Browse Source

HDDS-245. Handle ContainerReports in the SCM. Contributed by Elek Marton.

Xiaoyu Yao 7 years ago
parent
commit
f5dbbfe2e9

+ 6 - 1
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java

@@ -147,7 +147,12 @@ public class EventQueue implements EventPublisher, AutoCloseable {
 
         for (EventHandler handler : executorAndHandlers.getValue()) {
           queuedCount.incrementAndGet();
-
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Delivering event {} to executor/handler {}: {}",
+                event.getName(),
+                executorAndHandlers.getKey().getName(),
+                payload);
+          }
           executorAndHandlers.getKey()
               .onMessage(handler, payload, this);
 

+ 104 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java

@@ -18,30 +18,131 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Handles container reports from datanode.
  */
 public class ContainerReportHandler implements
     EventHandler<ContainerReportFromDatanode> {
 
-  private final Mapping containerMapping;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportHandler.class);
+
   private final Node2ContainerMap node2ContainerMap;
 
+  private final Mapping containerMapping;
+
+  private ContainerStateManager containerStateManager;
+
+  private ReplicationActivityStatus replicationStatus;
+
+
   public ContainerReportHandler(Mapping containerMapping,
-                                Node2ContainerMap node2ContainerMap) {
+      Node2ContainerMap node2ContainerMap,
+      ReplicationActivityStatus replicationActivityStatus) {
+    Preconditions.checkNotNull(containerMapping);
+    Preconditions.checkNotNull(node2ContainerMap);
+    Preconditions.checkNotNull(replicationActivityStatus);
     this.containerMapping = containerMapping;
     this.node2ContainerMap = node2ContainerMap;
+    this.containerStateManager = containerMapping.getStateManager();
+    this.replicationStatus = replicationActivityStatus;
   }
 
   @Override
   public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
                         EventPublisher publisher) {
-    // TODO: process container report.
+
+    DatanodeDetails datanodeOrigin =
+        containerReportFromDatanode.getDatanodeDetails();
+
+    ContainerReportsProto containerReport =
+        containerReportFromDatanode.getReport();
+    try {
+
+      //update state in container db and trigger close container events
+      containerMapping.processContainerReports(datanodeOrigin, containerReport);
+
+      Set<ContainerID> containerIds = containerReport.getReportsList().stream()
+          .map(containerProto -> containerProto.getContainerID())
+          .map(ContainerID::new)
+          .collect(Collectors.toSet());
+
+      ReportResult reportResult = node2ContainerMap
+          .processReport(datanodeOrigin.getUuid(), containerIds);
+
+      //we have the report, so we can update the states for the next iteration.
+      node2ContainerMap
+          .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
+
+      for (ContainerID containerID : reportResult.getMissingContainers()) {
+        containerStateManager
+            .removeContainerReplica(containerID, datanodeOrigin);
+        emitReplicationRequestEvent(containerID, publisher);
+      }
+
+      for (ContainerID containerID : reportResult.getNewContainers()) {
+        containerStateManager.addContainerReplica(containerID, datanodeOrigin);
+
+        emitReplicationRequestEvent(containerID, publisher);
+      }
+
+    } catch (IOException e) {
+      //TODO: stop all the replication?
+      LOG.error("Error on processing container report from datanode {}",
+          datanodeOrigin, e);
+    }
+
+  }
+
+  private void emitReplicationRequestEvent(ContainerID containerID,
+      EventPublisher publisher) throws SCMException {
+    ContainerInfo container = containerStateManager.getContainer(containerID);
+
+    if (container == null) {
+      //warning unknown container
+      LOG.warn(
+          "Container is missing from containerStateManager. Can't request "
+              + "replication. {}",
+          containerID);
+    }
+    if (replicationStatus.isReplicationEnabled()) {
+
+      int existingReplicas =
+          containerStateManager.getContainerReplicas(containerID).size();
+
+      int expectedReplicas = container.getReplicationFactor().getNumber();
+
+      if (existingReplicas != expectedReplicas) {
+
+        publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+            new ReplicationRequest(containerID.getId(), existingReplicas,
+                container.getReplicationFactor().getNumber()));
+      }
+    }
+
   }
 }

