Browse Source

HDDS-896. Handle over replicated containers in SCM.
Contributed by Nandakumar.

Anu Engineer 6 years ago
parent
commit
ddc0a40507
22 changed files with 600 additions and 182 deletions
  1. 9 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
  2. 3 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  3. 19 15
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  4. 45 49
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
  5. 84 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
  6. 11 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
  7. 34 28
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
  8. 11 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
  9. 4 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
  10. 1 5
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
  11. 62 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
  12. 0 4
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
  13. 1 10
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
  14. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
  15. 13 5
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
  16. 12 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
  17. 56 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
  18. 1 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
  19. 148 22
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
  20. 14 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
  21. 69 26
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
  22. 2 3
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

+ 9 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java

@@ -146,6 +146,15 @@ public abstract class Handler {
   public abstract void closeContainer(Container container)
       throws IOException;
 
+  /**
+   * Deletes the given container.
+   *
+   * @param container container to be deleted
+   * @throws IOException
+   */
+  public abstract void deleteContainer(Container container)
+      throws IOException;
+
   public void setScmID(String scmId) {
     this.scmID = scmId;
   }

+ 3 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -40,6 +40,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .DeleteContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .ReplicateContainerCommandHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
@@ -115,6 +117,7 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
             conf))
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
+        .addHandler(new DeleteContainerCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)

