Bläddra i källkod

HDDS-199. Implement ReplicationManager to handle underreplication of closed containers. Contributed by Elek Marton.

Xiaoyu Yao 7 år sedan
förälder
incheckning
3a9e25edf5
23 ändrade filer med 857 tillägg och 47 borttagningar
  1. 7 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  2. 1 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  3. 10 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  4. 2 2
      hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
  5. 5 0
      hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
  6. 2 4
      hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
  7. 4 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
  8. 6 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
  9. 10 6
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
  10. 5 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
  11. 56 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
  12. 242 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
  13. 13 16
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
  14. 2 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
  15. 2 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
  16. 31 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
  17. 41 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  18. 106 0
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
  19. 86 0
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
  20. 215 0
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
  21. 7 7
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
  22. 1 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
  23. 3 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java

+ 7 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -251,6 +251,13 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD =
       "ozone.scm.container.close.threshold";
   public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
+
+  public static final String HDDS_SCM_WATCHER_TIMEOUT =
+      "hdds.scm.watcher.timeout";
+
+  public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
+      "10m";
+
   /**
    * Never constructed.
    */

+ 1 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
 import org.apache.ratis.util.TimeDuration;
 
 /**

+ 10 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1108,4 +1108,14 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.scm.watcher.timeout</name>
+    <value>10m</value>
+    <tag>OZONE, SCM, MANAGEMENT</tag>
+    <description>
+      Timeout for the watchers of the HDDS SCM CommandWatchers. After this
+      duration the Copy/Delete container commands will be sent again to the
+      datanode unless the datanode confirms the completion.
+    </description>
+  </property>
 </configuration>

+ 2 - 2
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java

@@ -180,9 +180,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
 
   }
 
-  abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+  protected abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
 
-  abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+  protected abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
 
   public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
       Predicate<? super TIMEOUT_PAYLOAD> predicate) {

+ 5 - 0
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java

@@ -48,4 +48,9 @@ public class TypedEvent<T> implements Event<T> {
     return name;
   }
 
+  @Override
+  public String toString() {
+    return "TypedEvent{" + "payloadType=" + payloadType + ", name='" + name
+        + '\'' + '}';
+  }
 }

+ 2 - 4
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java

@@ -216,12 +216,12 @@ public class TestEventWatcher {
     }
 
     @Override
-    void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+    protected void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
       publisher.fireEvent(UNDER_REPLICATED, payload);
     }
 
     @Override
-    void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+    protected void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
       //Good job. We did it.
     }
 
@@ -231,8 +231,6 @@ public class TestEventWatcher {
     }
   }
 
-  ;
-
   private static class ReplicationCompletedEvent
       implements IdentifiableEventPayload {
 

+ 4 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java

@@ -31,11 +31,14 @@ public interface ContainerPlacementPolicy {
   /**
    * Given the replication factor and size required, return set of datanodes
    * that satisfy the nodes and size requirement.
+   *
+   * @param excludedNodes - list of nodes to be excluded.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return list of datanodes chosen.
    * @throws IOException
    */
-  List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired)
+  List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes,
+      int nodesRequired, long sizeRequired)
       throws IOException;
 }

+ 6 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java

@@ -95,16 +95,20 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    * 3. if a set of containers are requested, we either meet the required
    * number of nodes or we fail that request.
    *
+   *
+   * @param excludedNodes - datanodes with existing replicas
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return list of datanodes chosen.
    * @throws SCMException SCM exception.
    */
 
