|
@@ -20,20 +20,7 @@ package org.apache.hadoop.ozone.storage;
|
|
|
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
|
|
|
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
|
|
|
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_HANDLER_COUNT_DEFAULT;
|
|
|
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_HANDLER_COUNT_KEY;
|
|
|
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT;
|
|
|
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY;
|
|
|
+import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
|
|
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -47,19 +34,21 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.collect.Sets;
|
|
|
import com.google.protobuf.BlockingService;
|
|
|
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
-import org.apache.hadoop.ha.HAServiceProtocol;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol;
|
|
|
+import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
@@ -73,7 +62,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
@@ -88,8 +76,8 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
-import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
|
|
+import org.apache.hadoop.ozone.OzoneClientUtils;
|
|
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
|
|
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
|
|
|
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
|
|
@@ -128,17 +116,13 @@ public class StorageContainerManager
|
|
|
private Pipeline singlePipeline;
|
|
|
|
|
|
/** The RPC server that listens to requests from DataNodes. */
|
|
|
- private final RPC.Server serviceRpcServer;
|
|
|
- private final InetSocketAddress serviceRpcAddress;
|
|
|
+ private final RPC.Server datanodeRpcServer;
|
|
|
+ private final InetSocketAddress datanodeRpcAddress;
|
|
|
|
|
|
/** The RPC server that listens to requests from clients. */
|
|
|
private final RPC.Server clientRpcServer;
|
|
|
private final InetSocketAddress clientRpcAddress;
|
|
|
|
|
|
- /** The RPC server that listens to requests from nodes to find containers. */
|
|
|
- private final RPC.Server storageRpcServer;
|
|
|
- private final InetSocketAddress storageRpcAddress;
|
|
|
-
|
|
|
/**
|
|
|
* Creates a new StorageContainerManager. Configuration will be updated with
|
|
|
* information on the actual listening addresses used for RPC servers.
|
|
@@ -157,58 +141,38 @@ public class StorageContainerManager
|
|
|
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
|
|
|
ProtobufRpcEngine.class);
|
|
|
|
|
|
- int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
|
|
|
+ final int handlerCount = conf.getInt(
|
|
|
+ OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
|
|
|
+ final int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
|
|
|
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
|
|
|
- BlockingService dnProtoPbService =
|
|
|
- DatanodeProtocolProtos
|
|
|
- .DatanodeProtocolService
|
|
|
- .newReflectiveBlockingService(
|
|
|
+ BlockingService dnProtoPbService = DatanodeProtocolProtos.
|
|
|
+ DatanodeProtocolService.newReflectiveBlockingService(
|
|
|
new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength));
|
|
|
|
|
|
- InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false);
|
|
|
- serviceRpcServer = startRpcServer(conf, serviceRpcAddr,
|
|
|
- DatanodeProtocolPB.class, dnProtoPbService,
|
|
|
- DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
|
|
|
- DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
|
|
- DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
|
|
- serviceRpcAddress = updateListenAddress(conf,
|
|
|
- DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, serviceRpcAddr, serviceRpcServer);
|
|
|
+ final InetSocketAddress datanodeRpcAddr =
|
|
|
+ OzoneClientUtils.getScmDataNodeBindAddress(conf);
|
|
|
+ datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
|
|
|
+ DatanodeProtocolPB.class, dnProtoPbService, handlerCount);
|
|
|
+ datanodeRpcAddress = updateListenAddress(conf,
|
|
|
+ OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
|
|
|
LOG.info(buildRpcServerStartMessage("Service RPC server",
|
|
|
- serviceRpcAddress));
|
|
|
-
|
|
|
- InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf);
|
|
|
- clientRpcServer = startRpcServer(conf, rpcAddr,
|
|
|
- DatanodeProtocolPB.class, dnProtoPbService,
|
|
|
- DFS_NAMENODE_RPC_BIND_HOST_KEY,
|
|
|
- DFS_NAMENODE_HANDLER_COUNT_KEY,
|
|
|
- DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
|
|
- clientRpcAddress = updateListenAddress(conf,
|
|
|
- DFS_NAMENODE_RPC_ADDRESS_KEY, rpcAddr, clientRpcServer);
|
|
|
- conf.set(FS_DEFAULT_NAME_KEY, DFSUtilClient.getNNUri(clientRpcAddress)
|
|
|
- .toString());
|
|
|
- LOG.info(buildRpcServerStartMessage("RPC server", clientRpcAddress));
|
|
|
+ datanodeRpcAddress));
|
|
|
|
|
|
BlockingService storageProtoPbService =
|
|
|
StorageContainerLocationProtocolProtos
|
|
|
- .StorageContainerLocationProtocolService
|
|
|
- .newReflectiveBlockingService(
|
|
|
- new StorageContainerLocationProtocolServerSideTranslatorPB(this));
|
|
|
-
|
|
|
- InetSocketAddress storageRpcAddr = NetUtils.createSocketAddr(
|
|
|
- conf.getTrimmed(DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY,
|
|
|
- DFS_CONTAINER_LOCATION_RPC_ADDRESS_DEFAULT),
|
|
|
- -1, DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY);
|
|
|
+ .StorageContainerLocationProtocolService
|
|
|
+ .newReflectiveBlockingService(
|
|
|
+ new StorageContainerLocationProtocolServerSideTranslatorPB(this));
|
|
|
|
|
|
- storageRpcServer = startRpcServer(conf, storageRpcAddr,
|
|
|
+ final InetSocketAddress clientRpcAddr =
|
|
|
+ OzoneClientUtils.getScmClientBindAddress(conf);
|
|
|
+ clientRpcServer = startRpcServer(conf, clientRpcAddr,
|
|
|
StorageContainerLocationProtocolPB.class, storageProtoPbService,
|
|
|
- DFS_CONTAINER_LOCATION_RPC_BIND_HOST_KEY,
|
|
|
- DFS_CONTAINER_LOCATION_HANDLER_COUNT_KEY,
|
|
|
- DFS_CONTAINER_HANDLER_COUNT_DEFAULT);
|
|
|
- storageRpcAddress = updateListenAddress(conf,
|
|
|
- DFS_CONTAINER_LOCATION_RPC_ADDRESS_KEY,
|
|
|
- storageRpcAddr, storageRpcServer);
|
|
|
+ handlerCount);
|
|
|
+ clientRpcAddress = updateListenAddress(conf,
|
|
|
+ OZONE_SCM_CLIENT_ADDRESS_KEY, clientRpcAddr, clientRpcServer);
|
|
|
LOG.info(buildRpcServerStartMessage(
|
|
|
- "StorageContainerLocationProtocol RPC server", storageRpcAddress));
|
|
|
+ "StorageContainerLocationProtocol RPC server", clientRpcAddress));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -216,7 +180,7 @@ public class StorageContainerManager
|
|
|
throws IOException {
|
|
|
LOG.trace("getStorageContainerLocations keys = {}", keys);
|
|
|
Pipeline pipeline = initSingleContainerPipeline();
|
|
|
- List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
+ List<DatanodeDescriptor> liveNodes = new ArrayList<>();
|
|
|
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
|
|
|
if (liveNodes.isEmpty()) {
|
|
|
throw new IOException("Storage container locations not found.");
|
|
@@ -385,13 +349,13 @@ public class StorageContainerManager
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns listen address of StorageContainerLocation RPC server.
|
|
|
+ * Returns listen address of client RPC server.
|
|
|
*
|
|
|
- * @return listen address of StorageContainerLocation RPC server
|
|
|
+ * @return listen address of client RPC server
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- public InetSocketAddress getStorageContainerLocationRpcAddress() {
|
|
|
- return storageRpcAddress;
|
|
|
+ public InetSocketAddress getClientRpcAddress() {
|
|
|
+ return clientRpcAddress;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -399,10 +363,7 @@ public class StorageContainerManager
|
|
|
*/
|
|
|
public void start() {
|
|
|
clientRpcServer.start();
|
|
|
- if (serviceRpcServer != null) {
|
|
|
- serviceRpcServer.start();
|
|
|
- }
|
|
|
- storageRpcServer.start();
|
|
|
+ datanodeRpcServer.start();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -412,11 +373,8 @@ public class StorageContainerManager
|
|
|
if (clientRpcServer != null) {
|
|
|
clientRpcServer.stop();
|
|
|
}
|
|
|
- if (serviceRpcServer != null) {
|
|
|
- serviceRpcServer.stop();
|
|
|
- }
|
|
|
- if (storageRpcServer != null) {
|
|
|
- storageRpcServer.stop();
|
|
|
+ if (datanodeRpcServer != null) {
|
|
|
+ datanodeRpcServer.stop();
|
|
|
}
|
|
|
IOUtils.closeStream(ns);
|
|
|
}
|
|
@@ -427,10 +385,7 @@ public class StorageContainerManager
|
|
|
public void join() {
|
|
|
try {
|
|
|
clientRpcServer.join();
|
|
|
- if (serviceRpcServer != null) {
|
|
|
- serviceRpcServer.join();
|
|
|
- }
|
|
|
- storageRpcServer.join();
|
|
|
+ datanodeRpcServer.join();
|
|
|
} catch (InterruptedException e) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
LOG.info("Interrupted during StorageContainerManager join.");
|
|
@@ -497,7 +452,7 @@ public class StorageContainerManager
|
|
|
private static String buildRpcServerStartMessage(String description,
|
|
|
InetSocketAddress addr) {
|
|
|
return addr != null ? String.format("%s is listening at %s",
|
|
|
- description, NetUtils.getHostPortString(addr)) :
|
|
|
+ description, addr.getHostString() + ":" + addr.getPort()) :
|
|
|
String.format("%s not started", description);
|
|
|
}
|
|
|
|
|
@@ -527,59 +482,47 @@ public class StorageContainerManager
|
|
|
* @param addr configured address of RPC server
|
|
|
* @param protocol RPC protocol provided by RPC server
|
|
|
* @param instance RPC protocol implementation instance
|
|
|
- * @param bindHostKey configuration key for setting explicit bind host. If
|
|
|
- * the property is not configured, then the bind host is taken from addr.
|
|
|
- * @param handlerCountKey configuration key for RPC server handler count
|
|
|
- * @param handlerCountDefault default RPC server handler count if unconfigured
|
|
|
- * @return RPC server, or null if addr is null
|
|
|
+ * @param handlerCount RPC server handler count
|
|
|
+ *
|
|
|
+ * @return RPC server
|
|
|
* @throws IOException if there is an I/O error while creating RPC server
|
|
|
*/
|
|
|
private static RPC.Server startRpcServer(OzoneConfiguration conf,
|
|
|
InetSocketAddress addr, Class<?> protocol, BlockingService instance,
|
|
|
- String bindHostKey, String handlerCountKey, int handlerCountDefault)
|
|
|
+ int handlerCount)
|
|
|
throws IOException {
|
|
|
- if (addr == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- String bindHost = conf.getTrimmed(bindHostKey);
|
|
|
- if (bindHost == null || bindHost.isEmpty()) {
|
|
|
- bindHost = addr.getHostName();
|
|
|
- }
|
|
|
- int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
|
|
|
RPC.Server rpcServer = new RPC.Builder(conf)
|
|
|
.setProtocol(protocol)
|
|
|
.setInstance(instance)
|
|
|
- .setBindAddress(bindHost)
|
|
|
+ .setBindAddress(addr.getHostString())
|
|
|
.setPort(addr.getPort())
|
|
|
- .setNumHandlers(numHandlers)
|
|
|
+ .setNumHandlers(handlerCount)
|
|
|
.setVerbose(false)
|
|
|
.setSecretManager(null)
|
|
|
.build();
|
|
|
+
|
|
|
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
|
|
|
return rpcServer;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* After starting an RPC server, updates configuration with the actual
|
|
|
- * listening address of that server. The listening address may be different
|
|
|
+ * listening address of that server. The listening address may be different
|
|
|
* from the configured address if, for example, the configured address uses
|
|
|
* port 0 to request use of an ephemeral port.
|
|
|
*
|
|
|
* @param conf configuration to update
|
|
|
* @param rpcAddressKey configuration key for RPC server address
|
|
|
* @param addr configured address
|
|
|
- * @param rpcServer started RPC server. If null, then the server was not
|
|
|
- * started, and this method is a no-op.
|
|
|
+ * @param rpcServer started RPC server.
|
|
|
*/
|
|
|
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
|
|
|
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
|
|
|
- if (rpcServer == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
|
|
|
InetSocketAddress updatedAddr = new InetSocketAddress(
|
|
|
- addr.getHostName(), listenAddr.getPort());
|
|
|
- conf.set(rpcAddressKey, NetUtils.getHostPortString(updatedAddr));
|
|
|
+ addr.getHostString(), listenAddr.getPort());
|
|
|
+ conf.set(rpcAddressKey,
|
|
|
+ addr.getHostString() + ":" + listenAddr.getPort());
|
|
|
return updatedAddr;
|
|
|
}
|
|
|
|