+ 19 - 15
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -19,10 +19,9 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto
@@ -427,22 +426,27 @@ public class StateContext {
    * @param cmd - {@link SCMCommand}.
    */
   public void addCmdStatus(SCMCommand cmd) {
-    if (cmd.getType().equals(Type.closeContainerCommand)) {
-      // We will be removing CommandStatus completely.
-      // As a first step, removed it for CloseContainerCommand.
-      return;
-    }
-    CommandStatusBuilder statusBuilder;
-    if (cmd.getType() == Type.deleteBlocksCommand) {
-      statusBuilder = new DeleteBlockCommandStatusBuilder();
-    } else {
-      statusBuilder = CommandStatusBuilder.newBuilder();
+    final Optional<CommandStatusBuilder> cmdStatusBuilder;
+    switch (cmd.getType()) {
+    case replicateContainerCommand:
+      cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
+      break;
+    case deleteBlocksCommand:
+      cmdStatusBuilder = Optional.of(
+          DeleteBlockCommandStatusBuilder.newBuilder());
+      break;
+    case deleteContainerCommand:
+      cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
+      break;
+    default:
+      cmdStatusBuilder = Optional.empty();
     }
-    this.addCmdStatus(cmd.getId(),
-        statusBuilder.setCmdId(cmd.getId())
+    cmdStatusBuilder.ifPresent(statusBuilder ->
+        addCmdStatus(cmd.getId(), statusBuilder
+            .setCmdId(cmd.getId())
             .setStatus(Status.PENDING)
             .setType(cmd.getType())
-            .build());
+            .build()));
   }
 
   /**

+ 45 - 49
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java

@@ -16,7 +16,6 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto
@@ -31,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.NotLeaderException;
@@ -68,61 +68,57 @@ public class CloseContainerCommandHandler implements CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
+    LOG.debug("Processing Close Container command.");
+    invocationCount++;
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails datanodeDetails = context.getParent()
+        .getDatanodeDetails();
+    final CloseContainerCommandProto closeCommand =
+        ((CloseContainerCommand)command).getProto();
+    final ContainerController controller = ozoneContainer.getController();
+    final long containerId = closeCommand.getContainerID();
     try {
-      LOG.debug("Processing Close Container command.");
-      invocationCount++;
-      final long startTime = Time.monotonicNow();
-      final DatanodeDetails datanodeDetails = context.getParent()
-          .getDatanodeDetails();
-      final CloseContainerCommandProto closeCommand =
-          CloseContainerCommandProto.parseFrom(command.getProtoBufMessage());
-      final ContainerController controller = ozoneContainer.getController();
-      final long containerId = closeCommand.getContainerID();
-      try {
-        final Container container = controller.getContainer(containerId);
+      final Container container = controller.getContainer(containerId);
 
-        if (container == null) {
-          LOG.error("Container #{} does not exist in datanode. "
-              + "Container close failed.", containerId);
-          return;
-        }
+      if (container == null) {
+        LOG.error("Container #{} does not exist in datanode. "
+            + "Container close failed.", containerId);
+        return;
+      }
 
-        // Move the container to CLOSING state
-        controller.markContainerForClose(containerId);
+      // Move the container to CLOSING state
+      controller.markContainerForClose(containerId);
 
-        // If the container is part of open pipeline, close it via write channel
-        if (ozoneContainer.getWriteChannel()
-            .isExist(closeCommand.getPipelineID())) {
-          if (closeCommand.getForce()) {
-            LOG.warn("Cannot force close a container when the container is" +
-                " part of an active pipeline.");
-            return;
-          }
-          ContainerCommandRequestProto request =
-              getContainerCommandRequestProto(datanodeDetails,
-                  closeCommand.getContainerID());
-          ozoneContainer.getWriteChannel().submitRequest(
-              request, closeCommand.getPipelineID());
+      // If the container is part of open pipeline, close it via write channel
+      if (ozoneContainer.getWriteChannel()
+          .isExist(closeCommand.getPipelineID())) {
+        if (closeCommand.getForce()) {
+          LOG.warn("Cannot force close a container when the container is" +
+              " part of an active pipeline.");
           return;
         }
-        // If we reach here, there is no active pipeline for this container.
-        if (!closeCommand.getForce()) {
-          // QUASI_CLOSE the container.
-          controller.quasiCloseContainer(containerId);
-        } else {
-          // SCM told us to force close the container.
-          controller.closeContainer(containerId);
-        }
-      } catch (NotLeaderException e) {
-        LOG.debug("Follower cannot close container #{}.", containerId);
-      } catch (IOException e) {
-        LOG.error("Can't close container #{}", containerId, e);
-      } finally {
-        long endTime = Time.monotonicNow();
-        totalTime += endTime - startTime;
+        ContainerCommandRequestProto request =
+            getContainerCommandRequestProto(datanodeDetails,
+                closeCommand.getContainerID());
+        ozoneContainer.getWriteChannel().submitRequest(
+            request, closeCommand.getPipelineID());
+        return;
+      }
+      // If we reach here, there is no active pipeline for this container.
+      if (!closeCommand.getForce()) {
+        // QUASI_CLOSE the container.
+        controller.quasiCloseContainer(containerId);
+      } else {
+        // SCM told us to force close the container.
+        controller.closeContainer(containerId);
       }
-    } catch (InvalidProtocolBufferException ex) {
-      LOG.error("Exception while closing container", ex);
+    } catch (NotLeaderException e) {
+      LOG.debug("Follower cannot close container #{}.", containerId);
+    } catch (IOException e) {
+      LOG.error("Can't close container #{}", containerId, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
     }
   }
 

+ 84 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handler to process the DeleteContainerCommand from SCM.
+ */
+public class DeleteContainerCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DeleteContainerCommandHandler.class);
+
+  private int invocationCount;
+  private long totalTime;
+
+  @Override
+  public void handle(final SCMCommand command,
+                     final OzoneContainer ozoneContainer,
+                     final StateContext context,
+                     final SCMConnectionManager connectionManager) {
+    final long startTime = Time.monotonicNow();
+    invocationCount++;
+    try {
+      final DeleteContainerCommand deleteContainerCommand =
+          (DeleteContainerCommand) command;
+      final ContainerController controller = ozoneContainer.getController();
+      controller.deleteContainer(deleteContainerCommand.getContainerID());
+      updateCommandStatus(context, command,
+          (cmdStatus) -> cmdStatus.setStatus(true), LOG);
+    } catch (IOException e) {
+      updateCommandStatus(context, command,
+          (cmdStatus) -> cmdStatus.setStatus(false), LOG);
+      LOG.error("Exception occurred while deleting the container.", e);
+    } finally {
+      totalTime += Time.monotonicNow() - startTime;
+    }
+
+  }
+
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.deleteContainerCommand;
+  }
+
+  @Override
+  public int getInvocationCount() {
+    return this.invocationCount;
+  }
+
+  @Override
+  public long getAverageRunTime() {
+    return invocationCount == 0 ? 0 : totalTime / invocationCount;
+  }
+}

+ 11 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
 import org.slf4j.Logger;
@@ -294,6 +295,16 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(replicateContainerCommand);
         break;
+      case deleteContainerCommand:
+        DeleteContainerCommand deleteContainerCommand =
+            DeleteContainerCommand.getFromProtobuf(
+                commandResponseProto.getDeleteContainerCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM delete container request for container {}",
+              deleteContainerCommand.getContainerID());
+        }
+        this.context.addCommand(deleteContainerCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());

