Ver código fonte

HDDS-868. Handle quasi closed container replicas in SCM.
Contributed by Nanda kumar.

Nanda kumar 6 anos atrás
pai
commit
099e723406

+ 25 - 4
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java

@@ -70,6 +70,10 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
   private String owner;
   private long containerID;
   private long deleteTransactionId;
+  // The sequenceId of a close container cannot change, and all the
+  // container replica should have the same sequenceId.
+  private long sequenceId;
+
   /**
    * Allows you to maintain private data on ContainerInfo. This is not
    * serialized via protobuf, just allows us to maintain some private data.
@@ -86,6 +90,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
       long stateEnterTime,
       String owner,
       long deleteTransactionId,
+      long sequenceId,
       ReplicationFactor replicationFactor,
       ReplicationType repType) {
     this.containerID = containerID;
@@ -97,6 +102,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
     this.stateEnterTime = stateEnterTime;
     this.owner = owner;
     this.deleteTransactionId = deleteTransactionId;
+    this.sequenceId = sequenceId;
     this.replicationFactor = replicationFactor;
     this.replicationType = repType;
   }
@@ -105,8 +111,8 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
     this(info.getContainerID(), info.getState(), info.getPipelineID(),
         info.getUsedBytes(), info.getNumberOfKeys(),
         info.getStateEnterTime(), info.getOwner(),
-        info.getDeleteTransactionId(), info.getReplicationFactor(),
-        info.getReplicationType());
+        info.getDeleteTransactionId(), info.getSequenceId(),
+        info.getReplicationFactor(), info.getReplicationType());
   }
   /**
    * Needed for serialization findbugs.
@@ -174,10 +180,19 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
     return deleteTransactionId;
   }
 
+  public long getSequenceId() {
+    return sequenceId;
+  }
+
   public void updateDeleteTransactionId(long transactionId) {
     deleteTransactionId = max(transactionId, deleteTransactionId);
   }
 
+  public void updateSequenceId(long sequenceID) {
+    assert (isOpen() || state == HddsProtos.LifeCycleState.QUASI_CLOSED);
+    sequenceId = max(sequenceID, sequenceId);
+  }
+
   public ContainerID containerID() {
     return new ContainerID(getContainerID());
   }
@@ -380,6 +395,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
     private String owner;
     private long containerID;
     private long deleteTransactionId;
+    private long sequenceId;
     private PipelineID pipelineID;
     private ReplicationFactor replicationFactor;
     private ReplicationType replicationType;
@@ -436,10 +452,15 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
       return this;
     }
 
+    public Builder setSequenceId(long sequenceID) {
+      this.sequenceId = sequenceID;
+      return this;
+    }
+
     public ContainerInfo build() {
       return new ContainerInfo(containerID, state, pipelineID,
-              used, keys, stateEnterTime, owner, deleteTransactionId,
-          replicationFactor, replicationType);
+          used, keys, stateEnterTime, owner, deleteTransactionId,
+          sequenceId, replicationFactor, replicationType);
     }
   }
 

+ 12 - 8
hadoop-hdds/common/src/main/proto/hdds.proto

@@ -113,16 +113,19 @@ message NodePool {
 enum LifeCycleState {
     OPEN = 1;
     CLOSING = 2;
-    CLOSED = 3;
-    DELETING = 4;
-    DELETED = 5; // object is deleted.
+    QUASI_CLOSED = 3;
+    CLOSED = 4;
+    DELETING = 5;
+    DELETED = 6; // object is deleted.
 }
 
 enum LifeCycleEvent {
     FINALIZE = 1;
-    CLOSE = 2; // !!Event after this has not been used yet.
-    DELETE = 3;
-    CLEANUP = 4;
+    QUASI_CLOSE = 2;
+    CLOSE = 3; // !!Event after this has not been used yet.
+    FORCE_CLOSE = 4;
+    DELETE = 5;
+    CLEANUP = 6;
 }
 
 message ContainerInfoProto {
@@ -134,8 +137,9 @@ message ContainerInfoProto {
     optional int64 stateEnterTime = 6;
     required string owner = 7;
     optional int64 deleteTransactionId = 8;
-    required ReplicationFactor replicationFactor  = 9;
-    required ReplicationType replicationType  = 10;
+    optional int64 sequenceId = 9;
+    required ReplicationFactor replicationFactor  = 10;
+    required ReplicationType replicationType  = 11;
 }
 
 message ContainerWithPipeline {

+ 12 - 38
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -154,10 +155,13 @@ public class ContainerReportHandler implements
       try {
         final ContainerID containerID = ContainerID.valueof(
             replicaProto.getContainerID());
+
+        ReportHandlerHelper.processContainerReplica(containerManager,
+            containerID, replicaProto, datanodeDetails, publisher, LOG);
+
         final ContainerInfo containerInfo = containerManager
             .getContainer(containerID);
-        updateContainerState(datanodeDetails, containerInfo,
-            replicaProto, publisher);
+
         if (containerInfo.getDeleteTransactionId() >
             replicaProto.getDeleteTransactionId()) {
           pendingDeleteStatusList
@@ -166,8 +170,12 @@ public class ContainerReportHandler implements
                   containerInfo.getContainerID());
         }
       } catch (ContainerNotFoundException e) {
-        LOG.error("Received container report for an unknown container {}",
-            replicaProto.getContainerID());
+        LOG.error("Received container report for an unknown container {} from" +
+                " datanode {}", replicaProto.getContainerID(), datanodeDetails);
+      } catch (IOException e) {
+        LOG.error("Exception while processing container report for container" +
+                " {} from datanode {}",
+            replicaProto.getContainerID(), datanodeDetails);
       }
     }
     if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
@@ -176,40 +184,6 @@ public class ContainerReportHandler implements
     }
   }
 
-  private void updateContainerState(final DatanodeDetails datanodeDetails,
-      final ContainerInfo containerInfo,
-      final ContainerReplicaProto replicaProto,
-      final EventPublisher publisher)
-      throws ContainerNotFoundException {
-
-    final ContainerID id = containerInfo.containerID();
-    final ContainerReplica datanodeContainerReplica = ContainerReplica
-        .newBuilder()
-        .setContainerID(id)
-        .setContainerState(replicaProto.getState())
-        .setDatanodeDetails(datanodeDetails)
-        .build();
-    // TODO: Add bcsid and origin datanode to replica.
-
-    final ContainerReplica scmContainerReplica = containerManager
-        .getContainerReplicas(id)
-        .stream()
-        .filter(replica ->
-            replica.getDatanodeDetails().equals(datanodeDetails))
-        .findFirst().orElse(null);
-
-    // This is an in-memory update.
-    containerManager.updateContainerReplica(id, datanodeContainerReplica);
-    containerInfo.setUsedBytes(replicaProto.getUsed());
-    containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
-
-    // Check if there is state change in container replica.
-    if (scmContainerReplica == null ||
-        scmContainerReplica.getState() != datanodeContainerReplica.getState()) {
-      //TODO: Handler replica state change.
-    }
-  }
-
   private void checkReplicationState(ContainerID containerID,
       EventPublisher publisher) {
     try {

+ 41 - 17
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java

@@ -156,40 +156,64 @@ public class ContainerStateManager {
    *
    * Event and State Transition Mapping:
    *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
+   * State: OPEN         ----------------> CLOSING
+   * Event:                    FINALIZE
    *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
+   * State: CLOSING      ----------------> QUASI_CLOSED
+   * Event:                  QUASI_CLOSE
    *
-   * State: CLOSED   ----------------> DELETING
-   * Event:                DELETE
+   * State: CLOSING      ----------------> CLOSED
+   * Event:                     CLOSE
    *
-   * State: DELETING ----------------> DELETED
-   * Event:               CLEANUP
+   * State: QUASI_CLOSED ----------------> CLOSED
+   * Event:                  FORCE_CLOSE
+   *
+   * State: CLOSED       ----------------> DELETING
+   * Event:                    DELETE
+   *
+   * State: DELETING     ----------------> DELETED
+   * Event:                    CLEANUP
    *
    *
    * Container State Flow:
    *
-   * [OPEN]-------->[CLOSING]------->[CLOSED]
-   *       (FINALIZE)         (CLOSE)   |
-   *                                    |
-   *                                    |
-   *                            (DELETE)|
-   *                                    |
-   *                                    |
-   *                                [DELETING] ----------> [DELETED]
-   *                                            (CLEANUP)
+   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]
+   *          (FINALIZE)      |      (QUASI_CLOSE)        |
+   *                          |                           |
+   *                          |                           |
+   *                  (CLOSE) |             (FORCE_CLOSE) |
+   *                          |                           |
+   *                          |                           |
+   *                          +--------->[CLOSED]<--------+
+   *                                        |
+   *                                (DELETE)|
+   *                                        |
+   *                                        |
+   *                                   [DELETING]
+   *                                        |
+   *                              (CLEANUP) |
+   *                                        |
+   *                                        V
+   *                                    [DELETED]
+   *
    */
   private void initializeStateMachine() {
     stateMachine.addTransition(LifeCycleState.OPEN,
         LifeCycleState.CLOSING,
         LifeCycleEvent.FINALIZE);
 
+    stateMachine.addTransition(LifeCycleState.CLOSING,
+        LifeCycleState.QUASI_CLOSED,
+        LifeCycleEvent.QUASI_CLOSE);
+
     stateMachine.addTransition(LifeCycleState.CLOSING,
         LifeCycleState.CLOSED,
         LifeCycleEvent.CLOSE);
 
+    stateMachine.addTransition(LifeCycleState.QUASI_CLOSED,
+        LifeCycleState.CLOSED,
+        LifeCycleEvent.FORCE_CLOSE);
+
     stateMachine.addTransition(LifeCycleState.CLOSED,
         LifeCycleState.DELETING,
         LifeCycleEvent.DELETE);

+ 3 - 21
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -66,26 +65,8 @@ public class IncrementalContainerReportHandler implements
             .getDatanodeDetails();
         final ContainerID containerID = ContainerID
             .valueof(replicaProto.getContainerID());
-        final ContainerInfo containerInfo = containerManager
-            .getContainer(containerID);
-
-        ContainerReplica replica = ContainerReplica.newBuilder()
-            .setContainerID(ContainerID.valueof(replicaProto.getContainerID()))
-            .setContainerState(replicaProto.getState())
-            .setDatanodeDetails(datanodeDetails)
-            .build();
-
-        containerManager.updateContainerReplica(containerID, replica);
-
-        // Check if the state of the container is changed.
-        if (replicaProto.getState() == ContainerReplicaProto.State.CLOSED &&
-            containerInfo.getState() == HddsProtos.LifeCycleState.CLOSING) {
-          containerManager.updateContainerState(containerID,
-              HddsProtos.LifeCycleEvent.CLOSE);
-        }
-
-        // TODO: Handler replica state change
-
+        ReportHandlerHelper.processContainerReplica(containerManager,
+            containerID, replicaProto, datanodeDetails, publisher, LOG);
       } catch (ContainerNotFoundException e) {
         LOG.warn("Container {} not found!", replicaProto.getContainerID());
       } catch (IOException e) {
@@ -95,4 +76,5 @@ public class IncrementalContainerReportHandler implements
     }
 
   }
