|
@@ -159,10 +159,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
int handlerCount =
|
|
int handlerCount =
|
|
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
|
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
|
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
|
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
|
- InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
|
|
|
|
- RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
|
|
|
- ProtobufRpcEngine.class);
|
|
|
|
- ClientNamenodeProtocolServerSideTranslatorPB
|
|
|
|
|
|
+
|
|
|
|
+ RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
|
|
|
+ ProtobufRpcEngine.class);
|
|
|
|
+
|
|
|
|
+ ClientNamenodeProtocolServerSideTranslatorPB
|
|
clientProtocolServerTranslator =
|
|
clientProtocolServerTranslator =
|
|
new ClientNamenodeProtocolServerSideTranslatorPB(this);
|
|
new ClientNamenodeProtocolServerSideTranslatorPB(this);
|
|
BlockingService clientNNPbService = ClientNamenodeProtocol.
|
|
BlockingService clientNNPbService = ClientNamenodeProtocol.
|
|
@@ -199,22 +200,24 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
.newReflectiveBlockingService(haServiceProtocolXlator);
|
|
.newReflectiveBlockingService(haServiceProtocolXlator);
|
|
|
|
|
|
WritableRpcEngine.ensureInitialized();
|
|
WritableRpcEngine.ensureInitialized();
|
|
-
|
|
|
|
- InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
|
|
|
|
- if (dnSocketAddr != null) {
|
|
|
|
|
|
+
|
|
|
|
+ InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
|
|
|
|
+ if (serviceRpcAddr != null) {
|
|
int serviceHandlerCount =
|
|
int serviceHandlerCount =
|
|
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
|
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
|
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
|
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
|
- // Add all the RPC protocols that the namenode implements
|
|
|
|
- this.serviceRpcServer = new RPC.Builder(conf)
|
|
|
|
|
|
+ serviceRpcServer = new RPC.Builder(conf)
|
|
.setProtocol(
|
|
.setProtocol(
|
|
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
|
|
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
|
|
.setInstance(clientNNPbService)
|
|
.setInstance(clientNNPbService)
|
|
- .setBindAddress(dnSocketAddr.getHostName())
|
|
|
|
- .setPort(dnSocketAddr.getPort()).setNumHandlers(serviceHandlerCount)
|
|
|
|
|
|
+ .setBindAddress(serviceRpcAddr.getHostName())
|
|
|
|
+ .setPort(serviceRpcAddr.getPort())
|
|
|
|
+ .setNumHandlers(serviceHandlerCount)
|
|
.setVerbose(false)
|
|
.setVerbose(false)
|
|
.setSecretManager(namesystem.getDelegationTokenSecretManager())
|
|
.setSecretManager(namesystem.getDelegationTokenSecretManager())
|
|
.build();
|
|
.build();
|
|
|
|
+
|
|
|
|
+ // Add all the RPC protocols that the namenode implements
|
|
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
|
|
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
|
|
serviceRpcServer);
|
|
serviceRpcServer);
|
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
|
@@ -228,20 +231,26 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
|
|
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
|
|
getUserMappingService, serviceRpcServer);
|
|
getUserMappingService, serviceRpcServer);
|
|
|
|
|
|
- this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
|
|
|
|
|
+ serviceRPCAddress = serviceRpcServer.getListenerAddress();
|
|
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
|
|
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
|
|
} else {
|
|
} else {
|
|
serviceRpcServer = null;
|
|
serviceRpcServer = null;
|
|
serviceRPCAddress = null;
|
|
serviceRPCAddress = null;
|
|
}
|
|
}
|
|
- // Add all the RPC protocols that the namenode implements
|
|
|
|
- this.clientRpcServer = new RPC.Builder(conf)
|
|
|
|
|
|
+
|
|
|
|
+ InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
|
|
|
|
+ clientRpcServer = new RPC.Builder(conf)
|
|
.setProtocol(
|
|
.setProtocol(
|
|
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
|
|
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
|
|
- .setInstance(clientNNPbService).setBindAddress(socAddr.getHostName())
|
|
|
|
- .setPort(socAddr.getPort()).setNumHandlers(handlerCount)
|
|
|
|
|
|
+ .setInstance(clientNNPbService)
|
|
|
|
+ .setBindAddress(rpcAddr.getHostName())
|
|
|
|
+ .setPort(rpcAddr.getPort())
|
|
|
|
+ .setNumHandlers(handlerCount)
|
|
.setVerbose(false)
|
|
.setVerbose(false)
|
|
- .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
|
|
|
|
|
|
+ .setSecretManager(namesystem.getDelegationTokenSecretManager())
|
|
|
|
+ .build();
|
|
|
|
+
|
|
|
|
+ // Add all the RPC protocols that the namenode implements
|
|
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
|
|
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
|
|
clientRpcServer);
|
|
clientRpcServer);
|
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
|
@@ -259,44 +268,51 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
if (serviceAuthEnabled =
|
|
if (serviceAuthEnabled =
|
|
conf.getBoolean(
|
|
conf.getBoolean(
|
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
|
- this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
|
- if (this.serviceRpcServer != null) {
|
|
|
|
- this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
|
|
|
+ clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
|
+ if (serviceRpcServer != null) {
|
|
|
|
+ serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// The rpc-server port can be ephemeral... ensure we have the correct info
|
|
// The rpc-server port can be ephemeral... ensure we have the correct info
|
|
- this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
|
|
|
|
|
|
+ clientRpcAddress = clientRpcServer.getListenerAddress();
|
|
nn.setRpcServerAddress(conf, clientRpcAddress);
|
|
nn.setRpcServerAddress(conf, clientRpcAddress);
|
|
|
|
|
|
- this.minimumDataNodeVersion = conf.get(
|
|
|
|
|
|
+ minimumDataNodeVersion = conf.get(
|
|
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
|
|
|
|
|
|
// Set terse exception whose stack trace won't be logged
|
|
// Set terse exception whose stack trace won't be logged
|
|
- this.clientRpcServer.addTerseExceptions(SafeModeException.class);
|
|
|
|
|
|
+ clientRpcServer.addTerseExceptions(SafeModeException.class);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Actually start serving requests.
|
|
|
|
|
|
+ * Start client and service RPC servers.
|
|
*/
|
|
*/
|
|
void start() {
|
|
void start() {
|
|
- clientRpcServer.start(); //start RPC server
|
|
|
|
|
|
+ clientRpcServer.start();
|
|
if (serviceRpcServer != null) {
|
|
if (serviceRpcServer != null) {
|
|
serviceRpcServer.start();
|
|
serviceRpcServer.start();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Wait until the RPC server has shut down.
|
|
|
|
|
|
+ * Wait until the client RPC server has shutdown.
|
|
*/
|
|
*/
|
|
void join() throws InterruptedException {
|
|
void join() throws InterruptedException {
|
|
- this.clientRpcServer.join();
|
|
|
|
|
|
+ clientRpcServer.join();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Stop client and service RPC servers.
|
|
|
|
+ */
|
|
void stop() {
|
|
void stop() {
|
|
- if(clientRpcServer != null) clientRpcServer.stop();
|
|
|
|
- if(serviceRpcServer != null) serviceRpcServer.stop();
|
|
|
|
|
|
+ if (clientRpcServer != null) {
|
|
|
|
+ clientRpcServer.stop();
|
|
|
|
+ }
|
|
|
|
+ if (serviceRpcServer != null) {
|
|
|
|
+ serviceRpcServer.stop();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
InetSocketAddress getServiceRpcAddress() {
|
|
InetSocketAddress getServiceRpcAddress() {
|
|
@@ -333,8 +349,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
|
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
|
verifyRequest(registration);
|
|
verifyRequest(registration);
|
|
LOG.info("Error report from " + registration + ": " + msg);
|
|
LOG.info("Error report from " + registration + ": " + msg);
|
|
- if(errorCode == FATAL)
|
|
|
|
|
|
+ if (errorCode == FATAL) {
|
|
namesystem.releaseBackupNode(registration);
|
|
namesystem.releaseBackupNode(registration);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override // NamenodeProtocol
|
|
@Override // NamenodeProtocol
|