+ 86 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import javax.management.ObjectName;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.metrics2.util.MBeans;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event listener to track the current state of replication.
+ */
+public class ReplicationActivityStatus
+    implements EventHandler<Boolean>, ReplicationActivityStatusMXBean,
+    Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationActivityStatus.class);
+
+  private AtomicBoolean replicationEnabled = new AtomicBoolean();
+
+  private ObjectName jmxObjectName;
+
+  public boolean isReplicationEnabled() {
+    return replicationEnabled.get();
+  }
+
+  @VisibleForTesting
+  public void setReplicationEnabled(boolean enabled) {
+    replicationEnabled.set(enabled);
+  }
+
+  @VisibleForTesting
+  public void enableReplication() {
+    replicationEnabled.set(true);
+  }
+
+  /**
+   * The replication status could be set by async events.
+   */
+  @Override
+  public void onMessage(Boolean enabled, EventPublisher publisher) {
+    replicationEnabled.set(enabled);
+  }
+
+  public void start() {
+    try {
+      this.jmxObjectName =
+          MBeans.register(
+              "StorageContainerManager", "ReplicationActivityStatus", this);
+    } catch (Exception ex) {
+      LOG.error("JMX bean for ReplicationActivityStatus can't be registered",
+          ex);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.jmxObjectName != null) {
+      MBeans.unregister(jmxObjectName);
+    }
+  }
+}

+ 28 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+/**
+ * JMX interface to monitor replication status.
+ */
+public interface ReplicationActivityStatusMXBean {
+
+  boolean isReplicationEnabled();
+
+  void setReplicationEnabled(boolean enabled);
+}

+ 22 - 6
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java