+
 }

+ 368 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java

@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+
+/**
+ * Helper functions to handler container reports.
+ */
+public final class ReportHandlerHelper {
+
+  private ReportHandlerHelper() {}
+
+  /**
+   * Processes the container replica and updates the container state in SCM.
+   * If needed, sends command to datanode to update the replica state.
+   *
+   * @param containerManager ContainerManager instance
+   * @param containerId Id of the container
+   * @param replicaProto replica of the container
+   * @param datanodeDetails datanode where the replica resides
+   * @param publisher event publisher
+   * @param logger for logging
+   * @throws IOException
+   */
+  static void processContainerReplica(final ContainerManager containerManager,
+      final ContainerID containerId, final ContainerReplicaProto replicaProto,
+      final DatanodeDetails datanodeDetails, final EventPublisher publisher,
+      final Logger logger) throws IOException {
+
+    final ContainerReplica replica = ContainerReplica.newBuilder()
+        .setContainerID(containerId)
+        .setContainerState(replicaProto.getState())
+        .setDatanodeDetails(datanodeDetails)
+        .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
+        .setSequenceId(replicaProto.getBlockCommitSequenceId())
+        .build();
+
+    // This is an in-memory update.
+    containerManager.updateContainerReplica(containerId, replica);
+    ReportHandlerHelper.reconcileContainerState(containerManager,
+        containerId, publisher, logger);
+
+    final ContainerInfo containerInfo = containerManager
+        .getContainer(containerId);
+    if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
+      containerInfo.setUsedBytes(replicaProto.getUsed());
+    }
+
+    if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
+      containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
+    }
+
+    // Now we have reconciled the container state. If the container state and
+    // the replica state doesn't match, then take appropriate action.
+    ReportHandlerHelper.sendReplicaCommands(
+        datanodeDetails, containerInfo, replica, publisher, logger);
+  }
+
+
+  /**
+   * Reconcile the container state based on the ContainerReplica states.
+   * ContainerState is updated after the reconciliation.
+   *
+   * @param manager ContainerManager
+   * @param containerId container id
+   * @throws ContainerNotFoundException
+   */
+  private static void reconcileContainerState(final ContainerManager manager,
+      final ContainerID containerId, final EventPublisher publisher,
+      final Logger logger) throws IOException {
+    // TODO: handle unhealthy replica.
+    synchronized (manager.getContainer(containerId)) {
+      final ContainerInfo container = manager.getContainer(containerId);
+      final Set<ContainerReplica> replicas = manager.getContainerReplicas(
+          containerId);
+      final LifeCycleState containerState = container.getState();
+      switch (containerState) {
+      case OPEN:
+        /*
+         * If the container state is OPEN.
+         * None of the replica should be in any other state.
+         *
+         */
+        List<ContainerReplica> invalidReplicas = replicas.stream()
+            .filter(replica -> replica.getState() != State.OPEN)
+            .collect(Collectors.toList());
+        if (!invalidReplicas.isEmpty()) {
+          logger.warn("Container {} has invalid replica state." +
+              "Invalid Replicas: {}", containerId, invalidReplicas);
+        }
+        // A container cannot be over replicated when in OPEN state.
+        break;
+      case CLOSING:
+        /*
+         * SCM has asked DataNodes to close the container. Now the replicas
+         * can be in any of the following states.
+         *
+         * 1. OPEN
+         * 2. CLOSING
+         * 3. QUASI_CLOSED
+         * 4. CLOSED
+         *
+         * If all the replica are either in OPEN or CLOSING state, do nothing.
+         *
+         * If any one of the replica is in QUASI_CLOSED state, move the
+         * container to QUASI_CLOSED state.
+         *
+         * If any one of the replica is in CLOSED state, mark the container as
+         * CLOSED. The close has happened via Ratis.
+         *
+         */
+        Optional<ContainerReplica> closedReplica = replicas.stream()
+            .filter(replica -> replica.getState() == State.CLOSED)
+            .findFirst();
+        if (closedReplica.isPresent()) {
+          container.updateSequenceId(closedReplica.get().getSequenceId());
+          manager.updateContainerState(
+              containerId, HddsProtos.LifeCycleEvent.CLOSE);
+
+          // TODO: remove container from OPEN pipeline, since the container is
+          // closed we can go ahead and remove it from Ratis pipeline.
+        } else if (replicas.stream()
+            .anyMatch(replica -> replica.getState() == State.QUASI_CLOSED)) {
+          manager.updateContainerState(
+              containerId, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
+        }
+        break;
+      case QUASI_CLOSED:
+        /*
+         * The container is in QUASI_CLOSED state, this means that at least
+         * one of the replica is in QUASI_CLOSED/CLOSED state.
+         * Other replicas can be in any of the following state.
+         *
+         * 1. OPEN
+         * 2. CLOSING
+         * 3. QUASI_CLOSED
+         * 4. CLOSED
+         *
+         * If <50% of container replicas are in QUASI_CLOSED state and all
+         * the other replica are either in OPEN or CLOSING state, do nothing.
+         * We cannot identify the correct replica since we don't have quorum
+         * yet.
+         *
+         * If >50% (quorum) of replicas are in QUASI_CLOSED state and other
+         * replicas are either in OPEN or CLOSING state, try to identify
+         * the latest container replica using originNodeId and sequenceId.
+         * Force close those replica(s) which have the latest sequenceId.
+         *
+         * If at least one of the replica is in CLOSED state, mark the
+         * container as CLOSED. Force close the replicas which matches the
+         * sequenceId of the CLOSED replica.
+         *
+         */
+        if (replicas.stream()
+            .anyMatch(replica -> replica.getState() == State.CLOSED)) {
+          manager.updateContainerState(
+              containerId, HddsProtos.LifeCycleEvent.FORCE_CLOSE);
+          // TODO: remove container from OPEN pipeline, since the container is
+          // closed we can go ahead and remove it from Ratis pipeline.
+        } else {
+          final int replicationFactor = container
+              .getReplicationFactor().getNumber();
+          final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
+              .filter(replica -> replica.getState() == State.QUASI_CLOSED)
+              .collect(Collectors.toList());
+          final long uniqueQuasiClosedReplicaCount = quasiClosedReplicas
+              .stream()
+              .map(ContainerReplica::getOriginDatanodeId)
+              .distinct()
+              .count();
+
+          float quasiClosePercent = ((float) uniqueQuasiClosedReplicaCount) /
+              ((float) replicationFactor);
+
+          if (quasiClosePercent > 0.5F) {
+            // Quorum of unique replica has been QUASI_CLOSED
+            long sequenceId = forceCloseContainerReplicaWithHighestSequenceId(
+                container, quasiClosedReplicas, publisher);
+            if (sequenceId != -1L) {
+              container.updateSequenceId(sequenceId);
+            }
+          }
+        }
+        break;
+      case CLOSED:
+        /*
+         * The container is already in closed state. do nothing.
+         */
+        break;
+      case DELETING:
+        // Not handled.
+        throw new UnsupportedOperationException("Unsupported container state" +
+            " 'DELETING'.");
+      case DELETED:
+        // Not handled.
+        throw new UnsupportedOperationException("Unsupported container state" +
+            " 'DELETED'.");
+      default:
+        break;
+      }
+    }
+  }
+
+  /**
+   * Compares the QUASI_CLOSED replicas of a container and sends close command.
+   *
+   * @param quasiClosedReplicas list of quasi closed replicas
+   * @return the sequenceId of the closed replica.
+   */
+  private static long forceCloseContainerReplicaWithHighestSequenceId(
+      final ContainerInfo container,
+      final List<ContainerReplica> quasiClosedReplicas,
+      final EventPublisher publisher) {
+
+    final long highestSequenceId = quasiClosedReplicas.stream()
+        .map(ContainerReplica::getSequenceId)
+        .max(Long::compare)
+        .orElse(-1L);
+
+    if (highestSequenceId != -1L) {
+      quasiClosedReplicas.stream()
+          .filter(replica -> replica.getSequenceId() == highestSequenceId)
+          .forEach(replica -> {
+            CloseContainerCommand closeContainerCommand =
+                new CloseContainerCommand(container.getContainerID(),
+                    container.getPipelineID(), true);
+            publisher.fireEvent(DATANODE_COMMAND,
+                new CommandForDatanode<>(
+                    replica.getDatanodeDetails().getUuid(),
+                    closeContainerCommand));
+          });
+    }
+    return highestSequenceId;
+  }
+
+  /**
+   * Based on the container and replica state, send command to datanode if
+   * required.
+   *
+   * @param datanodeDetails datanode where the replica resides
+   * @param containerInfo container information
+   * @param replica replica information
+   * @param publisher queue to publish the datanode command event
+   * @param log for logging
+   */
+  static void sendReplicaCommands(
+      final DatanodeDetails datanodeDetails,
+      final ContainerInfo containerInfo,
+      final ContainerReplica replica,
+      final EventPublisher publisher,
+      final Logger log) {
+    final HddsProtos.LifeCycleState containerState = containerInfo.getState();
+    final ContainerReplicaProto.State replicaState = replica.getState();
+
+    if(!ReportHandlerHelper.compareState(containerState, replicaState)) {
+      if (containerState == HddsProtos.LifeCycleState.OPEN) {
+        // When a container state in SCM is OPEN, there is no way a datanode
+        // can quasi close/close the container.
+        log.warn("Invalid container replica state for container {}" +
+                " from datanode {}. Expected state is OPEN.",
+            containerInfo.containerID(), datanodeDetails);
+        // The replica can go CORRUPT, we have to handle it.
+      }
+      if (containerState == HddsProtos.LifeCycleState.CLOSING ||
+          containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+        // Resend close container event for this datanode if the container
+        // replica state is OPEN/CLOSING.
+        if (replicaState == ContainerReplicaProto.State.OPEN ||
+            replicaState == ContainerReplicaProto.State.CLOSING) {
+          CloseContainerCommand closeContainerCommand =
+              new CloseContainerCommand(containerInfo.getContainerID(),
+                  containerInfo.getPipelineID());
+          publisher.fireEvent(DATANODE_COMMAND,
+              new CommandForDatanode<>(
+                  replica.getDatanodeDetails().getUuid(),
+                  closeContainerCommand));
+        }
+      }
+      if (containerState == HddsProtos.LifeCycleState.CLOSED) {
+        if (replicaState == ContainerReplicaProto.State.OPEN ||
+            replicaState == ContainerReplicaProto.State.CLOSING ||
+            replicaState == ContainerReplicaProto.State.QUASI_CLOSED) {
+          // Send force close container event for this datanode if the container
+          // replica state is OPEN/CLOSING/QUASI_CLOSED.
+
+          // Close command will be send only if this replica matches the
+          // sequence of the container.
+          if (containerInfo.getSequenceId() ==
+              replica.getSequenceId()) {
+            CloseContainerCommand closeContainerCommand =
+                new CloseContainerCommand(containerInfo.getContainerID(),
+                    containerInfo.getPipelineID(), true);
+            publisher.fireEvent(DATANODE_COMMAND,
+                new CommandForDatanode<>(
+                    replica.getDatanodeDetails().getUuid(),
+                    closeContainerCommand));
+          }
+          // TODO: delete the replica if the BCSID doesn't match.
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Compares the container and replica state.
+   *
+   * @param containerState container state
+   * @param replicaState replica state
+   * @return true if the states are same, else false
+   */
+  private static boolean compareState(final LifeCycleState containerState,
+                              final State replicaState) {
+    // TODO: handle unhealthy replica.
+    switch (containerState) {
+    case OPEN:
+      return replicaState == State.OPEN;
+    case CLOSING:
+      return replicaState == State.CLOSING;
+    case QUASI_CLOSED:
+      return replicaState == State.QUASI_CLOSED;
+    case CLOSED:
+      return replicaState == State.CLOSED;
+    case DELETING:
+      return false;
+    case DELETED:
+      return false;
+    default:
+      return false;
+    }
+  }
+
+}

+ 1 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

@@ -94,6 +94,7 @@ public class MockNodeManager implements NodeManager {
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dd = TestUtils.randomDatanodeDetails();
+        register(dd, null, null);
         populateNodeMetric(dd, x);
       }
     }

+ 609 - 163
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java

@@ -16,208 +16,654 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.replication
     .ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .ContainerReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.getReplicas;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.getContainer;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.addContainerToContainerManager;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.mockUpdateContainerReplica;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.mockUpdateContainerState;
 
 /**
  * Test the behaviour of the ContainerReportHandler.
  */
-public class TestContainerReportHandler implements EventPublisher {
+public class TestContainerReportHandler {
 
-  private List<Object> publishedEvents = new ArrayList<>();
-  private final NodeManager nodeManager = new MockNodeManager(true, 15);
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestContainerReportHandler.class);
+  @Test
+  public void testUnderReplicatedContainer()
+      throws NodeNotFoundException, ContainerNotFoundException,
+      ContainerReplicaNotFoundException {
+
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+    replicationActivityStatus.enableReplication();
 
-  @Before
-  public void resetEventCollector() {
-    publishedEvents.clear();
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
+    final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree, containerIDSet);
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+
+    Mockito.doAnswer((Answer<Void>) invocation -> {
+      Object[] args = invocation.getArguments();
+      if (args[0].equals(containerOne.containerID())) {
+        ContainerReplica replica = (ContainerReplica) args[1];
+        containerOneReplicas.remove(replica);
+      }
+      return null;
+    }).when(containerManager).removeContainerReplica(
+        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+
+
+    Mockito.when(
+        containerManager.getContainerReplicas(containerOne.containerID()))
+        .thenReturn(containerOneReplicas);
+    Mockito.when(
+        containerManager.getContainerReplicas(containerTwo.containerID()))
+        .thenReturn(containerTwoReplicas);
+
+    // SCM expects both containerOne and containerTwo to be in all the three
+    // datanodes datanodeOne, datanodeTwo and datanodeThree
+
+    // Now datanodeOne is sending container report in which containerOne is
+    // missing.
+
+    // containerOne becomes under replicated.
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeOne, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+
+    // Now we should get a replication request for containerOne
+    Mockito.verify(publisher, Mockito.times(1))
+        .fireEvent(Mockito.any(), Mockito.any());
+
+    // TODO: verify whether are actually getting a replication request event
+    // for containerOne
   }
 
-  //TODO: Rewrite it
-  @Ignore
   @Test
-  public void test() throws IOException, NodeNotFoundException {
-    String testDir = GenericTestUtils.getTempPath(
-        this.getClass().getSimpleName());
-    //GIVEN
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir);
-    EventQueue eventQueue = new EventQueue();
-    PipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
-    SCMContainerManager containerManager = new SCMContainerManager(
-        conf, nodeManager, pipelineManager, eventQueue);
-
-    ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus();
-
-    ContainerReportHandler reportHandler =
-        new ContainerReportHandler(nodeManager, pipelineManager,
-            containerManager, replicationActivityStatus);
-
-    DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
-    DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
-    DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
-    DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
-    nodeManager.setContainers(dn1, new HashSet<>());
-    nodeManager.setContainers(dn2, new HashSet<>());
-    nodeManager.setContainers(dn3, new HashSet<>());
-    nodeManager.setContainers(dn4, new HashSet<>());
-
-    ContainerInfo cont1 = containerManager
-        .allocateContainer(ReplicationType.STAND_ALONE,
-            ReplicationFactor.THREE, "root");
-    ContainerInfo cont2 = containerManager
-        .allocateContainer(ReplicationType.STAND_ALONE,
-            ReplicationFactor.THREE, "root");
-    // Open Container
-    ContainerInfo cont3 = containerManager
-        .allocateContainer(ReplicationType.STAND_ALONE,
-            ReplicationFactor.THREE, "root");
-
-    long c1 = cont1.getContainerID();
-    long c2 = cont2.getContainerID();
-    long c3 = cont3.getContainerID();
-
-    // Close remaining containers
-    TestUtils.closeContainer(containerManager, cont1.containerID());
-    TestUtils.closeContainer(containerManager, cont2.containerID());
-
-    //when
-
-    //initial reports before replication is enabled. 2 containers w 3 replicas.
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn1,
-            createContainerReport(new long[] {c1, c2, c3})), this);
-
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn2,
-            createContainerReport(new long[] {c1, c2, c3})), this);
-
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn3,
-            createContainerReport(new long[] {c1, c2})), this);
-
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn4,
-            createContainerReport(new long[] {})), this);
-
-    Assert.assertEquals(0, publishedEvents.size());
+  public void testOverReplicatedContainer() throws NodeNotFoundException,
+      ContainerNotFoundException {
 
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
     replicationActivityStatus.enableReplication();
 
-    //no problem here
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn1,
-            createContainerReport(new long[] {c1, c2})), this);
-
-    Assert.assertEquals(0, publishedEvents.size());
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final DatanodeDetails datanodeFour = nodeIterator.next();
+    final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
+    final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree, containerIDSet);
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+
+    Mockito.doAnswer((Answer<Void>) invocation -> {
+      Object[] args = invocation.getArguments();
+      if (args[0].equals(containerOne.containerID())) {
+        containerOneReplicas.add((ContainerReplica) args[1]);
+      }
+      return null;
+    }).when(containerManager).updateContainerReplica(
+        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+
+
+
+    // SCM expects both containerOne and containerTwo to be in all the three
+    // datanodes datanodeOne, datanodeTwo and datanodeThree
+
+    // Now datanodeFour is sending container report which has containerOne.
+
+    // containerOne becomes over replicated.
+
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeFour, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+    Mockito.verify(publisher, Mockito.times(1))
+        .fireEvent(Mockito.any(), Mockito.any());
+
+    // TODO: verify whether are actually getting a replication request event
+    // for containerOne
+  }
 