-  public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long
-      sizeRequired) throws SCMException {
+  public List<DatanodeDetails> chooseDatanodes(
+      List<DatanodeDetails> excludedNodes,
+      int nodesRequired, final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    healthyNodes.removeAll(excludedNodes);
     String msg;
     if (healthyNodes.size() == 0) {
       msg = "No healthy node found to allocate container.";

+ 10 - 6
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java

@@ -17,17 +17,18 @@
 
 package org.apache.hadoop.hdds.scm.container.placement.algorithms;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 /**
  * Container placement policy that randomly choose datanodes with remaining
  * space to satisfy the size constraints.
@@ -83,6 +84,8 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
   /**
    * Called by SCM to choose datanodes.
    *
+   *
+   * @param excludedNodes - list of the datanodes to exclude.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return List of datanodes.
@@ -90,9 +93,10 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      final int nodesRequired, final long sizeRequired) throws SCMException {
+      List<DatanodeDetails> excludedNodes, final int nodesRequired,
+      final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(nodesRequired, sizeRequired);
+        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
     if (healthyNodes.size() == nodesRequired) {
       return healthyNodes;
     }

+ 5 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java

@@ -56,6 +56,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
   /**
    * Choose datanodes called by the SCM to choose the datanode.
    *
+   *
+   * @param excludedNodes - list of the datanodes to exclude.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return List of Datanodes.
@@ -63,9 +65,10 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      final int nodesRequired, final long sizeRequired) throws SCMException {
+      List<DatanodeDetails> excludedNodes, final int nodesRequired,
+      final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(nodesRequired, sizeRequired);
+        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
 
     if (healthyNodes.size() == nodesRequired) {
       return healthyNodes;

+ 56 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+/**
+ * Command watcher to track the replication commands.
+ */
+public class ReplicationCommandWatcher
+    extends
+    EventWatcher<ReplicationManager.ReplicationRequestToRepeat,
+        ReplicationManager.ReplicationCompleted> {
+
+  public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent,
+      Event<ReplicationCompleted> completionEvent,
+      LeaseManager<Long> leaseManager) {
+    super(startEvent, completionEvent, leaseManager);
+  }
+
+  @Override
+  protected void onTimeout(EventPublisher publisher,
+      ReplicationRequestToRepeat payload) {
+    //put back to the original queue
+    publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+        payload.getRequest());
+  }
+
+  @Override
+  protected void onFinished(EventPublisher publisher,
+      ReplicationRequestToRepeat payload) {
+
+  }
+}

+ 242 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java

@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_REPLICATE_COMMAND;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Replication Manager manages the replication of the closed container.
+ */
+public class ReplicationManager implements Runnable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManager.class);
+
+  private ReplicationQueue replicationQueue;
+
+  private ContainerPlacementPolicy containerPlacement;
+
+  private EventPublisher eventPublisher;
+
+  private ReplicationCommandWatcher replicationCommandWatcher;
+
+  private boolean running = true;
+
+  private ContainerStateManager containerStateManager;
+
+  public ReplicationManager(ContainerPlacementPolicy containerPlacement,
+      ContainerStateManager containerStateManager, EventQueue eventQueue,
+      LeaseManager<Long> commandWatcherLeaseManager) {
+
+    this.containerPlacement = containerPlacement;
+    this.containerStateManager = containerStateManager;
+    this.eventPublisher = eventQueue;
+
+    this.replicationCommandWatcher =
+        new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
+            SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
+
+    this.replicationQueue = new ReplicationQueue();
+
+    eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
+        (replicationRequest, publisher) -> replicationQueue
+            .add(replicationRequest));
+
+    this.replicationCommandWatcher.start(eventQueue);
+
+  }
+
+  public void start() {
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Replication Manager").build();
+
+    threadFactory.newThread(this).start();
+  }
+
+  public void run() {
+
+    while (running) {
+      ReplicationRequest request = null;
+      try {
+        //TODO: add throttling here
+        request = replicationQueue.take();
+
+        ContainerID containerID = new ContainerID(request.getContainerId());
+        ContainerInfo containerInfo =
+            containerStateManager.getContainer(containerID);
+
+        Preconditions.checkNotNull(containerInfo,
+            "No information about the container " + request.getContainerId());
+
+        Preconditions
+            .checkState(containerInfo.getState() == LifeCycleState.CLOSED,
+                "Container should be in closed state");
+
+        //check the current replication
+        List<DatanodeDetails> datanodesWithReplicas =
+            getCurrentReplicas(request);
+
+        ReplicationRequest finalRequest = request;
+
+        int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
+            e -> e.request.getContainerId() == finalRequest.getContainerId())
+            .size();
+
+        int deficit =
+            request.getExpecReplicationCount() - datanodesWithReplicas.size()
+                - inFlightReplications;
+
+        if (deficit > 0) {
+
+          List<DatanodeDetails> selectedDatanodes = containerPlacement
+              .chooseDatanodes(datanodesWithReplicas, deficit,
+                  containerInfo.getUsedBytes());
+
+          //send the command
+          for (DatanodeDetails datanode : selectedDatanodes) {
+
+            ReplicateContainerCommand replicateCommand =
+                new ReplicateContainerCommand(containerID.getId(),
+                    datanodesWithReplicas);
+
+            eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+                new CommandForDatanode<>(
+                    datanode.getUuid(), replicateCommand));
+
+            ReplicationRequestToRepeat timeoutEvent =
+                new ReplicationRequestToRepeat(replicateCommand.getId(),
+                    request);
+
+            eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent);
+
+          }
+
+        } else if (deficit < 0) {
+          //TODO: too many replicas. Not handled yet.
+        }
+
+      } catch (Exception e) {
+        LOG.error("Can't replicate container {}", request, e);
+      }
+    }
+
+  }
+
+  @VisibleForTesting
+  protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
+      throws IOException {
+    //TODO: replication information is not yet available after HDDS-175,
+    // should be fixed after HDDS-228
+    return new ArrayList<>();
+  }
+
+  @VisibleForTesting
+  public ReplicationQueue getReplicationQueue() {
+    return replicationQueue;
+  }
+
+  public void stop() {
+    running = false;
+  }
+
+  /**
+   * Event for the ReplicationCommandWatcher to repeate the embedded request
+   * in case fof timeout.
+   */
+  public static class ReplicationRequestToRepeat
+      implements IdentifiableEventPayload {
+
+    private final long commandId;
+
+    private final ReplicationRequest request;
+
+    public ReplicationRequestToRepeat(long commandId,
+        ReplicationRequest request) {
+      this.commandId = commandId;
+      this.request = request;
+    }
+
+    public ReplicationRequest getRequest() {
+      return request;
+    }
+
+    @Override
+    public long getId() {
+      return commandId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o;
+      return Objects.equals(request, that.request);
+    }
+
+    @Override
+    public int hashCode() {
+
+      return Objects.hash(request);
+    }
+  }
+
+  public static class ReplicationCompleted implements IdentifiableEventPayload {
+
+    private final long uuid;
+
+    public ReplicationCompleted(long uuid) {
+      this.uuid = uuid;
+    }
+
+    @Override
+    public long getId() {
+      return uuid;
+    }
+  }
+}