@@ -29,18 +29,24 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 public class ReplicationRequest implements Comparable<ReplicationRequest>,
     Serializable {
   private final long containerId;
-  private final short replicationCount;
-  private final short expecReplicationCount;
+  private final int replicationCount;
+  private final int expecReplicationCount;
   private final long timestamp;
 
-  public ReplicationRequest(long containerId, short replicationCount,
-      long timestamp, short expecReplicationCount) {
+  public ReplicationRequest(long containerId, int replicationCount,
+      long timestamp, int expecReplicationCount) {
     this.containerId = containerId;
     this.replicationCount = replicationCount;
     this.timestamp = timestamp;
     this.expecReplicationCount = expecReplicationCount;
   }
 
+  public ReplicationRequest(long containerId, int replicationCount,
+      int expecReplicationCount) {
+    this(containerId, replicationCount, System.currentTimeMillis(),
+        expecReplicationCount);
+  }
+
   /**
    * Compares this object with the specified object for order.  Returns a
    * negative integer, zero, or a positive integer as this object is less
@@ -93,7 +99,7 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>,
     return containerId;
   }
 
-  public short getReplicationCount() {
+  public int getReplicationCount() {
     return replicationCount;
   }
 
@@ -101,7 +107,17 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>,
     return timestamp;
   }
 
-  public short getExpecReplicationCount() {
+  public int getExpecReplicationCount() {
     return expecReplicationCount;
   }
+
+  @Override
+  public String toString() {
+    return "ReplicationRequest{" +
+        "containerId=" + containerId +
+        ", replicationCount=" + replicationCount +
+        ", expecReplicationCount=" + expecReplicationCount +
+        ", timestamp=" + timestamp +
+        '}';
+  }
 }

+ 9 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

@@ -173,6 +173,15 @@ public final class SCMEvents {
   public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
       new TypedEvent<>(ReplicationCompleted.class);
 
+  /**
+   * Signal for all the components (but especially for the replication
+   * manager and container report handler) that the replication could be
+   * started. Should be send only if (almost) all the container state are
+   * available from the datanodes.
+   */
+  public static final TypedEvent<Boolean> START_REPLICATION =
+      new TypedEvent<>(Boolean.class);
+
   /**
    * Private Ctor. Never Constructed.
    */

+ 7 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -68,7 +69,8 @@ public class Node2ContainerMap {
       throws SCMException {
     Preconditions.checkNotNull(containerIDs);
     Preconditions.checkNotNull(datanodeID);
-    if(dn2ContainerMap.putIfAbsent(datanodeID, containerIDs) != null) {
+    if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
+        != null) {
       throw new SCMException("Node already exists in the map",
                   DUPLICATE_DATANODE);
     }
@@ -82,11 +84,13 @@ public class Node2ContainerMap {
    * @throws SCMException - if we don't know about this datanode, for new DN
    *                      use insertNewDatanode.
    */
-  public void updateDatanodeMap(UUID datanodeID, Set<ContainerID> containers)
+  public void setContainersForDatanode(UUID datanodeID, Set<ContainerID> containers)
       throws SCMException {
     Preconditions.checkNotNull(datanodeID);
     Preconditions.checkNotNull(containers);
-    if(dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> v) == null){
+    if (dn2ContainerMap
+        .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
+        == null) {
       throw new SCMException("No such datanode", NO_SUCH_DATANODE);
     }
   }

+ 16 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java

@@ -21,10 +21,13 @@ package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 
+import java.util.Collections;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
+
 /**
- * A Container Report gets processsed by the Node2Container and returns the
+ * A Container Report gets processsed by the Node2Container and returns
  * Report Result class.
  */
 public class ReportResult {
@@ -36,6 +39,8 @@ public class ReportResult {
       Set<ContainerID> missingContainers,
       Set<ContainerID> newContainers) {
     this.status = status;
+    Preconditions.checkNotNull(missingContainers);
+    Preconditions.checkNotNull(newContainers);
     this.missingContainers = missingContainers;
     this.newContainers = newContainers;
   }
@@ -80,7 +85,16 @@ public class ReportResult {
     }
 
     ReportResult build() {
-      return new ReportResult(status, missingContainers, newContainers);
+
+      Set<ContainerID> nullSafeMissingContainers = this.missingContainers;
+      Set<ContainerID> nullSafeNewContainers = this.newContainers;
+      if (nullSafeNewContainers == null) {
+        nullSafeNewContainers = Collections.emptySet();
+      }
+      if (nullSafeMissingContainers == null) {
+        nullSafeMissingContainers = Collections.emptySet();
+      }
+      return new ReportResult(status, nullSafeMissingContainers, nullSafeNewContainers);
     }
   }
 }

+ 25 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -164,9 +166,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * Key = DatanodeUuid, value = ContainerStat.
    */
   private Cache<String, ContainerStat> containerReportCache;
+
   private final ReplicationManager replicationManager;
+
   private final LeaseManager<Long> commandWatcherLeaseManager;
 
+  private final ReplicationActivityStatus replicationStatus;
+
   /**
    * Creates a new StorageContainerManager. Configuration will be updated
    * with information on the
@@ -199,19 +205,26 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
 
+    replicationStatus = new ReplicationActivityStatus();
+
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(scmContainerManager);
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(scmNodeManager);
-    ContainerReportHandler containerReportHandler =
-        new ContainerReportHandler(scmContainerManager, node2ContainerMap);
+
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
+
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
     StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
 
+    ContainerReportHandler containerReportHandler =
+        new ContainerReportHandler(scmContainerManager, node2ContainerMap,
+            replicationStatus);
+
+
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
@@ -221,6 +234,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
+    eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -580,6 +594,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         "server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
     getDatanodeProtocolServer().start();
 
+    replicationStatus.start();
     httpServer.start();
     scmBlockManager.start();
     replicationManager.start();
@@ -591,6 +606,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    */
   public void stop() {
 
+    try {
+      LOG.info("Stopping Replication Activity Status tracker.");
+      replicationStatus.close();
+    } catch (Exception ex) {
+      LOG.error("Replication Activity Status tracker stop failed.", ex);
+    }
+
+
     try {
       LOG.info("Stopping Replication Manager Service.");
       replicationManager.stop();

+ 228 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java

@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+    .Builder;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyLong;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the behaviour of the ContainerReportHandler.
+ */
+public class TestContainerReportHandler implements EventPublisher {
+
+  private List<Object> publishedEvents = new ArrayList<>();
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerReportHandler.class);
+
+  @Before
+  public void resetEventCollector() {
+    publishedEvents.clear();
+  }
+
+  @Test
+  public void test() throws IOException {
+
+    //given
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
+    Mapping mapping = Mockito.mock(Mapping.class);
+
+    when(mapping.getContainer(anyLong()))
+        .thenAnswer(
+            (Answer<ContainerInfo>) invocation ->
+                new Builder()
+                    .setReplicationFactor(ReplicationFactor.THREE)
+                    .setContainerID((Long) invocation.getArguments()[0])
+                    .build()
+        );
+
+    ContainerStateManager containerStateManager =
+        new ContainerStateManager(conf, mapping);
+
+    when(mapping.getStateManager()).thenReturn(containerStateManager);
+
+    ReplicationActivityStatus replicationActivityStatus =
+        new ReplicationActivityStatus();
+
+    ContainerReportHandler reportHandler =
+        new ContainerReportHandler(mapping, node2ContainerMap,
+            replicationActivityStatus);
+
+    DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
+    node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>());
+    node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>());
+    node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>());
+    node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>());
+    PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
+
+    Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,
+        ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1");
+
+    when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE,
+        ReplicationFactor.THREE)).thenReturn(pipeline);
+
+    long c1 = containerStateManager
+        .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
+            ReplicationFactor.THREE, "root").getContainerInfo()
+        .getContainerID();
+
+    long c2 = containerStateManager
+        .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
+            ReplicationFactor.THREE, "root").getContainerInfo()
+        .getContainerID();
+
+    //when
+
+    //initial reports before replication is enabled. 2 containers w 3 replicas.
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn1,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn2,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn3,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn4,
+            createContainerReport(new long[] {})), this);
+
+    Assert.assertEquals(0, publishedEvents.size());
+
+    replicationActivityStatus.enableReplication();
+
+    //no problem here
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn1,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    Assert.assertEquals(0, publishedEvents.size());
+
+    //container is missing from d2
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn2,
+            createContainerReport(new long[] {c1})), this);
+
+    Assert.assertEquals(1, publishedEvents.size());
+    ReplicationRequest replicationRequest =
+        (ReplicationRequest) publishedEvents.get(0);
+
+    Assert.assertEquals(c2, replicationRequest.getContainerId());
+    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+    Assert.assertEquals(2, replicationRequest.getReplicationCount());
+
+    //container was replicated to dn4
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn4,
+            createContainerReport(new long[] {c2})), this);
+
+    //no more event, everything is perfect
+    Assert.assertEquals(1, publishedEvents.size());
+
+    //c2 was found at dn2 (it was missing before, magic)
+    reportHandler.onMessage(
+        new ContainerReportFromDatanode(dn2,
+            createContainerReport(new long[] {c1, c2})), this);
+
+    //c2 is over replicated (dn1,dn2,dn3,dn4)
+    Assert.assertEquals(2, publishedEvents.size());
+
+    replicationRequest =
+        (ReplicationRequest) publishedEvents.get(1);
+
+    Assert.assertEquals(c2, replicationRequest.getContainerId());
+    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+    Assert.assertEquals(4, replicationRequest.getReplicationCount());
+
+  }
+
+  private ContainerReportsProto createContainerReport(long[] containerIds) {
+
+    ContainerReportsProto.Builder crBuilder =
+        ContainerReportsProto.newBuilder();
+
+    for (long containerId : containerIds) {
+      org.apache.hadoop.hdds.protocol.proto
+          .StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder
+          ciBuilder = org.apache.hadoop.hdds.protocol.proto
+          .StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
+      ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
+          .setSize(5368709120L)
+          .setUsed(2000000000L)
+          .setKeyCount(100000000L)
+          .setReadCount(100000000L)
+          .setWriteCount(100000000L)
+          .setReadBytes(2000000000L)
+          .setWriteBytes(2000000000L)
+          .setContainerID(containerId)
+          .setDeleteTransactionId(0);
+
+      crBuilder.addReports(ciBuilder.build());
+    }
+
+    return crBuilder.build();
+  }
+
+  @Override
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
+      EVENT_TYPE event, PAYLOAD payload) {
+    LOG.info("Event is published: {}", payload);
+    publishedEvents.add(payload);
+  }
+}