-    //container is missing from d2
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn2,
-            createContainerReport(new long[] {c1})), this);
+  @Test
+  public void testOpenToClosing()
+      throws NodeNotFoundException, ContainerNotFoundException {
+    /*
+     * The container is in CLOSING state and all the replicas are either in
+     * OPEN or CLOSING state.
+     *
+     * The datanode reports that the replica is still in OPEN state.
+     *
+     * In this case SCM should trigger close container event to the datanode.
+     */
+
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+    replicationActivityStatus.enableReplication();
 
-    Assert.assertEquals(1, publishedEvents.size());
-    ReplicationRequest replicationRequest =
-        (ReplicationRequest) publishedEvents.get(0);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
+    final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(), ContainerReplicaProto.State.OPEN,
+        datanodeOne);
+
+    containerOneReplicas.addAll(getReplicas(
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSING,
+        datanodeTwo, datanodeThree));
+
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree, containerIDSet);
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+    mockUpdateContainerReplica(
+        containerManager, containerOne, containerOneReplicas);
+
+    // Replica in datanodeOne of containerOne is in OPEN state.
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.OPEN,
+        datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeOne, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+
+    // Now we should get close container event for containerOne on datanodeOne
+    Mockito.verify(publisher, Mockito.times(1))
+        .fireEvent(Mockito.any(), Mockito.any());
+
+    // TODO: verify whether are actually getting a close container
+    // datanode command for containerOne/datanodeOne
+
+    /*
+     * The container is in CLOSING state and all the replicas are either in
+     * OPEN or CLOSING state.
+     *
+     * The datanode reports that the replica is in CLOSING state.
+     *
+     * In this case SCM should trigger close container event to the datanode.
+     */
+
+    // Replica in datanodeOne of containerOne is in OPEN state.
+    final ContainerReportsProto containerReportTwo = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.OPEN,
+        datanodeOne.getUuidString());
+    final ContainerReportFromDatanode containerReportTwoFromDatanode =
+        new ContainerReportFromDatanode(datanodeOne, containerReportTwo);
+    reportHandler.onMessage(containerReportTwoFromDatanode, publisher);
+
+    // Now we should get close container event for containerOne on datanodeOne
+    Mockito.verify(publisher, Mockito.times(2))
+        .fireEvent(Mockito.any(), Mockito.any());
+
+    // TODO: verify whether are actually getting a close container
+    // datanode command for containerOne/datanodeOne
+  }
 