+ 13 - 16
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java → hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java

@@ -15,11 +15,11 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.hadoop.ozone.container.replication;
+package org.apache.hadoop.hdds.scm.container.replication;
 
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 
 /**
  * Priority queue to handle under-replicated and over replicated containers
@@ -28,13 +28,13 @@ import java.util.Queue;
  */
 public class ReplicationQueue {
 
-  private final Queue<ReplicationRequest> queue;
+  private final BlockingQueue<ReplicationRequest> queue;
 
-  ReplicationQueue() {
-    queue = new PriorityQueue<>();
+  public ReplicationQueue() {
+    queue = new PriorityBlockingQueue<>();
   }
 
-  public synchronized boolean add(ReplicationRequest repObj) {
+  public boolean add(ReplicationRequest repObj) {
     if (this.queue.contains(repObj)) {
       // Remove the earlier message and insert this one
       this.queue.remove(repObj);
@@ -42,7 +42,7 @@ public class ReplicationQueue {
     return this.queue.add(repObj);
   }
 
-  public synchronized boolean remove(ReplicationRequest repObj) {
+  public boolean remove(ReplicationRequest repObj) {
     return queue.remove(repObj);
   }
 
@@ -52,21 +52,18 @@ public class ReplicationQueue {
    *
    * @return the head of this queue, or {@code null} if this queue is empty
    */
-  public synchronized ReplicationRequest peek() {
+  public ReplicationRequest peek() {
     return queue.peek();
   }
 
   /**
-   * Retrieves and removes the head of this queue,
-   * or returns {@code null} if this queue is empty.
-   *
-   * @return the head of this queue, or {@code null} if this queue is empty
+   * Retrieves and removes the head of this queue (blocking queue).
    */
-  public synchronized ReplicationRequest poll() {
-    return queue.poll();
+  public ReplicationRequest take() throws InterruptedException {
+    return queue.take();
   }
 
-  public synchronized boolean removeAll(List<ReplicationRequest> repObjs) {
+  public boolean removeAll(List<ReplicationRequest> repObjs) {
     return queue.removeAll(repObjs);
   }
 

+ 2 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java → hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java

@@ -15,9 +15,10 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.hadoop.ozone.container.replication;
+package org.apache.hadoop.hdds.scm.container.replication;
 
 import java.io.Serializable;
+
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 

+ 2 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java → hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java

@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.replication;
+package org.apache.hadoop.hdds.scm.container.replication;
 
 /**
- * Ozone Container replicaton related classes.
+ * HDDS (Closed) Container replicaton related classes.
  */

+ 31 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

@@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
@@ -128,6 +132,33 @@ public final class SCMEvents {
       new TypedEvent(DeleteBlockCommandStatus.class,
           "DeleteBlockCommandStatus");
 
+  /**
+   * This is the command for ReplicationManager to handle under/over
+   * replication. Sent by the ContainerReportHandler after processing the
+   * heartbeat.
+   */
+  public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER =
+      new TypedEvent<>(ReplicationRequest.class);
+
+  /**
+   * This event is sent by the ReplicaManager to the
+   * ReplicationCommandWatcher to track the in-progress replication.
+   */
+  public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
+      TRACK_REPLICATE_COMMAND =
+      new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
+  /**
+   * This event comes from the Heartbeat dispatcher (in fact from the
+   * datanode) to notify the scm that the replication is done. This is
+   * received by the replicate command watcher to mark in-progress task as
+   * finished.
+    <p>
+   * TODO: Temporary event, should be replaced by specific Heartbeat
+   * ActionRequred event.
+   */
+  public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
+      new TypedEvent<>(ReplicationCompleted.class);
+
   /**
    * Private Ctor. Never Constructed.
    */

+ 41 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
@@ -38,7 +39,12 @@ import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -61,9 +67,13 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,6 +163,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * Key = DatanodeUuid, value = ContainerStat.
    */
   private Cache<String, ContainerStat> containerReportCache;
+  private final ReplicationManager replicationManager;
+  private final LeaseManager<Long> commandWatcherLeaseManager;
 
   /**
    * Creates a new StorageContainerManager. Configuration will be updated
@@ -207,6 +219,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
 
+    long watcherTimeout =
+        conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
+            HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+
+    commandWatcherLeaseManager = new LeaseManager<>(watcherTimeout);
+
+    //TODO: support configurable containerPlacement policy
+    ContainerPlacementPolicy containerPlacementPolicy =
+        new SCMContainerPlacementCapacity(scmNodeManager, conf);
+
+    replicationManager = new ReplicationManager(containerPlacementPolicy,
+        scmContainerManager.getStateManager(), eventQueue,
+        commandWatcherLeaseManager);
+
     scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
         .OZONE_ADMINISTRATORS);
     scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@@ -552,7 +578,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     httpServer.start();
     scmBlockManager.start();
-
+    replicationManager.start();
     setStartTime();
   }
 
@@ -561,6 +587,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    */
   public void stop() {
 
+    try {
+      LOG.info("Stopping Replication Manager Service.");
+      replicationManager.stop();
+    } catch (Exception ex) {
+      LOG.error("Replication manager service stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping Lease Manager of the command watchers");
+      commandWatcherLeaseManager.shutdown();
+    } catch (Exception ex) {
+      LOG.error("Lease Manager of the command watchers stop failed");
+    }
+
     try {
       LOG.info("Stopping datanode service RPC server");
       getDatanodeProtocolServer().stop();

+ 106 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+public class TestSCMContainerPlacementCapacity {
+  @Test
+  public void chooseDatanodes() throws SCMException {
+    //given
+    Configuration conf = new OzoneConfiguration();
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      datanodes.add(TestUtils.getDatanodeDetails());
+    }
+
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodes(NodeState.HEALTHY))
+        .thenReturn(new ArrayList<>(datanodes));
+
+    when(mockNodeManager.getNodeStat(anyObject()))
+        .thenReturn(new SCMNodeMetric(100l, 0L, 100L));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100l, 90L, 10L));
+    when(mockNodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(100l, 80L, 20L));
+    when(mockNodeManager.getNodeStat(datanodes.get(4)))
+        .thenReturn(new SCMNodeMetric(100l, 70L, 30L));
+
+    SCMContainerPlacementCapacity scmContainerPlacementRandom =
+        new SCMContainerPlacementCapacity(mockNodeManager, conf);
+
+    List<DatanodeDetails> existingNodes = new ArrayList<>();
+    existingNodes.add(datanodes.get(0));
+    existingNodes.add(datanodes.get(1));
+
+    Map<DatanodeDetails, Integer> selectedCount = new HashMap<>();
+    for (DatanodeDetails datanode : datanodes) {
+      selectedCount.put(datanode, 0);
+    }
+
+    for (int i = 0; i < 1000; i++) {
+
+      //when
+      List<DatanodeDetails> datanodeDetails =
+          scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+
+      //then
+      Assert.assertEquals(1, datanodeDetails.size());
+      DatanodeDetails datanode0Details = datanodeDetails.get(0);
+
+      Assert.assertNotEquals(
+          "Datanode 0 should not been selected: excluded by parameter",
+          datanodes.get(0), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 1 should not been selected: excluded by parameter",
+          datanodes.get(1), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 2 should not been selected: not enough space there",
+          datanodes.get(2), datanode0Details);
+
+      selectedCount
+          .put(datanode0Details, selectedCount.get(datanode0Details) + 1);
+
+    }
+
+    //datanode 4 has less space. Should be selected less times.
+    Assert.assertTrue(selectedCount.get(datanodes.get(3)) > selectedCount
+        .get(datanodes.get(6)));
+    Assert.assertTrue(selectedCount.get(datanodes.get(4)) > selectedCount
+        .get(datanodes.get(6)));
+  }
+}

+ 86 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+public class TestSCMContainerPlacementRandom {
+
+  @Test
+  public void chooseDatanodes() throws SCMException {
+    //given
+    Configuration conf = new OzoneConfiguration();
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      datanodes.add(TestUtils.getDatanodeDetails());
+    }
+
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodes(NodeState.HEALTHY))
+        .thenReturn(new ArrayList<>(datanodes));
+
+    when(mockNodeManager.getNodeStat(anyObject()))
+        .thenReturn(new SCMNodeMetric(100l, 0l, 100l));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100l, 90l, 10l));
+
+    SCMContainerPlacementRandom scmContainerPlacementRandom =
+        new SCMContainerPlacementRandom(mockNodeManager, conf);
+
+    List<DatanodeDetails> existingNodes = new ArrayList<>();
+    existingNodes.add(datanodes.get(0));
+    existingNodes.add(datanodes.get(1));
+
+    for (int i = 0; i < 100; i++) {
+      //when
+      List<DatanodeDetails> datanodeDetails =
+          scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+
+      //then
+      Assert.assertEquals(1, datanodeDetails.size());
+      DatanodeDetails datanode0Details = datanodeDetails.get(0);
+
+      Assert.assertNotEquals(
+          "Datanode 0 should not been selected: excluded by parameter",
+          datanodes.get(0), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 1 should not been selected: excluded by parameter",
+          datanodes.get(1), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 2 should not been selected: not enough space there",
+          datanodes.get(2), datanode0Details);
+
+    }
+  }
+}

+ 215 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java

@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_REPLICATE_COMMAND;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test behaviour of the TestReplication.
+ */
+public class TestReplicationManager {
+
+  private EventQueue queue;
+
+  private List<ReplicationRequestToRepeat> trackReplicationEvents;
+
+  private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
+
+  private ContainerStateManager containerStateManager;
+
+  private ContainerPlacementPolicy containerPlacementPolicy;
+  private List<DatanodeDetails> listOfDatanodeDetails;
+
+  @Before
+  public void initReplicationManager() throws IOException {
+
+    listOfDatanodeDetails = TestUtils.getListOfDatanodeDetails(5);
+
+    containerPlacementPolicy =
+        (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails
+            .subList(2, 2 + nodesRequired);
+
+    containerStateManager = Mockito.mock(ContainerStateManager.class);
+
+    //container with 2 replicas
+    ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setState(LifeCycleState.CLOSED)
+        .build();
+
+    when(containerStateManager.getContainer(anyObject()))
+        .thenReturn(containerInfo);
+
+    queue = new EventQueue();
+
+    trackReplicationEvents = new ArrayList<>();
+    queue.addHandler(TRACK_REPLICATE_COMMAND,
+        (event, publisher) -> trackReplicationEvents.add(event));
+
+    copyEvents = new ArrayList<>();
+    queue.addHandler(SCMEvents.DATANODE_COMMAND,
+        (event, publisher) -> copyEvents.add(event));
+
+  }
+
+  @Test
+  public void testEventSending() throws InterruptedException, IOException {
+
+
+    //GIVEN
+
+    LeaseManager<Long> leaseManager = new LeaseManager<>(100000L);
+    try {
+      leaseManager.start();
+
+      ReplicationManager replicationManager =
+          new ReplicationManager(containerPlacementPolicy,
+              containerStateManager,
+              queue, leaseManager) {
+            @Override
+            protected List<DatanodeDetails> getCurrentReplicas(
+                ReplicationRequest request) throws IOException {
+              return listOfDatanodeDetails.subList(0, 2);
+            }
+          };
+      replicationManager.start();
+
+      //WHEN
+
+      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
+              (short) 3));
+
+      Thread.sleep(500L);
+      queue.processAll(1000L);
+
+      //THEN
+
+      Assert.assertEquals(1, trackReplicationEvents.size());
+      Assert.assertEquals(1, copyEvents.size());
+    } finally {
+      if (leaseManager != null) {
+        leaseManager.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testCommandWatcher() throws InterruptedException, IOException {
+
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+    LeaseManager<Long> leaseManager = new LeaseManager<>(1000L);
+
+    try {
+      leaseManager.start();
+
+      ReplicationManager replicationManager =
+          new ReplicationManager(containerPlacementPolicy, containerStateManager,
+
+
+              queue, leaseManager) {
+            @Override
+            protected List<DatanodeDetails> getCurrentReplicas(
+                ReplicationRequest request) throws IOException {
+              return listOfDatanodeDetails.subList(0, 2);
+            }
+          };
+      replicationManager.start();
+
+      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
+              (short) 3));
+
+      Thread.sleep(500L);
+
+      queue.processAll(1000L);
+
+      Assert.assertEquals(1, trackReplicationEvents.size());
+      Assert.assertEquals(1, copyEvents.size());
+
+      Assert.assertEquals(trackReplicationEvents.get(0).getId(),
+          copyEvents.get(0).getCommand().getId());
+
+      //event is timed out
+      Thread.sleep(1500);
+
+      queue.processAll(1000L);
+
+      //original copy command + retry
+      Assert.assertEquals(2, trackReplicationEvents.size());
+      Assert.assertEquals(2, copyEvents.size());
+
+    } finally {
+      if (leaseManager != null) {
+        leaseManager.shutdown();
+      }
+    }
+  }
+
+  public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
+      throws IOException {
+    Objects.requireNonNull(ids, "ids == null");
+    final Iterator<DatanodeDetails> i = ids.iterator();
+    Preconditions.checkArgument(i.hasNext());
+    final DatanodeDetails leader = i.next();
+    String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
+    final Pipeline pipeline =
+        new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
+            ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+    pipeline.addMember(leader);
+    for (; i.hasNext(); ) {
+      pipeline.addMember(i.next());
+    }
+    return pipeline;
+  }
+
+}

+ 7 - 7
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java → hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java

@@ -15,7 +15,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.hadoop.ozone.container.replication;
+package org.apache.hadoop.hdds.scm.container.replication;
 
 import java.util.Random;
 import java.util.UUID;
@@ -39,7 +39,7 @@ public class TestReplicationQueue {
   }
 
   @Test
-  public void testDuplicateAddOp() {
+  public void testDuplicateAddOp() throws InterruptedException {
     long contId = random.nextLong();
     String nodeId = UUID.randomUUID().toString();
     ReplicationRequest obj1, obj2, obj3;
@@ -53,12 +53,12 @@ public class TestReplicationQueue {
     replicationQueue.add(obj3);
     Assert.assertEquals("Should add only 1 msg as second one is duplicate",
         1, replicationQueue.size());
-    ReplicationRequest temp = replicationQueue.poll();
+    ReplicationRequest temp = replicationQueue.take();
     Assert.assertEquals(temp, obj3);
   }
 
   @Test
-  public void testPollOp() {
+  public void testPollOp() throws InterruptedException {
     long contId = random.nextLong();
     String nodeId = UUID.randomUUID().toString();
     ReplicationRequest msg1, msg2, msg3, msg4, msg5;
@@ -82,19 +82,19 @@ public class TestReplicationQueue {
     // Since Priority queue orders messages according to replication count,
     // message with lowest replication should be first
     ReplicationRequest temp;
-    temp = replicationQueue.poll();
+    temp = replicationQueue.take();
     Assert.assertEquals("Should have 2 objects",
         2, replicationQueue.size());
     Assert.assertEquals(temp, msg3);
 
-    temp = replicationQueue.poll();
+    temp = replicationQueue.take();
     Assert.assertEquals("Should have 1 objects",
         1, replicationQueue.size());
     Assert.assertEquals(temp, msg5);
 
     // Message 2 should be ordered before message 5 as both have same replication
     // number but message 2 has earlier timestamp.
-    temp = replicationQueue.poll();
+    temp = replicationQueue.take();
     Assert.assertEquals("Should have 0 objects",
         replicationQueue.size(), 0);
     Assert.assertEquals(temp, msg4);

+ 1 - 1
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java → hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java

@@ -19,5 +19,5 @@
 /**
  * SCM Testing and Mocking Utils.
  */
-package org.apache.hadoop.ozone.container.replication;
+package org.apache.hadoop.hdds.scm.container.replication;
 // Test classes for Replication functionality.

+ 3 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
@@ -86,11 +87,11 @@ public class TestContainerPlacement {
     for (int x = 0; x < opsCount; x++) {
       long containerSize = random.nextInt(100) * OzoneConsts.GB;
       List<DatanodeDetails> nodesCapacity =
-          capacityPlacer.chooseDatanodes(nodesRequired, containerSize);
+          capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, containerSize);
       assertEquals(nodesRequired, nodesCapacity.size());
 
       List<DatanodeDetails> nodesRandom =
-          randomPlacer.chooseDatanodes(nodesRequired, containerSize);
+          randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, containerSize);
 
       // One fifth of all calls are delete
       if (x % 5 == 0) {