|
@@ -19,19 +19,13 @@
|
|
|
package org.apache.ambari.view.utils.hdfs;
|
|
|
|
|
|
import org.apache.ambari.view.ViewContext;
|
|
|
-import org.apache.ambari.view.utils.ambari.AmbariApi;
|
|
|
-import org.apache.ambari.view.utils.ambari.NoClusterAssociatedException;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
-import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
|
|
-import org.apache.hadoop.fs.azure.Wasb;
|
|
|
-import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
-import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
|
|
|
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.StringWriter;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.util.Map;
|
|
@@ -47,27 +41,27 @@ public class ConfigurationBuilder {
|
|
|
public static final String HDFS_SITE = "hdfs-site";
|
|
|
|
|
|
public static final String DEFAULT_FS_INSTANCE_PROPERTY = "webhdfs.url";
|
|
|
- public static final String DEFAULT_FS_CLUSTER_PROPERTY = "fs.defaultFS";
|
|
|
+ public static final String DEFAULT_FS_CLUSTER_PROPERTY = "fs.defaultFS";
|
|
|
|
|
|
public static final String NAMESERVICES_INSTANCE_PROPERTY = "webhdfs.nameservices";
|
|
|
- public static final String NAMESERVICES_CLUSTER_PROPERTY = "dfs.nameservices";
|
|
|
+ public static final String NAMESERVICES_CLUSTER_PROPERTY = "dfs.nameservices";
|
|
|
public static final String HA_NAMENODES_INSTANCE_PROPERTY = "webhdfs.ha.namenodes.list";
|
|
|
|
|
|
- public static final String HA_NAMENODES_CLUSTER_PROPERTY = "dfs.ha.namenodes.%s";
|
|
|
+ public static final String HA_NAMENODES_CLUSTER_PROPERTY = "dfs.ha.namenodes.%s";
|
|
|
public static final String NAMENODE_RPC_NN1_INSTANCE_PROPERTY = "webhdfs.ha.namenode.rpc-address.nn1";
|
|
|
public static final String NAMENODE_RPC_NN2_INSTANCE_PROPERTY = "webhdfs.ha.namenode.rpc-address.nn2";
|
|
|
- public static final String NAMENODE_RPC_NN_CLUSTER_PROPERTY = "dfs.namenode.rpc-address.%s.%s";
|
|
|
+ public static final String NAMENODE_RPC_NN_CLUSTER_PROPERTY = "dfs.namenode.rpc-address.%s.%s";
|
|
|
|
|
|
public static final String NAMENODE_HTTP_NN1_INSTANCE_PROPERTY = "webhdfs.ha.namenode.http-address.nn1";
|
|
|
public static final String NAMENODE_HTTP_NN2_INSTANCE_PROPERTY = "webhdfs.ha.namenode.http-address.nn2";
|
|
|
- public static final String NAMENODE_HTTP_NN_CLUSTER_PROPERTY = "dfs.namenode.http-address.%s.%s";
|
|
|
+ public static final String NAMENODE_HTTP_NN_CLUSTER_PROPERTY = "dfs.namenode.http-address.%s.%s";
|
|
|
|
|
|
public static final String NAMENODE_HTTPS_NN1_INSTANCE_PROPERTY = "webhdfs.ha.namenode.https-address.nn1";
|
|
|
public static final String NAMENODE_HTTPS_NN2_INSTANCE_PROPERTY = "webhdfs.ha.namenode.https-address.nn2";
|
|
|
- public static final String NAMENODE_HTTPS_NN_CLUSTER_PROPERTY = "dfs.namenode.https-address.%s.%s";
|
|
|
+ public static final String NAMENODE_HTTPS_NN_CLUSTER_PROPERTY = "dfs.namenode.https-address.%s.%s";
|
|
|
|
|
|
public static final String FAILOVER_PROXY_PROVIDER_INSTANCE_PROPERTY = "webhdfs.client.failover.proxy.provider";
|
|
|
- public static final String FAILOVER_PROXY_PROVIDER_CLUSTER_PROPERTY = "dfs.client.failover.proxy.provider.%s";
|
|
|
+ public static final String FAILOVER_PROXY_PROVIDER_CLUSTER_PROPERTY = "dfs.client.failover.proxy.provider.%s";
|
|
|
|
|
|
public static final String UMASK_CLUSTER_PROPERTY = "fs.permissions.umask-mode";
|
|
|
public static final String UMASK_INSTANCE_PROPERTY = "hdfs.umask-mode";
|
|
@@ -85,6 +79,8 @@ public class ConfigurationBuilder {
|
|
|
private AuthConfigurationBuilder authParamsBuilder;
|
|
|
private Map<String, String> authParams;
|
|
|
private URI defaultFsUri;
|
|
|
+ private Map<String, String> customProperties;
|
|
|
+
|
|
|
/**
|
|
|
* Constructor of ConfigurationBuilder based on ViewContext
|
|
|
* @param context ViewContext
|
|
@@ -94,6 +90,17 @@ public class ConfigurationBuilder {
|
|
|
this.authParamsBuilder = new AuthConfigurationBuilder(context);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * takes context and any extra custom properties that needs to be included into config
|
|
|
+ * @param context
|
|
|
+ * @param customProperties
|
|
|
+ */
|
|
|
+ public ConfigurationBuilder(ViewContext context, Map<String, String> customProperties) {
|
|
|
+ this.context = context;
|
|
|
+ this.authParamsBuilder = new AuthConfigurationBuilder(context);
|
|
|
+ this.customProperties = customProperties;
|
|
|
+ }
|
|
|
+
|
|
|
private void parseProperties() throws HdfsApiException {
|
|
|
String defaultFS = getDefaultFS(context);
|
|
|
|
|
@@ -113,7 +120,7 @@ public class ConfigurationBuilder {
|
|
|
|
|
|
} catch (URISyntaxException e) {
|
|
|
throw new HdfsApiException("HDFS060 Invalid " + DEFAULT_FS_INSTANCE_PROPERTY +
|
|
|
- "='" + defaultFS + "' URI", e);
|
|
|
+ "='" + defaultFS + "' URI", e);
|
|
|
}
|
|
|
|
|
|
conf.set("fs.defaultFS", defaultFS);
|
|
@@ -128,30 +135,30 @@ public class ConfigurationBuilder {
|
|
|
|
|
|
defaultFS = addProtocolIfMissing(defaultFS);
|
|
|
|
|
|
- if(context.getCluster() != null){
|
|
|
+ if (context.getCluster() != null) {
|
|
|
try {
|
|
|
URI fsUri = new URI(defaultFS);
|
|
|
String protocol = fsUri.getScheme();
|
|
|
String hostWithPort = defaultFS.substring(protocol.length() + 3);
|
|
|
|
|
|
- Boolean webHdfsEnabled = Boolean.valueOf(getProperty(HDFS_SITE,DFS_WEBHDFS_ENABLED));
|
|
|
- Boolean isHttps = DFS_HTTP_POLICY_HTTPS_ONLY.equals(getProperty(HDFS_SITE,DFS_HTTP_POLICY));
|
|
|
+ Boolean webHdfsEnabled = Boolean.valueOf(getProperty(HDFS_SITE, DFS_WEBHDFS_ENABLED));
|
|
|
+ Boolean isHttps = DFS_HTTP_POLICY_HTTPS_ONLY.equals(getProperty(HDFS_SITE, DFS_HTTP_POLICY));
|
|
|
|
|
|
boolean isHA = isHAEnabled(defaultFS);
|
|
|
|
|
|
- if(webHdfsEnabled && isHttps){
|
|
|
+ if (webHdfsEnabled && isHttps && "hdfs".equals(protocol)) {
|
|
|
protocol = "swebhdfs";
|
|
|
- String httpAddr = getProperty(HDFS_SITE,DFS_NAMENODE_HTTPS_ADDERSS);
|
|
|
- if(!isHA && httpAddr != null) hostWithPort = httpAddr ;
|
|
|
- }else if(webHdfsEnabled){
|
|
|
+ String httpAddr = getProperty(HDFS_SITE, DFS_NAMENODE_HTTPS_ADDERSS);
|
|
|
+ if (!isHA && httpAddr != null) hostWithPort = httpAddr;
|
|
|
+ } else if (webHdfsEnabled && "hdfs".equals(protocol)) {
|
|
|
protocol = "webhdfs";
|
|
|
- String httpsAddr = getProperty(HDFS_SITE,DFS_NAMENODE_HTTP_ADDERSS);
|
|
|
- if(!isHA) hostWithPort = httpsAddr;
|
|
|
+ String httpsAddr = getProperty(HDFS_SITE, DFS_NAMENODE_HTTP_ADDERSS);
|
|
|
+ if (!isHA) hostWithPort = httpsAddr;
|
|
|
}
|
|
|
|
|
|
- return protocol + "://" +hostWithPort;
|
|
|
+ return protocol + "://" + hostWithPort;
|
|
|
} catch (URISyntaxException e) {
|
|
|
- throw new HdfsApiException("Invalid URI format."+e.getMessage(),e);
|
|
|
+ throw new HdfsApiException("Invalid URI format." + e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
return defaultFS;
|
|
@@ -160,7 +167,7 @@ public class ConfigurationBuilder {
|
|
|
private String getProperty(String type, String key, String instanceProperty) {
|
|
|
String value;
|
|
|
|
|
|
- if(context.getCluster() != null) {
|
|
|
+ if (context.getCluster() != null) {
|
|
|
value = context.getCluster().getConfigurationValue(type, key);
|
|
|
} else {
|
|
|
value = context.getProperties().get(instanceProperty);
|
|
@@ -168,9 +175,9 @@ public class ConfigurationBuilder {
|
|
|
return value;
|
|
|
}
|
|
|
|
|
|
- private String getProperty(String type,String key){
|
|
|
- if(context.getCluster() != null){
|
|
|
- return context.getCluster().getConfigurationValue(type,key);
|
|
|
+ private String getProperty(String type, String key) {
|
|
|
+ if (context.getCluster() != null) {
|
|
|
+ return context.getCluster().getConfigurationValue(type, key);
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
@@ -178,7 +185,7 @@ public class ConfigurationBuilder {
|
|
|
private void copyPropertyIfExists(String type, String key) {
|
|
|
String value;
|
|
|
|
|
|
- if(context.getCluster() != null) {
|
|
|
+ if (context.getCluster() != null) {
|
|
|
value = context.getCluster().getConfigurationValue(type, key);
|
|
|
if (value != null) {
|
|
|
conf.set(key, value);
|
|
@@ -191,13 +198,35 @@ public class ConfigurationBuilder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void copyPropertiesBySite(String type) {
|
|
|
+ if (context.getCluster() != null) {
|
|
|
+ Map<String, String> configs = context.getCluster().getConfigByType(type);
|
|
|
+ LOG.debug("configs from core-site : {}", configs);
|
|
|
+ copyProperties(configs);
|
|
|
+ } else {
|
|
|
+ LOG.error("Cannot find cluster.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void copyProperties(Map<String, String> configs) {
|
|
|
+ if (null != configs) {
|
|
|
+ for(Map.Entry<String, String> entry : configs.entrySet()){
|
|
|
+ String key = entry.getKey();
|
|
|
+ String value = entry.getValue();
|
|
|
+ conf.set(key, value);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.error("configs were null.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void copyHAProperties(String defaultFS) throws URISyntaxException, HdfsApiException {
|
|
|
URI uri = new URI(defaultFS);
|
|
|
String nameservice = uri.getHost();
|
|
|
|
|
|
copyClusterProperty(NAMESERVICES_CLUSTER_PROPERTY, NAMESERVICES_INSTANCE_PROPERTY);
|
|
|
String namenodeIDs = copyClusterProperty(String.format(HA_NAMENODES_CLUSTER_PROPERTY, nameservice),
|
|
|
- HA_NAMENODES_INSTANCE_PROPERTY);
|
|
|
+ HA_NAMENODES_INSTANCE_PROPERTY);
|
|
|
|
|
|
String[] namenodes = namenodeIDs.split(",");
|
|
|
if (namenodes.length != 2) {
|
|
@@ -205,22 +234,22 @@ public class ConfigurationBuilder {
|
|
|
}
|
|
|
//NN1
|
|
|
copyClusterProperty(String.format(NAMENODE_RPC_NN_CLUSTER_PROPERTY, nameservice, namenodes[0]),
|
|
|
- NAMENODE_RPC_NN1_INSTANCE_PROPERTY);
|
|
|
+ NAMENODE_RPC_NN1_INSTANCE_PROPERTY);
|
|
|
copyClusterProperty(String.format(NAMENODE_HTTP_NN_CLUSTER_PROPERTY, nameservice, namenodes[0]),
|
|
|
- NAMENODE_HTTP_NN1_INSTANCE_PROPERTY);
|
|
|
+ NAMENODE_HTTP_NN1_INSTANCE_PROPERTY);
|
|
|
copyClusterProperty(String.format(NAMENODE_HTTPS_NN_CLUSTER_PROPERTY, nameservice, namenodes[0]),
|
|
|
NAMENODE_HTTPS_NN1_INSTANCE_PROPERTY);
|
|
|
|
|
|
//NN2
|
|
|
copyClusterProperty(String.format(NAMENODE_RPC_NN_CLUSTER_PROPERTY, nameservice, namenodes[1]),
|
|
|
- NAMENODE_RPC_NN2_INSTANCE_PROPERTY);
|
|
|
+ NAMENODE_RPC_NN2_INSTANCE_PROPERTY);
|
|
|
copyClusterProperty(String.format(NAMENODE_HTTP_NN_CLUSTER_PROPERTY, nameservice, namenodes[1]),
|
|
|
- NAMENODE_HTTP_NN2_INSTANCE_PROPERTY);
|
|
|
+ NAMENODE_HTTP_NN2_INSTANCE_PROPERTY);
|
|
|
copyClusterProperty(String.format(NAMENODE_HTTPS_NN_CLUSTER_PROPERTY, nameservice, namenodes[1]),
|
|
|
NAMENODE_HTTPS_NN2_INSTANCE_PROPERTY);
|
|
|
|
|
|
copyClusterProperty(String.format(FAILOVER_PROXY_PROVIDER_CLUSTER_PROPERTY, nameservice),
|
|
|
- FAILOVER_PROXY_PROVIDER_INSTANCE_PROPERTY);
|
|
|
+ FAILOVER_PROXY_PROVIDER_INSTANCE_PROPERTY);
|
|
|
}
|
|
|
|
|
|
private String copyClusterProperty(String propertyName, String instancePropertyName) {
|
|
@@ -236,7 +265,7 @@ public class ConfigurationBuilder {
|
|
|
URI uri = new URI(defaultFS);
|
|
|
String nameservice = uri.getHost();
|
|
|
String namenodeIDs = getProperty(HDFS_SITE, String.format(HA_NAMENODES_CLUSTER_PROPERTY, nameservice),
|
|
|
- HA_NAMENODES_INSTANCE_PROPERTY);
|
|
|
+ HA_NAMENODES_INSTANCE_PROPERTY);
|
|
|
return namenodeIDs != null;
|
|
|
}
|
|
|
|
|
@@ -280,49 +309,35 @@ public class ConfigurationBuilder {
|
|
|
public Configuration buildConfig() throws HdfsApiException {
|
|
|
parseProperties();
|
|
|
setAuthParams(buildAuthenticationConfig());
|
|
|
+ copyPropertiesBySite(CORE_SITE);
|
|
|
+ copyPropertiesBySite(HDFS_SITE);
|
|
|
|
|
|
String umask = context.getProperties().get(UMASK_INSTANCE_PROPERTY);
|
|
|
- if(umask != null && !umask.isEmpty()) conf.set(UMASK_CLUSTER_PROPERTY,umask);
|
|
|
+ if (umask != null && !umask.isEmpty()) conf.set(UMASK_CLUSTER_PROPERTY, umask);
|
|
|
|
|
|
- conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
|
|
|
- conf.set("fs.webhdfs.impl", WebHdfsFileSystem.class.getName());
|
|
|
- conf.set("fs.file.impl", LocalFileSystem.class.getName());
|
|
|
- conf.set("fs.swebhdfs.impl", SWebHdfsFileSystem.class.getName());
|
|
|
-
|
|
|
- configureWASB();
|
|
|
- configureADL();
|
|
|
+ if(null != this.customProperties){
|
|
|
+ copyProperties(this.customProperties);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(LOG.isDebugEnabled()){
|
|
|
+ LOG.debug("final conf : {}", printConf());
|
|
|
+ }
|
|
|
|
|
|
return conf;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Fill Azure Blob Storage properties if wasb:// scheme configured
|
|
|
- */
|
|
|
- public void configureWASB() {
|
|
|
- LOG.debug("defaultFsUri.getScheme() == " + defaultFsUri.getScheme());
|
|
|
- if (defaultFsUri.getScheme().equals("wasb")) {
|
|
|
- conf.set("fs.AbstractFileSystem.wasb.impl", Wasb.class.getName());
|
|
|
- conf.set("fs.wasb.impl", NativeAzureFileSystem.class.getName());
|
|
|
-
|
|
|
- String account = defaultFsUri.getHost();
|
|
|
- LOG.debug("WASB account == " + account);
|
|
|
- copyPropertyIfExists(CORE_SITE, "fs.azure.account.key." + account);
|
|
|
- copyPropertyIfExists(CORE_SITE, "fs.azure.account.keyprovider." + account);
|
|
|
- copyPropertyIfExists(CORE_SITE, "fs.azure.shellkeyprovider.script");
|
|
|
+ private String printConf() {
|
|
|
+ try {
|
|
|
+ StringWriter stringWriter = new StringWriter();
|
|
|
+ conf.writeXml(stringWriter);
|
|
|
+ stringWriter.close();
|
|
|
+ return stringWriter.toString().replace("\n", "");
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("error while converting conf to xml : ", e);
|
|
|
+ return "";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Fill adl properties if adl:// scheme configured
|
|
|
- */
|
|
|
- public void configureADL() {
|
|
|
- if (defaultFsUri.getScheme().equals("adl")) {
|
|
|
- conf.set("fs.adl.impl", "com.microsoft.azure.datalake.store.AdlFileSystem");
|
|
|
- copyPropertyIfExists(CORE_SITE,"dfs.webhdfs.oauth2.access.token.provider");
|
|
|
- copyPropertyIfExists(CORE_SITE,"fs.azure.datalake.token.provider.service.urls");
|
|
|
- copyPropertyIfExists(CORE_SITE,"fs.azure.datalake.token.provider.script");
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Builds the authentication configuration
|