-    Assert.assertEquals(c2, replicationRequest.getContainerId());
-    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
-    Assert.assertEquals(2, replicationRequest.getReplicationCount());
+  @Test
+  public void testClosingToClosed() throws NodeNotFoundException, IOException {
+    /*
+     * The container is in CLOSING state and all the replicas are in
+     * OPEN/CLOSING state.
+     *
+     * The datanode reports that one of the replica is now CLOSED.
+     *
+     * In this case SCM should mark the container as CLOSED.
+     */
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+    replicationActivityStatus.enableReplication();
 
-    //container was replicated to dn4
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn4,
-            createContainerReport(new long[] {c2})), this);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
+    final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeOne);
+
+    containerOneReplicas.addAll(getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.OPEN,
+        datanodeTwo, datanodeThree));
+
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree, containerIDSet);
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+    mockUpdateContainerReplica(
+        containerManager, containerOne, containerOneReplicas);
+    mockUpdateContainerState(containerManager, containerOne,
+        LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
+
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeOne, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+
+    Assert.assertEquals(
+        LifeCycleState.CLOSED, containerOne.getState());
+  }
 
-    //no more event, everything is perfect
-    Assert.assertEquals(1, publishedEvents.size());
+  @Test
+  public void testClosingToQuasiClosed()
+      throws NodeNotFoundException, IOException {
+    /*
+     * The container is in CLOSING state and all the replicas are in
+     * OPEN/CLOSING state.
+     *
+     * The datanode reports that the replica is now QUASI_CLOSED.
+     *
+     * In this case SCM should move the container to QUASI_CLOSED.
+     */
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+    replicationActivityStatus.enableReplication();
 
-    //c2 was found at dn2 (it was missing before, magic)
-    reportHandler.onMessage(
-        new ContainerReportFromDatanode(dn2,
-            createContainerReport(new long[] {c1, c2})), this);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final ContainerInfo containerOne =
+        getContainer(LifeCycleState.CLOSING);
+    final ContainerInfo containerTwo =
+        getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeOne, datanodeTwo);
+    containerOneReplicas.addAll(getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.OPEN,
+        datanodeThree));
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree, containerIDSet);
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+    mockUpdateContainerReplica(
+        containerManager, containerOne, containerOneReplicas);
+    mockUpdateContainerState(containerManager, containerOne,
+        LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
+
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
+        datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeOne, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+
+    Assert.assertEquals(
+        LifeCycleState.QUASI_CLOSED, containerOne.getState());
+  }
 
