Browse Source

YARN-11005. Implement the core QUEUE_LENGTH_THEN_RESOURCES OContainer allocation policy (#3717)

Andrew Chung 3 years ago
parent
commit
ffee92bbf1

+ 35 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java

@@ -404,18 +404,51 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   @Override
   public float ratio(Resource a, Resource b) {
-    float ratio = 0.0f;
+    return ratio(a, b, true);
+  }
+
+  /**
+   * Computes the ratio of resource a over resource b,
+   * where the boolean flag {@literal isDominantShare} allows
+   * specification of whether the max- or min-share should be computed.
+   * @param a the numerator resource.
+   * @param b the denominator resource.
+   * @param isDominantShare whether the dominant (max) share should be computed,
+   *                        computes the min-share if false.
+   * @return the max- or min-share ratio of the resources.
+   */
+  private float ratio(Resource a, Resource b, boolean isDominantShare) {
+    float ratio = isDominantShare ? 0.0f : 1.0f;
     int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation aResourceInformation = a.getResourceInformation(i);
       ResourceInformation bResourceInformation = b.getResourceInformation(i);
       final float tmp = divideSafelyAsFloat(aResourceInformation.getValue(),
           bResourceInformation.getValue());
-      ratio = ratio > tmp ? ratio : tmp;
+      if (isDominantShare) {
+        ratio = Math.max(ratio, tmp);
+      } else {
+        ratio = Math.min(ratio, tmp);
+      }
     }
     return ratio;
   }
 
+  /**
+   * Computes the ratio of resource a over resource b.
+   * However, different from ratio(Resource, Resource),
+   * this returns the min-share of the resources.
+   * For example, ratio(Resource(10, 50), Resource(100, 100)) would return 0.5,
+   * whereas minRatio(Resource(10, 50), Resource(100, 100)) would return 0.1.
+   * @param a the numerator resource.
+   * @param b the denominator resource.
+   * @return the min-share ratio of the resources.
+   */
+  @Unstable
+  public float minRatio(Resource a, Resource b) {
+    return ratio(a, b, false);
+  }
+
   @Override
   public Resource divideAndCeil(Resource numerator, int denominator) {
     return divideAndCeil(numerator, (long) denominator);

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

@@ -257,7 +257,9 @@ public class OpportunisticContainerAllocatorAMService
 
     int limitMin, limitMax;
 
-    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
+    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH ||
+        comparator ==
+            NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) {
       limitMin = rmContext.getYarnConfiguration()
           .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
               YarnConfiguration.

+ 10 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java

@@ -251,14 +251,15 @@ public class CentralizedOpportunisticContainerAllocator extends
       String userName, Map<Resource, List<Allocation>> allocations)
       throws YarnException {
     List<Container> allocatedContainers = new ArrayList<>();
+    final ResourceRequest resourceRequest = enrichedAsk.getRequest();
     while (toAllocate > 0) {
       RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation,
-          blacklist);
+          blacklist, resourceRequest.getCapability());
       if (node != null) {
         toAllocate--;
         Container container = createContainer(rmIdentifier, appParams,
             idCounter, id, userName, allocations, nodeLocation,
-            enrichedAsk.getRequest(), convertToRemoteNode(node));
+            resourceRequest, convertToRemoteNode(node));
         allocatedContainers.add(container);
         LOG.info("Allocated [{}] as opportunistic at location [{}]",
             container.getId(), nodeLocation);
