|
@@ -111,7 +111,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
}
|
|
|
|
|
|
public long getProtocolVersion(String protocol,
|
|
|
- long clientVersion) throws IOException {
|
|
|
+ long clientVersion) throws IOException {
|
|
|
if (protocol.equals(ClientProtocol.class.getName())) {
|
|
|
return ClientProtocol.versionID;
|
|
|
} else if (protocol.equals(DatanodeProtocol.class.getName())){
|
|
@@ -134,8 +134,17 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
public FSNamesystem namesystem; // TODO: This should private. Use getNamesystem() instead.
|
|
|
/** RPC server */
|
|
|
private Server server;
|
|
|
+ /** RPC server for HDFS Services communication.
|
|
|
+ BackupNode, Datanodes and all other services
|
|
|
+ should be connecting to this server if it is
|
|
|
+ configured. Clients should only go to NameNode#server
|
|
|
+ */
|
|
|
+ private Server serviceRpcServer;
|
|
|
+
|
|
|
/** RPC server address */
|
|
|
private InetSocketAddress serverAddress = null;
|
|
|
+ /** RPC server for DN address */
|
|
|
+ protected InetSocketAddress serviceRPCAddress = null;
|
|
|
/** httpServer */
|
|
|
private HttpServer httpServer;
|
|
|
/** HTTP server address */
|
|
@@ -166,6 +175,32 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
return NetUtils.createSocketAddr(address, DEFAULT_PORT);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Set the configuration property for the service rpc address
|
|
|
+ * to address
|
|
|
+ */
|
|
|
+ public static void setServiceAddress(Configuration conf,
|
|
|
+ String address) {
|
|
|
+ LOG.info("Setting ADDRESS " + address);
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fetches the address for services to use when connecting to namenode
|
|
|
+ * based on the value of fallback returns null if the special
|
|
|
+ * address is not specified or returns the default namenode address
|
|
|
+ * to be used by both clients and services.
|
|
|
+ * Services here are datanodes, backup node, any non client connection
|
|
|
+ */
|
|
|
+ public static InetSocketAddress getServiceAddress(Configuration conf,
|
|
|
+ boolean fallback) {
|
|
|
+ String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
|
|
|
+ if (addr == null || addr.isEmpty()) {
|
|
|
+ return fallback ? getAddress(conf) : null;
|
|
|
+ }
|
|
|
+ return getAddress(addr);
|
|
|
+ }
|
|
|
+
|
|
|
public static InetSocketAddress getAddress(Configuration conf) {
|
|
|
return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
|
|
|
}
|
|
@@ -176,6 +211,25 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
return URI.create("hdfs://"+ namenode.getHostName()+portString);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Given a configuration get the address of the service rpc server
|
|
|
+ * If the service rpc is not configured returns null
|
|
|
+ */
|
|
|
+ protected InetSocketAddress getServiceRpcServerAddress(Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ return NameNode.getServiceAddress(conf, false);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Modifies the configuration passed to contain the service rpc address setting
|
|
|
+ */
|
|
|
+ protected void setRpcServiceServerAddress(Configuration conf) {
|
|
|
+ String address = serviceRPCAddress.getHostName() + ":"
|
|
|
+ + serviceRPCAddress.getPort();
|
|
|
+ setServiceAddress(conf, address);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Initialize name-node.
|
|
|
*
|
|
@@ -202,7 +256,18 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
namesystem.activateSecretManager();
|
|
|
}
|
|
|
|
|
|
- // create rpc server
|
|
|
+ // create rpc server
|
|
|
+ InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
|
|
|
+ if (dnSocketAddr != null) {
|
|
|
+ int serviceHandlerCount =
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
|
|
+ this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
|
|
|
+ dnSocketAddr.getPort(), serviceHandlerCount,
|
|
|
+ false, conf, namesystem.getDelegationTokenSecretManager());
|
|
|
+ this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
|
|
+ setRpcServiceServerAddress(conf);
|
|
|
+ }
|
|
|
this.server = RPC.getServer(this, socAddr.getHostName(),
|
|
|
socAddr.getPort(), handlerCount, false, conf, namesystem
|
|
|
.getDelegationTokenSecretManager());
|
|
@@ -216,6 +281,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
|
|
|
startHttpServer(conf);
|
|
|
this.server.start(); //start RPC server
|
|
|
+ if (serviceRpcServer != null) {
|
|
|
+ serviceRpcServer.start();
|
|
|
+ }
|
|
|
startTrashEmptier(conf);
|
|
|
}
|
|
|
|
|
@@ -395,6 +463,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
if(namesystem != null) namesystem.close();
|
|
|
if(emptier != null) emptier.interrupt();
|
|
|
if(server != null) server.stop();
|
|
|
+ if(serviceRpcServer != null) serviceRpcServer.stop();
|
|
|
if (myMetrics != null) {
|
|
|
myMetrics.shutdown();
|
|
|
}
|