-    //c2 is over replicated (dn1,dn2,dn3,dn4)
-    Assert.assertEquals(2, publishedEvents.size());
+  @Test
+  public void testQuasiClosedWithDifferentOriginNodeReplica()
+      throws NodeNotFoundException, IOException {
+    /*
+     * The container is in QUASI_CLOSED state.
+     *  - One of the replica is in QUASI_CLOSED state
+     *  - The other two replica are in OPEN/CLOSING state
+     *
+     * The datanode reports the second replica is now QUASI_CLOSED.
+     *
+     * In this case SCM should CLOSE the container with highest BCSID and
+     * send force close command to the datanode.
+     */
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+    replicationActivityStatus.enableReplication();
 
-    replicationRequest =
-        (ReplicationRequest) publishedEvents.get(1);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final ContainerInfo containerOne =
+        getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerInfo containerTwo =
+        getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.QUASI_CLOSED,
+        10000L,
+        datanodeOne);
+    containerOneReplicas.addAll(getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeTwo, datanodeThree));
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree, containerIDSet);
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+    mockUpdateContainerReplica(
+        containerManager, containerOne, containerOneReplicas);
+    mockUpdateContainerState(containerManager, containerOne,
+        LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
+
+    // Container replica with datanodeOne as originNodeId is already
+    // QUASI_CLOSED. Now we will tell SCM that container replica from
+    // datanodeTwo is also QUASI_CLOSED, but has higher sequenceId.
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
+        datanodeTwo.getUuidString(), 999999L);
+
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeTwo, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+
+    // Now we should get force close container event for containerOne on
+    // datanodeTwo
+    Mockito.verify(publisher, Mockito.times(1))
+        .fireEvent(Mockito.any(), Mockito.any());
+    // TODO: verify whether are actually getting a force close container
+    // datanode command for containerOne/datanodeTwo
+
+    // The sequence id of the container should have been updated.
+    Assert.assertEquals(999999L, containerOne.getSequenceId());
+
+    // Now datanodeTwo should close containerOne.
+    final ContainerReportsProto containerReportTwo = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
+        datanodeTwo.getUuidString(), 999999L);
+    final ContainerReportFromDatanode containerReportFromDatanodeTwo =
+        new ContainerReportFromDatanode(datanodeTwo, containerReportTwo);
+    reportHandler.onMessage(containerReportFromDatanodeTwo, publisher);
+
+    // The container should be closed in SCM now.
+    Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
+  }
 
