Browse Source

HDDS-700. Support rack awared node placement policy based on network topology. Contributed by Sammi Chen.

Xiaoyu Yao 6 years ago
parent
commit
20a4ec351c
13 changed files with 662 additions and 34 deletions
  1. 29 8
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
  2. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
  3. 2 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
  4. 4 6
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
  5. 5 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
  6. 329 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
  7. 5 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
  8. 23 4
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
  9. 1 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
  10. 2 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
  11. 257 0
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
  12. 2 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
  13. 2 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java

+ 29 - 8
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java

@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,9 +37,9 @@ import java.util.UUID;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class DatanodeDetails implements Comparable<DatanodeDetails> {
-
-  /**
+public class DatanodeDetails extends NodeImpl implements
+    Comparable<DatanodeDetails> {
+/**
    * DataNode's unique identifier in the cluster.
    */
   private final UUID uuid;
@@ -47,18 +49,19 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
   private List<Port> ports;
   private String certSerialId;
 
-
   /**
    * Constructs DatanodeDetails instance. DatanodeDetails.Builder is used
    * for instantiating DatanodeDetails.
    * @param uuid DataNode's UUID
    * @param ipAddress IP Address of this DataNode
    * @param hostName DataNode's hostname
+   * @param networkLocation DataNode's network location path
    * @param ports Ports used by the DataNode
    * @param certSerialId serial id from SCM issued certificate.
    */
   private DatanodeDetails(String uuid, String ipAddress, String hostName,
-      List<Port> ports, String certSerialId) {
+      String networkLocation, List<Port> ports, String certSerialId) {
+    super(hostName, networkLocation, NetConstants.NODE_COST_DEFAULT);
     this.uuid = UUID.fromString(uuid);
     this.ipAddress = ipAddress;
     this.hostName = hostName;
@@ -67,6 +70,8 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
   }
 
   protected DatanodeDetails(DatanodeDetails datanodeDetails) {
+    super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(),
+        datanodeDetails.getCost());
     this.uuid = datanodeDetails.uuid;
     this.ipAddress = datanodeDetails.ipAddress;
     this.hostName = datanodeDetails.hostName;
@@ -223,6 +228,8 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
         ipAddress +
         ", host: " +
         hostName +
+        ", networkLocation: " +
+        getNetworkLocation() +
         ", certSerialId: " + certSerialId +
         "}";
   }
@@ -259,6 +266,7 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
     private String id;
     private String ipAddress;
     private String hostName;
+    private String networkLocation;
     private List<Port> ports;
     private String certSerialId;
 
@@ -303,6 +311,17 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
       return this;
     }
 
+    /**
+     * Sets the network location of DataNode.
+     *
+     * @param loc location
+     * @return DatanodeDetails.Builder
+     */
+    public Builder setNetworkLocation(String loc) {
+      this.networkLocation = loc;
+      return this;
+    }
+
     /**
      * Adds a DataNode Port.
      *
@@ -334,9 +353,12 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
      */
     public DatanodeDetails build() {
       Preconditions.checkNotNull(id);
-      return new DatanodeDetails(id, ipAddress, hostName, ports, certSerialId);
+      if (networkLocation == null) {
+        networkLocation = NetConstants.DEFAULT_RACK;
+      }
+      return new DatanodeDetails(id, ipAddress, hostName, networkLocation,
+          ports, certSerialId);
     }
-
   }
 
   /**
@@ -437,5 +459,4 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
   public void setCertSerialId(String certSerialId) {
     this.certSerialId = certSerialId;
   }
-
 }

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java

@@ -484,7 +484,7 @@ public class ReplicationManager {
             .getReplicationFactor().getNumber();
         final int delta = replicationFactor - getReplicaCount(id, replicas);
         final List<DatanodeDetails> selectedDatanodes = containerPlacement
-            .chooseDatanodes(source, delta, container.getUsedBytes());
+            .chooseDatanodes(source, null, delta, container.getUsedBytes());
 
         LOG.info("Container {} is under replicated. Expected replica count" +
                 " is {}, but found {}.", id, replicationFactor,

+ 2 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java

@@ -33,12 +33,13 @@ public interface ContainerPlacementPolicy {
    * that satisfy the nodes and size requirement.
    *
    * @param excludedNodes - list of nodes to be excluded.
+   * @param favoredNodes - list of nodes preferred.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return list of datanodes chosen.
    * @throws IOException
    */
   List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes,