+ 34 - 28
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -240,6 +240,7 @@ public class KeyValueHandler extends Handler {
       if (containerSet.getContainer(containerID) == null) {
         newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
         containerSet.addContainer(newContainer);
+        sendICR(newContainer);
       } else {
 
         // The create container request for an already existing container can
@@ -335,37 +336,10 @@ public class KeyValueHandler extends Handler {
     }
 
     boolean forceDelete = request.getDeleteContainer().getForceDelete();
-    kvContainer.writeLock();
     try {
-      // Check if container is open
-      if (kvContainer.getContainerData().isOpen()) {
-        kvContainer.writeUnlock();
-        throw new StorageContainerException(
-            "Deletion of Open Container is not allowed.",
-            DELETE_ON_OPEN_CONTAINER);
-      } else if (!forceDelete && kvContainer.getContainerData().getKeyCount()
-          > 0) {
-        // If the container is not empty and cannot be deleted forcibly,
-        // then throw a SCE to stop deleting.
-        kvContainer.writeUnlock();
-        throw new StorageContainerException(
-            "Container cannot be deleted because it is not empty.",
-            ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
-      } else {
-        long containerId = kvContainer.getContainerData().getContainerID();
-        containerSet.removeContainer(containerId);
-        // Release the lock first.
-        // Avoid holding write locks for disk operations
-        kvContainer.writeUnlock();
-
-        kvContainer.delete(forceDelete);
-      }
+      deleteInternal(kvContainer, forceDelete);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } finally {
-      if (kvContainer.hasWriteLock()) {
-        kvContainer.writeUnlock();
-      }
     }
     return ContainerUtils.getSuccessResponse(request);
   }
@@ -823,6 +797,7 @@ public class KeyValueHandler extends Handler {
 
     populateContainerPathFields(container, maxSize);
     container.importContainerData(rawContainerStream, packer);
+    sendICR(container);
     return container;
 
   }
@@ -877,4 +852,35 @@ public class KeyValueHandler extends Handler {
     container.close();
     sendICR(container);
   }
+
+  @Override
+  public void deleteContainer(Container container) throws IOException {
+    deleteInternal(container, true);
+  }
+
+  private void deleteInternal(Container container, boolean force)
+      throws StorageContainerException {
+    container.writeLock();
+    try {
+      // Check if container is open
+      if (container.getContainerData().isOpen()) {
+        throw new StorageContainerException(
+            "Deletion of Open Container is not allowed.",
+            DELETE_ON_OPEN_CONTAINER);
+      }
+      if (!force && container.getContainerData().getKeyCount() > 0) {
+        // If the container is not empty and cannot be deleted forcibly,
+        // then throw a SCE to stop deleting.
+        throw new StorageContainerException(
+            "Container cannot be deleted because it is not empty.",
+            ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+      }
+      long containerId = container.getContainerData().getContainerID();
+      containerSet.removeContainer(containerId);
+    } finally {
+      container.writeUnlock();
+    }
+    // Avoid holding write locks for disk operations
+    container.delete(force);
+  }
 }

+ 11 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java

@@ -94,7 +94,7 @@ public class ContainerController {
   }
 
   /**
-   * Closes a container given its id.
+   * Closes a container given its Id.
    *
    * @param containerId Id of the container to close
    * @throws IOException in case of exception
@@ -113,6 +113,16 @@ public class ContainerController {
         originPipelineId, originNodeId, rawContainerStream, packer);
   }
 
+  /**
+   * Deletes a container given its Id.
+   * @param containerId Id of the container to be deleted
+   * @throws IOException
+   */
+  public void deleteContainer(final long containerId) throws IOException {
+    final Container container = containerSet.getContainer(containerId);
+    getHandler(container).deleteContainer(container);
+  }
+
   /**
    * Given a container, returns its handler instance.
    *

+ 4 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java

@@ -54,16 +54,7 @@ public class CloseContainerCommand
     return SCMCommandProto.Type.closeContainerCommand;
   }
 
-  /**
-   * Gets the protobuf message of this object.
-   *
-   * @return A protobuf message.
-   */
   @Override
-  public byte[] getProtoBufMessage() {
-    return getProto().toByteArray();
-  }
-
   public CloseContainerCommandProto getProto() {
     return CloseContainerCommandProto.newBuilder()
         .setContainerID(getId())
@@ -84,4 +75,8 @@ public class CloseContainerCommand
   public long getContainerID() {
     return getId();
   }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
 }

+ 1 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java

@@ -56,17 +56,13 @@ public class DeleteBlocksCommand extends
     return SCMCommandProto.Type.deleteBlocksCommand;
   }
 
-  @Override
-  public byte[] getProtoBufMessage() {
-    return getProto().toByteArray();
-  }
-
   public static DeleteBlocksCommand getFromProtobuf(
       DeleteBlocksCommandProto deleteBlocksProto) {
     return new DeleteBlocksCommand(deleteBlocksProto
         .getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId());
   }
 
+  @Override
   public DeleteBlocksCommandProto getProto() {
     return DeleteBlocksCommandProto.newBuilder()
         .setCmdId(getId())

+ 62 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeleteContainerCommandProto;
+
+/**
+ * SCM command which tells the datanode to delete a container.
+ */
+public class DeleteContainerCommand extends
+    SCMCommand<DeleteContainerCommandProto> {
+
+  private final long containerId;
+
+  public DeleteContainerCommand(long containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.deleteContainerCommand;
+  }
+
+  @Override
+  public DeleteContainerCommandProto getProto() {
+    DeleteContainerCommandProto.Builder builder =
+        DeleteContainerCommandProto.newBuilder();
+    builder.setCmdId(getId())
+        .setContainerID(getContainerID());
+    return builder.build();
+  }
+
+  public long getContainerID() {
+    return containerId;
+  }
+
+  public static DeleteContainerCommand getFromProtobuf(
+      DeleteContainerCommandProto protoMessage) {
+    Preconditions.checkNotNull(protoMessage);
+    return new DeleteContainerCommand(protoMessage.getContainerID());
+  }
+}

+ 0 - 4
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java

@@ -63,10 +63,6 @@ public class ReplicateContainerCommand
   }
 
   @Override
-  public byte[] getProtoBufMessage() {
-    return getProto().toByteArray();
-  }
-
   public ReplicateContainerCommandProto getProto() {
     Builder builder = ReplicateContainerCommandProto.newBuilder()
         .setCmdId(getId())

+ 1 - 10
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java

@@ -39,16 +39,6 @@ public class ReregisterCommand extends
     return SCMCommandProto.Type.reregisterCommand;
   }
 
-  /**
-   * Gets the protobuf message of this object.
-   *
-   * @return A protobuf message.
-   */
-  @Override
-  public byte[] getProtoBufMessage() {
-    return getProto().toByteArray();
-  }
-
   /**
    * Not implemented for ReregisterCommand.
    *
@@ -59,6 +49,7 @@ public class ReregisterCommand extends
     return 0;
   }
 
+  @Override
   public ReregisterCommandProto getProto() {
     return ReregisterCommandProto
         .newBuilder()

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java

@@ -49,7 +49,7 @@ public abstract class SCMCommand<T extends GeneratedMessage> implements
    * Gets the protobuf message of this object.
    * @return A protobuf message.
    */
-  public abstract byte[] getProtoBufMessage();
+  public abstract T getProto();
 
   /**
    * Gets the commandId of this object.

+ 13 - 5
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java

@@ -70,6 +70,8 @@ public class TestCloseContainerCommandHandler {
     try {
       final Container container =
           createContainer(conf, datanodeDetails, ozoneContainer);
+      Mockito.verify(context.getParent(),
+          Mockito.times(1)).triggerHeartbeat();
       final long containerId = container.getContainerData().getContainerID();
       final PipelineID pipelineId = PipelineID.valueOf(UUID.fromString(
           container.getContainerData().getOriginPipelineId()));
@@ -88,7 +90,7 @@ public class TestCloseContainerCommandHandler {
               .getContainerState());
 
       Mockito.verify(context.getParent(),
-          Mockito.times(2)).triggerHeartbeat();
+          Mockito.times(3)).triggerHeartbeat();
     } finally {
       ozoneContainer.stop();
     }
@@ -105,6 +107,8 @@ public class TestCloseContainerCommandHandler {
     try {
       final Container container =
           createContainer(conf, datanodeDetails, ozoneContainer);
+      Mockito.verify(context.getParent(),
+          Mockito.times(1)).triggerHeartbeat();
       final long containerId = container.getContainerData().getContainerID();
       // To quasi close specify a pipeline which doesn't exist in the datanode.
       final PipelineID pipelineId = PipelineID.randomId();
@@ -122,7 +126,7 @@ public class TestCloseContainerCommandHandler {
               .getContainerState());
 
       Mockito.verify(context.getParent(),
-          Mockito.times(2)).triggerHeartbeat();
+          Mockito.times(3)).triggerHeartbeat();
     } finally {
       ozoneContainer.stop();
     }
@@ -138,6 +142,8 @@ public class TestCloseContainerCommandHandler {
     try {
       final Container container =
           createContainer(conf, datanodeDetails, ozoneContainer);
+      Mockito.verify(context.getParent(),
+          Mockito.times(1)).triggerHeartbeat();
       final long containerId = container.getContainerData().getContainerID();
       // A pipeline which doesn't exist in the datanode.
       final PipelineID pipelineId = PipelineID.randomId();
@@ -155,7 +161,7 @@ public class TestCloseContainerCommandHandler {
               .getContainerState());
 
       Mockito.verify(context.getParent(),
-          Mockito.times(2)).triggerHeartbeat();
+          Mockito.times(3)).triggerHeartbeat();
 
       // The container is quasi closed. Force close the container now.
       final CloseContainerCommand closeCommand = new CloseContainerCommand(
@@ -168,7 +174,7 @@ public class TestCloseContainerCommandHandler {
               .getContainerState());
 
       Mockito.verify(context.getParent(),
-          Mockito.times(3)).triggerHeartbeat();
+          Mockito.times(4)).triggerHeartbeat();
     } finally {
       ozoneContainer.stop();
     }
@@ -184,6 +190,8 @@ public class TestCloseContainerCommandHandler {
     try {
       final Container container =
           createContainer(conf, datanodeDetails, ozoneContainer);
+      Mockito.verify(context.getParent(),
+          Mockito.times(1)).triggerHeartbeat();
       final long containerId = container.getContainerData().getContainerID();
       // A pipeline which doesn't exist in the datanode.
       final PipelineID pipelineId = PipelineID.randomId();
@@ -201,7 +209,7 @@ public class TestCloseContainerCommandHandler {
               .getContainerState());
 
       Mockito.verify(context.getParent(),
-          Mockito.times(2)).triggerHeartbeat();
+          Mockito.times(3)).triggerHeartbeat();
     } finally {
       ozoneContainer.stop();
     }

+ 12 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.command;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .CommandStatusReportFromDatanode;
@@ -57,6 +58,11 @@ public class CommandStatusReportHandler implements
       case replicateContainerCommand:
         publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
             ReplicationStatus(cmdStatus));
+        if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
+          publisher.fireEvent(SCMEvents.REPLICATION_COMPLETE,
+              new ReplicationManager.ReplicationCompleted(
+                  cmdStatus.getCmdId()));
+        }
         break;
       case deleteBlocksCommand:
         if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
@@ -64,6 +70,12 @@ public class CommandStatusReportHandler implements
               new DeleteBlockStatus(cmdStatus));
         }
         break;
+      case deleteContainerCommand:
+        if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
+          publisher.fireEvent(SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
+              new ReplicationManager.DeleteContainerCommandCompleted(
+                  cmdStatus.getCmdId()));
+        }
       default:
         LOGGER.debug("CommandStatus of type:{} not handled in " +
             "CommandStatusReportHandler.", cmdStatus.getType());

+ 56 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .DeletionRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .DeleteContainerCommandCompleted;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+/**
+ * Command watcher to track the delete container commands.
+ */
+public class DeleteContainerCommandWatcher extends
+    EventWatcher<DeletionRequestToRepeat, DeleteContainerCommandCompleted> {
+
+  public DeleteContainerCommandWatcher(
+      Event<DeletionRequestToRepeat> startEvent,
+      Event<DeleteContainerCommandCompleted> completionEvent,
+      LeaseManager<Long> leaseManager) {
+    super(startEvent, completionEvent, leaseManager);
+  }
+
+  @Override
+  protected void onTimeout(EventPublisher publisher,
+      DeletionRequestToRepeat payload) {
+    //put back to the original queue
+    publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+        payload.getRequest());
+  }
+
+
+  @Override
+  protected void onFinished(EventPublisher publisher,
+      DeletionRequestToRepeat payload) {
+
+  }
+}