-    Assert.assertEquals(c2, replicationRequest.getContainerId());
-    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
-    Assert.assertEquals(4, replicationRequest.getReplicationCount());
+  @Test
+  public void testQuasiClosedWithSameOriginNodeReplica()
+      throws NodeNotFoundException, IOException {
+    /*
+     * The container is in QUASI_CLOSED state.
+     *  - One of the replica is in QUASI_CLOSED state
+     *  - The other two replica are in OPEN/CLOSING state
+     *
+     * The datanode reports a QUASI_CLOSED replica which has the same
+     * origin node id as the existing QUASI_CLOSED replica.
+     *
+     * In this case SCM should not CLOSE the container.
+     */
+    final NodeManager nodeManager = new MockNodeManager(true, 10);
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+    replicationActivityStatus.enableReplication();
 
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, pipelineManager, containerManager,
+        replicationActivityStatus);
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeState.HEALTHY).iterator();
+    final DatanodeDetails datanodeOne = nodeIterator.next();
+    final DatanodeDetails datanodeTwo = nodeIterator.next();
+    final DatanodeDetails datanodeThree = nodeIterator.next();
+    final ContainerInfo containerOne =
+        getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerInfo containerTwo =
+        getContainer(LifeCycleState.CLOSED);
+    final Set<ContainerID> containerIDSet = Stream.of(
+        containerOne.containerID(), containerTwo.containerID())
+        .collect(Collectors.toSet());
+    final Set<ContainerReplica> containerOneReplicas = getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.QUASI_CLOSED,
+        datanodeOne);
+    containerOneReplicas.addAll(getReplicas(
+        containerOne.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeTwo));
+    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
+        containerTwo.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    nodeManager.setContainers(datanodeOne, containerIDSet);
+    nodeManager.setContainers(datanodeTwo, containerIDSet);
+    nodeManager.setContainers(datanodeThree,
+        Collections.singleton(containerTwo.containerID()));
+
+    addContainerToContainerManager(
+        containerManager, containerOne, containerOneReplicas);
+    addContainerToContainerManager(
+        containerManager, containerTwo, containerTwoReplicas);
+
+    mockUpdateContainerReplica(
+        containerManager, containerOne, containerOneReplicas);
+    mockUpdateContainerState(containerManager, containerOne,
+        LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
+
+    // containerOne is QUASI_CLOSED in datanodeOne and CLOSING in datanodeTwo.
+    // Now datanodeThree is sending container report which says that it has
+    // containerOne replica, but the originNodeId of this replica is
+    // datanodeOne. In this case we should not force close the container even
+    // though we got two QUASI_CLOSED replicas.
+    final ContainerReportsProto containerReport = getContainerReportsProto(
+        containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
+        datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final ContainerReportFromDatanode containerReportFromDatanode =
+        new ContainerReportFromDatanode(datanodeThree, containerReport);
+    reportHandler.onMessage(containerReportFromDatanode, publisher);
+
+    Mockito.verify(publisher, Mockito.times(0))
+        .fireEvent(Mockito.any(), Mockito.any());
   }
 
-  private ContainerReportsProto createContainerReport(long[] containerIds) {
+  private static ContainerReportsProto getContainerReportsProto(
+      final ContainerID containerId, final ContainerReplicaProto.State state,
+      final String originNodeId) {
+    return getContainerReportsProto(containerId, state, originNodeId, 100L);
+  }
 
-    ContainerReportsProto.Builder crBuilder =
+  private static ContainerReportsProto getContainerReportsProto(
+      final ContainerID containerId, final ContainerReplicaProto.State state,
+      final String originNodeId, final long bcsid) {
+    final ContainerReportsProto.Builder crBuilder =
         ContainerReportsProto.newBuilder();
-
-    for (long containerId : containerIds) {
-      org.apache.hadoop.hdds.protocol.proto
-          .StorageContainerDatanodeProtocolProtos
-          .ContainerReplicaProto.Builder
-          ciBuilder = org.apache.hadoop.hdds.protocol.proto
-          .StorageContainerDatanodeProtocolProtos
-          .ContainerReplicaProto.newBuilder();
-      ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
-          .setSize(5368709120L)
-          .setUsed(2000000000L)
-          .setKeyCount(100000000L)
-          .setReadCount(100000000L)
-          .setWriteCount(100000000L)
-          .setReadBytes(2000000000L)
-          .setWriteBytes(2000000000L)
-          .setContainerID(containerId)
-          .setDeleteTransactionId(0);
-
-      crBuilder.addReports(ciBuilder.build());
-    }
-
-    return crBuilder.build();
+    final ContainerReplicaProto replicaProto =
+        ContainerReplicaProto.newBuilder()
+            .setContainerID(containerId.getId())
+            .setState(state)
+            .setOriginNodeId(originNodeId)
+            .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
+            .setSize(5368709120L)
+            .setUsed(2000000000L)
+            .setKeyCount(100000000L)
+            .setReadCount(100000000L)
+            .setWriteCount(100000000L)
+            .setReadBytes(2000000000L)
+            .setWriteBytes(2000000000L)
+            .setBlockCommitSequenceId(bcsid)
+            .setDeleteTransactionId(0)
+            .build();
+    return crBuilder.addReports(replicaProto).build();
   }
 
-  @Override
-  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
-      EVENT_TYPE event, PAYLOAD payload) {
-    LOG.info("Event is published: {}", payload);
-    publishedEvents.add(payload);
-  }
 }

