Browse Source

YARN-9697. Efficient allocation of Opportunistic containers. Contributed by Abhishek Modi.

Abhishek Modi 5 năm trước cách đây
mục cha
commit
fb512f5087
9 tập tin đã thay đổi với 1312 bổ sung43 xóa
  1. 7 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
  2. 26 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
  3. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
  4. 7 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java
  5. 14 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
  6. 340 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java
  7. 114 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
  8. 669 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java
  9. 134 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java

+ 7 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java

@@ -201,7 +201,7 @@ public class DistributedOpportunisticContainerAllocator
     // * Rack local candidates selected in loop == 1
     // * From loop == 2 onwards, we revert to off switch allocations.
     int loopIndex = OFF_SWITCH_LOOP;
-    if (enrichedAsk.getNodeLocations().size() > 0) {
+    if (enrichedAsk.getNodeMap().size() > 0) {
       loopIndex = NODE_LOCAL_LOOP;
     }
     while (numAllocated < toAllocate) {
@@ -218,7 +218,7 @@ public class DistributedOpportunisticContainerAllocator
         }
         String location = ResourceRequest.ANY;
         if (loopIndex == NODE_LOCAL_LOOP) {
-          if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
+          if (enrichedAsk.getNodeMap().containsKey(rNodeHost)) {
             location = rNodeHost;
           } else {
             continue;
@@ -229,7 +229,8 @@ public class DistributedOpportunisticContainerAllocator
           continue;
         }
         if (loopIndex == RACK_LOCAL_LOOP) {
-          if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
+          if (enrichedAsk.getRackMap().containsKey(
+              rNode.getRackName())) {
             location = rNode.getRackName();
           } else {
             continue;
@@ -248,7 +249,7 @@ public class DistributedOpportunisticContainerAllocator
         }
       }
       if (loopIndex == NODE_LOCAL_LOOP &&
-          enrichedAsk.getRackLocations().size() > 0) {
+          enrichedAsk.getRackMap().size() > 0) {
         loopIndex = RACK_LOCAL_LOOP;
       } else {
         loopIndex++;
@@ -318,7 +319,7 @@ public class DistributedOpportunisticContainerAllocator
     String partition = getRequestPartition(enrichedRR);
     for (RemoteNode rNode : allNodes.values()) {
       if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
-          enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+          enrichedRR.getRackMap().containsKey(rNode.getRackName())) {
         String rHost = rNode.getNodeId().getHost();
         if (blackList.contains(rHost)) {
           continue;
@@ -341,7 +342,7 @@ public class DistributedOpportunisticContainerAllocator
       EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
       int numContainers) {
     String partition = getRequestPartition(enrichedRR);
-    for (String nodeName : enrichedRR.getNodeLocations()) {
+    for (String nodeName : enrichedRR.getNodeMap().keySet()) {
       RemoteNode remoteNode = allNodes.get(nodeName);
       if (remoteNode != null &&
           StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {

+ 26 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.Time;
@@ -48,7 +47,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -205,20 +203,36 @@ public abstract class OpportunisticContainerAllocator {
 
   private final BaseContainerTokenSecretManager tokenSecretManager;
 
-  static class Allocation {
+  /**
+   * This class encapsulates container and resourceName for an allocation.
+   */
+  public static class Allocation {
     private final Container container;
     private final String resourceName;
 
-    Allocation(Container container, String resourceName) {
+    /**
+     * Creates an instance of Allocation.
+     * @param container allocated container.
+     * @param resourceName location where it got allocated.
+     */
+    public Allocation(Container container, String resourceName) {
       this.container = container;
       this.resourceName = resourceName;
     }
 
-    Container getContainer() {
+    /**
+     * Get container of the allocation.
+     * @return container of the allocation.
+     */
+    public Container getContainer() {
       return container;
     }
 
-    String getResourceName() {
+    /**
+     * Get resource name of the allocation.
+     * @return resource name of the allocation.
+     */
+    public String getResourceName() {
       return resourceName;
     }
   }
@@ -273,12 +287,12 @@ public abstract class OpportunisticContainerAllocator {
       }
     }
 
-    public Set<String> getNodeLocations() {
-      return nodeLocations.keySet();
+    public Map<String, AtomicInteger> getNodeMap() {
+      return nodeLocations;
     }
 
-    public Set<String> getRackLocations() {
-      return rackLocations.keySet();
+    public Map<String, AtomicInteger> getRackMap() {
+      return rackLocations;
     }
   }
 
@@ -304,8 +318,8 @@ public abstract class OpportunisticContainerAllocator {
     this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
   }
 
-  @VisibleForTesting
-  void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) {
+  public void setMaxAllocationsPerAMHeartbeat(
+      int maxAllocationsPerAMHeartbeat) {
     this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java

@@ -201,7 +201,7 @@ public class OpportunisticContainerContext {
   }
 
   @VisibleForTesting
-  OpportunisticSchedulerMetrics getOppSchedulerMetrics() {
+  public OpportunisticSchedulerMetrics getOppSchedulerMetrics() {
     return OpportunisticSchedulerMetrics.getMetrics();
   }
 }

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java

@@ -56,12 +56,16 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class TestOpportunisticContainerAllocator {
+/**
+ * Test cases for DistributedOpportunisticContainerAllocator.
+ */
+public class TestDistributedOpportunisticContainerAllocator {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestOpportunisticContainerAllocator.class);
+      LoggerFactory.getLogger(
+          TestDistributedOpportunisticContainerAllocator.class);
   private static final int GB = 1024;
-  private OpportunisticContainerAllocator allocator = null;
+  private DistributedOpportunisticContainerAllocator allocator = null;
   private OpportunisticContainerContext oppCntxt = null;
   private static final Priority PRIORITY_NORMAL = Priority.newInstance(1);
   private static final Resource CAPABILITY_1GB =

+ 14 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
-import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.CentralizedOpportunisticContainerAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -232,10 +232,6 @@ public class OpportunisticContainerAllocatorAMService
         YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
         YarnConfiguration.
             DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
-    this.oppContainerAllocator =
-        new DistributedOpportunisticContainerAllocator(
-            rmContext.getContainerTokenSecretManager(),
-            maxAllocationsPerAMHeartbeat);
     this.numNodes = rmContext.getYarnConfiguration().getInt(
         YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
         YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);
@@ -253,7 +249,7 @@ public class OpportunisticContainerAllocatorAMService
                     DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR));
 
     NodeQueueLoadMonitor topKSelector =
-        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
+        new NodeQueueLoadMonitor(nodeSortInterval, comparator, numNodes);
 
     float sigma = rmContext.getYarnConfiguration()
         .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
@@ -285,6 +281,10 @@ public class OpportunisticContainerAllocatorAMService
 
     topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
     this.nodeMonitor = topKSelector;
+    this.oppContainerAllocator =
+        new CentralizedOpportunisticContainerAllocator(
+            rmContext.getContainerTokenSecretManager(),
+            maxAllocationsPerAMHeartbeat, nodeMonitor);
   }
 
   @Override
@@ -371,6 +371,14 @@ public class OpportunisticContainerAllocatorAMService
     }
   }
 
+  @Override
+  protected void serviceStop() throws Exception {
+    if (nodeMonitor != null) {
+      nodeMonitor.stop();
+    }
+    super.serviceStop();
+  }
+
   @Override
   public void handle(SchedulerEvent event) {
     switch (event.getType()) {

+ 340 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java

@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * <p>
+ * The CentralizedOpportunisticContainerAllocator allocates opportunistic
+ * containers by considering all the nodes present in the cluster, after
+ * modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as
+ * possible.
+ * </p>
+ */
+public class CentralizedOpportunisticContainerAllocator extends
+    OpportunisticContainerAllocator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CentralizedOpportunisticContainerAllocator.class);
+
+  private NodeQueueLoadMonitor nodeQueueLoadMonitor;
+  private OpportunisticSchedulerMetrics metrics =
+      OpportunisticSchedulerMetrics.getMetrics();
+
+  /**
+   * Create a new Centralized Opportunistic Container Allocator.
+   * @param tokenSecretManager TokenSecretManager
+   */
+  public CentralizedOpportunisticContainerAllocator(
+      BaseContainerTokenSecretManager tokenSecretManager) {
+    super(tokenSecretManager);
+  }
+
+  /**
+   * Create a new Centralized Opportunistic Container Allocator.
+   * @param tokenSecretManager TokenSecretManager
+   * @param maxAllocationsPerAMHeartbeat max number of containers to be
+   *                                     allocated in one AM heartbeat
+   */
+  public CentralizedOpportunisticContainerAllocator(
+      BaseContainerTokenSecretManager tokenSecretManager,
+      int maxAllocationsPerAMHeartbeat,
+      NodeQueueLoadMonitor nodeQueueLoadMonitor) {
+    super(tokenSecretManager, maxAllocationsPerAMHeartbeat);
+    this.nodeQueueLoadMonitor = nodeQueueLoadMonitor;
+  }
+
+  @VisibleForTesting
+  void setNodeQueueLoadMonitor(NodeQueueLoadMonitor nodeQueueLoadMonitor) {
+    this.nodeQueueLoadMonitor = nodeQueueLoadMonitor;
+  }
+
+  @Override
+  public List<Container> allocateContainers(
+      ResourceBlacklistRequest blackList, List<ResourceRequest> oppResourceReqs,
+      ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerContext opportContext, long rmIdentifier,
+      String appSubmitter) throws YarnException {
+
+    updateBlacklist(blackList, opportContext);
+
+    // Add OPPORTUNISTIC requests to the outstanding ones.
+    opportContext.addToOutstandingReqs(oppResourceReqs);
+
+    Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
+    List<Container> allocatedContainers = new ArrayList<>();
+    int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat();
+    List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
+
+    for (SchedulerRequestKey schedulerKey :
+        opportContext.getOutstandingOpReqs().descendingKeySet()) {
+      // Allocated containers :
+      //  Key = Requested Capability,
+      //  Value = List of Containers of given cap (the actual container size
+      //          might be different than what is requested, which is why
+      //          we need the requested capability (key) to match against
+      //          the outstanding reqs)
+      int remAllocs = -1;
+      if (maxAllocationsPerAMHeartbeat > 0) {
+        remAllocs =
+            maxAllocationsPerAMHeartbeat - getTotalAllocations(allocations);
+        if (remAllocs <= 0) {
+          LOG.info("Not allocating more containers as we have reached max "
+                  + "allocations per AM heartbeat {}",
+              maxAllocationsPerAMHeartbeat);
+          break;
+        }
+      }
+      Map<Resource, List<Allocation>> allocation = allocatePerSchedulerKey(
+          rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
+          appSubmitter, nodeBlackList, remAllocs);
+      if (allocation.size() > 0) {
+        allocations.add(allocation);
+      }
+    }
+    matchAllocation(allocations, allocatedContainers, opportContext);
+    return allocatedContainers;
+  }
+
+  private Map<Resource, List<Allocation>> allocatePerSchedulerKey(
+      long rmIdentifier, OpportunisticContainerContext appContext,
+      SchedulerRequestKey schedKey, ApplicationAttemptId appAttId,
+      String userName, Set<String> blackList, int maxAllocations)
+      throws YarnException {
+    Map<Resource, List<Allocation>> allocations = new HashMap<>();
+    int totalAllocated = 0;
+    for (EnrichedResourceRequest enrichedAsk :
+        appContext.getOutstandingOpReqs().get(schedKey).values()) {
+      int remainingAllocs = -1;
+      if (maxAllocations > 0) {
+        remainingAllocs = maxAllocations - totalAllocated;
+        if (remainingAllocs <= 0) {
+          LOG.info("Not allocating more containers as max allocations per AM "
+              + "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat());
+          break;
+        }
+      }
+
+      totalAllocated += allocateContainersPerRequest(rmIdentifier,
+          appContext.getAppParams(),
+          appContext.getContainerIdGenerator(), blackList,
+          appAttId, userName, allocations, enrichedAsk,
+          remainingAllocs);
+      ResourceRequest anyAsk = enrichedAsk.getRequest();
+      if (!allocations.isEmpty()) {
+        LOG.info("Opportunistic allocation requested for [priority={}, "
+                + "allocationRequestId={}, num_containers={}, capability={}] "
+                + "allocated = {}", anyAsk.getPriority(),
+            anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
+            anyAsk.getCapability(), allocations.keySet());
+      }
+    }
+    return allocations;
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private int allocateContainersPerRequest(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist,
+      ApplicationAttemptId id,
+      String userName, Map<Resource, List<Allocation>> allocations,
+      EnrichedResourceRequest enrichedAsk, int maxAllocations)
+      throws YarnException {
+    ResourceRequest anyAsk = enrichedAsk.getRequest();
+    int totalAllocated = 0;
+    int maxToAllocate = anyAsk.getNumContainers()
+        - (allocations.isEmpty() ? 0 :
+        allocations.get(anyAsk.getCapability()).size());
+    if (maxAllocations >= 0) {
+      maxToAllocate = Math.min(maxAllocations, maxToAllocate);
+    }
+
+    // allocate node local
+    if (maxToAllocate > 0) {
+      Map<String, AtomicInteger> nodeLocations = enrichedAsk.getNodeMap();
+      for (Map.Entry<String, AtomicInteger> nodeLocation :
+          nodeLocations.entrySet()) {
+        int numContainers = nodeLocation.getValue().get();
+        numContainers = Math.min(numContainers, maxToAllocate);
+        List<Container> allocatedContainers =
+            allocateNodeLocal(enrichedAsk, nodeLocation.getKey(),
+                numContainers, rmIdentifier, appParams, idCounter, blacklist,
+                id, userName, allocations);
+        totalAllocated += allocatedContainers.size();
+        maxToAllocate -= allocatedContainers.size();
+        // no more containers to allocate
+        if (maxToAllocate <= 0) {
+          break;
+        }
+      }
+    }
+
+    // if still left, allocate rack local
+    if (maxToAllocate > 0) {
+      Map<String, AtomicInteger> rackLocations = enrichedAsk.getRackMap();
+      for (Map.Entry<String, AtomicInteger> rack : rackLocations.entrySet()) {
+        int numContainers = rack.getValue().get();
+        numContainers = Math.min(numContainers, maxToAllocate);
+        List<Container> allocatedContainers =
+            allocateRackLocal(enrichedAsk, rack.getKey(), numContainers,
+                rmIdentifier, appParams, idCounter, blacklist, id,
+                userName, allocations);
+        totalAllocated += allocatedContainers.size();
+        maxToAllocate -= allocatedContainers.size();
+        // no more containers to allocate
+        if (maxToAllocate <= 0) {
+          break;
+        }
+      }
+    }
+
+    // if still left, try on ANY
+    if (maxToAllocate > 0) {
+      List<Container> allocatedContainers = allocateAny(enrichedAsk,
+          maxToAllocate, rmIdentifier, appParams, idCounter, blacklist,
+          id, userName, allocations);
+      totalAllocated += allocatedContainers.size();
+    }
+    return totalAllocated;
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private List<Container> allocateNodeLocal(
+      EnrichedResourceRequest enrichedAsk,
+      String nodeLocation,
+      int toAllocate, long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist,
+      ApplicationAttemptId id,
+      String userName, Map<Resource, List<Allocation>> allocations)
+      throws YarnException {
+    List<Container> allocatedContainers = new ArrayList<>();
+    while (toAllocate > 0) {
+      RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation,
+          blacklist);
+      if (node != null) {
+        toAllocate--;
+        Container container = createContainer(rmIdentifier, appParams,
+            idCounter, id, userName, allocations, nodeLocation,
+            enrichedAsk.getRequest(), convertToRemoteNode(node));
+        allocatedContainers.add(container);
+        LOG.info("Allocated [{}] as opportunistic at location [{}]",
+            container.getId(), nodeLocation);
+        metrics.incrNodeLocalOppContainers();
+      } else {
+        // we couldn't allocate any - break the loop.
+        break;
+      }
+    }
+    return allocatedContainers;
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private List<Container> allocateRackLocal(EnrichedResourceRequest enrichedAsk,
+      String rackLocation, int toAllocate, long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist,
+      ApplicationAttemptId id,
+      String userName, Map<Resource, List<Allocation>> allocations)
+      throws YarnException {
+    List<Container> allocatedContainers = new ArrayList<>();
+    while (toAllocate > 0) {
+      RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation,
+          blacklist);
+      if (node != null) {
+        toAllocate--;
+        Container container = createContainer(rmIdentifier, appParams,
+            idCounter, id, userName, allocations, rackLocation,
+            enrichedAsk.getRequest(), convertToRemoteNode(node));
+        allocatedContainers.add(container);
+        metrics.incrRackLocalOppContainers();
+        LOG.info("Allocated [{}] as opportunistic at location [{}]",
+            container.getId(), rackLocation);
+      } else {
+        // we couldn't allocate any - break the loop.
+        break;
+      }
+    }
+    return allocatedContainers;
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private List<Container> allocateAny(EnrichedResourceRequest enrichedAsk,
+      int toAllocate, long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist,
+      ApplicationAttemptId id,
+      String userName, Map<Resource, List<Allocation>> allocations)
+      throws YarnException {
+    List<Container> allocatedContainers = new ArrayList<>();
+    while (toAllocate > 0) {
+      RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist);
+      if (node != null) {
+        toAllocate--;
+        Container container = createContainer(rmIdentifier, appParams,
+            idCounter, id, userName, allocations, ResourceRequest.ANY,
+            enrichedAsk.getRequest(), convertToRemoteNode(node));
+        allocatedContainers.add(container);
+        metrics.incrOffSwitchOppContainers();
+        LOG.info("Allocated [{}] as opportunistic at location [{}]",
+            container.getId(), ResourceRequest.ANY);
+      } else {
+        // we couldn't allocate any - break the loop.
+        break;
+      }
+    }
+    return allocatedContainers;
+  }
+
+  private RemoteNode convertToRemoteNode(RMNode rmNode) {
+    if (rmNode != null) {
+      RemoteNode rNode = RemoteNode.newInstance(rmNode.getNodeID(),
+          rmNode.getHttpAddress());
+      rNode.setRackName(rmNode.getRackName());
+      return rNode;
+    }
+    return null;
+  }
+}

+ 114 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java

@@ -34,12 +34,17 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED;
+
 /**
  * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
  * and total wait time) associated with Container Queues on the Node Manager.
@@ -48,9 +53,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class NodeQueueLoadMonitor implements ClusterMonitor {
 
-  final static Logger LOG = LoggerFactory.
+  private final static Logger LOG = LoggerFactory.
       getLogger(NodeQueueLoadMonitor.class);
 
+  private int numNodesForAnyAllocation =
+      DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED;
+
   /**
    * The comparator used to specify the metric against which the load
    * of two Nodes are compared.
@@ -68,14 +76,34 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     }
 
     public int getMetric(ClusterNode c) {
-      return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime;
+      return (this == QUEUE_LENGTH) ?
+          c.queueLength.get() : c.queueWaitTime.get();
+    }
+
+    /**
+     * Increment the metric by a delta if it is below the threshold.
+     * @param c ClusterNode
+     * @param incrementSize increment size
+     * @return true if the metric was below threshold and was incremented.
+     */
+    public boolean compareAndIncrement(ClusterNode c, int incrementSize) {
+      if(this == QUEUE_LENGTH) {
+        int ret = c.queueLength.addAndGet(incrementSize);
+        if (ret <= c.queueCapacity) {
+          return true;
+        }
+        c.queueLength.addAndGet(-incrementSize);
+        return false;
+      }
+      // for queue wait time, we don't have any threshold.
+      return true;
     }
   }
 
   static class ClusterNode {
-    int queueLength = 0;
-    int queueWaitTime = -1;
-    double timestamp;
+    private AtomicInteger queueLength = new AtomicInteger(0);
+    private AtomicInteger queueWaitTime = new AtomicInteger(-1);
+    private long timestamp;
     final NodeId nodeId;
     private int queueCapacity = 0;
 
@@ -85,12 +113,12 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     }
 
     public ClusterNode setQueueLength(int qLength) {
-      this.queueLength = qLength;
+      this.queueLength.set(qLength);
       return this;
     }
 
     public ClusterNode setQueueWaitTime(int wTime) {
-      this.queueWaitTime = wTime;
+      this.queueWaitTime.set(wTime);
       return this;
     }
 
@@ -106,7 +134,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
 
     public boolean isQueueFull() {
       return this.queueCapacity > 0 &&
-          this.queueLength >= this.queueCapacity;
+          this.queueLength.get() >= this.queueCapacity;
     }
   }
 
@@ -115,6 +143,10 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
   private final List<NodeId> sortedNodes;
   private final Map<NodeId, ClusterNode> clusterNodes =
       new ConcurrentHashMap<>();
+  private final Map<String, RMNode> nodeByHostName =
+      new ConcurrentHashMap<>();
+  private final Map<String, Set<NodeId>> nodeIdsByRack =
+      new ConcurrentHashMap<>();
   private final LoadComparator comparator;
   private QueueLimitCalculator thresholdCalculator;
   private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
@@ -151,13 +183,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
   }
 
   public NodeQueueLoadMonitor(long nodeComputationInterval,
-      LoadComparator comparator) {
+      LoadComparator comparator, int numNodes) {
     this.sortedNodes = new ArrayList<>();
     this.scheduledExecutor = Executors.newScheduledThreadPool(1);
     this.comparator = comparator;
     this.scheduledExecutor.scheduleAtFixedRate(computeTask,
         nodeComputationInterval, nodeComputationInterval,
         TimeUnit.MILLISECONDS);
+    numNodesForAnyAllocation = numNodes;
   }
 
   List<NodeId> getSortedNodes() {
@@ -168,6 +201,12 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     return thresholdCalculator;
   }
 
+  public void stop() {
+    if (scheduledExecutor != null) {
+      scheduledExecutor.shutdown();
+    }
+  }
+
   Map<NodeId, ClusterNode> getClusterNodes() {
     return clusterNodes;
   }
@@ -184,15 +223,17 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
   @Override
   public void addNode(List<NMContainerStatus> containerStatuses,
       RMNode rmNode) {
-    LOG.debug("Node added event from: {}", rmNode.getNode().getName());
-
+    this.nodeByHostName.put(rmNode.getHostName(), rmNode);
+    addIntoNodeIdsByRack(rmNode);
     // Ignoring this currently : at least one NODE_UPDATE heartbeat is
     // required to ensure node eligibility.
   }
 
   @Override
   public void removeNode(RMNode removedRMNode) {
-    LOG.debug("Node delete event for: {}", removedRMNode.getNode().getName());
+    LOG.info("Node delete event for: {}", removedRMNode.getNode().getName());
+    this.nodeByHostName.remove(removedRMNode.getHostName());
+    removeFromNodeIdsByRack(removedRMNode);
     ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
     writeLock.lock();
     ClusterNode node;
@@ -303,6 +344,67 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     }
   }
 
+  public RMNode selectLocalNode(String hostName, Set<String> blacklist) {
+    if (blacklist.contains(hostName)) {
+      return null;
+    }
+    RMNode node = nodeByHostName.get(hostName);
+    if (node != null) {
+      ClusterNode clusterNode = clusterNodes.get(node.getNodeID());
+      if (comparator.compareAndIncrement(clusterNode, 1)) {
+        return node;
+      }
+    }
+    return null;
+  }
+
+  public RMNode selectRackLocalNode(String rackName, Set<String> blacklist) {
+    Set<NodeId> nodesOnRack = nodeIdsByRack.get(rackName);
+    if (nodesOnRack != null) {
+      for (NodeId nodeId : nodesOnRack) {
+        if (!blacklist.contains(nodeId.getHost())) {
+          ClusterNode node = clusterNodes.get(nodeId);
+          if (node != null && comparator.compareAndIncrement(node, 1)) {
+            return nodeByHostName.get(nodeId.getHost());
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  public RMNode selectAnyNode(Set<String> blacklist) {
+    List<NodeId> nodeIds = selectLeastLoadedNodes(numNodesForAnyAllocation);
+    int size = nodeIds.size();
+    if (size <= 0) {
+      return null;
+    }
+    Random rand = new Random();
+    int startIndex = rand.nextInt(size);
+    for (int i = 0; i < size; ++i) {
+      int index = i + startIndex;
+      index %= size;
+      NodeId nodeId = nodeIds.get(index);
+      if (nodeId != null && !blacklist.contains(nodeId.getHost())) {
+        ClusterNode node = clusterNodes.get(nodeId);
+        if (node != null && comparator.compareAndIncrement(node, 1)) {
+          return nodeByHostName.get(nodeId.getHost());
+        }
+      }
+    }
+    return null;
+  }
+
+  private void removeFromNodeIdsByRack(RMNode removedNode) {
+    nodeIdsByRack.computeIfPresent(removedNode.getRackName(),
+        (k, v) -> v).remove(removedNode.getNodeID());
+  }
+
+  private void addIntoNodeIdsByRack(RMNode addedNode) {
+    nodeIdsByRack.compute(addedNode.getRackName(), (k, v) -> v == null ?
+        ConcurrentHashMap.newKeySet() : v).add(addedNode.getNodeID());
+  }
+
   private List<NodeId> sortNodes() {
     ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
     readLock.lock();

+ 669 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java

@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for Centralized Opportunistic Container Allocator.
+ */
+public class TestCentralizedOpportunisticContainerAllocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestCentralizedOpportunisticContainerAllocator.class);
+  private static final int GB = 1024;
+  private CentralizedOpportunisticContainerAllocator allocator = null;
+  private OpportunisticContainerContext oppCntxt = null;
+  private static final Priority PRIORITY_NORMAL = Priority.newInstance(1);
+  private static final Resource CAPABILITY_1GB =
+      Resources.createResource(GB);
+  private static final ResourceBlacklistRequest EMPTY_BLACKLIST_REQUEST =
+      ResourceBlacklistRequest.newInstance(
+          new ArrayList<>(), new ArrayList<>());
+
+  @Before
+  public void setup() {
+    // creating a dummy master key to be used for creation of container.
+    final MasterKey mKey = new MasterKey() {
+      @Override
+      public int getKeyId() {
+        return 1;
+      }
+      @Override
+      public void setKeyId(int keyId) {}
+      @Override
+      public ByteBuffer getBytes() {
+        return ByteBuffer.allocate(8);
+      }
+      @Override
+      public void setBytes(ByteBuffer bytes) {}
+    };
+
+    // creating a dummy tokenSecretManager to be used for creation of
+    // container.
+    BaseContainerTokenSecretManager secMan =
+        new BaseContainerTokenSecretManager(new Configuration()) {
+          @Override
+          public MasterKey getCurrentKey() {
+            return mKey;
+          }
+
+          @Override
+          public byte[] createPassword(ContainerTokenIdentifier identifier) {
+            return new byte[]{1, 2};
+          }
+        };
+
+    allocator = new CentralizedOpportunisticContainerAllocator(secMan);
+    oppCntxt = new OpportunisticContainerContext();
+    oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
+    oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
+    oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10));
+  }
+
+  /**
+   * Tests allocation of an Opportunistic container from single application.
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleAllocation() throws Exception {
+    List<ResourceRequest> reqs =
+        Collections.singletonList(createResourceRequest(1, "*", 1));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(1, 2, 100));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    assertEquals(1, containers.size());
+    assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+  }
+
+  /**
+   * Tests Opportunistic container should not be allocated on blacklisted
+   * nodes.
+   * @throws Exception
+   */
+  @Test
+  public void testBlacklistRejection() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            Arrays.asList("h1", "h2"), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Collections.singletonList(createResourceRequest(1, "*", 1));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(2, 2, 100));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+    assertEquals(0, containers.size());
+    assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+  }
+
+  /**
+   * Tests that allocation of Opportunistic containers should be spread out.
+   * @throws Exception
+   */
+  @Test
+  public void testRoundRobinSimpleAllocation() throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(1, ResourceRequest.ANY, 1),
+            createResourceRequest(2, ResourceRequest.ANY, 1),
+            createResourceRequest(3, ResourceRequest.ANY, 1));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 3));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    Set<String> allocatedNodes = new HashSet<>();
+    for (Container c : containers) {
+      allocatedNodes.add(c.getNodeId().toString());
+    }
+    assertTrue(allocatedNodes.contains("h1:1234"));
+    assertTrue(allocatedNodes.contains("h2:1234"));
+    assertTrue(allocatedNodes.contains("h3:1234"));
+    assertEquals(3, containers.size());
+  }
+
+  /**
+   * Tests allocation of node local Opportunistic container requests.
+   * @throws Exception
+   */
+  @Test
+  public void testNodeLocalAllocation() throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(1, ResourceRequest.ANY, 1),
+            createResourceRequest(2, "/r1", 1),
+            createResourceRequest(2, "h1", 1),
+            createResourceRequest(2, ResourceRequest.ANY, 1),
+            createResourceRequest(3, "/r1", 1),
+            createResourceRequest(3, "h1", 1),
+            createResourceRequest(3, ResourceRequest.ANY, 1));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    // all 3 containers should be allocated.
+    assertEquals(3, containers.size());
+    // container with allocation id 2 and 3 should be allocated on node h1
+    for (Container c : containers) {
+      if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) {
+        assertEquals("h1:1234", c.getNodeId().toString());
+      }
+    }
+  }
+
+  /**
+   * Tests node local allocation of Opportunistic container requests with
+   * same allocation request id.
+   * @throws Exception
+   */
+  @Test
+  public void testNodeLocalAllocationSameSchedulerKey() throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(2, "/r1", 2),
+            createResourceRequest(2, "h1", 2),
+            createResourceRequest(2, ResourceRequest.ANY, 2));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeId().toString());
+    }
+    assertEquals(2, containers.size());
+    assertTrue(allocatedHosts.contains("h1:1234"));
+    assertFalse(allocatedHosts.contains("h2:1234"));
+    assertFalse(allocatedHosts.contains("h3:1234"));
+  }
+
+  /**
+   * Tests rack local allocation of Opportunistic container requests.
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleRackLocalAllocation() throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(2, "/r1", 1),
+            createResourceRequest(2, "h4", 1),
+            createResourceRequest(2, ResourceRequest.ANY, 1));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor(
+        Arrays.asList("h1", "h2", "h3"), Arrays.asList("/r2", "/r1", "/r3"),
+        Arrays.asList(2, 2, 2), Arrays.asList(5, 5, 5));
+    allocator.setNodeQueueLoadMonitor(selector);
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeId().toString());
+    }
+    assertTrue(allocatedHosts.contains("h2:1234"));
+    assertFalse(allocatedHosts.contains("h3:1234"));
+    assertFalse(allocatedHosts.contains("h4:1234"));
+    assertEquals(1, containers.size());
+  }
+
+  /**
+   * Tests that allocation of rack local Opportunistic container requests
+   * should be spread out.
+   * @throws Exception
+   */
+  @Test
+  public void testRoundRobinRackLocalAllocation() throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(1, "/r1", 1),
+            createResourceRequest(1, "h5", 1),
+            createResourceRequest(1, ResourceRequest.ANY, 1),
+            createResourceRequest(2, "/r1", 1),
+            createResourceRequest(2, "h5", 1),
+            createResourceRequest(2, ResourceRequest.ANY, 1));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor(
+        Arrays.asList("h1", "h2", "h3", "h4"),
+        Arrays.asList("/r2", "/r1", "/r3", "/r1"),
+        Arrays.asList(4, 4, 4, 4), Arrays.asList(5, 5, 5, 5));
+    allocator.setNodeQueueLoadMonitor(selector);
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeId().toString());
+    }
+    LOG.info("Containers: {}", containers);
+    assertTrue(allocatedHosts.contains("h2:1234"));
+    assertTrue(allocatedHosts.contains("h4:1234"));
+    assertFalse(allocatedHosts.contains("h1:1234"));
+    assertFalse(allocatedHosts.contains("h3:1234"));
+    assertEquals(2, containers.size());
+  }
+
+  /**
+   * Tests that allocation of rack local Opportunistic container requests
+   * with same allocation request id should be spread out.
+   * @throws Exception
+   */
+  @Test
+  public void testRoundRobinRackLocalAllocationSameSchedulerKey()
+      throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(2, "/r1", 2),
+            createResourceRequest(2, "h5", 2),
+            createResourceRequest(2, ResourceRequest.ANY, 2));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor(
+        Arrays.asList("h1", "h2", "h3", "h4"),
+        Arrays.asList("/r2", "/r1", "/r3", "/r1"),
+        Arrays.asList(4, 4, 4, 4), Arrays.asList(5, 5, 5, 5));
+    allocator.setNodeQueueLoadMonitor(selector);
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeId().toString());
+    }
+    LOG.info("Containers: {}", containers);
+    assertTrue(allocatedHosts.contains("h2:1234"));
+    assertTrue(allocatedHosts.contains("h4:1234"));
+    assertFalse(allocatedHosts.contains("h1:1234"));
+    assertFalse(allocatedHosts.contains("h3:1234"));
+    assertEquals(2, containers.size());
+  }
+
+  /**
+   * Tests off switch allocation of Opportunistic containers.
+   * @throws Exception
+   */
+  @Test
+  public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(2, "/r3", 2),
+            createResourceRequest(2, "h6", 2),
+            createResourceRequest(2, ResourceRequest.ANY, 2));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor(
+        Arrays.asList("h1", "h2", "h3", "h4"),
+        Arrays.asList("/r2", "/r1", "/r2", "/r1"),
+        Arrays.asList(4, 4, 4, 4), Arrays.asList(5, 5, 5, 5));
+    allocator.setNodeQueueLoadMonitor(selector);
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    assertEquals(2, containers.size());
+  }
+
+  /**
+   * Tests allocation of rack local Opportunistic containers with same
+   * scheduler key.
+   * @throws Exception
+   */
+  @Test
+  public void testLotsOfContainersRackLocalAllocationSameSchedulerKey()
+      throws Exception {
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(2, "/r1", 1000),
+            createResourceRequest(2, "h1", 1000),
+            createResourceRequest(2, ResourceRequest.ANY, 1000));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor(
+        Arrays.asList("h1", "h2", "h3", "h4"),
+        Arrays.asList("/r1", "/r1", "/r1", "/r2"),
+        Arrays.asList(0, 0, 0, 0), Arrays.asList(500, 500, 500, 300));
+    allocator.setNodeQueueLoadMonitor(selector);
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+
+    Map<String, Integer> hostsToNumContainerMap = new HashMap<>();
+    for (Container c : containers) {
+      String host = c.getNodeId().toString();
+      int numContainers = 0;
+      if (hostsToNumContainerMap.containsKey(host)) {
+        numContainers = hostsToNumContainerMap.get(host);
+      }
+      hostsToNumContainerMap.put(host, numContainers + 1);
+    }
+    assertEquals(1000, containers.size());
+    assertEquals(500, hostsToNumContainerMap.get("h1:1234").intValue());
+    assertFalse(hostsToNumContainerMap.containsKey("h4:1234"));
+  }
+
+  /**
+   * Tests scheduling of many rack local Opportunistic container requests.
+   * @throws Exception
+   */
+  @Test
+  public void testLotsOfContainersRackLocalAllocation()
+      throws Exception {
+    List<ResourceRequest> reqs = new ArrayList<>();
+    // add 100 container requests.
+    for (int i = 0; i < 100; i++) {
+      reqs.add(createResourceRequest(i + 1, ResourceRequest.ANY, 1));
+      reqs.add(createResourceRequest(i + 1, "h5", 1));
+      reqs.add(createResourceRequest(i + 1, "/r1", 1));
+    }
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    NodeQueueLoadMonitor selector = createNodeQueueLoadMonitor(
+        Arrays.asList("h1", "h2", "h3", "h4"),
+        Arrays.asList("/r1", "/r1", "/r1", "/r2"),
+        Arrays.asList(0, 0, 0, 0), Arrays.asList(500, 500, 500, 300));
+    allocator.setNodeQueueLoadMonitor(selector);
+
+    List<Container> containers = new ArrayList<>();
+    containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    assertEquals(100, containers.size());
+  }
+
+  /**
+   * Tests maximum number of opportunistic containers that can be allocated in
+   * AM heartbeat.
+   * @throws Exception
+   */
+  @Test
+  public void testMaxAllocationsPerAMHeartbeat() throws Exception {
+    allocator.setMaxAllocationsPerAMHeartbeat(2);
+    List<ResourceRequest> reqs = Arrays.asList(
+        createResourceRequest(2, "/r3", 3),
+        createResourceRequest(2, "h6", 3),
+        createResourceRequest(2, ResourceRequest.ANY, 3));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    // Although capacity is present, but only 2 containers should be allocated
+    // as max allocation per AM heartbeat is set to 2.
+    assertEquals(2, containers.size());
+    containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, new ArrayList<>(), appAttId,
+        oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    // Remaining 1 container should be allocated.
+    assertEquals(1, containers.size());
+  }
+
+  /**
+   * Tests maximum opportunistic container allocation per AM heartbeat for
+   * allocation requests with different scheduler key.
+   * @throws Exception
+   */
+  @Test
+  public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey()
+      throws Exception {
+    allocator.setMaxAllocationsPerAMHeartbeat(2);
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            createResourceRequest(1, ResourceRequest.ANY, 1),
+            createResourceRequest(2, "h6", 2),
+            createResourceRequest(3, "/r3", 2));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    // Although capacity is present, but only 2 containers should be allocated
+    // as max allocation per AM heartbeat is set to 2.
+    assertEquals(2, containers.size());
+    containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, new ArrayList<>(), appAttId,
+        oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    // 2 more containers should be allocated from pending allocation requests.
+    assertEquals(2, containers.size());
+    containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, new ArrayList<>(), appAttId,
+        oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    // Remaining 1 container should be allocated.
+    assertEquals(1, containers.size());
+  }
+
+  /**
+   * Tests maximum opportunistic container allocation per AM heartbeat when
+   * limit is set to -1.
+   * @throws Exception
+   */
+  @Test
+  public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception {
+    allocator.setMaxAllocationsPerAMHeartbeat(-1);
+
+    List<ResourceRequest> reqs = new ArrayList<>();
+    final int numContainers = 20;
+    for (int i = 0; i < numContainers; i++) {
+      reqs.add(createResourceRequest(i + 1, "h1", 1));
+    }
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 500));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+
+    // all containers should be allocated in single heartbeat.
+    assertEquals(numContainers, containers.size());
+  }
+
+  /**
+   * Tests maximum opportunistic container allocation per AM heartbeat when
+   * limit is set to higher value.
+   * @throws Exception
+   */
+  @Test
+  public void testMaxAllocationsPerAMHeartbeatWithHighLimit()
+      throws Exception {
+    allocator.setMaxAllocationsPerAMHeartbeat(100);
+    final int numContainers = 20;
+    List<ResourceRequest> reqs = new ArrayList<>();
+    for (int i = 0; i < numContainers; i++) {
+      reqs.add(createResourceRequest(i + 1, "h1", 1));
+    }
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 500));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+
+    // all containers should be allocated in single heartbeat.
+    assertEquals(numContainers, containers.size());
+  }
+
+  /**
+   * Test opportunistic container allocation latency metrics.
+   * @throws Exception
+   */
+  @Test
+  public void testAllocationLatencyMetrics() throws Exception {
+    oppCntxt = spy(oppCntxt);
+    OpportunisticSchedulerMetrics metrics =
+        mock(OpportunisticSchedulerMetrics.class);
+    when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics);
+    List<ResourceRequest> reqs = Arrays.asList(
+        createResourceRequest(2, "/r3", 2),
+        createResourceRequest(2, "h6", 2),
+        createResourceRequest(2, ResourceRequest.ANY, 2));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    allocator.setNodeQueueLoadMonitor(createNodeQueueLoadMonitor(3, 2, 5));
+
+    List<Container> containers = allocator.allocateContainers(
+        EMPTY_BLACKLIST_REQUEST, reqs, appAttId, oppCntxt, 1L, "user");
+    LOG.info("Containers: {}", containers);
+    assertEquals(2, containers.size());
+    // for each allocated container, latency should be added.
+    verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong());
+  }
+
+  private NodeQueueLoadMonitor createNodeQueueLoadMonitor(int numNodes,
+      int queueLength, int queueCapacity) {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+    for (int i = 1; i <= numNodes; ++i) {
+      RMNode node = createRMNode("h" + i, 1234, queueLength, queueCapacity);
+      selector.addNode(null, node);
+      selector.updateNode(node);
+    }
+    selector.computeTask.run();
+    return selector;
+  }
+
+  private NodeQueueLoadMonitor createNodeQueueLoadMonitor(List<String> hosts,
+      List<String> racks, List<Integer> queueLengths,
+      List<Integer> queueCapacities) {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+    for (int i = 0; i < hosts.size(); ++i) {
+      RMNode node = createRMNode(hosts.get(i), 1234, racks.get(i),
+          queueLengths.get(i), queueCapacities.get(i));
+      selector.addNode(null, node);
+      selector.updateNode(node);
+    }
+    selector.computeTask.run();
+    return selector;
+  }
+
+  private ResourceRequest createResourceRequest(int allocationId,
+      String location, int numContainers) {
+    return ResourceRequest.newBuilder()
+        .allocationRequestId(allocationId)
+        .priority(PRIORITY_NORMAL)
+        .resourceName(location)
+        .capability(CAPABILITY_1GB)
+        .relaxLocality(true)
+        .numContainers(numContainers)
+        .executionType(ExecutionType.OPPORTUNISTIC).build();
+  }
+
+  private RMNode createRMNode(String host, int port, int queueLength,
+      int queueCapacity) {
+    return createRMNode(host, port, "default", queueLength,
+        queueCapacity);
+  }
+
+  private RMNode createRMNode(String host, int port, String rack,
+      int queueLength, int queueCapacity) {
+    RMNode node1 = Mockito.mock(RMNode.class);
+    NodeId nID1 = new TestNodeQueueLoadMonitor.FakeNodeId(host, port);
+    Mockito.when(node1.getHostName()).thenReturn(host);
+    Mockito.when(node1.getRackName()).thenReturn(rack);
+    Mockito.when(node1.getNodeID()).thenReturn(nID1);
+    Mockito.when(node1.getState()).thenReturn(NodeState.RUNNING);
+    OpportunisticContainersStatus status1 =
+        Mockito.mock(OpportunisticContainersStatus.class);
+    Mockito.when(status1.getEstimatedQueueWaitTime())
+        .thenReturn(-1);
+    Mockito.when(status1.getWaitQueueLength())
+        .thenReturn(queueLength);
+    Mockito.when(status1.getOpportQueueCapacity())
+        .thenReturn(queueCapacity);
+    Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
+    return node1;
+  }
+}

