Browse Source

YARN-1290. Let continuous scheduling achieve more balanced task assignment (Wei Yan via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1537735 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 years ago
parent
commit
7ff13102d5

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -59,6 +59,9 @@ Release 2.3.0 - UNRELEASED
     applications so that clients can get information about them post RM-restart.
     applications so that clients can get information about them post RM-restart.
     (Jian He via vinodkv)
     (Jian He via vinodkv)
 
 
+    YARN-1290. Let continuous scheduling achieve more balanced task assignment
+    (Wei Yan via Sandy Ryza)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 28 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -181,6 +181,8 @@ public class FairScheduler implements ResourceScheduler {
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
   protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
   protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+  private Comparator nodeAvailableResourceComparator =
+          new NodeAvailableResourceComparator(); // Node available resource comparator
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality
   protected long nodeLocalityDelayMs; // Delay for node locality
   protected long nodeLocalityDelayMs; // Delay for node locality
@@ -948,14 +950,22 @@ public class FairScheduler implements ResourceScheduler {
 
 
   private void continuousScheduling() {
   private void continuousScheduling() {
     while (true) {
     while (true) {
-      for (FSSchedulerNode node : nodes.values()) {
-        try {
-          if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
-            attemptScheduling(node);
+      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+
+      // iterate all nodes
+      for (NodeId nodeId : nodeIdList) {
+        if (nodes.containsKey(nodeId)) {
+          FSSchedulerNode node = nodes.get(nodeId);
+          try {
+            if (Resources.fitsIn(minimumAllocation,
+                    node.getAvailableResource())) {
+              attemptScheduling(node);
+            }
+          } catch (Throwable ex) {
+            LOG.warn("Error while attempting scheduling for node " + node +
+                    ": " + ex.toString(), ex);
           }
           }
-        } catch (Throwable ex) {
-          LOG.warn("Error while attempting scheduling for node " + node + ": " +
-                  ex.toString(), ex);
         }
         }
       }
       }
       try {
       try {
@@ -966,6 +976,17 @@ public class FairScheduler implements ResourceScheduler {
       }
       }
     }
     }
   }
   }
+
+  /** Sort nodes by available resource */
+  private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+
+    @Override
+    public int compare(NodeId n1, NodeId n2) {
+      return RESOURCE_CALCULATOR.compare(clusterCapacity,
+              nodes.get(n2).getAvailableResource(),
+              nodes.get(n1).getAvailableResource());
+    }
+  }
   
   
   private synchronized void attemptScheduling(FSSchedulerNode node) {
   private synchronized void attemptScheduling(FSSchedulerNode node) {
     // Assign new containers...
     // Assign new containers...

+ 36 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -33,6 +33,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -2348,7 +2352,7 @@ public class TestFairScheduler {
         fs.applications, FSSchedulerApp.class);
         fs.applications, FSSchedulerApp.class);
   }
   }
 
 
-  @Test (timeout = 5000)
+  @Test (timeout = 10000)
   public void testContinuousScheduling() throws Exception {
   public void testContinuousScheduling() throws Exception {
     // set continuous scheduling enabled
     // set continuous scheduling enabled
     FairScheduler fs = new FairScheduler();
     FairScheduler fs = new FairScheduler();
@@ -2359,16 +2363,21 @@ public class TestFairScheduler {
     Assert.assertTrue("Continuous scheduling should be enabled.",
     Assert.assertTrue("Continuous scheduling should be enabled.",
             fs.isContinuousSchedulingEnabled());
             fs.isContinuousSchedulingEnabled());
 
 
-    // Add one node
+    // Add two nodes
     RMNode node1 =
     RMNode node1 =
             MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
             MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
                     "127.0.0.1");
                     "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     fs.handle(nodeEvent1);
     fs.handle(nodeEvent1);
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    fs.handle(nodeEvent2);
 
 
     // available resource
     // available resource
-    Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
-    Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
+    Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
+    Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
 
 
     // send application request
     // send application request
     ApplicationAttemptId appAttemptId =
     ApplicationAttemptId appAttemptId =
@@ -2387,10 +2396,32 @@ public class TestFairScheduler {
     FSSchedulerApp app = fs.applications.get(appAttemptId);
     FSSchedulerApp app = fs.applications.get(appAttemptId);
     // Wait until app gets resources.
     // Wait until app gets resources.
     while (app.getCurrentConsumption().equals(Resources.none())) { }
     while (app.getCurrentConsumption().equals(Resources.none())) { }
-    
+
     // check consumption
     // check consumption
     Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
     Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
     Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
     Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
+
+    // another request
+    request =
+            createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
+    ask.clear();
+    ask.add(request);
+    fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
+
+    // Wait until app gets resources
+    while (app.getCurrentConsumption()
+            .equals(Resources.createResource(1024, 1))) { }
+
+    Assert.assertEquals(2048, app.getCurrentConsumption().getMemory());
+    Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
+
+    // 2 containers should be assigned to 2 nodes
+    Set<NodeId> nodes = new HashSet<NodeId>();
+    Iterator<RMContainer> it = app.getLiveContainers().iterator();
+    while (it.hasNext()) {
+      nodes.add(it.next().getContainer().getNodeId());
+    }
+    Assert.assertEquals(2, nodes.size());
   }
   }