+ 113 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Helper methods for testing ContainerReportHandler and
+ * IncrementalContainerReportHandler.
+ */
+public final class TestContainerReportHelper {
+
+  private TestContainerReportHelper() {}
+
+  static void addContainerToContainerManager(
+      final ContainerManager containerManager, final ContainerInfo container,
+      final Set<ContainerReplica> replicas) throws ContainerNotFoundException {
+    Mockito.when(containerManager.getContainer(container.containerID()))
+        .thenReturn(container);
+    Mockito.when(
+        containerManager.getContainerReplicas(container.containerID()))
+        .thenReturn(replicas);
+  }
+
+  static void mockUpdateContainerReplica(
+      final ContainerManager containerManager,
+      final ContainerInfo containerInfo, final Set<ContainerReplica> replicas)
+      throws ContainerNotFoundException {
+    Mockito.doAnswer((Answer<Void>) invocation -> {
+      Object[] args = invocation.getArguments();
+      if (args[0].equals(containerInfo.containerID())) {
+        ContainerReplica replica = (ContainerReplica) args[1];
+        replicas.remove(replica);
+        replicas.add(replica);
+      }
+      return null;
+    }).when(containerManager).updateContainerReplica(
+        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+  }
+
+  static void mockUpdateContainerState(
+      final ContainerManager containerManager,
+      final ContainerInfo containerInfo,
+      final LifeCycleEvent event, final LifeCycleState state)
+      throws IOException {
+    Mockito.doAnswer((Answer<LifeCycleState>) invocation -> {
+      containerInfo.setState(state);
+      return containerInfo.getState();
+    }).when(containerManager).updateContainerState(
+        containerInfo.containerID(), event);
+  }
+
+  public static ContainerInfo getContainer(final LifeCycleState state) {
+    return new ContainerInfo.Builder()
+        .setContainerID(RandomUtils.nextLong())
+        .setReplicationType(ReplicationType.RATIS)
+        .setReplicationFactor(ReplicationFactor.THREE)
+        .setState(state)
+        .build();
+  }
+
+  static Set<ContainerReplica> getReplicas(
+      final ContainerID containerId,
+      final ContainerReplicaProto.State state,
+      final DatanodeDetails... datanodeDetails) {
+    return getReplicas(containerId, state, 10000L, datanodeDetails);
+  }
+
+  static Set<ContainerReplica> getReplicas(
+      final ContainerID containerId,
+      final ContainerReplicaProto.State state,
+      final long sequenceId,
+      final DatanodeDetails... datanodeDetails) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (DatanodeDetails datanode : datanodeDetails) {
+      replicas.add(ContainerReplica.newBuilder()
+          .setContainerID(containerId)
+          .setContainerState(state)
+          .setDatanodeDetails(datanode)
+          .setOriginNodeId(datanode.getUuid())
+          .setSequenceId(sequenceId)
+          .build());
+    }
+    return replicas;
+  }
+}