+ 1 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java

@@ -204,10 +204,7 @@ public final class ReportHandlerHelper {
               .distinct()
               .count();
 
-          float quasiClosePercent = ((float) uniqueQuasiClosedReplicaCount) /
-              ((float) replicationFactor);
-
-          if (quasiClosePercent > 0.5F) {
+          if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) {
             // Quorum of unique replica has been QUASI_CLOSED
             long sequenceId = forceCloseContainerReplicaWithHighestSequenceId(
                 container, quasiClosedReplicas, publisher);

+ 148 - 22
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java

@@ -18,18 +18,24 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ThreadFactory;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -38,11 +44,14 @@ import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_DELETE_CONTAINER_COMMAND;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents
     .TRACK_REPLICATE_COMMAND;
 import org.slf4j.Logger;
@@ -63,6 +72,7 @@ public class ReplicationManager implements Runnable {
   private EventPublisher eventPublisher;
 
   private ReplicationCommandWatcher replicationCommandWatcher;
+  private DeleteContainerCommandWatcher deleteContainerCommandWatcher;
 
   private boolean running = true;
 
@@ -80,6 +90,11 @@ public class ReplicationManager implements Runnable {
         new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
             SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
 
+    this.deleteContainerCommandWatcher =
+        new DeleteContainerCommandWatcher(TRACK_DELETE_CONTAINER_COMMAND,
+            SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
+            commandWatcherLeaseManager);
+
     this.replicationQueue = new ReplicationQueue();
 
     eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
@@ -108,15 +123,15 @@ public class ReplicationManager implements Runnable {
         request = replicationQueue.take();
 
         ContainerID containerID = new ContainerID(request.getContainerId());
-        ContainerInfo containerInfo =
-            containerManager.getContainer(containerID);
-
-        Preconditions.checkNotNull(containerInfo,
-            "No information about the container " + request.getContainerId());
-
-        Preconditions
-            .checkState(containerInfo.getState() == LifeCycleState.CLOSED,
-                "Container should be in closed state");
+        ContainerInfo container = containerManager.getContainer(containerID);
+        final HddsProtos.LifeCycleState state = container.getState();
+
+        if (state != LifeCycleState.CLOSED &&
+            state != LifeCycleState.QUASI_CLOSED) {
+          LOG.warn("Cannot replicate the container {} when in {} state.",
+              containerID, state);
+          continue;
+        }
 
         //check the current replication
         List<ContainerReplica> containerReplicas =
@@ -130,28 +145,41 @@ public class ReplicationManager implements Runnable {
           return;
         }
 
-        ReplicationRequest finalRequest = request;
+        final ReplicationRequest finalRequest = request;
 
         int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
-            e -> e.request.getContainerId() == finalRequest.getContainerId())
+            e -> e.getRequest().getContainerId()
+                == finalRequest.getContainerId())
+            .size();
+
+        int inFlightDelete = deleteContainerCommandWatcher.getTimeoutEvents(
+            e -> e.getRequest().getContainerId()
+                == finalRequest.getContainerId())
             .size();
 
         int deficit =
-            request.getExpecReplicationCount() - containerReplicas.size()
-                - inFlightReplications;
+            (request.getExpecReplicationCount() - containerReplicas.size())
+                - (inFlightReplications - inFlightDelete);
 
         if (deficit > 0) {
 
           List<DatanodeDetails> datanodes = containerReplicas.stream()
+              .sorted((r1, r2) ->
+                  r2.getSequenceId().compareTo(r1.getSequenceId()))
               .map(ContainerReplica::getDatanodeDetails)
               .collect(Collectors.toList());
           List<DatanodeDetails> selectedDatanodes = containerPlacement
-              .chooseDatanodes(datanodes, deficit,
-                  containerInfo.getUsedBytes());
+              .chooseDatanodes(datanodes, deficit, container.getUsedBytes());
 
           //send the command
           for (DatanodeDetails datanode : selectedDatanodes) {
 
+            LOG.info("Container {} is under replicated." +
+                " Expected replica count is {}, but found {}." +
+                " Re-replicating it on {}.",
+                container.containerID(), request.getExpecReplicationCount(),
+                containerReplicas.size(), datanode);
+
             ReplicateContainerCommand replicateCommand =
                 new ReplicateContainerCommand(containerID.getId(), datanodes);
 
@@ -168,8 +196,62 @@ public class ReplicationManager implements Runnable {
           }
 
         } else if (deficit < 0) {
-          //TODO: too many replicas. Not handled yet.
-          LOG.debug("Too many replicas is not handled yet.");
+
+          int numberOfReplicasToDelete = Math.abs(deficit);
+
+          final Map<UUID, List<DatanodeDetails>> originIdToDnMap =
+              new LinkedHashMap<>();
+
+          containerReplicas.stream()
+              .sorted(Comparator.comparing(ContainerReplica::getSequenceId))
+              .forEach(replica -> {
+                originIdToDnMap.computeIfAbsent(
+                    replica.getOriginDatanodeId(), key -> new ArrayList<>());
+                originIdToDnMap.get(replica.getOriginDatanodeId())
+                    .add(replica.getDatanodeDetails());
+              });
+
+          for(UUID originId : originIdToDnMap.keySet()) {
+            final List<DatanodeDetails> listOfReplica =
+                originIdToDnMap.get(originId);
+            if (listOfReplica.size() > 1) {
+              final int toDelete = Math.min(listOfReplica.size() - 1,
+                  numberOfReplicasToDelete);
+              final DeleteContainerCommand deleteContainer =
+                  new DeleteContainerCommand(containerID.getId());
+              for (int i = 0; i < toDelete; i++) {
+                LOG.info("Container {} is over replicated." +
+                    " Expected replica count is {}, but found {}." +
+                    " Deleting the replica on {}.",
+                    container.containerID(), request.getExpecReplicationCount(),
+                    containerReplicas.size(), listOfReplica.get(i));
+                eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+                    new CommandForDatanode<>(listOfReplica.get(i).getUuid(),
+                        deleteContainer));
+                DeletionRequestToRepeat timeoutEvent =
+                    new DeletionRequestToRepeat(deleteContainer.getId(),
+                        request);
+
+                eventPublisher.fireEvent(
+                    TRACK_DELETE_CONTAINER_COMMAND, timeoutEvent);
+              }
+              numberOfReplicasToDelete -= toDelete;
+            }
+            if (numberOfReplicasToDelete == 0) {
+              break;
+            }
+          }
+
+          if (numberOfReplicasToDelete != 0) {
+            final int expectedReplicaCount = container
+                .getReplicationFactor().getNumber();
+
+            LOG.warn("Not able to delete the container replica of Container" +
+                " {} even though it is over replicated. Expected replica" +
+                " count is {}, current replica count is {}.",
+                containerID, expectedReplicaCount,
+                expectedReplicaCount + numberOfReplicasToDelete);
+          }
         }
 
       } catch (Exception e) {
@@ -196,17 +278,43 @@ public class ReplicationManager implements Runnable {
   }
 
   /**
-   * Event for the ReplicationCommandWatcher to repeate the embedded request.
+   * Event for the ReplicationCommandWatcher to repeat the embedded request.
    * in case fof timeout.
    */
   public static class ReplicationRequestToRepeat
+      extends ContainerRequestToRepeat {
+
+    public ReplicationRequestToRepeat(
+        long commandId, ReplicationRequest request) {
+      super(commandId, request);
+    }
+  }
+
+  /**
+   * Event for the DeleteContainerCommandWatcher to repeat the
+   * embedded request. In case fof timeout.
+   */
+  public static class DeletionRequestToRepeat
+      extends ContainerRequestToRepeat {
+
+    public DeletionRequestToRepeat(
+        long commandId, ReplicationRequest request) {
+      super(commandId, request);
+    }
+  }
+
+  /**
+   * Container Request wrapper which will be used by ReplicationManager to
+   * perform the intended operation.
+   */
+  public static class ContainerRequestToRepeat
       implements IdentifiableEventPayload {
 
     private final long commandId;
 
     private final ReplicationRequest request;
 
-    public ReplicationRequestToRepeat(long commandId,
+    ContainerRequestToRepeat(long commandId,
         ReplicationRequest request) {
       this.commandId = commandId;
       this.request = request;
@@ -229,7 +337,7 @@ public class ReplicationManager implements Runnable {
       if (o == null || getClass() != o.getClass()) {
         return false;
       }
-      ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o;
+      ContainerRequestToRepeat that = (ContainerRequestToRepeat) o;
       return Objects.equals(request, that.request);
     }
 
@@ -241,7 +349,7 @@ public class ReplicationManager implements Runnable {
   }
 
   /**
-   * Add javadoc.
+   * Event which indicates that the replicate operation is completed.
    */
   public static class ReplicationCompleted
       implements IdentifiableEventPayload {
@@ -257,4 +365,22 @@ public class ReplicationManager implements Runnable {
       return uuid;
     }
   }
+
+  /**
+   * Event which indicates that the container deletion operation is completed.
+   */
+  public static class DeleteContainerCommandCompleted
+      implements IdentifiableEventPayload {
+
+    private final long uuid;
+
+    public DeleteContainerCommandCompleted(long uuid) {
+      this.uuid = uuid;
+    }
+
+    @Override
+    public long getId() {
+      return uuid;
+    }
+  }
 }

+ 14 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

@@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .DeleteContainerCommandCompleted;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
     .ReplicationCompleted;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
@@ -204,6 +206,14 @@ public final class SCMEvents {
   public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
       TRACK_REPLICATE_COMMAND =
       new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
+
+  /**
+   * This event is sent by the ReplicaManager to the
+   * DeleteContainerCommandWatcher to track the in-progress delete commands.
+   */
+  public static final TypedEvent<ReplicationManager.DeletionRequestToRepeat>
+      TRACK_DELETE_CONTAINER_COMMAND =
+      new TypedEvent<>(ReplicationManager.DeletionRequestToRepeat.class);
   /**
    * This event comes from the Heartbeat dispatcher (in fact from the
    * datanode) to notify the scm that the replication is done. This is
@@ -216,6 +226,10 @@ public final class SCMEvents {
   public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
       new TypedEvent<>(ReplicationCompleted.class);
 
+  public static final TypedEvent<DeleteContainerCommandCompleted>
+      DELETE_CONTAINER_COMMAND_COMPLETE =
+      new TypedEvent<>(DeleteContainerCommandCompleted.class);
+
   /**
    * Signal for all the components (but especially for the replication
    * manager and container report handler) that the replication could be

+ 69 - 26
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java

@@ -21,13 +21,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.IntStream;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -35,21 +32,24 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.replication
     .ReplicationManager.ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationManager.DeletionRequestToRepeat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 
-import com.google.common.base.Preconditions;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_DELETE_CONTAINER_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_REPLICATE_COMMAND;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,6 +65,7 @@ public class TestReplicationManager {
   private EventQueue queue;
 
   private List<ReplicationRequestToRepeat> trackReplicationEvents;
+  private List<DeletionRequestToRepeat> trackDeleteEvents;
 
   private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
 
@@ -87,6 +88,8 @@ public class TestReplicationManager {
       listOfContainerReplica.add(ContainerReplica.newBuilder()
           .setContainerID(ContainerID.valueof(i))
           .setContainerState(ContainerReplicaProto.State.CLOSED)
+          .setSequenceId(10000L)
+          .setOriginNodeId(dd.getUuid())
           .setDatanodeDetails(dd).build());
     });
 
@@ -119,6 +122,10 @@ public class TestReplicationManager {
     queue.addHandler(TRACK_REPLICATE_COMMAND,
         (event, publisher) -> trackReplicationEvents.add(event));
 
+    trackDeleteEvents = new ArrayList<>();
+    queue.addHandler(TRACK_DELETE_CONTAINER_COMMAND,
+        (event, publisher) -> trackDeleteEvents.add(event));
+
     copyEvents = new ArrayList<>();
     queue.addHandler(SCMEvents.DATANODE_COMMAND,
         (event, publisher) -> copyEvents.add(event));
@@ -128,8 +135,6 @@ public class TestReplicationManager {
     replicationManager = new ReplicationManager(containerPlacementPolicy,
         containerManager, queue, leaseManager);
 
-
-
   }
 
   /**
@@ -160,6 +165,57 @@ public class TestReplicationManager {
     }
   }
 
+  @Test
+  public void testOverReplication() throws ContainerNotFoundException,
+      InterruptedException {
+    try {
+      leaseManager.start();
+      replicationManager.start();
+
+      final ContainerID containerID = ContainerID.valueof(5L);
+
+      final ContainerReplica duplicateReplicaOne = ContainerReplica.newBuilder()
+          .setContainerID(containerID)
+          .setContainerState(ContainerReplicaProto.State.CLOSED)
+          .setSequenceId(10000L)
+          .setOriginNodeId(listOfDatanodeDetails.get(0).getUuid())
+          .setDatanodeDetails(listOfDatanodeDetails.get(3))
+          .build();
+
+      final ContainerReplica duplicateReplicaTwo = ContainerReplica.newBuilder()
+          .setContainerID(containerID)
+          .setContainerState(ContainerReplicaProto.State.CLOSED)
+          .setSequenceId(10000L)
+          .setOriginNodeId(listOfDatanodeDetails.get(1).getUuid())
+          .setDatanodeDetails(listOfDatanodeDetails.get(4))
+          .build();
+
+      when(containerManager.getContainerReplicas(new ContainerID(5L)))
+          .thenReturn(new HashSet<>(Arrays.asList(
+              listOfContainerReplica.get(0),
+              listOfContainerReplica.get(1),
+              listOfContainerReplica.get(2),
+              duplicateReplicaOne,
+              duplicateReplicaTwo
+          )));
+
+      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(5L, (short) 5, System.currentTimeMillis(),
+              (short) 3));
+      Thread.sleep(500L);
+      queue.processAll(1000L);
+
+      //THEN
+      Assert.assertEquals(2, trackDeleteEvents.size());
+      Assert.assertEquals(2, copyEvents.size());
+
+    } finally {
+      if (leaseManager != null) {
+        leaseManager.shutdown();
+      }
+    }
+  }
+
   @Test
   public void testEventSending() throws InterruptedException, IOException {
 
@@ -196,6 +252,7 @@ public class TestReplicationManager {
         containerManager, queue, rapidLeaseManager);
 
     try {
+      leaseManager.start();
       rapidLeaseManager.start();
       replicationManager.start();
 
@@ -223,25 +280,11 @@ public class TestReplicationManager {
       Assert.assertEquals(2, copyEvents.size());
 
     } finally {
-      if (rapidLeaseManager != null) {
-        rapidLeaseManager.shutdown();
+      rapidLeaseManager.shutdown();
+      if (leaseManager != null) {
+        leaseManager.shutdown();
       }
     }
   }
 
-  public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
-      throws IOException {
-    Objects.requireNonNull(ids, "ids == null");
-    Preconditions.checkArgument(ids.iterator().hasNext());
-    List<DatanodeDetails> dns = new ArrayList<>();
-    ids.forEach(dns::add);
-    return Pipeline.newBuilder()
-        .setState(Pipeline.PipelineState.OPEN)
-        .setId(PipelineID.randomId())
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
-        .setFactor(ReplicationFactor.ONE)
-        .setNodes(dns)
-        .build();
-  }
-
 }

+ 2 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -75,13 +75,12 @@ public class TestOzoneContainer {
       conf.setBoolean(
           OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
 
-      DatanodeDetails datanodeDetails = Mockito.mock(DatanodeDetails.class);
+      DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
       StateContext context = Mockito.mock(StateContext.class);
       DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
       Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
       Mockito.when(context.getParent()).thenReturn(dsm);
-      container = new OzoneContainer(TestUtils.randomDatanodeDetails(),
-          conf, context);
+      container = new OzoneContainer(datanodeDetails, conf, context);
       //Setting scmId, as we start manually ozone container.
       container.getDispatcher().setScmId(UUID.randomUUID().toString());
       container.start();