-      int nodesRequired, long sizeRequired)
+      List<DatanodeDetails> favoredNodes, int nodesRequired, long sizeRequired)
       throws IOException;
 }

+ 4 - 6
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java

@@ -97,6 +97,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    *
    *
    * @param excludedNodes - datanodes with existing replicas
+   * @param favoredNodes - list of nodes preferred.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return list of datanodes chosen.
@@ -104,7 +105,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      List<DatanodeDetails> excludedNodes,
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
       int nodesRequired, final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
@@ -137,7 +138,6 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE);
     }
-
     return healthyList;
   }
 
@@ -147,8 +147,8 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    * @param datanodeDetails DatanodeDetails
    * @return true if we have enough space.
    */
-  private boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
-                                 long sizeRequired) {
+  boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
+      long sizeRequired) {
     SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
     return (nodeMetric != null) && (nodeMetric.get() != null)
         && nodeMetric.get().getRemaining().hasResources(sizeRequired);
@@ -196,6 +196,4 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    */
   public abstract DatanodeDetails chooseNode(
       List<DatanodeDetails> healthyNodes);
-
-
 }

+ 5 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java

@@ -86,6 +86,7 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
    *
    *
    * @param excludedNodes - list of the datanodes to exclude.
+   * @param favoredNodes - list of nodes preferred.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return List of datanodes.
@@ -93,10 +94,10 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      List<DatanodeDetails> excludedNodes, final int nodesRequired,
-      final long sizeRequired) throws SCMException {
-    List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+      final int nodesRequired, final long sizeRequired) throws SCMException {
+    List<DatanodeDetails> healthyNodes = super.chooseDatanodes(excludedNodes,
+        favoredNodes, nodesRequired, sizeRequired);
     if (healthyNodes.size() == nodesRequired) {
       return healthyNodes;
     }

+ 329 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java

@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Container placement policy that choose datanodes with network topology
+ * awareness, together with the space to satisfy the size constraints.
+ * <p>
+ * This placement policy complies with the algorithm used in HDFS. With default
+ * 3 replica, two replica will be on the same rack, the third one will on a
+ * different rack.
+ * <p>
+ * This implementation applies to network topology like "/rack/node". Don't
+ * recommend to use this if the network topology has more layers.
+ * <p>
+ */
+public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMContainerPlacementRackAware.class);
+  private final NetworkTopology networkTopology;
+  private boolean fallback;
+  private int RACK_LEVEL = 1;
+  private int MAX_RETRY= 3;
+
+  /**
+   * Constructs a Container Placement with rack awareness.
+   *
+   * @param nodeManager Node Manager
+   * @param conf Configuration
+   * @param fallback Whether reducing constrains to choose a data node when
+   *                 there is no node which satisfy all constrains.
+   *                 Basically, false for open container placement, and true
+   *                 for closed container placement.
+   */
+  public SCMContainerPlacementRackAware(final NodeManager nodeManager,
+      final Configuration conf, final NetworkTopology networkTopology,
+      final boolean fallback) {
+    super(nodeManager, conf);
+    this.networkTopology = networkTopology;
+    this.fallback = fallback;
+  }
+
+  /**
+   * Called by SCM to choose datanodes.
+   * There are two scenarios, one is choosing all nodes for a new pipeline.
+   * Another is choosing node to meet replication requirement.
+   *
+   *
+   * @param excludedNodes - list of the datanodes to exclude.
+   * @param favoredNodes - list of nodes preferred. This is a hint to the
+   *                     allocator, whether the favored nodes will be used
+   *                     depends on whether the nodes meets the allocator's
+   *                     requirement.
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired - size required for the container or block.
+   * @return List of datanodes.
+   * @throws SCMException  SCMException
+   */
+  @Override
+  public List<DatanodeDetails> chooseDatanodes(
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+      int nodesRequired, final long sizeRequired) throws SCMException {
+    Preconditions.checkArgument(nodesRequired > 0);
+
+    int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT);
+    int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
+    if (datanodeCount < nodesRequired + excludedNodesCount) {
+      throw new SCMException("No enough datanodes to choose.", null);
+    }
+    List<DatanodeDetails> mutableFavoredNodes = favoredNodes;
+    // sanity check of favoredNodes
+    if (mutableFavoredNodes != null && excludedNodes != null) {
+      mutableFavoredNodes = new ArrayList<>();
+      mutableFavoredNodes.addAll(favoredNodes);
+      mutableFavoredNodes.removeAll(excludedNodes);
+    }
+    int favoredNodeNum = mutableFavoredNodes == null? 0 :
+        mutableFavoredNodes.size();
+
+    List<Node> chosenNodes = new ArrayList<>();
+    int favorIndex = 0;
+    if (excludedNodes == null || excludedNodes.isEmpty()) {
+      // choose all nodes for a new pipeline case
+      // choose first datanode from scope ROOT or from favoredNodes if not null
+      Node favoredNode = favoredNodeNum > favorIndex ?
+          mutableFavoredNodes.get(favorIndex) : null;
+      Node firstNode;
+      if (favoredNode != null) {
+        firstNode = favoredNode;
+        favorIndex++;
+      } else {
+        firstNode = chooseNode(null, null, sizeRequired);
+      }
+      chosenNodes.add(firstNode);
+      nodesRequired--;
+      if (nodesRequired == 0) {
+        return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0]));
+      }
+
+      // choose second datanode on the same rack as first one
+      favoredNode = favoredNodeNum > favorIndex ?
+          mutableFavoredNodes.get(favorIndex) : null;
+      Node secondNode;
+      if (favoredNode != null &&
+          networkTopology.isSameParent(firstNode, favoredNode)) {
+        secondNode = favoredNode;
+        favorIndex++;
+      } else {
+        secondNode = chooseNode(chosenNodes, firstNode, sizeRequired);
+      }
+      chosenNodes.add(secondNode);
+      nodesRequired--;
+      if (nodesRequired == 0) {
+        return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0]));
+      }
+
+      // choose remaining datanodes on different rack as first and second
+      return chooseNodes(null, chosenNodes, mutableFavoredNodes, favorIndex,
+          nodesRequired, sizeRequired);
+    } else {
+      List<Node> mutableExcludedNodes = new ArrayList<>();
+      mutableExcludedNodes.addAll(excludedNodes);
+      // choose node to meet replication requirement
+      // case 1: one excluded node, choose one on the same rack as the excluded
+      // node, choose others on different racks.
+      Node favoredNode;
+      if (excludedNodes.size() == 1) {
+        favoredNode = favoredNodeNum > favorIndex ?
+            mutableFavoredNodes.get(favorIndex) : null;
+        Node firstNode;
+        if (favoredNode != null &&
+            networkTopology.isSameParent(excludedNodes.get(0), favoredNode)) {
+          firstNode = favoredNode;
+          favorIndex++;
+        } else {
+          firstNode = chooseNode(mutableExcludedNodes, excludedNodes.get(0),
+              sizeRequired);
+        }
+        chosenNodes.add(firstNode);
+        nodesRequired--;
+        if (nodesRequired == 0) {
+          return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0]));
+        }
+        // choose remaining nodes on different racks
+        return chooseNodes(null, chosenNodes, mutableFavoredNodes, favorIndex,
+            nodesRequired, sizeRequired);
+      }
+      // case 2: two or more excluded nodes, if these two nodes are
+      // in the same rack, then choose nodes on different racks, otherwise,
+      // choose one on the same rack as one of excluded nodes, remaining chosen
+      // are on different racks.
+      for(int i = 0; i < excludedNodesCount; i++) {
+        for (int j = i + 1; j < excludedNodesCount; j++) {
+          if (networkTopology.isSameParent(
+              excludedNodes.get(i), excludedNodes.get(j))) {
+            // choose remaining nodes on different racks
+            return chooseNodes(mutableExcludedNodes, chosenNodes,
+                mutableFavoredNodes, favorIndex, nodesRequired, sizeRequired);
+          }
+        }
+      }
+      // choose one data on the same rack with one excluded node
+      favoredNode = favoredNodeNum > favorIndex ?
+          mutableFavoredNodes.get(favorIndex) : null;
+      Node secondNode;
+      if (favoredNode != null && networkTopology.isSameParent(
+          mutableExcludedNodes.get(0), favoredNode)) {
+        secondNode = favoredNode;
+        favorIndex++;
+      } else {
+        secondNode =
+            chooseNode(chosenNodes, mutableExcludedNodes.get(0), sizeRequired);
+      }
+      chosenNodes.add(secondNode);
+      mutableExcludedNodes.add(secondNode);
+      nodesRequired--;
+      if (nodesRequired == 0) {
+        return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0]));
+      }
+      // choose remaining nodes on different racks
+      return chooseNodes(mutableExcludedNodes, chosenNodes, mutableFavoredNodes,
+          favorIndex, nodesRequired, sizeRequired);
+    }
+  }
+
+  @Override
+  public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+    return null;
+  }
+
+  /**
+   * Choose a datanode which meets the requirements. If there is no node which
+   * meets all the requirements, there is fallback chosen process depending on
+   * whether fallback is allowed when this class is instantiated.
+   *
+   *
+   * @param excludedNodes - list of the datanodes to excluded. Can be null.
+   * @param affinityNode - the chosen nodes should be on the same rack as
+   *                    affinityNode. Can be null.
+   * @param sizeRequired - size required for the container or block.
+   * @return List of chosen datanodes.
+   * @throws SCMException  SCMException
+   */
+  private Node chooseNode(List<Node> excludedNodes, Node affinityNode,
+      long sizeRequired) throws SCMException {
+    int ancestorGen = RACK_LEVEL;
+    int maxRetry = MAX_RETRY;
+    while(true) {
+      Node node = networkTopology.chooseRandom(NetConstants.ROOT, null,
+          excludedNodes, affinityNode, ancestorGen);
+      if (node == null) {
+        // cannot find the node which meets all constrains
+        LOG.warn("Failed to find the datanode. excludedNodes:" +
+            (excludedNodes == null ? "" : excludedNodes.toString()) +
+            ", affinityNode:" +
+            (affinityNode == null ? "" : affinityNode.getNetworkFullPath()));
+        if (fallback) {
+          // fallback, don't consider the affinity node
+          if (affinityNode != null) {
+            affinityNode = null;
+            continue;
+          }
+          // fallback, don't consider cross rack
+          if (ancestorGen == RACK_LEVEL) {
+            ancestorGen--;
+            continue;
+          }
+        }
+        // there is no constrains to reduce or fallback is true
+        throw new SCMException("No satisfied datanode to meet the " +
+            " excludedNodes and affinityNode constrains.", null);
+      }
+      if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
+        LOG.debug("Datanode {} is chosen. Required size is {}",
+            node.toString(), sizeRequired);
+        return node;
+      } else {
+        maxRetry--;
+        if (maxRetry == 0) {
+          // avoid the infinite loop
+          String errMsg = "No satisfied datanode to meet the space constrains. "
+              + " sizeRequired: " + sizeRequired;
+          LOG.info(errMsg);
+          throw new SCMException(errMsg, null);
+        }
+      }
+    }
+  }
+
+  /**
+   * Choose a batch of datanodes on different rack than excludedNodes or
+   * chosenNodes.
+   *
+   *
+   * @param excludedNodes - list of the datanodes to excluded. Can be null.
+   * @param chosenNodes - list of nodes already chosen. These nodes should also
+   *                    be excluded. Cannot be null.
+   * @param favoredNodes - list of favoredNodes. It's a hint. Whether the nodes
+   *                     are chosen depends on whether they meet the constrains.
+   *                     Can be null.
+   * @param favorIndex - the node index of favoredNodes which is not chosen yet.
+   * @param sizeRequired - size required for the container or block.
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired - size required for the container or block.
+   * @return List of chosen datanodes.
+   * @throws SCMException  SCMException
+   */
+  private List<DatanodeDetails> chooseNodes(List<Node> excludedNodes,
+      List<Node> chosenNodes, List<DatanodeDetails> favoredNodes,
+      int favorIndex, int nodesRequired, long sizeRequired)
+      throws SCMException {
+    Preconditions.checkArgument(chosenNodes != null);
+    List<Node> excludedNodeList = excludedNodes != null ?
+        excludedNodes : chosenNodes;
+    int favoredNodeNum = favoredNodes == null? 0 : favoredNodes.size();
+    while(true) {
+      Node favoredNode = favoredNodeNum > favorIndex ?
+          favoredNodes.get(favorIndex) : null;
+      Node chosenNode;
+      if (favoredNode != null && networkTopology.isSameParent(
+          excludedNodeList.get(excludedNodeList.size() - 1), favoredNode)) {
+        chosenNode = favoredNode;
+        favorIndex++;
+      } else {
+        chosenNode = chooseNode(excludedNodeList, null, sizeRequired);
+      }
+      excludedNodeList.add(chosenNode);
+      if (excludedNodeList != chosenNodes) {
+        chosenNodes.add(chosenNode);
+      }
+      nodesRequired--;
+      if (nodesRequired == 0) {
+        return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0]));
+      }
+    }
+  }
+}

