|
@@ -90,9 +90,13 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
|
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
|
|
|
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
|
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
|
@@ -102,11 +106,18 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
@@ -134,7 +145,8 @@ import com.google.protobuf.BlockingService;
|
|
* the requests to the active
|
|
* the requests to the active
|
|
* {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}.
|
|
* {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}.
|
|
*/
|
|
*/
|
|
-public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
|
|
|
+public class RouterRpcServer extends AbstractService
|
|
|
|
+ implements ClientProtocol, NamenodeProtocol {
|
|
|
|
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(RouterRpcServer.class);
|
|
LoggerFactory.getLogger(RouterRpcServer.class);
|
|
@@ -176,6 +188,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
|
|
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
|
|
|
|
|
|
|
|
|
|
|
|
+ /** NamenodeProtocol calls. */
|
|
|
|
+ private final RouterNamenodeProtocol nnProto;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Construct a router RPC server.
|
|
* Construct a router RPC server.
|
|
*
|
|
*
|
|
@@ -226,6 +241,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
BlockingService clientNNPbService = ClientNamenodeProtocol
|
|
BlockingService clientNNPbService = ClientNamenodeProtocol
|
|
.newReflectiveBlockingService(clientProtocolServerTranslator);
|
|
.newReflectiveBlockingService(clientProtocolServerTranslator);
|
|
|
|
|
|
|
|
+ NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
|
|
|
|
+ new NamenodeProtocolServerSideTranslatorPB(this);
|
|
|
|
+ BlockingService nnPbService = NamenodeProtocolService
|
|
|
|
+ .newReflectiveBlockingService(namenodeProtocolXlator);
|
|
|
|
+
|
|
InetSocketAddress confRpcAddress = conf.getSocketAddr(
|
|
InetSocketAddress confRpcAddress = conf.getSocketAddr(
|
|
RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
|
|
RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
|
|
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
|
|
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
|
|
@@ -244,6 +264,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
.setQueueSizePerHandler(handlerQueueSize)
|
|
.setQueueSizePerHandler(handlerQueueSize)
|
|
.setVerbose(false)
|
|
.setVerbose(false)
|
|
.build();
|
|
.build();
|
|
|
|
+
|
|
|
|
+ // Add all the RPC protocols that the Router implements
|
|
|
|
+ DFSUtil.addPBProtocol(
|
|
|
|
+ conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
|
|
|
|
+
|
|
// We don't want the server to log the full stack trace for some exceptions
|
|
// We don't want the server to log the full stack trace for some exceptions
|
|
this.rpcServer.addTerseExceptions(
|
|
this.rpcServer.addTerseExceptions(
|
|
RemoteException.class,
|
|
RemoteException.class,
|
|
@@ -271,6 +296,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
// Create the client
|
|
// Create the client
|
|
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
|
|
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
|
|
this.namenodeResolver, this.rpcMonitor);
|
|
this.namenodeResolver, this.rpcMonitor);
|
|
|
|
+
|
|
|
|
+ // Initialize modules
|
|
|
|
+ this.nnProto = new RouterNamenodeProtocol(this);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -315,6 +343,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
return rpcClient;
|
|
return rpcClient;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the subcluster resolver.
|
|
|
|
+ *
|
|
|
|
+ * @return Subcluster resolver.
|
|
|
|
+ */
|
|
|
|
+ public FileSubclusterResolver getSubclusterResolver() {
|
|
|
|
+ return subclusterResolver;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Get the RPC monitor and metrics.
|
|
* Get the RPC monitor and metrics.
|
|
*
|
|
*
|
|
@@ -1314,7 +1351,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
action, isChecked);
|
|
action, isChecked);
|
|
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
Map<FederationNamespaceInfo, Boolean> results =
|
|
Map<FederationNamespaceInfo, Boolean> results =
|
|
- rpcClient.invokeConcurrent(nss, method, true, true, boolean.class);
|
|
|
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, true, Boolean.class);
|
|
|
|
|
|
// We only report true if all the name space are in safe mode
|
|
// We only report true if all the name space are in safe mode
|
|
int numSafemode = 0;
|
|
int numSafemode = 0;
|
|
@@ -1334,7 +1371,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
new Class<?>[] {String.class}, arg);
|
|
new Class<?>[] {String.class}, arg);
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
Map<FederationNamespaceInfo, Boolean> ret =
|
|
Map<FederationNamespaceInfo, Boolean> ret =
|
|
- rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
|
|
|
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
|
|
|
|
|
|
boolean success = true;
|
|
boolean success = true;
|
|
for (boolean s : ret.values()) {
|
|
for (boolean s : ret.values()) {
|
|
@@ -1941,6 +1978,77 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return nnProto.getBlocks(datanode, size);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public ExportedBlockKeys getBlockKeys() throws IOException {
|
|
|
|
+ return nnProto.getBlockKeys();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public long getTransactionID() throws IOException {
|
|
|
|
+ return nnProto.getTransactionID();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public long getMostRecentCheckpointTxId() throws IOException {
|
|
|
|
+ return nnProto.getMostRecentCheckpointTxId();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public CheckpointSignature rollEditLog() throws IOException {
|
|
|
|
+ return nnProto.rollEditLog();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public NamespaceInfo versionRequest() throws IOException {
|
|
|
|
+ return nnProto.versionRequest();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public void errorReport(NamenodeRegistration registration, int errorCode,
|
|
|
|
+ String msg) throws IOException {
|
|
|
|
+ nnProto.errorReport(registration, errorCode, msg);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public NamenodeRegistration registerSubordinateNamenode(
|
|
|
|
+ NamenodeRegistration registration) throws IOException {
|
|
|
|
+ return nnProto.registerSubordinateNamenode(registration);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return nnProto.startCheckpoint(registration);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public void endCheckpoint(NamenodeRegistration registration,
|
|
|
|
+ CheckpointSignature sig) throws IOException {
|
|
|
|
+ nnProto.endCheckpoint(registration, sig);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return nnProto.getEditLogManifest(sinceTxId);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public boolean isUpgradeFinalized() throws IOException {
|
|
|
|
+ return nnProto.isUpgradeFinalized();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override // NamenodeProtocol
|
|
|
|
+ public boolean isRollingUpgrade() throws IOException {
|
|
|
|
+ return nnProto.isRollingUpgrade();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Locate the location with the matching block pool id.
|
|
* Locate the location with the matching block pool id.
|
|
*
|
|
*
|