|
@@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
|
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
|
|
@@ -46,12 +47,14 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
|
|
|
|
|
|
/**
|
|
|
- * Ozone main class sets up the network server and initializes the container
|
|
|
+ * Ozone main class sets up the network servers and initializes the container
|
|
|
* layer.
|
|
|
*/
|
|
|
public class OzoneContainer {
|
|
@@ -64,7 +67,7 @@ public class OzoneContainer {
|
|
|
private final OzoneConfiguration config;
|
|
|
private final VolumeSet volumeSet;
|
|
|
private final ContainerSet containerSet;
|
|
|
- private final XceiverServerSpi[] server;
|
|
|
+ private final Map<ReplicationType, XceiverServerSpi> servers;
|
|
|
|
|
|
/**
|
|
|
* Construct OzoneContainer object.
|
|
@@ -82,14 +85,13 @@ public class OzoneContainer {
|
|
|
buildContainerSet();
|
|
|
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
|
|
|
context);
|
|
|
- server = new XceiverServerSpi[]{
|
|
|
- new XceiverServerGrpc(datanodeDetails, this.config, this
|
|
|
- .hddsDispatcher, createReplicationService()),
|
|
|
- XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
|
|
|
- .config, hddsDispatcher, context)
|
|
|
- };
|
|
|
-
|
|
|
-
|
|
|
+ servers = new HashMap<>();
|
|
|
+ servers.put(ReplicationType.STAND_ALONE,
|
|
|
+ new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher,
|
|
|
+ createReplicationService()));
|
|
|
+ servers.put(ReplicationType.RATIS, XceiverServerRatis
|
|
|
+ .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher,
|
|
|
+ context));
|
|
|
}
|
|
|
|
|
|
private GrpcReplicationService createReplicationService() {
|
|
@@ -133,7 +135,7 @@ public class OzoneContainer {
|
|
|
*/
|
|
|
public void start() throws IOException {
|
|
|
LOG.info("Attempting to start container services.");
|
|
|
- for (XceiverServerSpi serverinstance : server) {
|
|
|
+ for (XceiverServerSpi serverinstance : servers.values()) {
|
|
|
serverinstance.start();
|
|
|
}
|
|
|
hddsDispatcher.init();
|
|
@@ -145,7 +147,7 @@ public class OzoneContainer {
|
|
|
public void stop() {
|
|
|
//TODO: at end of container IO integration work.
|
|
|
LOG.info("Attempting to stop container services.");
|
|
|
- for(XceiverServerSpi serverinstance: server) {
|
|
|
+ for(XceiverServerSpi serverinstance: servers.values()) {
|
|
|
serverinstance.stop();
|
|
|
}
|
|
|
hddsDispatcher.shutdown();
|
|
@@ -169,7 +171,7 @@ public class OzoneContainer {
|
|
|
public PipelineReportsProto getPipelineReport() {
|
|
|
PipelineReportsProto.Builder pipelineReportsProto =
|
|
|
PipelineReportsProto.newBuilder();
|
|
|
- for (XceiverServerSpi serverInstance : server) {
|
|
|
+ for (XceiverServerSpi serverInstance : servers.values()) {
|
|
|
pipelineReportsProto
|
|
|
.addAllPipelineReport(serverInstance.getPipelineReport());
|
|
|
}
|
|
@@ -181,82 +183,38 @@ public class OzoneContainer {
|
|
|
* @param request
|
|
|
* @param replicationType
|
|
|
* @param pipelineID
|
|
|
- * @throws IOException
|
|
|
*/
|
|
|
public void submitContainerRequest(
|
|
|
ContainerProtos.ContainerCommandRequestProto request,
|
|
|
- HddsProtos.ReplicationType replicationType,
|
|
|
- HddsProtos.PipelineID pipelineID) throws IOException {
|
|
|
- XceiverServerSpi serverInstance;
|
|
|
- long containerId = getContainerIdForCmd(request);
|
|
|
- if (replicationType == HddsProtos.ReplicationType.RATIS) {
|
|
|
- serverInstance = getRatisSerer();
|
|
|
- Preconditions.checkNotNull(serverInstance);
|
|
|
- serverInstance.submitRequest(request, pipelineID);
|
|
|
- LOG.info("submitting {} request over RATIS server for container {}",
|
|
|
- request.getCmdType(), containerId);
|
|
|
- } else {
|
|
|
- serverInstance = getStandaAloneSerer();
|
|
|
- Preconditions.checkNotNull(serverInstance);
|
|
|
- getStandaAloneSerer().submitRequest(request, pipelineID);
|
|
|
- LOG.info(
|
|
|
- "submitting {} request over STAND_ALONE server for container {}",
|
|
|
- request.getCmdType(), containerId);
|
|
|
+ ReplicationType replicationType,
|
|
|
+ PipelineID pipelineID) throws IOException {
|
|
|
+ if (containerSet.getContainer(request.getContainerID())
|
|
|
+ .getContainerData().isClosed()) {
|
|
|
+ LOG.debug("Container {} is already closed", request.getContainerID());
|
|
|
+ // It might happen that the where the first attempt of closing the
|
|
|
+ // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
|
|
|
+ // retry to check the container got really closed via Ratis.
|
|
|
+ // In such cases of the retry attempt, if the container is already closed
|
|
|
+ // via Ratis, we should just return.
|
|
|
}
|
|
|
-
|
|
|
+ LOG.info("submitting {} request over {} server for container {}",
|
|
|
+ request.getCmdType(), replicationType, request.getContainerID());
|
|
|
+ Preconditions.checkState(servers.containsKey(replicationType));
|
|
|
+ servers.get(replicationType).submitRequest(request, pipelineID);
|
|
|
}
|
|
|
|
|
|
- private long getContainerIdForCmd(
|
|
|
- ContainerProtos.ContainerCommandRequestProto request)
|
|
|
- throws IllegalArgumentException {
|
|
|
- ContainerProtos.Type type = request.getCmdType();
|
|
|
- switch (type) {
|
|
|
- case CloseContainer:
|
|
|
- return request.getContainerID();
|
|
|
- // Right now, we handle only closeContainer via queuing it over the
|
|
|
- // over the XceiVerServer. For all other commands we throw Illegal
|
|
|
- // argument exception here. Will need to extend the switch cases
|
|
|
- // in case we want add another commands here.
|
|
|
- default:
|
|
|
- throw new IllegalArgumentException("Cmd " + request.getCmdType()
|
|
|
- + " not supported over HearBeat Response");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private XceiverServerSpi getRatisSerer() {
|
|
|
- for (XceiverServerSpi serverInstance : server) {
|
|
|
- if (serverInstance instanceof XceiverServerRatis) {
|
|
|
- return serverInstance;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- private XceiverServerSpi getStandaAloneSerer() {
|
|
|
- for (XceiverServerSpi serverInstance : server) {
|
|
|
- if (!(serverInstance instanceof XceiverServerRatis)) {
|
|
|
- return serverInstance;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- private int getPortbyType(HddsProtos.ReplicationType replicationType) {
|
|
|
- for (XceiverServerSpi serverinstance : server) {
|
|
|
- if (serverinstance.getServerType() == replicationType) {
|
|
|
- return serverinstance.getIPCPort();
|
|
|
- }
|
|
|
- }
|
|
|
- return INVALID_PORT;
|
|
|
+ private int getPortByType(ReplicationType replicationType) {
|
|
|
+ return servers.containsKey(replicationType) ?
|
|
|
+ servers.get(replicationType).getIPCPort() : INVALID_PORT;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the container server IPC port.
|
|
|
+ * Returns the container servers IPC port.
|
|
|
*
|
|
|
- * @return Container server IPC port.
|
|
|
+ * @return Container servers IPC port.
|
|
|
*/
|
|
|
public int getContainerServerPort() {
|
|
|
- return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
|
|
|
+ return getPortByType(ReplicationType.STAND_ALONE);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -265,7 +223,7 @@ public class OzoneContainer {
|
|
|
* @return Ratis port.
|
|
|
*/
|
|
|
public int getRatisContainerServerPort() {
|
|
|
- return getPortbyType(HddsProtos.ReplicationType.RATIS);
|
|
|
+ return getPortByType(ReplicationType.RATIS);
|
|
|
}
|
|
|
|
|
|
/**
|