+ 218 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.addContainerToContainerManager;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.getContainer;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.getReplicas;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.mockUpdateContainerReplica;
+import static org.apache.hadoop.hdds.scm.container
+    .TestContainerReportHelper.mockUpdateContainerState;
+
+/**
+ * Test cases to verify the functionality of IncrementalContainerReportHandler.
+ */
+public class TestIncrementalContainerReportHandler {
+
+  @Test
+  public void testClosingToClosed() throws IOException {
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final IncrementalContainerReportHandler reportHandler =
+        new IncrementalContainerReportHandler(
+            pipelineManager, containerManager);
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
+    final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
+    final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
+    final DatanodeDetails datanodeThree = TestUtils.randomDatanodeDetails();
+    final Set<ContainerReplica> containerReplicas = getReplicas(
+        container.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    addContainerToContainerManager(
+        containerManager, container, containerReplicas);
+    mockUpdateContainerReplica(
+        containerManager, container, containerReplicas);
+    mockUpdateContainerState(containerManager, container,
+        LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
+
+    final IncrementalContainerReportProto containerReport =
+        getIncrementalContainerReportProto(container.containerID(),
+            ContainerReplicaProto.State.CLOSED,
+            datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final IncrementalContainerReportFromDatanode icrFromDatanode =
+        new IncrementalContainerReportFromDatanode(
+            datanodeOne, containerReport);
+    reportHandler.onMessage(icrFromDatanode, publisher);
+    Assert.assertEquals(
+        LifeCycleState.CLOSED, container.getState());
+  }
+
+  @Test
+  public void testClosingToQuasiClosed() throws IOException {
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final IncrementalContainerReportHandler reportHandler =
+        new IncrementalContainerReportHandler(
+            pipelineManager, containerManager);
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
+    final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
+    final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
+    final DatanodeDetails datanodeThree = TestUtils.randomDatanodeDetails();
+    final Set<ContainerReplica> containerReplicas = getReplicas(
+        container.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeOne, datanodeTwo, datanodeThree);
+
+    addContainerToContainerManager(
+        containerManager, container, containerReplicas);
+    mockUpdateContainerReplica(
+        containerManager, container, containerReplicas);
+    mockUpdateContainerState(containerManager, container,
+        LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
+
+    final IncrementalContainerReportProto containerReport =
+        getIncrementalContainerReportProto(container.containerID(),
+            ContainerReplicaProto.State.QUASI_CLOSED,
+            datanodeOne.getUuidString());
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final IncrementalContainerReportFromDatanode icrFromDatanode =
+        new IncrementalContainerReportFromDatanode(
+            datanodeOne, containerReport);
+    reportHandler.onMessage(icrFromDatanode, publisher);
+    Assert.assertEquals(
+        LifeCycleState.QUASI_CLOSED, container.getState());
+  }
+
+  @Test
+  public void testQuasiClosedToClosed() throws IOException {
+    final ContainerManager containerManager = Mockito.mock(
+        ContainerManager.class);
+    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    final IncrementalContainerReportHandler reportHandler =
+        new IncrementalContainerReportHandler(
+            pipelineManager, containerManager);
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
+    final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
+    final DatanodeDetails datanodeThree = TestUtils.randomDatanodeDetails();
+    final Set<ContainerReplica> containerReplicas = getReplicas(
+        container.containerID(),
+        ContainerReplicaProto.State.CLOSING,
+        datanodeOne, datanodeTwo);
+    containerReplicas.addAll(getReplicas(
+        container.containerID(),
+        ContainerReplicaProto.State.QUASI_CLOSED,
+        datanodeThree));
+
+
+    addContainerToContainerManager(
+        containerManager, container, containerReplicas);
+    mockUpdateContainerReplica(
+        containerManager, container, containerReplicas);
+    mockUpdateContainerState(containerManager, container,
+        LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
+
+    final IncrementalContainerReportProto containerReport =
+        getIncrementalContainerReportProto(container.containerID(),
+            ContainerReplicaProto.State.QUASI_CLOSED,
+            datanodeOne.getUuidString(), 999999L);
+    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+    final IncrementalContainerReportFromDatanode icrFromDatanode =
+        new IncrementalContainerReportFromDatanode(
+            datanodeOne, containerReport);
+    reportHandler.onMessage(icrFromDatanode, publisher);
+
+    // SCM should issue force close.
+    Mockito.verify(publisher, Mockito.times(1))
+        .fireEvent(Mockito.any(), Mockito.any());
+
+    final IncrementalContainerReportProto containerReportTwo =
+        getIncrementalContainerReportProto(container.containerID(),
+            ContainerReplicaProto.State.CLOSED,
+            datanodeOne.getUuidString(), 999999L);
+    final IncrementalContainerReportFromDatanode icrTwoFromDatanode =
+        new IncrementalContainerReportFromDatanode(
+            datanodeOne, containerReportTwo);
+    reportHandler.onMessage(icrTwoFromDatanode, publisher);
+    Assert.assertEquals(
+        LifeCycleState.CLOSED, container.getState());
+  }
+
+  private static IncrementalContainerReportProto
+      getIncrementalContainerReportProto(
+          final ContainerID containerId,
+          final ContainerReplicaProto.State state,
+          final String originNodeId) {
+    return getIncrementalContainerReportProto(
+        containerId, state, originNodeId, 100L);
+  }
+
+  private static IncrementalContainerReportProto
+      getIncrementalContainerReportProto(
+          final ContainerID containerId,
+          final ContainerReplicaProto.State state,
+          final String originNodeId, final long bcsid) {
+    final IncrementalContainerReportProto.Builder crBuilder =
+        IncrementalContainerReportProto.newBuilder();
+    final ContainerReplicaProto replicaProto =
+        ContainerReplicaProto.newBuilder()
+            .setContainerID(containerId.getId())
+            .setState(state)
+            .setOriginNodeId(originNodeId)
+            .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
+            .setSize(5368709120L)
+            .setUsed(2000000000L)
+            .setKeyCount(100000000L)
+            .setReadCount(100000000L)
+            .setWriteCount(100000000L)
+            .setReadBytes(2000000000L)
+            .setWriteBytes(2000000000L)
+            .setBlockCommitSequenceId(bcsid)
+            .setDeleteTransactionId(0)
+            .build();
+    return crBuilder.addReport(replicaProto).build();
+  }
+}