|
@@ -136,7 +136,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
}
|
|
|
|
|
|
public long getProtocolVersion(String protocol,
|
|
|
- long clientVersion) throws IOException {
|
|
|
+ long clientVersion) throws IOException {
|
|
|
if (protocol.equals(ClientProtocol.class.getName())) {
|
|
|
return ClientProtocol.versionID;
|
|
|
} else if (protocol.equals(DatanodeProtocol.class.getName())){
|
|
@@ -159,10 +159,18 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
|
|
|
protected FSNamesystem namesystem;
|
|
|
protected NamenodeRole role;
|
|
|
- /** RPC server */
|
|
|
+ /** RPC server. */
|
|
|
protected Server server;
|
|
|
+ /** RPC server for HDFS Services communication.
|
|
|
+ BackupNode, Datanodes and all other services
|
|
|
+ should be connecting to this server if it is
|
|
|
+ configured. Clients should only go to NameNode#server
|
|
|
+ */
|
|
|
+ protected Server serviceRpcServer;
|
|
|
/** RPC server address */
|
|
|
protected InetSocketAddress rpcAddress = null;
|
|
|
+ /** RPC server for DN address */
|
|
|
+ protected InetSocketAddress serviceRPCAddress = null;
|
|
|
/** httpServer */
|
|
|
protected HttpServer httpServer;
|
|
|
/** HTTP server address */
|
|
@@ -204,6 +212,32 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
return NetUtils.createSocketAddr(address, DEFAULT_PORT);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Set the configuration property for the service rpc address
|
|
|
+ * to address
|
|
|
+ */
|
|
|
+ public static void setServiceAddress(Configuration conf,
|
|
|
+ String address) {
|
|
|
+ LOG.info("Setting ADDRESS " + address);
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fetches the address for services to use when connecting to namenode
|
|
|
+ * based on the value of fallback returns null if the special
|
|
|
+ * address is not specified or returns the default namenode address
|
|
|
+ * to be used by both clients and services.
|
|
|
+ * Services here are datanodes, backup node, any non client connection
|
|
|
+ */
|
|
|
+ public static InetSocketAddress getServiceAddress(Configuration conf,
|
|
|
+ boolean fallback) {
|
|
|
+ String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
|
|
|
+ if (addr == null || addr.isEmpty()) {
|
|
|
+ return fallback ? getAddress(conf) : null;
|
|
|
+ }
|
|
|
+ return getAddress(addr);
|
|
|
+ }
|
|
|
+
|
|
|
public static InetSocketAddress getAddress(Configuration conf) {
|
|
|
URI filesystemURI = FileSystem.getDefaultUri(conf);
|
|
|
String authority = filesystemURI.getAuthority();
|
|
@@ -247,9 +281,25 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
return role.equals(that);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Given a configuration get the address of the service rpc server
|
|
|
+ * If the service rpc is not configured returns null
|
|
|
+ */
|
|
|
+ protected InetSocketAddress getServiceRpcServerAddress(Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ return NameNode.getServiceAddress(conf, false);
|
|
|
+ }
|
|
|
+
|
|
|
protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
|
|
|
return getAddress(conf);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Modifies the configuration passed to contain the service rpc address setting
|
|
|
+ */
|
|
|
+ protected void setRpcServiceServerAddress(Configuration conf) {
|
|
|
+ setServiceAddress(conf, getHostPortString(serviceRPCAddress));
|
|
|
+ }
|
|
|
|
|
|
protected void setRpcServerAddress(Configuration conf) {
|
|
|
FileSystem.setDefaultUri(conf, getUri(rpcAddress));
|
|
@@ -298,7 +348,18 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
|
|
|
NameNode.initMetrics(conf, this.getRole());
|
|
|
loadNamesystem(conf);
|
|
|
- // create rpc server
|
|
|
+ // create rpc server
|
|
|
+ InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
|
|
|
+ if (dnSocketAddr != null) {
|
|
|
+ int serviceHandlerCount =
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
|
|
+ this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
|
|
|
+ dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
|
|
|
+ false, conf, namesystem.getDelegationTokenSecretManager());
|
|
|
+ this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
|
|
+ setRpcServiceServerAddress(conf);
|
|
|
+ }
|
|
|
this.server = RPC.getServer(NamenodeProtocols.class, this,
|
|
|
socAddr.getHostName(), socAddr.getPort(),
|
|
|
handlerCount, false, conf,
|
|
@@ -309,6 +370,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
|
|
|
activate(conf);
|
|
|
LOG.info(getRole() + " up at: " + rpcAddress);
|
|
|
+ if (serviceRPCAddress != null) {
|
|
|
+ LOG.info(getRole() + " service server is up at: " + serviceRPCAddress);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -322,6 +386,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
namesystem.activate(conf);
|
|
|
startHttpServer(conf);
|
|
|
server.start(); //start RPC server
|
|
|
+ if (serviceRpcServer != null) {
|
|
|
+ serviceRpcServer.start();
|
|
|
+ }
|
|
|
startTrashEmptier(conf);
|
|
|
|
|
|
plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
|
|
@@ -471,6 +538,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
|
|
if(namesystem != null) namesystem.close();
|
|
|
if(emptier != null) emptier.interrupt();
|
|
|
if(server != null) server.stop();
|
|
|
+ if(serviceRpcServer != null) serviceRpcServer.stop();
|
|
|
if (myMetrics != null) {
|
|
|
myMetrics.shutdown();
|
|
|
}
|