@@ -280,14 +281,15 @@ public class CentralizedOpportunisticContainerAllocator extends
       String userName, Map<Resource, List<Allocation>> allocations)
       throws YarnException {
     List<Container> allocatedContainers = new ArrayList<>();
+    final ResourceRequest resourceRequest = enrichedAsk.getRequest();
     while (toAllocate > 0) {
       RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation,
-          blacklist);
+          blacklist, resourceRequest.getCapability());
       if (node != null) {
         toAllocate--;
         Container container = createContainer(rmIdentifier, appParams,
             idCounter, id, userName, allocations, rackLocation,
-            enrichedAsk.getRequest(), convertToRemoteNode(node));
+            resourceRequest, convertToRemoteNode(node));
         allocatedContainers.add(container);
         metrics.incrRackLocalOppContainers();
         LOG.info("Allocated [{}] as opportunistic at location [{}]",
@@ -309,13 +311,15 @@ public class CentralizedOpportunisticContainerAllocator extends
       String userName, Map<Resource, List<Allocation>> allocations)
       throws YarnException {
     List<Container> allocatedContainers = new ArrayList<>();
+    final ResourceRequest resourceRequest = enrichedAsk.getRequest();
     while (toAllocate > 0) {
-      RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist);
+      RMNode node = nodeQueueLoadMonitor.selectAnyNode(
+          blacklist, resourceRequest.getCapability());
       if (node != null) {
         toAllocate--;
         Container container = createContainer(rmIdentifier, appParams,
             idCounter, id, userName, allocations, ResourceRequest.ANY,
-            enrichedAsk.getRequest(), convertToRemoteNode(node));
+            resourceRequest, convertToRemoteNode(node));
         allocatedContainers.add(container);
         metrics.incrOffSwitchOppContainers();
         LOG.info("Allocated [{}] as opportunistic at location [{}]",

+ 199 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java

@@ -20,74 +20,243 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
  */
 public class ClusterNode {
-  private final AtomicInteger queueLength = new AtomicInteger(0);
-  private final AtomicInteger queueWaitTime = new AtomicInteger(-1);
+  /**
+   * Properties class used to initialize/change fields in ClusterNode.
+   */
+  public static final class Properties {
+    private int queueLength = 0;
+    private int queueWaitTime = -1;
+    private long timestamp;
+    private int queueCapacity = 0;
+    private boolean queueCapacityIsSet = false;
+    private final HashSet<String> labels;
+    private Resource capability = null;
+    private Resource allocatedResource = null;
+
+    public static Properties newInstance() {
+      return new Properties();
+    }
+
+    Properties setQueueLength(int qLength) {
+      this.queueLength = qLength;
+      return this;
+    }
+
+    Properties setQueueWaitTime(int wTime) {
+      this.queueWaitTime = wTime;
+      return this;
+    }
+
+    Properties updateTimestamp() {
+      this.timestamp = System.currentTimeMillis();
+      return this;
+    }
+
+    Properties setQueueCapacity(int capacity) {
+      this.queueCapacity = capacity;
+      this.queueCapacityIsSet = true;
+      return this;
+    }
+
+    Properties setNodeLabels(Collection<String> labelsToAdd) {
+      labels.clear();
+      labels.addAll(labelsToAdd);
+      return this;
+    }
+
+    Properties setCapability(Resource nodeCapability) {
+      this.capability = nodeCapability;
+      return this;
+    }
+
+    Properties setAllocatedResource(Resource allocResource) {
+      this.allocatedResource = allocResource;
+      return this;
+    }
+
+    private Properties() {
+      labels = new HashSet<>();
+    }
+  }
+
+  private int queueLength = 0;
+  private int queueWaitTime = -1;
   private long timestamp;
   final NodeId nodeId;
   private int queueCapacity = 0;
   private final HashSet<String> labels;
+  private Resource capability = Resources.none();
+  private Resource allocatedResource = Resources.none();
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+  private final ReentrantReadWriteLock.ReadLock readLock;
 
   public ClusterNode(NodeId nodeId) {
     this.nodeId = nodeId;
     this.labels = new HashSet<>();
-    updateTimestamp();
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.writeLock = lock.writeLock();
+    this.readLock = lock.readLock();
+    this.timestamp = System.currentTimeMillis();
   }
 
-  public ClusterNode setQueueLength(int qLength) {
-    this.queueLength.set(qLength);
-    return this;
-  }
+  public ClusterNode setProperties(final Properties properties) {
+    writeLock.lock();
+    try {
+      if (properties.capability == null) {
+        this.capability = Resources.none();
+      } else {
+        this.capability = properties.capability;
+      }
 
-  public ClusterNode setQueueWaitTime(int wTime) {
-    this.queueWaitTime.set(wTime);
-    return this;
+      if (properties.allocatedResource == null) {
+        this.allocatedResource = Resources.none();
+      } else {
+        this.allocatedResource = properties.allocatedResource;
+      }
+
+      this.queueLength = properties.queueLength;
+      this.queueWaitTime = properties.queueWaitTime;
+      this.timestamp = properties.timestamp;
+      if (properties.queueCapacityIsSet) {
+        // queue capacity is only set on node add, not on node updates
+        this.queueCapacity = properties.queueCapacity;
+      }
+      this.labels.clear();
+      this.labels.addAll(properties.labels);
+      return this;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  public ClusterNode updateTimestamp() {
-    this.timestamp = System.currentTimeMillis();
-    return this;
+  public Resource getAllocatedResource() {
+    readLock.lock();
+    try {
+      return this.allocatedResource;
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public ClusterNode setQueueCapacity(int capacity) {
-    this.queueCapacity = capacity;
-    return this;
+  public Resource getAvailableResource() {
+    readLock.lock();
+    try {
+      return Resources.subtractNonNegative(capability, allocatedResource);
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public ClusterNode setNodeLabels(Collection<String> labelsToAdd) {
-    labels.clear();
-    labels.addAll(labelsToAdd);
-    return this;
+  public Resource getCapability() {
+    readLock.lock();
+    try {
+      return this.capability;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public boolean hasLabel(String label) {
-    return this.labels.contains(label);
+    readLock.lock();
+    try {
+      return this.labels.contains(label);
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public long getTimestamp() {
-    return this.timestamp;
+    readLock.lock();
+    try {
+      return this.timestamp;
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public AtomicInteger getQueueLength() {
-    return this.queueLength;
+  public int getQueueLength() {
+    readLock.lock();
+    try {
+      return this.queueLength;
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public AtomicInteger getQueueWaitTime() {
-    return this.queueWaitTime;
+  public int getQueueWaitTime() {
+    readLock.lock();
+    try {
+      return this.queueWaitTime;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public int getQueueCapacity() {
-    return this.queueCapacity;
+    readLock.lock();
+    try {
+      return this.queueCapacity;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public boolean compareAndIncrementAllocation(
+      final int incrementQLen,
+      final ResourceCalculator resourceCalculator,
+      final Resource requested) {
+    writeLock.lock();
+    try {
+      final Resource currAvailable = Resources.subtractNonNegative(
+          capability, allocatedResource);
+      if (resourceCalculator.fitsIn(requested, currAvailable)) {
+        allocatedResource = Resources.add(allocatedResource, requested);
+        return true;
+      }
+
+      if (!resourceCalculator.fitsIn(requested, capability)) {
+        // If does not fit at all, do not allocate
+        return false;
+      }
+
+      return compareAndIncrementAllocation(incrementQLen);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public boolean compareAndIncrementAllocation(final int incrementQLen) {
+    writeLock.lock();
+    try {
+      final int added = queueLength + incrementQLen;
+      if (added <= queueCapacity) {
+        queueLength = added;
+        return true;
+      }
+      return false;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public boolean isQueueFull() {
-    return this.queueCapacity > 0 &&
-        this.queueLength.get() >= this.queueCapacity;
+    readLock.lock();
+    try {
+      return this.queueCapacity > 0 &&
+          this.queueLength >= this.queueCapacity;
+    } finally {
+      readLock.unlock();
+    }
   }
 }

+ 224 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java

@@ -18,8 +18,13 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
+import org.apache.commons.math3.util.Precision;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -64,39 +69,165 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
    * of two Nodes are compared.
    */
   public enum LoadComparator implements Comparator<ClusterNode> {
+    /**
+     * This policy only considers queue length.
+     * When allocating, increments queue length without looking at resources
+     * available on the node, and when sorting, also only sorts by queue length.
+     */
     QUEUE_LENGTH,
-    QUEUE_WAIT_TIME;
+    /**
+     * This policy only considers the wait time of containers in the queue.
+     * Neither looks at resources nor at queue length.
+     */
+    QUEUE_WAIT_TIME,
+    /**
+     * This policy considers both queue length and resources.
+     * When allocating, first decrements resources available on a node.
+     * If resources are available, does not place OContainers on the node queue.
+     * When sorting, it first sorts by queue length,
+     * then by available resources.
+     */
+    QUEUE_LENGTH_THEN_RESOURCES;
+
+    private Resource clusterResource = Resources.none();
+    private final DominantResourceCalculator resourceCalculator =
+        new DominantResourceCalculator();
+
+    private boolean shouldPerformMinRatioComputation() {
+      if (clusterResource == null) {
+        return false;
+      }
+
+      return !resourceCalculator.isAnyMajorResourceZeroOrNegative(
+          clusterResource);
+    }
+
+    /**
+     * Compares queue length of nodes first (shortest first),
+     * then compares available resources normalized
+     * over cluster resources (most available resources first).
+     * @param o1 the first ClusterNode
+     * @param o2 the second ClusterNode
+     * @return the difference the two ClusterNodes for sorting
+     */
+    private int compareQueueLengthThenResources(
+        final ClusterNode o1, final ClusterNode o2) {
+      int diff = o1.getQueueLength() - o2.getQueueLength();
+      if (diff != 0) {
+        return diff;
+      }
+
+      final Resource availableResource1 = o1.getAvailableResource();
+      final Resource availableResource2 = o2.getAvailableResource();
+
+      // Cluster resource should be valid before performing min-ratio logic
+      // Use raw available resource comparison otherwise
+      if (shouldPerformMinRatioComputation()) {
+        // Takes the least available resource of the two nodes,
+        // normalized to the overall cluster resource
+        final float availableRatio1 =
+            resourceCalculator.minRatio(availableResource1, clusterResource);
+        final float availableRatio2 =
+            resourceCalculator.minRatio(availableResource2, clusterResource);
+
+        // The one with more available resources should be placed first
+        diff = Precision.compareTo(
+            availableRatio2, availableRatio1, Precision.EPSILON);
+      }
+
+      if (diff == 0) {
+        // Compare absolute value if ratios are the same
+        diff = availableResource2.getVirtualCores() - availableResource1.getVirtualCores();
+      }
+
+      if (diff == 0) {
+        diff = Long.compare(availableResource2.getMemorySize(),
+            availableResource1.getMemorySize());
+      }
+
+      return diff;
+    }
 
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
-      if (getMetric(o1) == getMetric(o2)) {
-        return (int)(o2.getTimestamp() - o1.getTimestamp());
+      int diff;
+      switch (this) {
+      case QUEUE_LENGTH_THEN_RESOURCES:
+        diff = compareQueueLengthThenResources(o1, o2);
+        break;
+      case QUEUE_WAIT_TIME:
+      case QUEUE_LENGTH:
+      default:
+        diff = getMetric(o1) - getMetric(o2);
+        break;
       }
-      return getMetric(o1) - getMetric(o2);
+
+      if (diff == 0) {
+        return (int) (o2.getTimestamp() - o1.getTimestamp());
+      }
+      return diff;
+    }
+
+    @VisibleForTesting
+    void setClusterResource(Resource clusterResource) {
+      this.clusterResource = clusterResource;
+    }
+
+    public ResourceCalculator getResourceCalculator() {
+      return resourceCalculator;
     }
 
     public int getMetric(ClusterNode c) {
-      return (this == QUEUE_LENGTH) ?
-          c.getQueueLength().get() : c.getQueueWaitTime().get();
+      switch (this) {
+      case QUEUE_WAIT_TIME:
+        return c.getQueueWaitTime();
+      case QUEUE_LENGTH:
+      case QUEUE_LENGTH_THEN_RESOURCES:
+      default:
+        return c.getQueueLength();
+      }
     }
 
     /**
      * Increment the metric by a delta if it is below the threshold.
      * @param c ClusterNode
      * @param incrementSize increment size
+     * @param requested the requested resource
      * @return true if the metric was below threshold and was incremented.
      */
-    public boolean compareAndIncrement(ClusterNode c, int incrementSize) {
-      if(this == QUEUE_LENGTH) {
-        int ret = c.getQueueLength().addAndGet(incrementSize);
-        if (ret <= c.getQueueCapacity()) {
-          return true;
+    public boolean compareAndIncrement(
+        ClusterNode c, int incrementSize, Resource requested) {
+      switch (this) {
+      case QUEUE_LENGTH_THEN_RESOURCES:
+        return c.compareAndIncrementAllocation(
+            incrementSize, resourceCalculator, requested);
+      case QUEUE_WAIT_TIME:
+        // for queue wait time, we don't have any threshold.
+        return true;
+      case QUEUE_LENGTH:
+      default:
+        return c.compareAndIncrementAllocation(incrementSize);
+      }
+    }
+
+    /**
+     * Whether we should be placing OContainers on a node.
+     * @param cn the clusterNode
+     * @return whether we should be placing OContainers on a node.
+     */
+    public boolean isNodeAvailable(final ClusterNode cn) {
+      int queueCapacity = cn.getQueueCapacity();
+      int queueLength = cn.getQueueLength();
+      if (this == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) {
+        if (queueCapacity <= 0) {
+          return queueLength <= 0;
+        } else {
+          return queueLength < queueCapacity;
         }
-        c.getQueueLength().addAndGet(-incrementSize);
-        return false;
       }
-      // for queue wait time, we don't have any threshold.
-      return true;
+      // In the special case where queueCapacity is 0 for the node,
+      // the container can be allocated on the node but will be rejected there
+      return queueCapacity <= 0 || queueLength < queueCapacity;
     }
   }
 
@@ -261,13 +392,21 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
 
     if (rmNode.getState() != NodeState.DECOMMISSIONING &&
         (estimatedQueueWaitTime != -1 ||
-            comparator == LoadComparator.QUEUE_LENGTH)) {
-      this.clusterNodes.put(rmNode.getNodeID(),
-          new ClusterNode(rmNode.getNodeID())
+            comparator == LoadComparator.QUEUE_LENGTH ||
+            comparator == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES)) {
+      final ClusterNode.Properties properties =
+          ClusterNode.Properties.newInstance()
               .setQueueWaitTime(estimatedQueueWaitTime)
               .setQueueLength(waitQueueLength)
               .setNodeLabels(rmNode.getNodeLabels())
-              .setQueueCapacity(opportQueueCapacity));
+              .setCapability(rmNode.getTotalCapability())
+              .setAllocatedResource(rmNode.getAllocatedContainerResource())
+              .setQueueCapacity(opportQueueCapacity)
+              .updateTimestamp();
+
+      this.clusterNodes.put(rmNode.getNodeID(),
+          new ClusterNode(rmNode.getNodeID()).setProperties(properties));
+
       LOG.info(
           "Inserting ClusterNode [{}] with queue wait time [{}] and "
               + "wait queue length [{}]",
@@ -295,12 +434,19 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
 
     if (rmNode.getState() != NodeState.DECOMMISSIONING &&
         (estimatedQueueWaitTime != -1 ||
-            comparator == LoadComparator.QUEUE_LENGTH)) {
-      clusterNode
-          .setQueueWaitTime(estimatedQueueWaitTime)
-          .setQueueLength(waitQueueLength)
-          .setNodeLabels(rmNode.getNodeLabels())
-          .updateTimestamp();
+            comparator == LoadComparator.QUEUE_LENGTH ||
+            comparator == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES)) {
+      final ClusterNode.Properties properties =
+          ClusterNode.Properties.newInstance()
+              .setQueueWaitTime(estimatedQueueWaitTime)
+              .setQueueLength(waitQueueLength)
+              .setNodeLabels(rmNode.getNodeLabels())
+              .setCapability(rmNode.getTotalCapability())
+              .setAllocatedResource(rmNode.getAllocatedContainerResource())
+              .updateTimestamp();
+
+      clusterNode.setProperties(properties);
+
       LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and"
               + " wait queue length [{}]", rmNode.getNodeID(),
           estimatedQueueWaitTime, waitQueueLength);
@@ -345,27 +491,47 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     }
   }
 
-  public RMNode selectLocalNode(String hostName, Set<String> blacklist) {
+  /**
+   * Selects the node as specified by hostName for resource allocation,
+   * unless the node has been blacklisted.
+   * @param hostName the hostname of the node for local resource allocation
+   * @param blacklist the blacklisted nodes
+   * @param request the requested resource
+   * @return the selected node, null if the node is full or is blacklisted
+   */
+  public RMNode selectLocalNode(
+      String hostName, Set<String> blacklist, Resource request) {
     if (blacklist.contains(hostName)) {
       return null;
     }
     RMNode node = nodeByHostName.get(hostName);
     if (node != null) {
       ClusterNode clusterNode = clusterNodes.get(node.getNodeID());
-      if (comparator.compareAndIncrement(clusterNode, 1)) {
+      if (clusterNode != null && comparator
+          .compareAndIncrement(clusterNode, 1, request)) {
         return node;
       }
     }
     return null;
   }
 
-  public RMNode selectRackLocalNode(String rackName, Set<String> blacklist) {
+  /**
+   * Selects a node from the rack as specified by rackName
+   * for resource allocation, excluding blacklisted nodes
+   * @param rackName the rack name for rack-local resource allocation
+   * @param blacklist the blacklisted nodes
+   * @param request the requested resource
+   * @return the selected node, null if no suitable nodes
+   */
+  public RMNode selectRackLocalNode(
+      String rackName, Set<String> blacklist, Resource request) {
     Set<NodeId> nodesOnRack = nodeIdsByRack.get(rackName);
     if (nodesOnRack != null) {
       for (NodeId nodeId : nodesOnRack) {
         if (!blacklist.contains(nodeId.getHost())) {
           ClusterNode node = clusterNodes.get(nodeId);
-          if (node != null && comparator.compareAndIncrement(node, 1)) {
+          if (node != null &&
+              comparator.compareAndIncrement(node, 1, request)) {
             return nodeByHostName.get(nodeId.getHost());
           }
         }
@@ -374,7 +540,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     return null;
   }
 
-  public RMNode selectAnyNode(Set<String> blacklist) {
+  /**
+   * Selects a node from all ClusterNodes for resource allocation,
+   * excluding blacklisted nodes.
+   * @param blacklist the blacklisted nodes
+   * @param request the requested resource
+   * @return the selected node, null if no suitable nodes
+   */
+  public RMNode selectAnyNode(Set<String> blacklist, Resource request) {
     List<NodeId> nodeIds = getCandidatesForSelectAnyNode();
     int size = nodeIds.size();
     if (size <= 0) {
@@ -388,7 +561,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       NodeId nodeId = nodeIds.get(index);
       if (nodeId != null && !blacklist.contains(nodeId.getHost())) {
         ClusterNode node = clusterNodes.get(nodeId);
-        if (node != null && comparator.compareAndIncrement(node, 1)) {
+        if (node != null && comparator.compareAndIncrement(
+            node, 1, request)) {
           return nodeByHostName.get(nodeId.getHost());
         }
       }
@@ -402,7 +576,10 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
 
   protected void removeFromNodeIdsByRack(RMNode removedNode) {
     nodeIdsByRack.computeIfPresent(removedNode.getRackName(),
-        (k, v) -> v).remove(removedNode.getNodeID());
+        (k, v) -> {
+          v.remove(removedNode.getNodeID());
+          return v;
+        });
   }
 
   protected void addIntoNodeIdsByRack(RMNode addedNode) {
@@ -414,21 +591,21 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
     readLock.lock();
     try {
-      ArrayList<ClusterNode> aList = new ArrayList<>(this.clusterNodes.values());
-      List<ClusterNode> retList = new ArrayList<>();
-      Object[] nodes = aList.toArray();
-      // Collections.sort would do something similar by calling Arrays.sort
-      // internally but would finally iterate through the input list (aList)
-      // to reset the value of each element. Since we don't really care about
-      // 'aList', we can use the iteration to create the list of nodeIds which
-      // is what we ultimately care about.
-      Arrays.sort(nodes, (Comparator)comparator);
-      for (int j=0; j < nodes.length; j++) {
-        ClusterNode cNode = (ClusterNode)nodes[j];
-        // Only add node to the result list when either condition is met:
-        // 1. we don't exclude full nodes
-        // 2. we do exclude full nodes, but the current node is not full
-        if (!excludeFullNodes || !cNode.isQueueFull()) {
+      final ClusterNode[] nodes = new ClusterNode[clusterNodes.size()];
+      int nodesIdx = 0;
+      final Resource clusterResource = Resource.newInstance(Resources.none());
+      for (final ClusterNode node : this.clusterNodes.values()) {
+        Resources.addTo(clusterResource, node.getCapability());
+        nodes[nodesIdx] = node;
+        nodesIdx++;
+      }
+
+      comparator.setClusterResource(clusterResource);
+
+      final List<ClusterNode> retList = new ArrayList<>();
+      Arrays.sort(nodes, comparator);
+      for (final ClusterNode cNode : nodes) {
+        if (!excludeFullNodes || comparator.isNodeAvailable(cNode)) {
           retList.add(cNode);
         }
       }

+ 506 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java

@@ -18,17 +18,29 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -36,8 +48,18 @@ import java.util.Set;
  */
 public class TestNodeQueueLoadMonitor {
 
+  // Extra resource type to test that all resource dimensions are considered
+  private static final String NETWORK_RESOURCE = "network";
   private final static int DEFAULT_MAX_QUEUE_LENGTH = 200;
 
+  // Note: The following variables are private static resources
+  // re-initialized on each test because resource dimensions considered
+  // are initialized in a static method.
+  // Declaring them as static final will "lock-in" resource dimensions and
+  // disallow specification of a new resource dimension ("network") in tests.
+  private static Resource defaultResourceRequested;
+  private static Resource defaultCapacity;
+
   static class FakeNodeId extends NodeId {
     final String host;
     final int port;
@@ -70,6 +92,44 @@ public class TestNodeQueueLoadMonitor {
     }
   }
 
+  private static Resource newResourceInstance(long memory, int vCores) {
+    return newResourceInstance(memory, vCores, 0L);
+  }
+
+  private static Resource newResourceInstance(
+      final long memory, final int vCores, final long network) {
+    return Resource.newInstance(memory, vCores,
+        ImmutableMap.of(NETWORK_RESOURCE, network));
+  }
+
+  private static long getNetworkResourceValue(final Resource resource) {
+    return resource.getResourceValue(NETWORK_RESOURCE);
+  }
+
+  public static void addNewTypesToResources(String... resourceTypes) {
+    // Initialize resource map
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+
+    // Initialize mandatory resources
+    riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
+    riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
+
+    for (String newResource : resourceTypes) {
+      riMap.put(newResource, ResourceInformation
+          .newInstance(newResource, "", 0, ResourceTypes.COUNTABLE, 0,
+              Integer.MAX_VALUE));
+    }
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+  }
+
+  @BeforeClass
+  public static void classSetUp() {
+    addNewTypesToResources(NETWORK_RESOURCE);
+    defaultResourceRequested = newResourceInstance(128, 1, 1);
+    defaultCapacity = newResourceInstance(1024, 8, 1000);
+  }
+
   @Test
   public void testWaitTimeSort() {
     NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
@@ -79,7 +139,6 @@ public class TestNodeQueueLoadMonitor {
     selector.updateNode(createRMNode("h3", 3, 10, 10));
     selector.computeTask.run();
     List<NodeId> nodeIds = selector.selectNodes();
-    System.out.println("1-> " + nodeIds);
     Assert.assertEquals("h2:2", nodeIds.get(0).toString());
     Assert.assertEquals("h3:3", nodeIds.get(1).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
@@ -88,7 +147,6 @@ public class TestNodeQueueLoadMonitor {
     selector.updateNode(createRMNode("h3", 3, 2, 10));
     selector.computeTask.run();
     nodeIds = selector.selectNodes();
-    System.out.println("2-> "+ nodeIds);
     Assert.assertEquals("h3:3", nodeIds.get(0).toString());
     Assert.assertEquals("h2:2", nodeIds.get(1).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
@@ -97,7 +155,6 @@ public class TestNodeQueueLoadMonitor {
     selector.updateNode(createRMNode("h4", 4, -1, 10));
     selector.computeTask.run();
     nodeIds = selector.selectNodes();
-    System.out.println("3-> "+ nodeIds);
     // No change
     Assert.assertEquals("h3:3", nodeIds.get(0).toString());
     Assert.assertEquals("h2:2", nodeIds.get(1).toString());
@@ -186,6 +243,208 @@ public class TestNodeQueueLoadMonitor {
     Assert.assertEquals("h4:4", nodeIds.get(2).toString());
   }
 
+  @Test
+  public void testQueueLengthThenResourcesSort() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES);
+
+    // Node and queue sizes were selected such that we can determine the
+    // order of these nodes in the selectNodes call deterministically
+    // h2 -> h1 -> h3 -> h4
+    selector.updateNode(createRMNode(
+        "h1", 1, -1, 0,
+        Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h2", 2, -1, 0,
+        Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h3", 3, -1, 5,
+        Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h4", 4, -1, 10,
+        Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+
+    // Now update node3
+    // node3 should now rank after node4 since it has the same queue length
+    // but less resources available
+    selector.updateNode(createRMNode(
+        "h3", 3, -1, 10,
+        Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(3).toString());
+
+    // Now update h3 and fill its queue -- it should no longer be available
+    selector.updateNode(createRMNode("h3", 3, -1,
+        DEFAULT_MAX_QUEUE_LENGTH));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    // h3 is queued up, so we should only have 3 nodes left
+    Assert.assertEquals(3, nodeIds.size());
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+
+    // Now update h2 to Decommissioning state
+    selector.updateNode(createRMNode("h2", 2, -1,
+        5, NodeState.DECOMMISSIONING));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    // h2 is decommissioned, and h3 is full, so we should only have 2 nodes
+    Assert.assertEquals(2, nodeIds.size());
+    Assert.assertEquals("h1:1", nodeIds.get(0).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+
+    // Now update h2 back to Running state
+    selector.updateNode(createRMNode(
+        "h2", 2, -1, 0,
+        Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals(3, nodeIds.size());
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+
+    // Now update h2 to have a zero queue capacity.
+    // Make sure that here it is still in the pool.
+    selector.updateNode(createRMNode(
+        "h2", 2, -1, 0, 0,
+        Resources.multiply(defaultResourceRequested, 2),
+        defaultCapacity));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals(3, nodeIds.size());
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+
+    // Now update h2 to have a positive queue length but a zero queue capacity.
+    // Make sure that here it is no longer in the pool.
+    // Need to first remove the node, because node capacity is not updated.
+    selector.removeNode(createRMNode(
+        "h2", 2, -1, 0, 0,
+        Resources.multiply(defaultResourceRequested, 2),
+        defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h2", 2, -1, 1, 0,
+        Resources.multiply(defaultResourceRequested, 2),
+        defaultCapacity));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals(2, nodeIds.size());
+    Assert.assertEquals("h1:1", nodeIds.get(0).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+  }
+
+  /**
+   * Tests that when using QUEUE_LENGTH_THEN_RESOURCES decrements the amount
+   * of resources on the internal {@link ClusterNode} representation.
+   */
+  @Test
+  public void testQueueLengthThenResourcesDecrementsAvailable() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES);
+    RMNode node = createRMNode("h1", 1, -1, 0);
+    selector.addNode(null, node);
+    selector.updateNode(node);
+    selector.updateSortedNodes();
+
+    ClusterNode clusterNode = selector.getClusterNodes().get(node.getNodeID());
+    Assert.assertEquals(Resources.none(),
+        clusterNode.getAllocatedResource());
+
+    // Has enough resources
+    RMNode selectedNode = selector.selectAnyNode(
+        Collections.emptySet(), defaultResourceRequested);
+    Assert.assertNotNull(selectedNode);
+    Assert.assertEquals(node.getNodeID(), selectedNode.getNodeID());
+
+    clusterNode = selector.getClusterNodes().get(node.getNodeID());
+    Assert.assertEquals(defaultResourceRequested,
+        clusterNode.getAllocatedResource());
+
+    // Does not have enough resources, but can queue
+    selectedNode = selector.selectAnyNode(
+        Collections.emptySet(), defaultCapacity);
+    Assert.assertNotNull(selectedNode);
+    Assert.assertEquals(node.getNodeID(), selectedNode.getNodeID());
+
+    clusterNode = selector.getClusterNodes().get(node.getNodeID());
+    Assert.assertEquals(1, clusterNode.getQueueLength());
+
+    // Does not have enough resources and cannot queue
+    selectedNode = selector.selectAnyNode(
+        Collections.emptySet(),
+        Resources.add(defaultResourceRequested, defaultCapacity));
+    Assert.assertNull(selectedNode);
+  }
+
+  @Test
+  public void testQueueLengthThenResourcesCapabilityChange() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES);
+
+    // Node sizes were selected such that we can determine the
+    // order of these nodes in the selectNodes call deterministically
+    // h1 -> h2 -> h3 -> h4
+    selector.updateNode(createRMNode(
+        "h1", 1, -1, 0,
+        Resources.multiply(defaultResourceRequested, 1), defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h2", 2, -1, 0,
+        Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h3", 3, -1, 0,
+        Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
+    selector.updateNode(createRMNode(
+        "h4", 4, -1, 0,
+        Resources.multiply(defaultResourceRequested, 4), defaultCapacity));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    Assert.assertEquals("h1:1", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+
+    // Now update node1 to have only defaultResourceRequested available
+    // by changing its capability to 2x defaultResourceReqeusted
+    // node1 should now rank last
+    selector.updateNode(createRMNode(
+        "h1", 1, -1, 0,
+        Resources.multiply(defaultResourceRequested, 1),
+        Resources.multiply(defaultResourceRequested, 2)));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(3).toString());
+
+    // Now update node2 to have no resources available
+    // by changing its capability to 1x defaultResourceReqeusted
+    // node2 should now rank last
+    selector.updateNode(createRMNode(
+        "h2", 2, -1, 0,
+        Resources.multiply(defaultResourceRequested, 1),
+        Resources.multiply(defaultResourceRequested, 1)));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(3).toString());
+  }
+
   @Test
   public void testContainerQueuingLimit() {
     NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
@@ -254,18 +513,22 @@ public class TestNodeQueueLoadMonitor {
     // basic test for selecting node which has queue length less
     // than queue capacity.
     Set<String> blacklist = new HashSet<>();
-    RMNode node = selector.selectLocalNode("h1", blacklist);
+    RMNode node = selector.selectLocalNode(
+        "h1", blacklist, defaultResourceRequested);
     Assert.assertEquals("h1", node.getHostName());
 
     // if node has been added to blacklist
     blacklist.add("h1");
-    node = selector.selectLocalNode("h1", blacklist);
+    node = selector.selectLocalNode(
+        "h1", blacklist, defaultResourceRequested);
     Assert.assertNull(node);
 
-    node = selector.selectLocalNode("h2", blacklist);
+    node = selector.selectLocalNode(
+        "h2", blacklist, defaultResourceRequested);
     Assert.assertNull(node);
 
-    node = selector.selectLocalNode("h3", blacklist);
+    node = selector.selectLocalNode(
+        "h3", blacklist, defaultResourceRequested);
     Assert.assertEquals("h3", node.getHostName());
   }
 
@@ -293,19 +556,23 @@ public class TestNodeQueueLoadMonitor {
     // basic test for selecting node which has queue length less
     // than queue capacity.
     Set<String> blacklist = new HashSet<>();
-    RMNode node = selector.selectRackLocalNode("rack1", blacklist);
+    RMNode node = selector.selectRackLocalNode(
+        "rack1", blacklist, defaultResourceRequested);
     Assert.assertEquals("h1", node.getHostName());
 
     // if node has been added to blacklist
     blacklist.add("h1");
-    node = selector.selectRackLocalNode("rack1", blacklist);
+    node = selector.selectRackLocalNode(
+        "rack1", blacklist, defaultResourceRequested);
     Assert.assertNull(node);
 
-    node = selector.selectRackLocalNode("rack2", blacklist);
+    node = selector.selectRackLocalNode(
+        "rack2", blacklist, defaultResourceRequested);
     Assert.assertEquals("h3", node.getHostName());
 
     blacklist.add("h3");
-    node = selector.selectRackLocalNode("rack2", blacklist);
+    node = selector.selectRackLocalNode(
+        "rack2", blacklist, defaultResourceRequested);
     Assert.assertNull(node);
   }
 
@@ -337,20 +604,217 @@ public class TestNodeQueueLoadMonitor {
     // basic test for selecting node which has queue length
     // less than queue capacity.
     Set<String> blacklist = new HashSet<>();
-    RMNode node = selector.selectAnyNode(blacklist);
+    RMNode node = selector.selectAnyNode(blacklist, defaultResourceRequested);
     Assert.assertTrue(node.getHostName().equals("h1") ||
         node.getHostName().equals("h3"));
 
     // if node has been added to blacklist
     blacklist.add("h1");
-    node = selector.selectAnyNode(blacklist);
+    node = selector.selectAnyNode(blacklist, defaultResourceRequested);
     Assert.assertEquals("h3", node.getHostName());
 
     blacklist.add("h3");
-    node = selector.selectAnyNode(blacklist);
+    node = selector.selectAnyNode(blacklist, defaultResourceRequested);
     Assert.assertNull(node);
   }
 
+  @Test
+  public void testQueueLengthThenResourcesComparator() {
+    NodeQueueLoadMonitor.LoadComparator comparator =
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES;
+
+    NodeId n1 = new FakeNodeId("n1", 5000);
+    NodeId n2 = new FakeNodeId("n2", 5000);
+
+    // Case 1: larger available cores should be ranked first
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(6, 6))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertTrue(comparator.compare(cn1, cn2) < 0);
+    }
+
+    // Case 2: Shorter queue should be ranked first before comparing resources
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(5);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(3, 3))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertTrue(comparator.compare(cn1, cn2) < 0);
+    }
+
+    // Case 3: No capability vs with capability,
+    // with capability should come first
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(Resources.none())
+              .setCapability(newResourceInstance(1, 1, 1000))
+              .setQueueLength(5);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(Resources.none())
+              .setCapability(Resources.none())
+              .setQueueLength(5);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertTrue(comparator.compare(cn1, cn2) < 0);
+    }
+
+    // Case 4: Compare same values
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertEquals(0, comparator.compare(cn1, cn2));
+    }
+
+    // Case 5: If ratio is the same, compare raw values
+    // by VCores first, then memory
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(6, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 6))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      // Both are 60% allocated, but CN1 has 5 avail VCores, CN2 only has 4
+      Assert.assertTrue(comparator.compare(cn1, cn2) < 0);
+    }
+
+    // Case 6: by VCores absolute value
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 6))
+              .setCapability(newResourceInstance(10, 12, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertTrue(comparator.compare(cn2, cn1) < 0);
+    }
+
+    // Case 7: by memory absolute value
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 5))
+              .setCapability(newResourceInstance(10, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(6, 5))
+              .setCapability(newResourceInstance(12, 10, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertTrue(comparator.compare(cn2, cn1) < 0);
+    }
+
+    // Case 8: Memory should be more constraining in the overall cluster,
+    // so rank the node with less allocated memory first
+    {
+      ClusterNode.Properties cn1Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(5, 11))
+              .setCapability(newResourceInstance(10, 100, 1000))
+              .setQueueLength(10);
+      ClusterNode cn1 = new ClusterNode(n1);
+      cn1.setProperties(cn1Props);
+
+      ClusterNode.Properties cn2Props =
+          ClusterNode.Properties.newInstance()
+              .setAllocatedResource(newResourceInstance(6, 10))
+              .setCapability(newResourceInstance(10, 100, 1000))
+              .setQueueLength(10);
+      ClusterNode cn2 = new ClusterNode(n2);
+      cn2.setProperties(cn2Props);
+
+      comparator.setClusterResource(
+          Resources.add(cn1.getCapability(), cn2.getCapability()));
+      Assert.assertTrue(comparator.compare(cn1, cn2) < 0);
+    }
+  }
+
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength) {
     return createRMNode(host, port, waitTime, queueLength,
@@ -377,12 +841,40 @@ public class TestNodeQueueLoadMonitor {
 
   private RMNode createRMNode(String host, int port, String rack,
       int waitTime, int queueLength, int queueCapacity, NodeState state) {
+    return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity,
+        state, Resources.none(), defaultCapacity);
+  }
+
+  private RMNode createRMNode(
+      String host, int port, int waitTime, int queueLength,
+      Resource allocatedResource, Resource nodeResource) {
+    return createRMNode(host, port, waitTime, queueLength,
+        DEFAULT_MAX_QUEUE_LENGTH, allocatedResource, nodeResource);
+  }
+
+  private RMNode createRMNode(
+      String host, int port, int waitTime, int queueLength, int queueCapacity,
+      Resource allocatedResource, Resource nodeResource) {
+    return createRMNode(host, port, "default", waitTime, queueLength,
+        queueCapacity, NodeState.RUNNING, allocatedResource, nodeResource);
+  }
+
+  @SuppressWarnings("parameternumber")
+  private RMNode createRMNode(String host, int port, String rack,
+      int waitTime, int queueLength, int queueCapacity, NodeState state,
+      Resource allocatedResource, Resource nodeResource) {
     RMNode node1 = Mockito.mock(RMNode.class);
     NodeId nID1 = new FakeNodeId(host, port);
     Mockito.when(node1.getHostName()).thenReturn(host);
     Mockito.when(node1.getRackName()).thenReturn(rack);
+    Mockito.when(node1.getNode()).thenReturn(new NodeBase("/" + host));
     Mockito.when(node1.getNodeID()).thenReturn(nID1);
     Mockito.when(node1.getState()).thenReturn(state);
+    Mockito.when(node1.getTotalCapability()).thenReturn(nodeResource);
+    Mockito.when(node1.getNodeUtilization()).thenReturn(
+        ResourceUtilization.newInstance(0, 0, 0));
+    Mockito.when(node1.getAllocatedContainerResource()).thenReturn(
+        allocatedResource);
     OpportunisticContainersStatus status1 =
         Mockito.mock(OpportunisticContainersStatus.class);
     Mockito.when(status1.getEstimatedQueueWaitTime())