Browse Source

YARN-7587. Skip dispatching opportunistic containers to nodes whose queue is already full. (Weiwei Yang via asuresh)

(cherry picked from commit 37ca4169508c3003dbe9044fefd37eb8cd8c0503)
Arun Suresh 7 years ago
parent
commit
6f123aae41

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java

@@ -149,4 +149,23 @@ public abstract class OpportunisticContainersStatus {
   @Unstable
   public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
 
+
+  /**
+   * Gets the capacity of the opportunistic containers queue on the node.
+   *
+   * @return queue capacity.
+   */
+  @Private
+  @Unstable
+  public abstract int getOpportQueueCapacity();
+
+
+  /**
+   * Sets the capacity of the opportunistic containers queue on the node.
+   *
+   * @param queueCapacity queue capacity.
+   */
+  @Private
+  @Unstable
+  public abstract void setOpportQueueCapacity(int queueCapacity);
 }

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java

@@ -136,4 +136,17 @@ public class OpportunisticContainersStatusPBImpl
     maybeInitBuilder();
     builder.setEstimatedQueueWaitTime(queueWaitTime);
   }
+
+  @Override
+  public int getOpportQueueCapacity() {
+    YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getOpportQueueCapacity();
+  }
+
+  @Override
+  public void setOpportQueueCapacity(int maxOpportQueueLength) {
+    maybeInitBuilder();
+    builder.setOpportQueueCapacity(maxOpportQueueLength);
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto

@@ -49,6 +49,7 @@ message OpportunisticContainersStatusProto {
   optional int32 queued_opport_containers = 4;
   optional int32 wait_queue_length = 5;
   optional int32 estimated_queue_wait_time = 6;
+  optional int32 opport_queue_capacity = 7;
 }
 
 message MasterKeyProto {

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java

@@ -68,6 +68,7 @@ public class ContainerScheduler extends AbstractService implements
       LoggerFactory.getLogger(ContainerScheduler.class);
 
   private final Context context;
+  // Capacity of the queue for opportunistic Containers.
   private final int maxOppQueueLength;
 
   // Queue of Guaranteed Containers waiting for resources to run
@@ -258,6 +259,15 @@ public class ContainerScheduler extends AbstractService implements
         + this.queuedOpportunisticContainers.size();
   }
 
+  /**
+   * Return the capacity of the queue for opportunistic containers
+   * on this node.
+   * @return queue capacity.
+   */
+  public int getOpportunisticQueueCapacity() {
+    return this.maxOppQueueLength;
+  }
+
   @VisibleForTesting
   public int getNumQueuedGuaranteedContainers() {
     return this.queuedGuaranteedContainers.size();
@@ -290,6 +300,8 @@ public class ContainerScheduler extends AbstractService implements
         metrics.getAllocatedOpportunisticVCores());
     this.opportunisticContainersStatus.setRunningOpportContainers(
         metrics.getRunningOpportunisticContainers());
+    this.opportunisticContainersStatus.setOpportQueueCapacity(
+        getOpportunisticQueueCapacity());
     return this.opportunisticContainersStatus;
   }
 

+ 20 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java

@@ -75,6 +75,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     int queueWaitTime = -1;
     double timestamp;
     final NodeId nodeId;
+    private int queueCapacity = 0;
 
     public ClusterNode(NodeId nodeId) {
       this.nodeId = nodeId;
@@ -95,6 +96,16 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       this.timestamp = System.currentTimeMillis();
       return this;
     }
+
+    public ClusterNode setQueueCapacity(int capacity) {
+      this.queueCapacity = capacity;
+      return this;
+    }
+
+    public boolean isQueueFull() {
+      return this.queueCapacity > 0 &&
+          this.queueLength >= this.queueCapacity;
+    }
   }
 
   private final ScheduledExecutorService scheduledExecutor;
@@ -207,6 +218,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       opportunisticContainersStatus =
           OpportunisticContainersStatus.newInstance();
     }
+    int opportQueueCapacity =
+        opportunisticContainersStatus.getOpportQueueCapacity();
     int estimatedQueueWaitTime =
         opportunisticContainersStatus.getEstimatedQueueWaitTime();
     int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
@@ -222,7 +235,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
           this.clusterNodes.put(rmNode.getNodeID(),
               new ClusterNode(rmNode.getNodeID())
                   .setQueueWaitTime(estimatedQueueWaitTime)
-                  .setQueueLength(waitQueueLength));
+                  .setQueueLength(waitQueueLength)
+                  .setQueueCapacity(opportQueueCapacity));
           LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "wait queue length [" + waitQueueLength + "]");
@@ -301,7 +315,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       // is what we ultimately care about.
       Arrays.sort(nodes, (Comparator)comparator);
       for (int j=0; j < nodes.length; j++) {
-        retList.add(((ClusterNode)nodes[j]).nodeId);
+        ClusterNode cNode = (ClusterNode)nodes[j];
+        // Exclude nodes whose queue is already full.
+        if (!cNode.isQueueFull()) {
+          retList.add(cNode.nodeId);
+        }
       }
       return retList;
     } finally {

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java

@@ -33,6 +33,8 @@ import java.util.List;
  */
 public class TestNodeQueueLoadMonitor {
 
+  private final static int DEFAULT_MAX_QUEUE_LENGTH = 200;
+
   static class FakeNodeId extends NodeId {
     final String host;
     final int port;
@@ -132,6 +134,17 @@ public class TestNodeQueueLoadMonitor {
     Assert.assertEquals("h2:2", nodeIds.get(1).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
     Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+
+    // Now update h3 and fill its queue.
+    selector.updateNode(createRMNode("h3", 3, -1,
+        DEFAULT_MAX_QUEUE_LENGTH));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("4-> "+ nodeIds);
+    Assert.assertEquals(3, nodeIds.size());
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
   }
 
   @Test
@@ -180,6 +193,12 @@ public class TestNodeQueueLoadMonitor {
 
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength) {
+    return createRMNode(host, port, waitTime, queueLength,
+        DEFAULT_MAX_QUEUE_LENGTH);
+  }
+
+  private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength, int queueCapacity) {
     RMNode node1 = Mockito.mock(RMNode.class);
     NodeId nID1 = new FakeNodeId(host, port);
     Mockito.when(node1.getNodeID()).thenReturn(nID1);
@@ -189,6 +208,8 @@ public class TestNodeQueueLoadMonitor {
         .thenReturn(waitTime);
     Mockito.when(status1.getWaitQueueLength())
         .thenReturn(queueLength);
+    Mockito.when(status1.getOpportQueueCapacity())
+        .thenReturn(queueCapacity);
     Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
     return node1;
   }