소스 검색

HDFS-13070. Ozone: SCM: Support for container replica reconciliation - 1. Contributed by Nanda kumar.

Nanda kumar 7 년 전
부모
커밋
74484754ac
12개의 변경된 파일200개의 추가작업 그리고 115개의 파일을 삭제
  1. 6 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
  2. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
  3. 15 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
  4. 3 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
  5. 74 49
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java
  6. 26 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
  7. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
  8. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
  9. 18 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
  10. 25 20
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
  11. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
  12. 14 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java

@@ -223,6 +223,12 @@ public final class ScmConfigKeys {
   public static final String
       OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s";
 
+  /**
+   * This determines the total number of pools to be processed in parallel.
+   */
+  public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS =
+      "ozone.scm.max.nodepool.processing.threads";
+  public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1;
   /**
    * These 2 settings control the number of threads in executor pool and time
    * outs for thw container reports from all nodes.

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -984,9 +984,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
     updateContainerReportMetrics(reports);
 
     // should we process container reports async?
-    scmContainerManager.processContainerReports(
-        DatanodeID.getFromProtoBuf(reports.getDatanodeID()),
-        reports.getType(), reports.getReportsList());
+    scmContainerManager.processContainerReports(reports);
     return ContainerReportsResponseProto.newBuilder().build();
   }
 

+ 15 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java

@@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
@@ -29,7 +28,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
@@ -74,6 +75,7 @@ public class ContainerMapping implements Mapping {
   private final PipelineSelector pipelineSelector;
   private final ContainerStateManager containerStateManager;
   private final LeaseManager<ContainerInfo> containerLeaseManager;
+  private final ContainerSupervisor containerSupervisor;
   private final float containerCloseThreshold;
 
   /**
@@ -113,6 +115,9 @@ public class ContainerMapping implements Mapping {
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
     this.containerStateManager =
         new ContainerStateManager(conf, this);
+    this.containerSupervisor =
+        new ContainerSupervisor(conf, nodeManager,
+            nodeManager.getNodePoolManager());
     this.containerCloseThreshold = conf.getFloat(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -347,16 +352,14 @@ public class ContainerMapping implements Mapping {
   /**
    * Process container report from Datanode.
    *
-   * @param datanodeID Datanode ID
-   * @param reportType Type of report
-   * @param containerInfos container details
+   * @param reports Container report
    */
   @Override
-  public void processContainerReports(
-      DatanodeID datanodeID,
-      ContainerReportsRequestProto.reportType reportType,
-      List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-          containerInfos) throws IOException {
+  public void processContainerReports(ContainerReportsRequestProto reports)
+      throws IOException {
+    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
+        containerInfos = reports.getReportsList();
+    containerSupervisor.handleContainerReport(reports);
     for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo :
         containerInfos) {
       byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray();
@@ -395,7 +398,7 @@ public class ContainerMapping implements Mapping {
           // TODO: Handling of containers which are already in close queue.
           if (containerUsedPercentage >= containerCloseThreshold) {
             // TODO: The container has to be moved to close container queue.
-            // For now, we are just updating the container state to CLOSED.
+            // For now, we are just updating the container state to CLOSING.
             // Close container implementation can decide on how to maintain
             // list of containers to be closed, this is the place where we
             // have to add the containers to that list.
@@ -412,7 +415,7 @@ public class ContainerMapping implements Mapping {
           // Container not found in our container db.
           LOG.error("Error while processing container report from datanode :" +
               " {}, for container: {}, reason: container doesn't exist in" +
-              "container database.", datanodeID,
+              "container database.", reports.getDatanodeID(),
               containerInfo.getContainerName());
         }
       } finally {

+ 3 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java

@@ -17,10 +17,7 @@
 package org.apache.hadoop.ozone.scm.container;
 
 
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
@@ -102,14 +99,9 @@ public interface Mapping extends Closeable {
   /**
    * Process container report from Datanode.
    *
-   * @param datanodeID Datanode ID
-   * @param reportType Type of report
-   * @param containerInfos container details
+   * @param reports Container report
    */
-  void processContainerReports(
-      DatanodeID datanodeID,
-      ContainerReportsRequestProto.reportType reportType,
-      List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-          containerInfos) throws IOException;
+  void processContainerReports(ContainerReportsRequestProto reports)
+      throws IOException;
 
 }

+ 74 - 49
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java → hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java

@@ -19,12 +19,11 @@ package org.apache.hadoop.ozone.scm.container.replication;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.scm.node.CommandQueue;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.NodePoolManager;
 import org.apache.hadoop.util.Time;
@@ -43,6 +42,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static com.google.common.util.concurrent.Uninterruptibles
     .sleepUninterruptibly;
@@ -58,17 +59,20 @@ import static org.apache.hadoop.scm.ScmConfigKeys
     .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
 import static org.apache.hadoop.scm.ScmConfigKeys
     .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT;
 
 /**
  * This class takes a set of container reports that belong to a pool and then
  * computes the replication levels for each container.
  */
-public class ContainerReplicationManager implements Closeable {
+public class ContainerSupervisor implements Closeable {
   public static final Logger LOG =
-      LoggerFactory.getLogger(ContainerReplicationManager.class);
+      LoggerFactory.getLogger(ContainerSupervisor.class);
 
   private final NodePoolManager poolManager;
-  private final CommandQueue commandQueue;
   private final HashSet<String> poolNames;
   private final PriorityQueue<PeriodicPool> poolQueue;
   private final NodeManager nodeManager;
@@ -79,6 +83,9 @@ public class ContainerReplicationManager implements Closeable {
   private long poolProcessCount;
   private final List<InProgressPool> inProgressPoolList;
   private final AtomicInteger threadFaultCount;
+  private final int inProgressPoolMaxCount;
+
+  private final ReadWriteLock inProgressPoolListLock;
 
   /**
    * Returns the number of times we have processed pools.
@@ -95,13 +102,10 @@ public class ContainerReplicationManager implements Closeable {
    * @param conf - OzoneConfiguration
    * @param nodeManager - Node Manager
    * @param poolManager - Pool Manager
-   * @param commandQueue - Datanodes Command Queue.
    */
-  public ContainerReplicationManager(OzoneConfiguration conf,
-      NodeManager nodeManager, NodePoolManager poolManager,
-      CommandQueue commandQueue) {
+  public ContainerSupervisor(Configuration conf, NodeManager nodeManager,
+                             NodePoolManager poolManager) {
     Preconditions.checkNotNull(poolManager);
-    Preconditions.checkNotNull(commandQueue);
     Preconditions.checkNotNull(nodeManager);
     this.containerProcessingLag =
         conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
@@ -116,18 +120,21 @@ public class ContainerReplicationManager implements Closeable {
         conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
             OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
             TimeUnit.MILLISECONDS);
+    this.inProgressPoolMaxCount = conf.getInt(
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS,
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT);
     this.poolManager = poolManager;
-    this.commandQueue = commandQueue;
     this.nodeManager = nodeManager;
     this.poolNames = new HashSet<>();
     this.poolQueue = new PriorityQueue<>();
-    runnable = new AtomicBoolean(true);
+    this.runnable = new AtomicBoolean(true);
     this.threadFaultCount = new AtomicInteger(0);
-    executorService = HadoopExecutors.newCachedThreadPool(
+    this.executorService = HadoopExecutors.newCachedThreadPool(
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("Container Reports Processing Thread - %d")
             .build(), maxContainerReportThreads);
-    inProgressPoolList = new LinkedList<>();
+    this.inProgressPoolList = new LinkedList<>();
+    this.inProgressPoolListLock = new ReentrantReadWriteLock();
 
     initPoolProcessThread();
   }
@@ -211,31 +218,49 @@ public class ContainerReplicationManager implements Closeable {
       while (runnable.get()) {
         // Make sure that we don't have any new pools.
         refreshPools();
-        PeriodicPool pool = poolQueue.poll();
-        if (pool != null) {
-          if (pool.getLastProcessedTime() + this.containerProcessingLag <
-              Time.monotonicNow()) {
-            LOG.debug("Adding pool {} to container processing queue", pool
-                .getPoolName());
-            InProgressPool inProgressPool =  new InProgressPool(maxPoolWait,
-                pool, this.nodeManager, this.poolManager, this.commandQueue,
-                this.executorService);
+        while (inProgressPoolList.size() < inProgressPoolMaxCount) {
+          PeriodicPool pool = poolQueue.poll();
+          if (pool != null) {
+            if (pool.getLastProcessedTime() + this.containerProcessingLag >
+                Time.monotonicNow()) {
+              LOG.debug("Not within the time window for processing: {}",
+                  pool.getPoolName());
+              // we might over sleep here, not a big deal.
+              sleepUninterruptibly(this.containerProcessingLag,
+                  TimeUnit.MILLISECONDS);
+            }
+            LOG.debug("Adding pool {} to container processing queue",
+                pool.getPoolName());
+            InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
+                pool, this.nodeManager, this.poolManager, this.executorService);
             inProgressPool.startReconciliation();
-            inProgressPoolList.add(inProgressPool);
+            inProgressPoolListLock.writeLock().lock();
+            try {
+              inProgressPoolList.add(inProgressPool);
+            } finally {
+              inProgressPoolListLock.writeLock().unlock();
+            }
             poolProcessCount++;
-
           } else {
-
-            LOG.debug("Not within the time window for processing: {}",
-                pool.getPoolName());
-            // Put back this pool since we are not planning to process it.
-            poolQueue.add(pool);
-            // we might over sleep here, not a big deal.
-            sleepUninterruptibly(this.containerProcessingLag,
-                TimeUnit.MILLISECONDS);
+            break;
           }
         }
         sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+        inProgressPoolListLock.readLock().lock();
+        try {
+          for (InProgressPool inProgressPool : inProgressPoolList) {
+            inProgressPool.finalizeReconciliation();
+            poolQueue.add(inProgressPool.getPool());
+          }
+        } finally {
+          inProgressPoolListLock.readLock().unlock();
+        }
+        inProgressPoolListLock.writeLock().lock();
+        try {
+          inProgressPoolList.clear();
+        } finally {
+          inProgressPoolListLock.writeLock().unlock();
+        }
       }
     };
 
@@ -263,28 +288,28 @@ public class ContainerReplicationManager implements Closeable {
    */
   public void handleContainerReport(
       ContainerReportsRequestProto containerReport) {
-    String poolName = null;
-    DatanodeID datanodeID = DatanodeID
-        .getFromProtoBuf(containerReport.getDatanodeID());
+    DatanodeID datanodeID = DatanodeID.getFromProtoBuf(
+        containerReport.getDatanodeID());
+    inProgressPoolListLock.readLock().lock();
     try {
-      poolName = poolManager.getNodePool(datanodeID);
+      String poolName = poolManager.getNodePool(datanodeID);
+      for (InProgressPool ppool : inProgressPoolList) {
+        if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
+          ppool.handleContainerReport(containerReport);
+          return;
+        }
+      }
+      // TODO: Decide if we can do anything else with this report.
+      LOG.debug("Discarding the container report for pool {}. " +
+              "That pool is not currently in the pool reconciliation process." +
+              " Container Name: {}", poolName, containerReport.getDatanodeID());
     } catch (SCMException e) {
       LOG.warn("Skipping processing container report from datanode {}, "
               + "cause: failed to get the corresponding node pool",
           datanodeID.toString(), e);
-      return;
-    }
-
-    for(InProgressPool ppool : inProgressPoolList) {
-      if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
-        ppool.handleContainerReport(containerReport);
-        return;
-      }
+    } finally {
+      inProgressPoolListLock.readLock().unlock();
     }
-    // TODO: Decide if we can do anything else with this report.
-    LOG.debug("Discarding the container report for pool {}. That pool is not " +
-        "currently in the pool reconciliation process. Container Name: {}",
-        poolName, containerReport.getDatanodeID());
   }
 
   /**

+ 26 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java

@@ -21,9 +21,10 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.NodePoolManager;
 import org.apache.hadoop.util.Time;
@@ -39,10 +40,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN;
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
+    .NodeState.HEALTHY;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
+    .NodeState.STALE;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
+    .NodeState.UNKNOWN;
 
 /**
  * These are pools that are actively checking for replication status of the
@@ -51,8 +56,8 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNO
 public final class InProgressPool {
   public static final Logger LOG =
       LoggerFactory.getLogger(InProgressPool.class);
+
   private final PeriodicPool pool;
-  private final CommandQueue commandQueue;
   private final NodeManager nodeManager;
   private final NodePoolManager poolManager;
   private final ExecutorService executorService;
@@ -70,22 +75,19 @@ public final class InProgressPool {
    * @param pool - Pool that we are working against
    * @param nodeManager - Nodemanager
    * @param poolManager - pool manager
-   * @param commandQueue - Command queue
    * @param executorService - Shared Executor service.
    */
   InProgressPool(long maxWaitTime, PeriodicPool pool,
       NodeManager nodeManager, NodePoolManager poolManager,
-      CommandQueue commandQueue, ExecutorService executorService) {
+                 ExecutorService executorService) {
     Preconditions.checkNotNull(pool);
     Preconditions.checkNotNull(nodeManager);
     Preconditions.checkNotNull(poolManager);
-    Preconditions.checkNotNull(commandQueue);
     Preconditions.checkNotNull(executorService);
     Preconditions.checkArgument(maxWaitTime > 0);
     this.pool = pool;
     this.nodeManager = nodeManager;
     this.poolManager = poolManager;
-    this.commandQueue = commandQueue;
     this.executorService = executorService;
     this.containerCountMap = new ConcurrentHashMap<>();
     this.processedNodeSet = new ConcurrentHashMap<>();
@@ -186,7 +188,7 @@ public final class InProgressPool {
         // Queue commands to all datanodes in this pool to send us container
         // report. Since we ignore dead nodes, it is possible that we would have
         // over replicated the container if the node comes back.
-        commandQueue.addCommand(id, cmd);
+        nodeManager.addDatanodeCommand(id, cmd);
       }
     }
     this.status = ProgressStatus.InProgress;
@@ -235,7 +237,12 @@ public final class InProgressPool {
    */
   public void handleContainerReport(
       ContainerReportsRequestProto containerReport) {
-    executorService.submit(processContainerReport(containerReport));
+    if (status == ProgressStatus.InProgress) {
+      executorService.submit(processContainerReport(containerReport));
+    } else {
+      LOG.debug("Cannot handle container report when the pool is in {} status.",
+          status);
+    }
   }
 
   private Runnable processContainerReport(
@@ -292,6 +299,11 @@ public final class InProgressPool {
     return pool.getPoolName();
   }
 
+  public void finalizeReconciliation() {
+    status = ProgressStatus.Done;
+    //TODO: Add finalizing logic. This is where actual reconciliation happens.
+  }
+
   /**
    * Current status of the computing replication status.
    */

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java

@@ -122,6 +122,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    */
   SCMNodeMetric getNodeStat(DatanodeID datanodeID);
 
+  /**
+   * Returns the NodePoolManager associated with the NodeManager.
+   * @return NodePoolManager
+   */
+  NodePoolManager getNodePoolManager();
+
   /**
    * Wait for the heartbeat is processed by NodeManager.
    * @return true if heartbeat has been processed.

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -857,6 +857,11 @@ public class SCMNodeManager
     return new SCMNodeMetric(nodeStats.get(datanodeID.getDatanodeUuid()));
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return nodePoolManager;
+  }
+
   @Override
   public Map<String, Integer> getNodeCount() {
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();

+ 18 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java

@@ -29,26 +29,32 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.mockito.Mockito;
 
 /**
  * A Node Manager to test replication.
  */
 public class ReplicationNodeManagerMock implements NodeManager {
   private final Map<DatanodeID, NodeState> nodeStateMap;
+  private final CommandQueue commandQueue;
 
   /**
    * A list of Datanodes and current states.
    * @param nodeState A node state map.
    */
-  public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState) {
+  public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState,
+                                    CommandQueue commandQueue) {
     Preconditions.checkNotNull(nodeState);
-    nodeStateMap = nodeState;
+    this.nodeStateMap = nodeState;
+    this.commandQueue = commandQueue;
   }
 
   /**
@@ -194,6 +200,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
     return null;
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return Mockito.mock(NodePoolManager.class);
+  }
+
   /**
    * Wait for the heartbeat is processed by NodeManager.
    *
@@ -304,4 +315,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
     nodeStateMap.put(id, state);
   }
 
+  @Override
+  public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
+    this.commandQueue.addCommand(id, command);
+  }
+
 }

+ 25 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java → hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java

@@ -28,8 +28,7 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.scm.container.replication
-    .ContainerReplicationManager;
+import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
 import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
 import org.apache.hadoop.ozone.scm.node.CommandQueue;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -53,35 +52,37 @@ import java.util.concurrent.TimeoutException;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.scm.ScmConfigKeys
     .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
 import static org.apache.ratis.shaded.com.google.common.util.concurrent
     .Uninterruptibles.sleepUninterruptibly;
 
 /**
  * Tests for the container manager.
  */
-public class TestContainerReplicationManager {
+public class TestContainerSupervisor {
   final static String POOL_NAME_TEMPLATE = "Pool%d";
   static final int MAX_DATANODES = 72;
   static final int POOL_SIZE = 24;
   static final int POOL_COUNT = 3;
   private LogCapturer logCapturer = LogCapturer.captureLogs(
-      LogFactory.getLog(ContainerReplicationManager.class));
+      LogFactory.getLog(ContainerSupervisor.class));
   private List<DatanodeID> datanodes = new LinkedList<>();
   private NodeManager nodeManager;
   private NodePoolManager poolManager;
   private CommandQueue commandQueue;
-  private ContainerReplicationManager replicationManager;
+  private ContainerSupervisor containerSupervisor;
   private ReplicationDatanodeStateManager datanodeStateManager;
 
   @After
   public void tearDown() throws Exception {
     logCapturer.stopCapturing();
-    GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.INFO);
+    GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO);
   }
 
   @Before
   public void setUp() throws Exception {
-    GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG);
     Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>();
     // We are setting up 3 pools with 24 nodes each in this cluster.
     // First we create 72 Datanodes.
@@ -91,10 +92,12 @@ public class TestContainerReplicationManager {
       nodeStateMap.put(datanode, HEALTHY);
     }
 
+    commandQueue = new CommandQueue();
+
     // All nodes in this cluster are healthy for time being.
-    nodeManager = new ReplicationNodeManagerMock(nodeStateMap);
+    nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue);
     poolManager = new ReplicationNodePoolManagerMock();
-    commandQueue = new CommandQueue();
+
 
     Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
         "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
@@ -108,10 +111,12 @@ public class TestContainerReplicationManager {
       }
     }
     OzoneConfiguration config = SCMTestUtils.getOzoneConf();
-    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 1,
+    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2,
+        TimeUnit.SECONDS);
+    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1,
         TimeUnit.SECONDS);
-    replicationManager = new ContainerReplicationManager(config,
-        nodeManager, poolManager, commandQueue);
+    containerSupervisor = new ContainerSupervisor(config,
+        nodeManager, poolManager);
     datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
         poolManager);
     // Sleep for one second to make sure all threads get time to run.
@@ -125,13 +130,13 @@ public class TestContainerReplicationManager {
   public void testAssertPoolsAreProcessed() {
     // This asserts that replication manager has started processing at least
     // one pool.
-    Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0);
+    Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0);
 
     // Since all datanodes are flagged as healthy in this test, for each
     // datanode we must have queued a command.
-    Assert.assertEquals("Commands are in queue :", commandQueue
-        .getCommandsInQueue(), POOL_SIZE * replicationManager
-        .getInProgressPoolCount());
+    Assert.assertEquals("Commands are in queue :",
+        POOL_SIZE * containerSupervisor.getInProgressPoolCount(),
+        commandQueue.getCommandsInQueue());
   }
 
   @Test
@@ -144,7 +149,7 @@ public class TestContainerReplicationManager {
       InterruptedException {
     String singleNodeContainer = "SingleNodeContainer";
     String threeNodeContainer = "ThreeNodeContainer";
-    InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
+    InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
     // Only single datanode reporting that "SingleNodeContainer" exists.
     List<ContainerReportsRequestProto> clist =
         datanodeStateManager.getContainerReport(singleNodeContainer,
@@ -180,7 +185,7 @@ public class TestContainerReplicationManager {
     String normalContainer = "NormalContainer";
     String overReplicated = "OverReplicatedContainer";
     String wayOverReplicated = "WayOverReplicated";
-    InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
+    InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
 
     List<ContainerReportsRequestProto> clist =
         datanodeStateManager.getContainerReport(normalContainer,
@@ -221,7 +226,7 @@ public class TestContainerReplicationManager {
   public void testAllPoolsAreProcessed() throws TimeoutException,
       InterruptedException {
     // Verify that we saw all three pools being picked up for processing.
-    GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount()
+    GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount()
         >= 3, 200, 15 * 1000);
     Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
         logCapturer.getOutput().contains("Pool2") &&
@@ -253,7 +258,7 @@ public class TestContainerReplicationManager {
       List<ContainerReportsRequestProto> clist =
           datanodeStateManager.getContainerReport("NewContainer1",
               "PoolNew", 1);
-      replicationManager.handleContainerReport(clist.get(0));
+      containerSupervisor.handleContainerReport(clist.get(0));
       GenericTestUtils.waitFor(() ->
           inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
               .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -269,6 +271,11 @@ public class MockNodeManager implements NodeManager {
     return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString()));
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return Mockito.mock(NodePoolManager.class);
+  }
+
   /**
    * Used for testing.
    *

+ 14 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java

@@ -203,8 +203,7 @@ public class TestContainerMapping {
   }
 
   @Test
-  public void testFullContainerReport() throws IOException,
-      InterruptedException {
+  public void testFullContainerReport() throws IOException {
     String containerName = UUID.randomUUID().toString();
     ContainerInfo info = createContainer(containerName);
     DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
@@ -227,7 +226,13 @@ public class TestContainerMapping {
         .setContainerID(info.getContainerID());
 
     reports.add(ciBuilder.build());
-    mapping.processContainerReports(datanodeID, reportType, reports);
+
+    ContainerReportsRequestProto.Builder crBuilder =
+        ContainerReportsRequestProto.newBuilder();
+    crBuilder.setDatanodeID(datanodeID.getProtoBufMessage())
+        .setType(reportType).addAllReports(reports);
+
+    mapping.processContainerReports(crBuilder.build());
 
     ContainerInfo updatedContainer = mapping.getContainer(containerName);
     Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys());
@@ -260,7 +265,12 @@ public class TestContainerMapping {
 
     reports.add(ciBuilder.build());
 
-    mapping.processContainerReports(datanodeID, reportType, reports);
+    ContainerReportsRequestProto.Builder crBuilder =
+        ContainerReportsRequestProto.newBuilder();
+    crBuilder.setDatanodeID(datanodeID.getProtoBufMessage())
+        .setType(reportType).addAllReports(reports);
+
+    mapping.processContainerReports(crBuilder.build());
 
     ContainerInfo updatedContainer = mapping.getContainer(containerName);
     Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys());