Explorar o código

HDDS-801. Quasi close the container when close is not executed via Ratis.
Contributed by Nanda kumar.

Nanda kumar %!s(int64=6) %!d(string=hai) anos
pai
achega
c4d9640028
Modificáronse 38 ficheiros con 875 adicións e 368 borrados
  1. 2 2
      hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
  2. 16 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
  3. 5 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
  4. 14 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
  5. 60 8
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
  6. 2 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
  7. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  8. 0 20
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  9. 65 73
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
  10. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
  11. 5 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
  12. 7 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
  13. 17 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  14. 32 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
  15. 81 38
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
  16. 124 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
  17. 47 77
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  18. 7 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
  19. 5 5
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
  20. 3 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
  21. 4 4
      hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
  22. 23 3
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
  23. 14 2
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
  24. 210 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
  25. 5 3
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
  26. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
  27. 2 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
  28. 1 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
  29. 5 3
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
  30. 1 1
      hadoop-ozone/dist/src/main/smoketest/basic/basic.robot
  31. 9 9
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
  32. 13 13
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
  33. 36 44
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
  34. 1 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
  35. 18 4
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
  36. 21 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
  37. 16 3
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
  38. 1 2
      hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java

+ 2 - 2
hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto

@@ -229,8 +229,8 @@ message ContainerDataProto {
   enum State {
     OPEN = 1;
     CLOSING = 2;
-    CLOSED = 3;
-    QUASI_CLOSED = 4;
+    QUASI_CLOSED =3;
+    CLOSED = 4;
     UNHEALTHY = 5;
     INVALID = 6;
   }

+ 16 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java

@@ -237,10 +237,25 @@ public abstract class ContainerData {
    * checks if the container is closed.
    * @return - boolean
    */
-  public synchronized  boolean isClosed() {
+  public synchronized boolean isClosed() {
     return ContainerDataProto.State.CLOSED == state;
   }
 
+  /**
+   * checks if the container is quasi closed.
+   * @return - boolean
+   */
+  public synchronized boolean isQuasiClosed() {
+    return ContainerDataProto.State.QUASI_CLOSED == state;
+  }
+
+  /**
+   * Marks this container as quasi closed.
+   */
+  public synchronized void quasiCloseContainer() {
+    setState(ContainerDataProto.State.QUASI_CLOSED);
+  }
+
   /**
    * Marks this container as closed.
    */

+ 5 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -76,18 +75,14 @@ public class HddsDispatcher implements ContainerDispatcher {
    * XceiverServerHandler.
    */
   public HddsDispatcher(Configuration config, ContainerSet contSet,
-      VolumeSet volumes, StateContext context) {
+      VolumeSet volumes, Map<ContainerType, Handler> handlers,
+      StateContext context, ContainerMetrics metrics) {
     this.conf = config;
     this.containerSet = contSet;
     this.volumeSet = volumes;
     this.context = context;
-    this.handlers = Maps.newHashMap();
-    this.metrics = ContainerMetrics.create(conf);
-    for (ContainerType containerType : ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(
-              containerType, conf, containerSet, volumeSet, metrics));
-    }
+    this.handlers = handlers;
+    this.metrics = metrics;
     this.containerCloseThreshold = conf.getFloat(
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -347,6 +342,7 @@ public class HddsDispatcher implements ContainerDispatcher {
     }
   }
 
+  @VisibleForTesting
   public Container getContainer(long containerID) {
     return containerSet.getContainer(containerID);
   }

+ 14 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java

@@ -82,11 +82,24 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
   ContainerProtos.ContainerDataProto.State getContainerState();
 
   /**
-   * Closes a open container, if it is already closed or does not exist a
+   * Marks the container for closing. Moves the container to CLOSING state.
+   */
+  void markContainerForClose() throws StorageContainerException;
+
+  /**
+   * Quasi Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
    *
    * @throws StorageContainerException
    */
+  void quasiClose() throws StorageContainerException;
+
+  /**
+   * Closes a open/quasi closed container, if it is already closed or does not
+   * exist a StorageContainerException is thrown.
+   *
+   * @throws StorageContainerException
+   */
   void close() throws StorageContainerException;
 
   /**

+ 60 - 8
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java

@@ -29,8 +29,12 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
@@ -47,26 +51,47 @@ public abstract class Handler {
   protected String scmID;
   protected final ContainerMetrics metrics;
 
-  protected Handler(Configuration config, ContainerSet contSet,
-      VolumeSet volumeSet, ContainerMetrics containerMetrics) {
-    conf = config;
-    containerSet = contSet;
+  private final StateContext context;
+
+  protected Handler(Configuration config, StateContext context,
+      ContainerSet contSet, VolumeSet volumeSet,
+      ContainerMetrics containerMetrics) {
+    this.conf = config;
+    this.context = context;
+    this.containerSet = contSet;
     this.volumeSet = volumeSet;
     this.metrics = containerMetrics;
   }
 
-  public static Handler getHandlerForContainerType(ContainerType containerType,
-      Configuration config, ContainerSet contSet, VolumeSet volumeSet,
-                                                   ContainerMetrics metrics) {
+  public static Handler getHandlerForContainerType(
+      final ContainerType containerType, final Configuration config,
+      final StateContext context, final ContainerSet contSet,
+      final VolumeSet volumeSet, final ContainerMetrics metrics) {
     switch (containerType) {
     case KeyValueContainer:
-      return new KeyValueHandler(config, contSet, volumeSet, metrics);
+      return new KeyValueHandler(config, context, contSet, volumeSet, metrics);
     default:
       throw new IllegalArgumentException("Handler for ContainerType: " +
         containerType + "doesn't exist.");
     }
   }
 
+  /**
+   * This should be called whenever there is state change. It will trigger
+   * an ICR to SCM.
+   *
+   * @param container Container for which ICR has to be sent
+   */
+  protected void sendICR(final Container container)
+      throws StorageContainerException {
+    IncrementalContainerReportProto icr = IncrementalContainerReportProto
+        .newBuilder()
+        .addReport(container.getContainerReport())
+        .build();
+    context.addReport(icr);
+    context.getParent().triggerHeartbeat();
+  }
+
   public abstract ContainerCommandResponseProto handle(
       ContainerCommandRequestProto msg, Container container);
 
@@ -80,6 +105,33 @@ public abstract class Handler {
       TarContainerPacker packer)
       throws IOException;
 
+  /**
+   * Marks the container for closing. Moves the container to CLOSING state.
+   *
+   * @param container container to update
+   * @throws IOException in case of exception
+   */
+  public abstract void markContainerForClose(Container container)
+      throws IOException;
+
+  /**
+   * Moves the Container to QUASI_CLOSED state.
+   *
+   * @param container container to be quasi closed
+   * @throws IOException
+   */
+  public abstract void quasiCloseContainer(Container container)
+      throws IOException;
+
+  /**
+   * Moves the Container to CLOSED state.
+   *
+   * @param container container to be closed
+   * @throws IOException
+   */
+  public abstract void closeContainer(Container container)
+      throws IOException;
+
   public void setScmID(String scmId) {
     this.scmID = scmId;
   }

+ 2 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java

@@ -80,6 +80,7 @@ public class ContainerReportPublisher extends
 
   @Override
   protected ContainerReportsProto getReport() throws IOException {
-    return getContext().getParent().getContainer().getContainerReport();
+    return getContext().getParent().getContainer()
+        .getController().getContainerReport();
   }
 }

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -99,7 +99,7 @@ public class DatanodeStateMachine implements Closeable {
 
     ContainerReplicator replicator =
         new DownloadAndImportReplicator(container.getContainerSet(),
-            container.getDispatcher(),
+            container.getController(),
             new SimpleContainerDownloader(conf), new TarContainerPacker());
 
     supervisor =

+ 0 - 20
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -59,8 +59,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-
 /**
  * Current Context of State Machine.
  */
@@ -115,24 +113,6 @@ public class StateContext {
     return parent;
   }
 
-  /**
-   * Get the container server port.
-   * @return The container server port if available, return -1 if otherwise
-   */
-  public int getContainerPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getContainerServerPort();
-  }
-
-  /**
-   * Gets the Ratis Port.
-   * @return int , return -1 if not valid.
-   */
-  public int getRatisPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
-  }
-
   /**
    * Returns true if we are entering a new state.
    *

+ 65 - 73
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java

@@ -16,21 +16,20 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerDataProto.State;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
@@ -38,17 +37,19 @@ import org.apache.ratis.protocol.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
  * Handler for close container command received from SCM.
  */
 public class CloseContainerCommandHandler implements CommandHandler {
-  static final Logger LOG =
+
+  private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
+
   private int invocationCount;
   private long totalTime;
-  private boolean cmdExecuted;
 
   /**
    * Constructs a ContainerReport handler.
@@ -67,79 +68,70 @@ public class CloseContainerCommandHandler implements CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Close Container command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
-    long containerID = -1;
     try {
-      CloseContainerCommandProto closeContainerProto =
+      LOG.debug("Processing Close Container command.");
+      invocationCount++;
+      final long startTime = Time.monotonicNow();
+      final DatanodeDetails datanodeDetails = context.getParent()
+          .getDatanodeDetails();
+      final CloseContainerCommandProto closeCommand =
           CloseContainerCommandProto.parseFrom(command.getProtoBufMessage());
-      containerID = closeContainerProto.getContainerID();
-      // CloseContainer operation is idempotent, if the container is already
-      // closed, then do nothing.
-      // TODO: Non-existent container should be handled properly
-      Container container =
-          ozoneContainer.getContainerSet().getContainer(containerID);
-      if (container == null) {
-        LOG.error("Container {} does not exist in datanode. "
-            + "Container close failed.", containerID);
-        cmdExecuted = false;
-        return;
-      }
-      ContainerData containerData = container.getContainerData();
-      State containerState = container.getContainerData().getState();
-      if (containerState != State.CLOSED) {
-        LOG.debug("Closing container {}.", containerID);
-        // when a closeContainerCommand arrives at a Datanode and if the
-        // container is open, each replica will be moved to closing state first.
-        if (containerState == State.OPEN) {
-          containerData.setState(State.CLOSING);
+      final ContainerController controller = ozoneContainer.getController();
+      final long containerId = closeCommand.getContainerID();
+      try {
+        // TODO: Closing of QUASI_CLOSED container.
+
+        final Container container = controller.getContainer(containerId);
+
+        if (container == null) {
+          LOG.error("Container #{} does not exist in datanode. "
+              + "Container close failed.", containerId);
+          return;
         }
 
-        // if the container is already closed, it will be just ignored.
-        // ICR will get triggered to change the replica state in SCM.
-        HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
-        HddsProtos.ReplicationType replicationType =
-            closeContainerProto.getReplicationType();
-
-        ContainerProtos.ContainerCommandRequestProto.Builder request =
-            ContainerProtos.ContainerCommandRequestProto.newBuilder();
-        request.setCmdType(ContainerProtos.Type.CloseContainer);
-        request.setContainerID(containerID);
-        request.setCloseContainer(
-            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-        request.setTraceID(UUID.randomUUID().toString());
-        request.setDatanodeUuid(
-            context.getParent().getDatanodeDetails().getUuidString());
-        // submit the close container request for the XceiverServer to handle
-        ozoneContainer.submitContainerRequest(request.build(), replicationType,
-            pipelineID);
-        // Since the container is closed, we trigger an ICR
-        IncrementalContainerReportProto icr =
-            IncrementalContainerReportProto.newBuilder().addReport(
-                ozoneContainer.getContainerSet().getContainer(containerID)
-                    .getContainerReport()).build();
-        context.addReport(icr);
-        context.getParent().triggerHeartbeat();
-      }
-    } catch (Exception e) {
-      if (e instanceof NotLeaderException) {
-        // If the particular datanode is not the Ratis leader, the close
-        // container command will not be executed by the follower but will be
-        // executed by Ratis stateMachine transactions via leader to follower.
-        // There can also be case where the datanode is in candidate state.
-        // In these situations, NotLeaderException is thrown.
-        LOG.info("Follower cannot close the container {}.", containerID);
-      } else {
-        LOG.error("Can't close container " + containerID, e);
+        // Move the container to CLOSING state
+        controller.markContainerForClose(containerId);
+
+        // If the container is part of open pipeline, close it via write channel
+        if (ozoneContainer.getWriteChannel()
+            .isExist(closeCommand.getPipelineID())) {
+          ContainerCommandRequestProto request =
+              getContainerCommandRequestProto(datanodeDetails,
+                  closeCommand.getContainerID());
+          ozoneContainer.getWriteChannel().submitRequest(
+              request, closeCommand.getPipelineID());
+          return;
+        }
+
+        // The container is not part of any open pipeline.
+        // QUASI_CLOSE the container using ContainerController.
+        controller.quasiCloseContainer(containerId);
+      } catch (NotLeaderException e) {
+        LOG.debug("Follower cannot close container #{}.", containerId);
+      } catch (IOException e) {
+        LOG.error("Can't close container #{}", containerId, e);
+      } finally {
+        long endTime = Time.monotonicNow();
+        totalTime += endTime - startTime;
       }
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
+    } catch (InvalidProtocolBufferException ex) {
+      LOG.error("Exception while closing container", ex);
     }
   }
 
+  private ContainerCommandRequestProto getContainerCommandRequestProto(
+      final DatanodeDetails datanodeDetails, final long containerId) {
+    final ContainerCommandRequestProto.Builder command =
+        ContainerCommandRequestProto.newBuilder();
+    command.setCmdType(ContainerProtos.Type.CloseContainer);
+    command.setContainerID(containerId);
+    command.setCloseContainer(
+        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    command.setTraceID(UUID.randomUUID().toString());
+    command.setDatanodeUuid(datanodeDetails.getUuidString());
+    return command.build();
+  }
+
   /**
    * Returns the command type that this command handler handles.
    *

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java

@@ -111,7 +111,7 @@ public final class RegisterEndpointTask implements
     try {
 
       ContainerReportsProto containerReport = datanodeContainerManager
-          .getContainerReport();
+          .getController().getContainerReport();
       NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
       PipelineReportsProto pipelineReportsProto =
               datanodeContainerManager.getPipelineReport();

+ 5 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java

@@ -140,6 +140,11 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
     }
   }
 
+  @Override
+  public boolean isExist(HddsProtos.PipelineID pipelineId) {
+    return PipelineID.valueOf(id).getProtobuf().equals(pipelineId);
+  }
+
   @Override
   public List<PipelineReport> getPipelineReport() {
     return Collections.singletonList(

+ 7 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java

@@ -53,6 +53,13 @@ public interface XceiverServerSpi {
       HddsProtos.PipelineID pipelineID)
       throws IOException;
 
+  /**
+   * Returns true if the given pipeline exist.
+   *
+   * @return true if pipeline present, else false
+   */
+  boolean isExist(HddsProtos.PipelineID pipelineId);
+
   /**
    * Get pipeline report for the XceiverServer instance.
    * @return list of report for each pipeline.

+ 17 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -86,7 +86,8 @@ import java.util.concurrent.atomic.AtomicLong;
  * Ozone containers.
  */
 public final class XceiverServerRatis implements XceiverServerSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(XceiverServerRatis.class);
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
   private static long nextCallId() {
@@ -457,6 +458,21 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             + ".Reason : " + action.getClosePipeline().getDetailedReason());
   }
 
+  @Override
+  public boolean isExist(HddsProtos.PipelineID pipelineId) {
+    try {
+      for (RaftGroupId groupId : server.getGroupIds()) {
+        if (PipelineID.valueOf(
+            groupId.getUuid()).getProtobuf().equals(pipelineId)) {
+          return true;
+        }
+      }
+      return false;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
   @Override
   public List<PipelineReport> getPipelineReport() {
     try {

+ 32 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerDataProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerType;
 import org.apache.hadoop.hdds.protocol.proto
@@ -266,30 +268,48 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     }
   }
 
+  @Override
+  public void markContainerForClose() throws StorageContainerException {
+    updateContainerData(() ->
+        containerData.setState(ContainerDataProto.State.CLOSING));
+  }
+
+  @Override
+  public void quasiClose() throws StorageContainerException {
+    updateContainerData(containerData::quasiCloseContainer);
+  }
+
   @Override
   public void close() throws StorageContainerException {
+    updateContainerData(containerData::closeContainer);
+    // It is ok if this operation takes a bit of time.
+    // Close container is not expected to be instantaneous.
+    compactDB();
+  }
 
-    //TODO: writing .container file and compaction can be done
-    // asynchronously, otherwise rpc call for this will take a lot of time to
-    // complete this action
+  private void updateContainerData(Runnable update)
+      throws StorageContainerException {
+    ContainerDataProto.State oldState = null;
     try {
       writeLock();
-
-      containerData.closeContainer();
+      oldState = containerData.getState();
+      update.run();
       File containerFile = getContainerFile();
       // update the new container data to .container File
       updateContainerFile(containerFile);
 
     } catch (StorageContainerException ex) {
-      // Failed to update .container file. Reset the state to CLOSING
-      containerData.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
+      if (oldState != null) {
+        // Failed to update .container file. Reset the state to CLOSING
+        containerData.setState(oldState);
+      }
       throw ex;
     } finally {
       writeUnlock();
     }
+  }
 
-    // It is ok if this operation takes a bit of time.
-    // Close container is not expected to be instantaneous.
+  private void compactDB() throws StorageContainerException {
     try {
       MetadataStore db = BlockUtils.getDB(containerData, config);
       db.compactDB();
@@ -549,6 +569,9 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     case CLOSING:
       state = ContainerReplicaProto.State.CLOSING;
       break;
+    case QUASI_CLOSED:
+      state = ContainerReplicaProto.State.QUASI_CLOSED;
+      break;
     case CLOSED:
       state = ContainerReplicaProto.State.CLOSED;
       break;

+ 81 - 38
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto;
+    .ContainerDataProto.State;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
@@ -109,9 +110,9 @@ public class KeyValueHandler extends Handler {
   private final long maxContainerSize;
   private final AutoCloseableLock handlerLock;
 
-  public KeyValueHandler(Configuration config, ContainerSet contSet,
-      VolumeSet volSet, ContainerMetrics metrics) {
-    super(config, contSet, volSet, metrics);
+  public KeyValueHandler(Configuration config, StateContext context,
+      ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
+    super(config, context, contSet, volSet, metrics);
     containerType = ContainerType.KeyValueContainer;
     blockManager = new BlockManagerImpl(config);
     chunkManager = new ChunkManagerImpl();
@@ -372,20 +373,10 @@ public class KeyValueHandler extends Handler {
           request.getTraceID());
       return ContainerUtils.malformedRequest(request);
     }
-
-    long containerID = kvContainer.getContainerData().getContainerID();
     try {
-      checkContainerOpen(kvContainer);
-      // TODO : The close command should move the container to either quasi
-      // closed/closed depending upon how the closeContainer gets executed.
-      // If it arrives by Standalone, it will be moved to Quasi Closed or
-      // otherwise moved to Closed state if it gets executed via Ratis.
-      kvContainer.close();
+      markContainerForClose(kvContainer);
+      closeContainer(kvContainer);
     } catch (StorageContainerException ex) {
-      if (ex.getResult() == CLOSED_CONTAINER_IO) {
-        LOG.debug("Container {} is already closed.", containerID);
-        return ContainerUtils.getSuccessResponse(request);
-      }
       return ContainerUtils.logAndReturnError(LOG, ex, request);
     } catch (IOException ex) {
       return ContainerUtils.logAndReturnError(LOG,
@@ -745,38 +736,39 @@ public class KeyValueHandler extends Handler {
   private void checkContainerOpen(KeyValueContainer kvContainer)
       throws StorageContainerException {
 
-    ContainerDataProto.State containerState = kvContainer.getContainerState();
+    final State containerState = kvContainer.getContainerState();
 
-    /**
+    /*
      * In a closing state, follower will receive transactions from leader.
      * Once the leader is put to closing state, it will reject further requests
      * from clients. Only the transactions which happened before the container
      * in the leader goes to closing state, will arrive here even the container
      * might already be in closing state here.
      */
-    if (containerState == ContainerDataProto.State.OPEN
-        || containerState == ContainerDataProto.State.CLOSING) {
+    if (containerState == State.OPEN || containerState == State.CLOSING) {
       return;
-    } else {
-      String msg = "Requested operation not allowed as ContainerState is " +
-          containerState;
-      ContainerProtos.Result result = null;
-      switch (containerState) {
-      case CLOSED:
-        result = CLOSED_CONTAINER_IO;
-        break;
-      case UNHEALTHY:
-        result = CONTAINER_UNHEALTHY;
-        break;
-      case INVALID:
-        result = INVALID_CONTAINER_STATE;
-        break;
-      default:
-        result = CONTAINER_INTERNAL_ERROR;
-      }
+    }
 
-      throw new StorageContainerException(msg, result);
+    final ContainerProtos.Result result;
+    switch (containerState) {
+    case QUASI_CLOSED:
+      result = CLOSED_CONTAINER_IO;
+      break;
+    case CLOSED:
+      result = CLOSED_CONTAINER_IO;
+      break;
+    case UNHEALTHY:
+      result = CONTAINER_UNHEALTHY;
+      break;
+    case INVALID:
+      result = INVALID_CONTAINER_STATE;
+      break;
+    default:
+      result = CONTAINER_INTERNAL_ERROR;
     }
+    String msg = "Requested operation not allowed as ContainerState is " +
+        containerState;
+    throw new StorageContainerException(msg, result);
   }
 
   public Container importContainer(long containerID, long maxSize,
@@ -796,4 +788,55 @@ public class KeyValueHandler extends Handler {
     return container;
 
   }
+
+  @Override
+  public void markContainerForClose(Container container)
+      throws IOException {
+    State currentState = container.getContainerState();
+    // Move the container to CLOSING state only if it's OPEN
+    if (currentState == State.OPEN) {
+      container.markContainerForClose();
+      sendICR(container);
+    }
+  }
+
+  @Override
+  public void quasiCloseContainer(Container container)
+      throws IOException {
+    final State state = container.getContainerState();
+    // Quasi close call is idempotent.
+    if (state == State.QUASI_CLOSED) {
+      return;
+    }
+    // The container has to be in CLOSING state.
+    if (state != State.CLOSING) {
+      ContainerProtos.Result error = state == State.INVALID ?
+          INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
+      throw new StorageContainerException("Cannot quasi close container #" +
+          container.getContainerData().getContainerID() + " while in " +
+          state + " state.", error);
+    }
+    container.quasiClose();
+    sendICR(container);
+  }
+
+  @Override
+  public void closeContainer(Container container)
+      throws IOException {
+    final State state = container.getContainerState();
+    // Close call is idempotent.
+    if (state == State.CLOSED) {
+      return;
+    }
+    // The container has to be either in CLOSING or in QUASI_CLOSED state.
+    if (state != State.CLOSING && state != State.QUASI_CLOSED) {
+      ContainerProtos.Result error = state == State.INVALID ?
+          INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
+      throw new StorageContainerException("Cannot close container #" +
+          container.getContainerData().getContainerID() + " while in " +
+          state + " state.", error);
+    }
+    container.close();
+    sendICR(container);
+  }
 }

+ 124 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java

@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerDataProto.State;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Control plane for container management in datanode.
+ */
+public class ContainerController {
+
+  private final ContainerSet containerSet;
+  private final Map<ContainerType, Handler> handlers;
+
+  public ContainerController(final ContainerSet containerSet,
+      final Map<ContainerType, Handler> handlers) {
+    this.containerSet = containerSet;
+    this.handlers = handlers;
+  }
+
+  /**
+   * Returns the Container given a container id.
+   *
+   * @param containerId ID of the container
+   * @return Container
+   */
+  public Container getContainer(final long containerId) {
+    return containerSet.getContainer(containerId);
+  }
+
+  /**
+   * Marks the container for closing. Moves the container to CLOSING state.
+   *
+   * @param containerId Id of the container to update
+   * @throws IOException in case of exception
+   */
+  public void markContainerForClose(final long containerId)
+      throws IOException {
+    Container container = containerSet.getContainer(containerId);
+
+    if (container.getContainerState() == State.OPEN) {
+      getHandler(container).markContainerForClose(container);
+    }
+  }
+
+  /**
+   * Returns the container report.
+   *
+   * @return ContainerReportsProto
+   * @throws IOException in case of exception
+   */
+  public ContainerReportsProto getContainerReport()
+      throws IOException {
+    return containerSet.getContainerReport();
+  }
+
+  /**
+   * Quasi closes a container given its id.
+   *
+   * @param containerId Id of the container to quasi close
+   * @throws IOException in case of exception
+   */
+  public void quasiCloseContainer(final long containerId) throws IOException {
+    final Container container = containerSet.getContainer(containerId);
+    getHandler(container).quasiCloseContainer(container);
+  }
+
+  /**
+   * Closes a container given its id.
+   *
+   * @param containerId Id of the container to close
+   * @throws IOException in case of exception
+   */
+  public void closeContainer(final long containerId) throws IOException {
+    final Container container = containerSet.getContainer(containerId);
+    getHandler(container).closeContainer(container);
+  }
+
+  public Container importContainer(final ContainerType type,
+      final long containerId, final long maxSize,
+      final FileInputStream rawContainerStream, final TarContainerPacker packer)
+      throws IOException {
+    return handlers.get(type).importContainer(
+        containerId, maxSize, rawContainerStream, packer);
+  }
+
+  /**
+   * Given a container, returns its handler instance.
+   *
+   * @param container Container
+   * @return handler of the container
+   */
+  private Handler getHandler(final Container container) {
+    return handlers.get(container.getContainerType());
+  }
+}

+ 47 - 77
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -19,18 +19,20 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -47,27 +49,26 @@ import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-
 /**
  * Ozone main class sets up the network servers and initializes the container
  * layer.
  */
 public class OzoneContainer {
 
-  public static final Logger LOG = LoggerFactory.getLogger(
+  private static final Logger LOG = LoggerFactory.getLogger(
       OzoneContainer.class);
 
   private final HddsDispatcher hddsDispatcher;
-  private final DatanodeDetails dnDetails;
+  private final Map<ContainerType, Handler> handlers;
   private final OzoneConfiguration config;
   private final VolumeSet volumeSet;
   private final ContainerSet containerSet;
-  private final Map<ReplicationType, XceiverServerSpi> servers;
+  private final XceiverServerSpi writeChannel;
+  private final XceiverServerSpi readChannel;
+  private final ContainerController controller;
 
   /**
    * Construct OzoneContainer object.
@@ -78,31 +79,42 @@ public class OzoneContainer {
    */
   public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
       conf, StateContext context) throws IOException {
-    this.dnDetails = datanodeDetails;
     this.config = conf;
     this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
     this.containerSet = new ContainerSet();
     buildContainerSet();
-    hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
-        context);
-    servers = new HashMap<>();
-    servers.put(ReplicationType.STAND_ALONE,
-        new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher,
-            createReplicationService()));
-    servers.put(ReplicationType.RATIS, XceiverServerRatis
-        .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher,
-            context));
+    final ContainerMetrics metrics = ContainerMetrics.create(conf);
+    this.handlers = Maps.newHashMap();
+    for (ContainerType containerType : ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, context, containerSet, volumeSet, metrics));
+    }
+    this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
+        handlers, context, metrics);
+
+    /*
+     * ContainerController is the control plane
+     * XceiverServerRatis is the write channel
+     * XceiverServerGrpc is the read channel
+     */
+    this.controller = new ContainerController(containerSet, handlers);
+    this.writeChannel = XceiverServerRatis.newXceiverServerRatis(
+        datanodeDetails, config, hddsDispatcher, context);
+    this.readChannel = new XceiverServerGrpc(
+        datanodeDetails, config, hddsDispatcher, createReplicationService());
+
   }
 
   private GrpcReplicationService createReplicationService() {
     return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(containerSet));
+        new OnDemandContainerReplicationSource(controller));
   }
 
   /**
    * Build's container map.
    */
-  public void buildContainerSet() {
+  private void buildContainerSet() {
     Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
         .iterator();
     ArrayList<Thread> volumeThreads = new ArrayList<Thread>();
@@ -111,7 +123,6 @@ public class OzoneContainer {
     // And also handle disk failure tolerance need to be added
     while (volumeSetIterator.hasNext()) {
       HddsVolume volume = volumeSetIterator.next();
-      File hddsVolumeRootDir = volume.getHddsRootDir();
       Thread thread = new Thread(new ContainerReader(volumeSet, volume,
           containerSet, config));
       thread.start();
@@ -135,9 +146,8 @@ public class OzoneContainer {
    */
   public void start() throws IOException {
     LOG.info("Attempting to start container services.");
-    for (XceiverServerSpi serverinstance : servers.values()) {
-      serverinstance.start();
-    }
+    writeChannel.start();
+    readChannel.start();
     hddsDispatcher.init();
   }
 
@@ -147,9 +157,8 @@ public class OzoneContainer {
   public void stop() {
     //TODO: at end of container IO integration work.
     LOG.info("Attempting to stop container services.");
-    for(XceiverServerSpi serverinstance: servers.values()) {
-      serverinstance.stop();
-    }
+    writeChannel.stop();
+    readChannel.stop();
     hddsDispatcher.shutdown();
   }
 
@@ -163,58 +172,24 @@ public class OzoneContainer {
    * @return - container report.
    * @throws IOException
    */
-  public StorageContainerDatanodeProtocolProtos.ContainerReportsProto
-      getContainerReport() throws IOException {
-    return this.containerSet.getContainerReport();
-  }
 
   public PipelineReportsProto getPipelineReport() {
     PipelineReportsProto.Builder pipelineReportsProto =
-            PipelineReportsProto.newBuilder();
-    for (XceiverServerSpi serverInstance : servers.values()) {
-      pipelineReportsProto
-              .addAllPipelineReport(serverInstance.getPipelineReport());
-    }
+        PipelineReportsProto.newBuilder();
+    pipelineReportsProto.addAllPipelineReport(writeChannel.getPipelineReport());
     return pipelineReportsProto.build();
   }
 
-  /**
-   * Submit ContainerRequest.
-   * @param request
-   * @param replicationType
-   * @param pipelineID
-   */
-  public void submitContainerRequest(
-      ContainerProtos.ContainerCommandRequestProto request,
-      ReplicationType replicationType,
-      PipelineID pipelineID) throws IOException {
-    LOG.info("submitting {} request over {} server for container {}",
-        request.getCmdType(), replicationType, request.getContainerID());
-    Preconditions.checkState(servers.containsKey(replicationType));
-    servers.get(replicationType).submitRequest(request, pipelineID);
-  }
-
-  private int getPortByType(ReplicationType replicationType) {
-    return servers.containsKey(replicationType) ?
-        servers.get(replicationType).getIPCPort() : INVALID_PORT;
+  public XceiverServerSpi getWriteChannel() {
+    return writeChannel;
   }
 
-  /**
-   * Returns the container servers IPC port.
-   *
-   * @return Container servers IPC port.
-   */
-  public int getContainerServerPort() {
-    return getPortByType(ReplicationType.STAND_ALONE);
+  public XceiverServerSpi getReadChannel() {
+    return readChannel;
   }
 
-  /**
-   * Returns the Ratis container Server IPC port.
-   *
-   * @return Ratis port.
-   */
-  public int getRatisContainerServerPort() {
-    return getPortByType(ReplicationType.RATIS);
+  public ContainerController getController() {
+    return controller;
   }
 
   /**
@@ -230,11 +205,6 @@ public class OzoneContainer {
     return this.hddsDispatcher;
   }
 
-  @VisibleForTesting
-  public XceiverServerSpi getServer(ReplicationType replicationType) {
-    return servers.get(replicationType);
-  }
-
   public VolumeSet getVolumeSet() {
     return volumeSet;
   }

+ 7 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java

@@ -28,9 +28,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
 import org.slf4j.Logger;
@@ -49,7 +48,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
 
   private final ContainerSet containerSet;
 
-  private final ContainerDispatcher containerDispatcher;
+  private final ContainerController controller;
 
   private final ContainerDownloader downloader;
 
@@ -57,11 +56,11 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
 
   public DownloadAndImportReplicator(
       ContainerSet containerSet,
-      ContainerDispatcher containerDispatcher,
+      ContainerController controller,
       ContainerDownloader downloader,
       TarContainerPacker packer) {
     this.containerSet = containerSet;
-    this.containerDispatcher = containerDispatcher;
+    this.controller = controller;
     this.downloader = downloader;
     this.packer = packer;
   }
@@ -80,10 +79,9 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
       try (FileInputStream tempContainerTarStream = new FileInputStream(
           tarFilePath.toFile())) {
 
-        Handler handler = containerDispatcher.getHandler(
-            originalContainerData.getContainerType());
-
-        Container container = handler.importContainer(containerID,
+        Container container = controller.importContainer(
+            originalContainerData.getContainerType(),
+            containerID,
             originalContainerData.getMaxSize(),
             tempContainerTarStream,
             packer);

+ 5 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java

@@ -20,12 +20,12 @@ package org.apache.hadoop.ozone.container.replication;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,13 +39,13 @@ public class OnDemandContainerReplicationSource
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerReplicationSource.class);
 
-  private ContainerSet containerSet;
+  private ContainerController controller;
 
   private ContainerPacker packer = new TarContainerPacker();
 
   public OnDemandContainerReplicationSource(
-      ContainerSet containerSet) {
-    this.containerSet = containerSet;
+      ContainerController controller) {
+    this.controller = controller;
   }
 
   @Override
@@ -57,7 +57,7 @@ public class OnDemandContainerReplicationSource
   public void copyData(long containerId, OutputStream destination)
       throws IOException {
 
-    Container container = containerSet.getContainer(containerId);
+    Container container = controller.getContainer(containerId);
 
     Preconditions
         .checkNotNull(container, "Container is not found " + containerId);

+ 3 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java

@@ -17,7 +17,6 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -30,14 +29,11 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 public class CloseContainerCommand
     extends SCMCommand<CloseContainerCommandProto> {
 
-  private HddsProtos.ReplicationType replicationType;
-  private PipelineID pipelineID;
+  private final PipelineID pipelineID;
 
-  public CloseContainerCommand(long containerID,
-      HddsProtos.ReplicationType replicationType,
-      PipelineID pipelineID) {
+  public CloseContainerCommand(final long containerID,
+      final PipelineID pipelineID) {
     super(containerID);
-    this.replicationType = replicationType;
     this.pipelineID = pipelineID;
   }
 
@@ -65,7 +61,6 @@ public class CloseContainerCommand
     return CloseContainerCommandProto.newBuilder()
         .setContainerID(getId())
         .setCmdId(getId())
-        .setReplicationType(replicationType)
         .setPipelineID(pipelineID.getProtobuf())
         .build();
   }
@@ -74,7 +69,6 @@ public class CloseContainerCommand
       CloseContainerCommandProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
     return new CloseContainerCommand(closeContainerProto.getCmdId(),
-        closeContainerProto.getReplicationType(),
         PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()));
   }
 

+ 4 - 4
hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -140,8 +140,8 @@ message ContainerReplicaProto {
   enum State {
     OPEN = 1;
     CLOSING = 2;
-    CLOSED = 3;
-    QUASI_CLOSED = 4;
+    QUASI_CLOSED = 3;
+    CLOSED = 4;
     UNHEALTHY = 5;
     INVALID = 6;
   }
@@ -289,9 +289,9 @@ This command asks the datanode to close a specific container.
 */
 message CloseContainerCommandProto {
   required int64 containerID = 1;
-  required hadoop.hdds.ReplicationType replicationType = 2;
+  required PipelineID pipelineID = 2;
+  // cmdId will be removed
   required int64 cmdId = 3;
-  required PipelineID pipelineID = 4;
 }
 
 /**

+ 23 - 3
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.StorageUnit;
@@ -25,6 +26,8 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerType;
 import org.apache.hadoop.hdds.protocol.datanode.proto
     .ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -33,7 +36,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .WriteChunkRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -47,6 +52,7 @@ import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -77,8 +83,15 @@ public class TestHddsDispatcher {
       container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
           scmId.toString());
       containerSet.addContainer(container);
+      ContainerMetrics metrics = ContainerMetrics.create(conf);
+      Map<ContainerType, Handler> handlers = Maps.newHashMap();
+      for (ContainerType containerType : ContainerType.values()) {
+        handlers.put(containerType,
+            Handler.getHandlerForContainerType(
+                containerType, conf, null, containerSet, volumeSet, metrics));
+      }
       HddsDispatcher hddsDispatcher = new HddsDispatcher(
-          conf, containerSet, volumeSet, context);
+          conf, containerSet, volumeSet, handlers, context, metrics);
       hddsDispatcher.setScmId(scmId.toString());
       ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch(
           getWriteChunkRequest(dd.getUuidString(), 1L, 1L));
@@ -113,8 +126,15 @@ public class TestHddsDispatcher {
       ContainerSet containerSet = new ContainerSet();
       VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
       StateContext context = Mockito.mock(StateContext.class);
-      HddsDispatcher hddsDispatcher =
-          new HddsDispatcher(conf, containerSet, volumeSet, context);
+      ContainerMetrics metrics = ContainerMetrics.create(conf);
+      Map<ContainerType, Handler> handlers = Maps.newHashMap();
+      for (ContainerType containerType : ContainerType.values()) {
+        handlers.put(containerType,
+            Handler.getHandlerForContainerType(
+                containerType, conf, null, containerSet, volumeSet, metrics));
+      }
+      HddsDispatcher hddsDispatcher = new HddsDispatcher(
+          conf, containerSet, volumeSet, handlers, context, metrics);
       hddsDispatcher.setScmId(scmId.toString());
       ContainerCommandRequestProto writeChunkRequest =
           getWriteChunkRequest(dd.getUuidString(), 1L, 1L);

+ 14 - 2
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java

@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.ozone.container.common.interfaces;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -32,6 +34,8 @@ import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
+import java.util.Map;
+
 /**
  * Tests Handler interface.
  */
@@ -50,8 +54,16 @@ public class TestHandler {
     this.conf = new Configuration();
     this.containerSet = Mockito.mock(ContainerSet.class);
     this.volumeSet = Mockito.mock(VolumeSet.class);
-
-    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null);
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+    for (ContainerProtos.ContainerType containerType :
+        ContainerProtos.ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, null, containerSet, volumeSet, metrics));
+    }
+    this.dispatcher = new HddsDispatcher(
+        conf, containerSet, volumeSet, handlers, null, metrics);
   }
 
   @Test

+ 210 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java

@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Test cases to verify CloseContainerCommandHandler in datanode.
+ */
+public class TestCloseContainerCommandHandler {
+
+  private static final StateContext CONTEXT = Mockito.mock(StateContext.class);
+  private static File testDir;
+
+
+  private OzoneContainer getOzoneContainer(final OzoneConfiguration conf,
+      final DatanodeDetails datanodeDetails) throws IOException {
+    testDir = GenericTestUtils.getTestDir(
+        TestCloseContainerCommandHandler.class.getName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testDir.getPath());
+
+    return new OzoneContainer(datanodeDetails, conf, CONTEXT);
+  }
+
+
+  @Test
+  public void testCloseContainerViaRatis()
+      throws IOException, InterruptedException {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+    final OzoneContainer container = getOzoneContainer(conf, datanodeDetails);
+    container.getDispatcher().setScmId(UUID.randomUUID().toString());
+    container.start();
+    // Give some time for ratis for leader election.
+    final PipelineID pipelineID = PipelineID.randomId();
+    final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
+    final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
+    final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
+        Collections.singleton(datanodeDetails));
+    final RaftClient client = RatisHelper.newRaftClient(
+        SupportedRpcType.GRPC, peer, retryPolicy);
+    System.out.println(client.groupAdd(group, peer.getId()).isSuccess());
+    Thread.sleep(2000);
+    final ContainerID containerId = ContainerID.valueof(1);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CreateContainer);
+    request.setContainerID(containerId.getId());
+    request.setCreateContainer(
+        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(datanodeDetails.getUuidString());
+    container.getWriteChannel().submitRequest(
+        request.build(), pipelineID.getProtobuf());
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    // We have created a container via ratis. Now close the container on ratis.
+    final CloseContainerCommandHandler closeHandler =
+        new CloseContainerCommandHandler();
+    final CloseContainerCommand command = new CloseContainerCommand(
+        containerId.getId(), pipelineID);
+    final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
+        DatanodeStateMachine.class);
+
+    Mockito.when(datanodeStateMachine.getDatanodeDetails())
+        .thenReturn(datanodeDetails);
+    Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine);
+
+    closeHandler.handle(command, container, CONTEXT, null);
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat();
+    container.stop();
+  }
+
+  @Test
+  public void testCloseContainerViaStandalone()
+      throws IOException, InterruptedException {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+    final OzoneContainer container = getOzoneContainer(conf, datanodeDetails);
+    container.getDispatcher().setScmId(UUID.randomUUID().toString());
+    container.start();
+    // Give some time for ratis for leader election.
+    final PipelineID pipelineID = PipelineID.randomId();
+    final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
+    final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
+    final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
+        Collections.singleton(datanodeDetails));
+    final RaftClient client = RatisHelper.newRaftClient(
+        SupportedRpcType.GRPC, peer, retryPolicy);
+    System.out.println(client.groupAdd(group, peer.getId()).isSuccess());
+    Thread.sleep(2000);
+    final ContainerID containerId = ContainerID.valueof(2);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CreateContainer);
+    request.setContainerID(containerId.getId());
+    request.setCreateContainer(
+        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(datanodeDetails.getUuidString());
+    container.getWriteChannel().submitRequest(
+        request.build(), pipelineID.getProtobuf());
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    // We have created a container via ratis. Now quasi close it
+    final CloseContainerCommandHandler closeHandler =
+        new CloseContainerCommandHandler();
+    // Specify a pipeline which doesn't exist in the datanode.
+    final CloseContainerCommand command = new CloseContainerCommand(
+        containerId.getId(), PipelineID.randomId());
+    final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
+        DatanodeStateMachine.class);
+
+    Mockito.when(datanodeStateMachine.getDatanodeDetails())
+        .thenReturn(datanodeDetails);
+    Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine);
+
+    closeHandler.handle(command, container, CONTEXT, null);
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat();
+    container.stop();
+  }
+
+  /**
+   * Creates a random DatanodeDetails.
+   * @return DatanodeDetails
+   */
+  private static DatanodeDetails randomDatanodeDetails() {
+    String ipAddress = "127.0.0.1";
+    DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.STANDALONE, 0);
+    DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.RATIS, 0);
+    DatanodeDetails.Port restPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.REST, 0);
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    builder.setUuid(UUID.randomUUID().toString())
+        .setHostName("localhost")
+        .setIpAddress(ipAddress)
+        .addPort(containerPort)
+        .addPort(ratisPort)
+        .addPort(restPort);
+    return builder.build();
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    FileUtils.deleteDirectory(testDir);
+  }
+}

+ 5 - 3
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java

@@ -54,6 +54,7 @@ import static org.mockito.Mockito.times;
 
 
 import java.io.File;
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -224,7 +225,7 @@ public class TestKeyValueHandler {
       interval[0] = 2;
       ContainerMetrics metrics = new ContainerMetrics(interval);
       VolumeSet volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf);
-      KeyValueHandler keyValueHandler = new KeyValueHandler(conf, cset,
+      KeyValueHandler keyValueHandler = new KeyValueHandler(conf, null, cset,
           volumeSet, metrics);
       assertEquals("org.apache.hadoop.ozone.container.common" +
           ".volume.RoundRobinVolumeChoosingPolicy",
@@ -235,7 +236,7 @@ public class TestKeyValueHandler {
       conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
           "org.apache.hadoop.ozone.container.common.impl.HddsDispatcher");
       try {
-        new KeyValueHandler(conf, cset, volumeSet, metrics);
+        new KeyValueHandler(conf, null, cset, volumeSet, metrics);
       } catch (RuntimeException ex) {
         GenericTestUtils.assertExceptionContains("class org.apache.hadoop" +
             ".ozone.container.common.impl.HddsDispatcher not org.apache" +
@@ -261,7 +262,7 @@ public class TestKeyValueHandler {
 
 
   @Test
-  public void testCloseInvalidContainer() {
+  public void testCloseInvalidContainer() throws IOException {
     long containerID = 1234L;
     Configuration conf = new Configuration();
     KeyValueContainerData kvData = new KeyValueContainerData(containerID,
@@ -282,6 +283,7 @@ public class TestKeyValueHandler {
 
     Mockito.when(handler.handleCloseContainer(any(), any()))
         .thenCallRealMethod();
+    doCallRealMethod().when(handler).closeContainer(any());
     // Closing invalid container should return error response.
     ContainerProtos.ContainerCommandResponseProto response =
         handler.handleCloseContainer(closeContainerRequest, container);

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java

@@ -172,7 +172,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
           LOG.warn("Unable to allocate container.");
         }
       } catch (IOException ex) {
-        LOG.warn("Unable to allocate container: {}", ex);
+        LOG.warn("Unable to allocate container.", ex);
       }
     }
   }

+ 2 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java

@@ -74,8 +74,8 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
       if (container.getState() == LifeCycleState.CLOSING) {
 
         final CloseContainerCommand closeContainerCommand =
-            new CloseContainerCommand(containerID.getId(),
-                container.getReplicationType(), container.getPipelineID());
+            new CloseContainerCommand(
+                containerID.getId(), container.getPipelineID());
 
         getNodes(container).forEach(node -> publisher.fireEvent(
             DATANODE_COMMAND,

+ 1 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -960,7 +959,7 @@ public class TestNodeManager {
                   TestUtils.getRandomPipelineReports());
       eq.fireEvent(DATANODE_COMMAND,
           new CommandForDatanode<>(datanodeDetails.getUuid(),
-              new CloseContainerCommand(1L, ReplicationType.STAND_ALONE,
+              new CloseContainerCommand(1L,
                   PipelineID.randomId())));
 
       eq.processAll(1000L);

+ 5 - 3
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -21,7 +21,6 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
@@ -66,6 +65,7 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
 import org.apache.hadoop.ozone.container.common.states.endpoint
     .VersionEndpointTask;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -75,6 +75,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.mockito.Mockito.mock;
@@ -309,8 +310,10 @@ public class TestEndPoint {
     OzoneContainer ozoneContainer = mock(OzoneContainer.class);
     when(ozoneContainer.getNodeReport()).thenReturn(TestUtils
         .createNodeReport(getStorageReports(UUID.randomUUID())));
-    when(ozoneContainer.getContainerReport()).thenReturn(
+    ContainerController controller = Mockito.mock(ContainerController.class);
+    when(controller.getContainerReport()).thenReturn(
         TestUtils.getRandomContainerReports(10));
+    when(ozoneContainer.getController()).thenReturn(controller);
     when(ozoneContainer.getPipelineReport()).thenReturn(
             TestUtils.getRandomPipelineReports());
     RegisterEndpointTask endpointTask =
@@ -433,7 +436,6 @@ public class TestEndPoint {
         .setCloseContainerCommandProto(
             CloseContainerCommandProto.newBuilder().setCmdId(1)
         .setContainerID(1)
-        .setReplicationType(ReplicationType.RATIS)
         .setPipelineID(PipelineID.randomId().getProtobuf())
         .build())
         .setCommandType(Type.closeContainerCommand)

+ 1 - 1
hadoop-ozone/dist/src/main/smoketest/basic/basic.robot

@@ -42,6 +42,6 @@ Check webui static resources
                        Should contain         ${result}    200
 
 Start freon testing
-    ${result} =        Execute              ozone freon randomkeys --numOfVolumes 5 --numOfBuckets 5 --numOfKeys 5 --numOfThreads 10
+    ${result} =        Execute              ozone freon randomkeys --numOfVolumes 5 --numOfBuckets 5 --numOfKeys 5 --numOfThreads 1
                        Wait Until Keyword Succeeds      3min       10sec     Should contain   ${result}   Number of Keys added: 125
                        Should Not Contain               ${result}  ERROR

+ 9 - 9
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java

@@ -176,14 +176,14 @@ public class TestMiniOzoneCluster {
             TestUtils.randomDatanodeDetails(), ozoneConf)
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
-      assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
-      assertTrue(ports.add(sm2.getContainer().getContainerServerPort()));
-      assertTrue(ports.add(sm3.getContainer().getContainerServerPort()));
+      assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
+      assertTrue(ports.add(sm2.getContainer().getReadChannel().getIPCPort()));
+      assertTrue(ports.add(sm3.getContainer().getReadChannel().getIPCPort()));
 
       // Assert that ratis is also on a different port.
-      assertTrue(ports.add(sm1.getContainer().getRatisContainerServerPort()));
-      assertTrue(ports.add(sm2.getContainer().getRatisContainerServerPort()));
-      assertTrue(ports.add(sm3.getContainer().getRatisContainerServerPort()));
+      assertTrue(ports.add(sm1.getContainer().getWriteChannel().getIPCPort()));
+      assertTrue(ports.add(sm2.getContainer().getWriteChannel().getIPCPort()));
+      assertTrue(ports.add(sm3.getContainer().getWriteChannel().getIPCPort()));
 
 
     }
@@ -199,9 +199,9 @@ public class TestMiniOzoneCluster {
             TestUtils.randomDatanodeDetails(), ozoneConf)
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
-      assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
-      assertFalse(ports.add(sm2.getContainer().getContainerServerPort()));
-      assertFalse(ports.add(sm3.getContainer().getContainerServerPort()));
+      assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
+      assertFalse(ports.add(sm2.getContainer().getReadChannel().getIPCPort()));
+      assertFalse(ports.add(sm3.getContainer().getReadChannel().getIPCPort()));
       assertEquals(ports.iterator().next().intValue(),
           conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
               OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT));

+ 13 - 13
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -134,7 +134,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
     key.write(data);
     key.flush();
     key.close();
@@ -162,11 +162,12 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setBucketName(bucketName)
+        .setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
     key.close();
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
@@ -199,7 +200,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
     // write 1 more block worth of data. It will fail and new block will be
     // allocated
     key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
@@ -249,7 +250,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
 
     key.close();
     // read the key from OM again and match the length.The length will still
@@ -291,7 +292,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
     // write 3 more chunks worth of data. It will fail and new block will be
     // allocated. This write completes 4 blocks worth of data written to key
     data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
@@ -321,7 +322,7 @@ public class TestCloseContainerHandlingByClient {
   }
 
   private void waitForContainerClose(String keyName,
-      OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
+      OzoneOutputStream outputStream)
       throws Exception {
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) outputStream.getOutputStream();
@@ -332,11 +333,10 @@ public class TestCloseContainerHandlingByClient {
       containerIdList.add(info.getContainerID());
     }
     Assert.assertTrue(!containerIdList.isEmpty());
-    waitForContainerClose(type, containerIdList.toArray(new Long[0]));
+    waitForContainerClose(containerIdList.toArray(new Long[0]));
   }
 
-  private void waitForContainerClose(HddsProtos.ReplicationType type,
-      Long... containerIdList)
+  private void waitForContainerClose(Long... containerIdList)
       throws ContainerNotFoundException, PipelineNotFoundException,
       TimeoutException, InterruptedException {
     List<Pipeline> pipelineList = new ArrayList<>();
@@ -358,7 +358,7 @@ public class TestCloseContainerHandlingByClient {
         // send the order to close the container
         cluster.getStorageContainerManager().getScmNodeManager()
             .addDatanodeCommand(details.getUuid(),
-                new CloseContainerCommand(containerID, type, pipeline.getId()));
+                new CloseContainerCommand(containerID, pipeline.getId()));
       }
     }
     int index = 0;
@@ -413,7 +413,7 @@ public class TestCloseContainerHandlingByClient {
             .getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
     dataString =
         ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     data = dataString.getBytes(UTF_8);
@@ -459,7 +459,7 @@ public class TestCloseContainerHandlingByClient {
         .build();
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key);
     // Again Write the Data. This will throw an exception which will be handled
     // and new blocks will be allocated
     key.write(data);

+ 36 - 44
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java

@@ -22,9 +22,11 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -32,7 +34,6 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -63,8 +64,9 @@ public class TestCloseContainerByPipeline {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, "1");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3).build();
+        .setNumDatanodes(9).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
@@ -86,7 +88,7 @@ public class TestCloseContainerByPipeline {
   @Test
   public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
     OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
-        .createKey("testCloseContainer", 1024, ReplicationType.STAND_ALONE,
+        .createKey("standalone", 1024, ReplicationType.RATIS,
             ReplicationFactor.ONE);
     key.write("standalone".getBytes());
     key.close();
@@ -94,10 +96,9 @@ public class TestCloseContainerByPipeline {
     //get the name of a valid container
     OmKeyArgs keyArgs =
         new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
-            .setType(HddsProtos.ReplicationType.STAND_ALONE)
+            .setType(HddsProtos.ReplicationType.RATIS)
             .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
-            .setKeyName("testCloseContainer").build();
-
+            .setKeyName("standalone").build();
     OmKeyLocationInfo omKeyLocationInfo =
         cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
             .get(0).getBlocksLatestVersionOnly().get(0);
@@ -127,8 +128,7 @@ public class TestCloseContainerByPipeline {
     //send the order to close the container
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(),
-            new CloseContainerCommand(containerID,
-                HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
+            new CloseContainerCommand(containerID, pipeline.getId()));
     GenericTestUtils
         .waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
             500, 5 * 1000);
@@ -142,7 +142,7 @@ public class TestCloseContainerByPipeline {
       throws IOException, TimeoutException, InterruptedException {
 
     OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
-        .createKey("standalone", 1024, ReplicationType.STAND_ALONE,
+        .createKey("standalone", 1024, ReplicationType.RATIS,
             ReplicationFactor.ONE);
     key.write("standalone".getBytes());
     key.close();
@@ -150,7 +150,7 @@ public class TestCloseContainerByPipeline {
     //get the name of a valid container
     OmKeyArgs keyArgs =
         new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
-            .setType(HddsProtos.ReplicationType.STAND_ALONE)
+            .setType(HddsProtos.ReplicationType.RATIS)
             .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
             .setKeyName("standalone").build();
 
@@ -170,30 +170,20 @@ public class TestCloseContainerByPipeline {
     Assert
         .assertFalse(isContainerClosed(cluster, containerID, datanodeDetails));
 
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
-    //send the order to close the container
+    // Send the order to close the container, give random pipeline id so that
+    // the container will not be closed via RATIS
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(),
-            new CloseContainerCommand(containerID,
-                HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
-
-    // The log will appear after the state changed to closed in standalone,
-    // wait for the log to ensure the operation has been done.
-    GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
-        "submitting CloseContainer request over STAND_ALONE server for"
-            + " container " + containerID), 500, 5 * 1000);
+            new CloseContainerCommand(containerID, PipelineID.randomId()));
 
     //double check if it's really closed (waitFor also throws an exception)
-    Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "submitting CloseContainer request over STAND_ALONE server for"
-            + " container " + containerID));
-    // Make sure it was really closed via StandAlone not Ratis server
-    Assert.assertFalse((logCapturer.getOutput().contains(
-        "submitting CloseContainer request over RATIS server for container "
-            + containerID)));
-    logCapturer.stopCapturing();
+    // TODO: change the below line after implementing QUASI_CLOSED to CLOSED
+    // logic. The container will be QUASI closed as of now
+    GenericTestUtils
+        .waitFor(() -> isContainerQuasiClosed(
+            cluster, containerID, datanodeDetails), 500, 5 * 1000);
+    Assert.assertTrue(
+        isContainerQuasiClosed(cluster, containerID, datanodeDetails));
   }
 
   @Test
@@ -224,18 +214,14 @@ public class TestCloseContainerByPipeline {
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(3, datanodes.size());
 
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
-
     for (DatanodeDetails details : datanodes) {
       Assert.assertFalse(isContainerClosed(cluster, containerID, details));
       //send the order to close the container
       cluster.getStorageContainerManager().getScmNodeManager()
           .addDatanodeCommand(details.getUuid(),
-              new CloseContainerCommand(containerID,
-                  HddsProtos.ReplicationType.RATIS, pipeline.getId()));
+              new CloseContainerCommand(containerID, pipeline.getId()));
     }
-
+    // Make sure that it is CLOSED
     for (DatanodeDetails datanodeDetails : datanodes) {
       GenericTestUtils.waitFor(
           () -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
@@ -244,14 +230,6 @@ public class TestCloseContainerByPipeline {
       Assert.assertTrue(isContainerClosed(cluster,
           containerID, datanodeDetails));
     }
-    // Make sure it was really closed via Ratis not STAND_ALONE server
-    Assert.assertFalse(logCapturer.getOutput().contains(
-        "submitting CloseContainer request over STAND_ALONE "
-            + "server for container " + containerID));
-    Assert.assertTrue((logCapturer.getOutput().contains(
-        "submitting CloseContainer request over RATIS server for container "
-            + containerID)));
-    logCapturer.stopCapturing();
   }
 
   private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,
@@ -267,4 +245,18 @@ public class TestCloseContainerByPipeline {
     }
     return false;
   }
+
+  private Boolean isContainerQuasiClosed(MiniOzoneCluster miniCluster,
+      long containerID, DatanodeDetails datanode) {
+    ContainerData containerData;
+    for (HddsDatanodeService datanodeService : miniCluster.getHddsDatanodes()) {
+      if (datanode.equals(datanodeService.getDatanodeDetails())) {
+        containerData =
+            datanodeService.getDatanodeStateMachine().getContainer()
+                .getContainerSet().getContainer(containerID).getContainerData();
+        return containerData.isQuasiClosed();
+      }
+    }
+    return false;
+  }
 }

+ 1 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java

@@ -95,8 +95,7 @@ public class TestCloseContainerHandler {
     //send the order to close the container
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(),
-            new CloseContainerCommand(containerId.getId(),
-                HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
+            new CloseContainerCommand(containerId.getId(), pipeline.getId()));
 
     GenericTestUtils.waitFor(() ->
             isContainerClosed(cluster, containerId.getId()),

+ 18 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -36,12 +37,15 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -49,6 +53,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -57,9 +62,9 @@ import java.util.UUID;
 public class TestContainerMetrics {
 
   private GrpcReplicationService createReplicationService(
-      ContainerSet containerSet) {
+      ContainerController controller) {
     return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(containerSet));
+        new OnDemandContainerReplicationSource(controller));
   }
 
   @Test
@@ -85,12 +90,21 @@ public class TestContainerMetrics {
       VolumeSet volumeSet = new VolumeSet(
           datanodeDetails.getUuidString(), conf);
       ContainerSet containerSet = new ContainerSet();
+      ContainerMetrics metrics = ContainerMetrics.create(conf);
+      Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+      for (ContainerProtos.ContainerType containerType :
+          ContainerProtos.ContainerType.values()) {
+        handlers.put(containerType,
+            Handler.getHandlerForContainerType(
+                containerType, conf, null, containerSet, volumeSet, metrics));
+      }
       HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet,
-          volumeSet, null);
+          volumeSet, handlers, null, metrics);
       dispatcher.setScmId(UUID.randomUUID().toString());
 
       server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
-          createReplicationService(containerSet));
+          createReplicationService(new ContainerController(
+              containerSet, handlers)));
       client = new XceiverClientGrpc(pipeline, conf);
 
       server.start();

+ 21 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java

@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.ozone.container.server;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -57,6 +60,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
@@ -71,15 +75,17 @@ public class TestContainerServer {
       = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
 
   private GrpcReplicationService createReplicationService(
-      ContainerSet containerSet) {
+      ContainerController containerController) {
     return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(containerSet));
+        new OnDemandContainerReplicationSource(containerController));
   }
 
   @Test
   public void testClientServer() throws Exception {
     DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
     ContainerSet containerSet = new ContainerSet();
+    ContainerController controller = new ContainerController(
+        containerSet, null);
     runTestClientServer(1, (pipeline, conf) -> conf
             .setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
                 pipeline.getFirstNode()
@@ -87,7 +93,7 @@ public class TestContainerServer {
         XceiverClientGrpc::new,
         (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
             new TestContainerDispatcher(),
-            createReplicationService(containerSet)), (dn, p) -> {
+            createReplicationService(controller)), (dn, p) -> {
         });
   }
 
@@ -185,12 +191,22 @@ public class TestContainerServer {
               .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
 
       ContainerSet containerSet = new ContainerSet();
+      VolumeSet volumeSet = mock(VolumeSet.class);
+      ContainerMetrics metrics = ContainerMetrics.create(conf);
+      Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+      for (ContainerProtos.ContainerType containerType :
+          ContainerProtos.ContainerType.values()) {
+        handlers.put(containerType,
+            Handler.getHandlerForContainerType(
+                containerType, conf, null, containerSet, volumeSet, metrics));
+      }
       HddsDispatcher dispatcher = new HddsDispatcher(
-          conf, mock(ContainerSet.class), mock(VolumeSet.class), null);
+          conf, containerSet, volumeSet, handlers, null, metrics);
       dispatcher.init();
       DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
       server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
-          createReplicationService(containerSet));
+          createReplicationService(
+              new ContainerController(containerSet, null)));
       client = new XceiverClientGrpc(pipeline, conf);
 
       server.start();

+ 16 - 3
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java

@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.ozone.genesis;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine
     .DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -44,6 +47,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -100,9 +104,18 @@ public class BenchMarkDatanodeDispatcher {
 
     ContainerSet containerSet = new ContainerSet();
     VolumeSet volumeSet = new VolumeSet(datanodeUuid, conf);
-
-    dispatcher = new HddsDispatcher(conf, containerSet, volumeSet,
-        new StateContext(conf, DatanodeStates.RUNNING, null));
+    StateContext context = new StateContext(
+        conf, DatanodeStates.RUNNING, null);
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+    for (ContainerProtos.ContainerType containerType :
+        ContainerProtos.ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, context, containerSet, volumeSet, metrics));
+    }
+    dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, handlers,
+        context, metrics);
     dispatcher.init();
 
     containerCount = new AtomicInteger();

+ 1 - 2
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.freon;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.common.transport
     .server.XceiverServerSpi;
@@ -128,7 +127,7 @@ public class TestFreonWithDatanodeFastRestart {
   private StateMachine getStateMachine() throws Exception {
     XceiverServerSpi server =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
-            getContainer().getServer(HddsProtos.ReplicationType.RATIS);
+            getContainer().getWriteChannel();
     RaftServerProxy proxy =
         (RaftServerProxy)(((XceiverServerRatis)server).getServer());
     RaftGroupId groupId = proxy.getGroupIds().iterator().next();