+ 134 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java

@@ -27,7 +27,9 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Unit tests for NodeQueueLoadMonitor.
@@ -228,6 +230,127 @@ public class TestNodeQueueLoadMonitor {
 
   }
 
+  /**
+   * Tests selection of local node from NodeQueueLoadMonitor. This test covers
+   * selection of node based on queue limit and blacklisted nodes.
+   */
+  @Test
+  public void testSelectLocalNode() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+
+    RMNode h1 = createRMNode("h1", 1, -1, 2, 5);
+    RMNode h2 = createRMNode("h2", 2, -1, 5, 5);
+    RMNode h3 = createRMNode("h3", 3, -1, 4, 5);
+
+    selector.addNode(null, h1);
+    selector.addNode(null, h2);
+    selector.addNode(null, h3);
+
+    selector.updateNode(h1);
+    selector.updateNode(h2);
+    selector.updateNode(h3);
+
+    // basic test for selecting node which has queue length less
+    // than queue capacity.
+    Set<String> blacklist = new HashSet<>();
+    RMNode node = selector.selectLocalNode("h1", blacklist);
+    Assert.assertEquals("h1", node.getHostName());
+
+    // if node has been added to blacklist
+    blacklist.add("h1");
+    node = selector.selectLocalNode("h1", blacklist);
+    Assert.assertNull(node);
+
+    node = selector.selectLocalNode("h2", blacklist);
+    Assert.assertNull(node);
+
+    node = selector.selectLocalNode("h3", blacklist);
+    Assert.assertEquals("h3", node.getHostName());
+  }
+
+  /**
+   * Tests selection of rack local node from NodeQueueLoadMonitor. This test
+   * covers selection of node based on queue limit and blacklisted nodes.
+   */
+  @Test
+  public void testSelectRackLocalNode() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+
+    RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5);
+    RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5);
+    RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5);
+
+    selector.addNode(null, h1);
+    selector.addNode(null, h2);
+    selector.addNode(null, h3);
+
+    selector.updateNode(h1);
+    selector.updateNode(h2);
+    selector.updateNode(h3);
+
+    // basic test for selecting node which has queue length less
+    // than queue capacity.
+    Set<String> blacklist = new HashSet<>();
+    RMNode node = selector.selectRackLocalNode("rack1", blacklist);
+    Assert.assertEquals("h1", node.getHostName());
+
+    // if node has been added to blacklist
+    blacklist.add("h1");
+    node = selector.selectRackLocalNode("rack1", blacklist);
+    Assert.assertNull(node);
+
+    node = selector.selectRackLocalNode("rack2", blacklist);
+    Assert.assertEquals("h3", node.getHostName());
+
+    blacklist.add("h3");
+    node = selector.selectRackLocalNode("rack2", blacklist);
+    Assert.assertNull(node);
+  }
+
+  /**
+   * Tests selection of any node from NodeQueueLoadMonitor. This test
+   * covers selection of node based on queue limit and blacklisted nodes.
+   */
+  @Test
+  public void testSelectAnyNode() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+
+    RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5);
+    RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5);
+    RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 10);
+
+    selector.addNode(null, h1);
+    selector.addNode(null, h2);
+    selector.addNode(null, h3);
+
+    selector.updateNode(h1);
+    selector.updateNode(h2);
+    selector.updateNode(h3);
+
+    selector.computeTask.run();
+
+    Assert.assertEquals(2, selector.getSortedNodes().size());
+
+    // basic test for selecting node which has queue length
+    // less than queue capacity.
+    Set<String> blacklist = new HashSet<>();
+    RMNode node = selector.selectAnyNode(blacklist);
+    Assert.assertTrue(node.getHostName().equals("h1") ||
+        node.getHostName().equals("h3"));
+
+    // if node has been added to blacklist
+    blacklist.add("h1");
+    node = selector.selectAnyNode(blacklist);
+    Assert.assertEquals("h3", node.getHostName());
+
+    blacklist.add("h3");
+    node = selector.selectAnyNode(blacklist);
+    Assert.assertNull(node);
+  }
+
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength) {
     return createRMNode(host, port, waitTime, queueLength,
@@ -236,20 +359,28 @@ public class TestNodeQueueLoadMonitor {
 
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength, NodeState state) {
-    return createRMNode(host, port, waitTime, queueLength,
+    return createRMNode(host, port, "default", waitTime, queueLength,
         DEFAULT_MAX_QUEUE_LENGTH, state);
   }
 
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength, int queueCapacity) {
-    return createRMNode(host, port, waitTime, queueLength, queueCapacity,
+    return createRMNode(host, port, "default", waitTime, queueLength,
+        queueCapacity, NodeState.RUNNING);
+  }
+
+  private RMNode createRMNode(String host, int port, String rack,
+      int waitTime, int queueLength, int queueCapacity) {
+    return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity,
         NodeState.RUNNING);
   }
 
-  private RMNode createRMNode(String host, int port,
+  private RMNode createRMNode(String host, int port, String rack,
       int waitTime, int queueLength, int queueCapacity, NodeState state) {
     RMNode node1 = Mockito.mock(RMNode.class);
     NodeId nID1 = new FakeNodeId(host, port);
+    Mockito.when(node1.getHostName()).thenReturn(host);
+    Mockito.when(node1.getRackName()).thenReturn(rack);
     Mockito.when(node1.getNodeID()).thenReturn(nID1);
     Mockito.when(node1.getState()).thenReturn(state);
     OpportunisticContainersStatus status1 =