+ 5 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java

@@ -58,6 +58,7 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
    *
    *
    * @param excludedNodes - list of the datanodes to exclude.
+   * @param favoredNodes - list of nodes preferred.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return List of Datanodes.
@@ -65,10 +66,11 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      List<DatanodeDetails> excludedNodes, final int nodesRequired,
-      final long sizeRequired) throws SCMException {
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+      final int nodesRequired, final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
+        super.chooseDatanodes(excludedNodes, favoredNodes, nodesRequired,
+            sizeRequired);
 
     if (healthyNodes.size() == nodesRequired) {
       return healthyNodes;

+ 23 - 4
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java

@@ -96,6 +96,22 @@ public final class TestUtils {
     return createDatanodeDetails(UUID.randomUUID());
   }
 
+  /**
+   * Creates DatanodeDetails with random UUID, specific hostname and network
+   * location.
+   *
+   * @return DatanodeDetails
+   */
+  public static DatanodeDetails createDatanodeDetails(String hostname,
+       String loc) {
+    String ipAddress = random.nextInt(256)
+        + "." + random.nextInt(256)
+        + "." + random.nextInt(256)
+        + "." + random.nextInt(256);
+    return createDatanodeDetails(UUID.randomUUID().toString(), hostname,
+        ipAddress, loc);
+  }
+
   /**
    * Creates DatanodeDetails using the given UUID.
    *
@@ -108,7 +124,8 @@ public final class TestUtils {
         + "." + random.nextInt(256)
         + "." + random.nextInt(256)
         + "." + random.nextInt(256);
-    return createDatanodeDetails(uuid.toString(), "localhost", ipAddress);
+    return createDatanodeDetails(uuid.toString(), "localhost", ipAddress,
+        null);
   }
 
   /**
@@ -121,7 +138,8 @@ public final class TestUtils {
   public static DatanodeDetails getDatanodeDetails(
       RegisteredCommand registeredCommand) {
     return createDatanodeDetails(registeredCommand.getDatanodeUUID(),
-        registeredCommand.getHostName(), registeredCommand.getIpAddress());
+        registeredCommand.getHostName(), registeredCommand.getIpAddress(),
+        null);
   }
 
   /**
@@ -134,7 +152,7 @@ public final class TestUtils {
    * @return DatanodeDetails
    */
   private static DatanodeDetails createDatanodeDetails(String uuid,
-      String hostname, String ipAddress) {
+      String hostname, String ipAddress, String networkLocation) {
     DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
         DatanodeDetails.Port.Name.STANDALONE, 0);
     DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
@@ -147,7 +165,8 @@ public final class TestUtils {
         .setIpAddress(ipAddress)
         .addPort(containerPort)
         .addPort(ratisPort)
-        .addPort(restPort);
+        .addPort(restPort)
+        .setNetworkLocation(networkLocation);
     return builder.build();
   }
 

+ 1 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java

@@ -96,7 +96,7 @@ public class TestReplicationManager {
     containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class);
 
     Mockito.when(containerPlacementPolicy.chooseDatanodes(
-        Mockito.anyListOf(DatanodeDetails.class),
+        Mockito.anyListOf(DatanodeDetails.class), null,
         Mockito.anyInt(), Mockito.anyLong()))
         .thenAnswer(invocation -> {
           int count = (int) invocation.getArguments()[1];

+ 2 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java

@@ -78,8 +78,8 @@ public class TestSCMContainerPlacementCapacity {
     for (int i = 0; i < 1000; i++) {
 
       //when
-      List<DatanodeDetails> datanodeDetails =
-          scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+      List<DatanodeDetails> datanodeDetails = scmContainerPlacementRandom
+          .chooseDatanodes(existingNodes, null, 1, 15);
 
       //then
       Assert.assertEquals(1, datanodeDetails.size());

+ 257 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java

@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the scm container rack aware placement.
+ */
+public class TestSCMContainerPlacementRackAware {
+  private NetworkTopology cluster;
+  private List<DatanodeDetails> datanodes = new ArrayList<>();
+  // policy with fallback capability
+  private SCMContainerPlacementRackAware policy;
+  // policy prohibit fallback
+  private SCMContainerPlacementRackAware policyNoFallback;
+  // node storage capacity
+  private final long STORAGE_CAPACITY = 100L;
+
+  @Before
+  public void setup() {
+    //initialize network topology instance
+    Configuration conf = new OzoneConfiguration();
+    NodeSchema[] schemas = new NodeSchema[]
+        {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+    NodeSchemaManager.getInstance().init(schemas, true);
+    cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+
+    // build datanodes, and network topology
+    String rack = "/rack";
+    String hostname = "node";
+    for (int i = 0; i < 15; i++) {
+      // Totally 3 racks, each has 5 datanodes
+      DatanodeDetails node = TestUtils.createDatanodeDetails(
+          hostname + i, rack + (i / 5));
+      datanodes.add(node);
+      cluster.add(node);
+    }
+
+    // create mock node manager
+    NodeManager nodeManager = Mockito.mock(NodeManager.class);
+    when(nodeManager.getNodes(NodeState.HEALTHY))
+        .thenReturn(new ArrayList<>(datanodes));
+    when(nodeManager.getNodeStat(anyObject()))
+        .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 0L, 100L));
+    when(nodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 90L, 10L));
+    when(nodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 80L, 20L));
+    when(nodeManager.getNodeStat(datanodes.get(4)))
+        .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 70L, 30L));
+
+    // create placement policy instances
+    policy =
+        new SCMContainerPlacementRackAware(nodeManager, conf, cluster, true);
+    policyNoFallback =
+        new SCMContainerPlacementRackAware(nodeManager, conf, cluster, false);
+  }
+
+
+  @Test
+  public void chooseNodeWithNoExcludedNodes() throws SCMException {
+    // test choose new datanodes for new pipeline cases
+    // 1 replica
+    int nodeNum = 1;
+    List<DatanodeDetails> datanodeDetails =
+        policy.chooseDatanodes(null, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+
+    // 2 replicas
+    nodeNum = 2;
+    datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)));
+
+    //  3 replicas
+    nodeNum = 3;
+    datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(2)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+        datanodeDetails.get(2)));
+
+    //  4 replicas
+    nodeNum = 4;
+    datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(2)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+        datanodeDetails.get(2)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(3)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
+        datanodeDetails.get(3)));
+  }
+
+  @Test
+  public void chooseNodeWithExcludedNodes() throws SCMException {
+    // test choose new datanodes for under replicated pipeline
+    // 3 replicas, two existing datanodes on same rack
+    int nodeNum = 1;
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+
+    excludedNodes.add(datanodes.get(0));
+    excludedNodes.add(datanodes.get(1));
+    List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        excludedNodes.get(0)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        excludedNodes.get(1)));
+
+    // 3 replicas, two existing datanodes on different rack
+    excludedNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    excludedNodes.add(datanodes.get(7));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(cluster.isSameParent(
+        datanodeDetails.get(0), excludedNodes.get(0)) ||
+        cluster.isSameParent(datanodeDetails.get(0), excludedNodes.get(1)));
+
+    // 3 replicas, one existing datanode
+    nodeNum = 2;
+    excludedNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(cluster.isSameParent(
+        datanodeDetails.get(0), excludedNodes.get(0)) ||
+        cluster.isSameParent(datanodeDetails.get(0), excludedNodes.get(1)));
+  }
+
+  @Test
+  public void testFallback() throws SCMException {
+
+    // 5 replicas. there are only 3 racks. policy with fallback should
+    // allocate the 5th datanode though it will break the rack rule(first
+    // 2 replicas on same rack, others are different racks).
+    int nodeNum = 5;
+    List<DatanodeDetails>  datanodeDetails =
+        policy.chooseDatanodes(null, null, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(2)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+        datanodeDetails.get(2)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(3)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
+        datanodeDetails.get(3)));
+  }
+
+
+  @Test(expected = SCMException.class)
+  public void testNoFallback() throws SCMException {
+    // 5 replicas. there are only 3 racks. policy prohibit fallback should fail.
+    int nodeNum = 5;
+    policyNoFallback.chooseDatanodes(null, null, nodeNum, 15);
+  }
+
+  @Test
+  public void chooseNodeWithFavoredNodes() throws SCMException {
+    int nodeNum = 1;
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+    List<DatanodeDetails> favoredNodes = new ArrayList<>();
+
+    // no excludedNodes, only favoredNodes
+    favoredNodes.add(datanodes.get(0));
+    List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, favoredNodes, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(datanodeDetails.get(0).getNetworkFullPath()
+        .equals(favoredNodes.get(0).getNetworkFullPath()));
+
+    // no overlap between excludedNodes and favoredNodes, favoredNodes can been
+    // chosen.
+    excludedNodes.clear();
+    favoredNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    favoredNodes.add(datanodes.get(2));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, favoredNodes, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(datanodeDetails.get(0).getNetworkFullPath()
+        .equals(favoredNodes.get(0).getNetworkFullPath()));
+
+    // there is overlap between excludedNodes and favoredNodes, favoredNodes
+    // should not be chosen.
+    excludedNodes.clear();
+    favoredNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    favoredNodes.add(datanodes.get(0));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, favoredNodes, nodeNum, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertFalse(datanodeDetails.get(0).getNetworkFullPath()
+        .equals(favoredNodes.get(0).getNetworkFullPath()));
+  }
+
+  @Test(expected = SCMException.class)
+  public void testNoInfiniteLoop() throws SCMException {
+    int nodeNum = 1;
+    // request storage space larger than node capability
+    policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 15);
+  }
+}

+ 2 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java

@@ -67,8 +67,8 @@ public class TestSCMContainerPlacementRandom {
 
     for (int i = 0; i < 100; i++) {
       //when
-      List<DatanodeDetails> datanodeDetails =
-          scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+      List<DatanodeDetails> datanodeDetails = scmContainerPlacementRandom
+          .chooseDatanodes(existingNodes, null, 1, 15);
 
       //then
       Assert.assertEquals(1, datanodeDetails.size());

+ 2 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java

@@ -87,12 +87,12 @@ public class TestContainerPlacement {
     for (int x = 0; x < opsCount; x++) {
       long containerSize = random.nextInt(100) * OzoneConsts.GB;
       List<DatanodeDetails> nodesCapacity =
-          capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired,
+          capacityPlacer.chooseDatanodes(new ArrayList<>(), null, nodesRequired,
               containerSize);
       assertEquals(nodesRequired, nodesCapacity.size());
 
       List<DatanodeDetails> nodesRandom =
-          randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired,
+          randomPlacer.chooseDatanodes(nodesCapacity, null, nodesRequired,
               containerSize);
 
       // One fifth of all calls are delete