+ 24 - 4
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java → hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java

@@ -38,7 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * Test classes for Node2ContainerMap.
  */
-public class Node2ContainerMapTest {
+public class TestNode2ContainerMap {
   private final static int DATANODE_COUNT = 300;
   private final static int CONTAINER_COUNT = 1000;
   private final Map<UUID, TreeSet<ContainerID>> testData = new
@@ -119,6 +119,26 @@ public class Node2ContainerMapTest {
         Node2ContainerMap.ReportStatus.ALL_IS_WELL);
   }
 
+  @Test
+  public void testUpdateDatanodeMap() throws SCMException {
+    UUID datanodeId = getFirstKey();
+    Set<ContainerID> values = testData.get(datanodeId);
+    Node2ContainerMap map = new Node2ContainerMap();
+    map.insertNewDatanode(datanodeId, values);
+    Assert.assertTrue(map.isKnownDatanode(datanodeId));
+    Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size());
+
+    //remove one container
+    values.remove(values.iterator().next());
+    Assert.assertEquals(CONTAINER_COUNT - 1, values.size());
+    Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size());
+
+    map.setContainersForDatanode(datanodeId, values);
+
+    Assert.assertEquals(values.size(), map.getContainers(datanodeId).size());
+    Assert.assertEquals(values, map.getContainers(datanodeId));
+  }
+
   @Test
   public void testProcessReportInsertAll() throws SCMException {
     Node2ContainerMap map = new Node2ContainerMap();
@@ -183,7 +203,7 @@ public class Node2ContainerMapTest {
 
     final int newCount = 100;
     // This is not a mistake, the treeset seems to be reverse sorted.
-    ContainerID last = values.pollFirst();
+    ContainerID last = values.first();
     TreeSet<ContainerID> addedContainers = new TreeSet<>();
     for (int x = 1; x <= newCount; x++) {
       long cTemp = last.getId() + x;
@@ -224,7 +244,7 @@ public class Node2ContainerMapTest {
     final int removeCount = 100;
     Random r = new Random();
 
-    ContainerID first = values.pollLast();
+    ContainerID first = values.last();
     TreeSet<ContainerID> removedContainers = new TreeSet<>();
 
     // Pick a random container to remove it is ok to collide no issues.
@@ -270,7 +290,7 @@ public class Node2ContainerMapTest {
     final int removeCount = 100;
     Random r = new Random();
 
-    ContainerID first = values.pollLast();
+    ContainerID first = values.last();
     TreeSet<ContainerID> removedContainers = new TreeSet<>();
 
     // Pick a random container to remove it is ok to collide no issues.