|
@@ -117,8 +117,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
|
private final InetSocketAddress serviceRPCAddress;
|
|
|
|
|
|
/** The RPC server that listens to requests from clients */
|
|
|
- protected final RPC.Server server;
|
|
|
- protected final InetSocketAddress rpcAddress;
|
|
|
+ protected final RPC.Server clientRpcServer;
|
|
|
+ protected final InetSocketAddress clientRpcAddress;
|
|
|
|
|
|
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
|
|
throws IOException {
|
|
@@ -130,15 +130,30 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
|
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
|
|
|
DFS_DATANODE_HANDLER_COUNT_DEFAULT);
|
|
|
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
|
|
|
-
|
|
|
+ ClientNamenodeProtocolServerSideTranslatorR23
|
|
|
+ clientProtocolServerTranslator =
|
|
|
+ new ClientNamenodeProtocolServerSideTranslatorR23(this);
|
|
|
+
|
|
|
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
|
|
|
if (dnSocketAddr != null) {
|
|
|
int serviceHandlerCount =
|
|
|
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
|
|
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
|
|
- this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
|
|
|
- dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
|
|
|
+ // Add all the RPC protocols that the namenode implements
|
|
|
+ this.serviceRpcServer =
|
|
|
+ RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.
|
|
|
+ ClientNamenodeWireProtocol.class, clientProtocolServerTranslator,
|
|
|
+ dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
|
|
|
+ serviceHandlerCount,
|
|
|
false, conf, namesystem.getDelegationTokenSecretManager());
|
|
|
+ this.serviceRpcServer.addProtocol(DatanodeProtocol.class, this);
|
|
|
+ this.serviceRpcServer.addProtocol(NamenodeProtocol.class, this);
|
|
|
+ this.serviceRpcServer.addProtocol(
|
|
|
+ RefreshAuthorizationPolicyProtocol.class, this);
|
|
|
+ this.serviceRpcServer.addProtocol(
|
|
|
+ RefreshUserMappingsProtocol.class, this);
|
|
|
+ this.serviceRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
|
|
|
+
|
|
|
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
|
|
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
|
|
|
} else {
|
|
@@ -146,40 +161,40 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
|
serviceRPCAddress = null;
|
|
|
}
|
|
|
// Add all the RPC protocols that the namenode implements
|
|
|
- ClientNamenodeProtocolServerSideTranslatorR23 clientProtocolServerTranslator =
|
|
|
- new ClientNamenodeProtocolServerSideTranslatorR23(this);
|
|
|
- this.server = RPC.getServer(
|
|
|
- org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol.class,
|
|
|
+ this.clientRpcServer = RPC.getServer(
|
|
|
+ org.apache.hadoop.hdfs.protocolR23Compatible.
|
|
|
+ ClientNamenodeWireProtocol.class,
|
|
|
clientProtocolServerTranslator, socAddr.getHostName(),
|
|
|
socAddr.getPort(), handlerCount, false, conf,
|
|
|
namesystem.getDelegationTokenSecretManager());
|
|
|
- this.server.addProtocol(DatanodeProtocol.class, this);
|
|
|
- this.server.addProtocol(NamenodeProtocol.class, this);
|
|
|
- this.server.addProtocol(RefreshAuthorizationPolicyProtocol.class, this);
|
|
|
- this.server.addProtocol(RefreshUserMappingsProtocol.class, this);
|
|
|
- this.server.addProtocol(GetUserMappingsProtocol.class, this);
|
|
|
+ this.clientRpcServer.addProtocol(DatanodeProtocol.class, this);
|
|
|
+ this.clientRpcServer.addProtocol(NamenodeProtocol.class, this);
|
|
|
+ this.clientRpcServer.addProtocol(
|
|
|
+ RefreshAuthorizationPolicyProtocol.class, this);
|
|
|
+ this.clientRpcServer.addProtocol(RefreshUserMappingsProtocol.class, this);
|
|
|
+ this.clientRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
|
|
|
|
|
|
|
|
|
// set service-level authorization security policy
|
|
|
if (serviceAuthEnabled =
|
|
|
conf.getBoolean(
|
|
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
|
|
- this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
+ this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
if (this.serviceRpcServer != null) {
|
|
|
this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// The rpc-server port can be ephemeral... ensure we have the correct info
|
|
|
- this.rpcAddress = this.server.getListenerAddress();
|
|
|
- nn.setRpcServerAddress(conf, rpcAddress);
|
|
|
+ this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
|
|
|
+ nn.setRpcServerAddress(conf, clientRpcAddress);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Actually start serving requests.
|
|
|
*/
|
|
|
void start() {
|
|
|
- server.start(); //start RPC server
|
|
|
+ clientRpcServer.start(); //start RPC server
|
|
|
if (serviceRpcServer != null) {
|
|
|
serviceRpcServer.start();
|
|
|
}
|
|
@@ -189,11 +204,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
|
* Wait until the RPC server has shut down.
|
|
|
*/
|
|
|
void join() throws InterruptedException {
|
|
|
- this.server.join();
|
|
|
+ this.clientRpcServer.join();
|
|
|
}
|
|
|
|
|
|
void stop() {
|
|
|
- if(server != null) server.stop();
|
|
|
+ if(clientRpcServer != null) clientRpcServer.stop();
|
|
|
if(serviceRpcServer != null) serviceRpcServer.stop();
|
|
|
}
|
|
|
|
|
@@ -202,7 +217,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
|
}
|
|
|
|
|
|
InetSocketAddress getRpcAddress() {
|
|
|
- return rpcAddress;
|
|
|
+ return clientRpcAddress;
|
|
|
}
|
|
|
|
|
|
@Override // VersionedProtocol
|
|
@@ -875,7 +890,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
|
throw new AuthorizationException("Service Level Authorization not enabled!");
|
|
|
}
|
|
|
|
|
|
- this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
|
|
|
+ this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
|
|
|
if (this.serviceRpcServer != null) {
|
|
|
this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
|
|
|
}
|