Przeglądaj źródła

YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from node-heartbeats. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1578722 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 11 lat temu
rodzic
commit
57cdf8626a

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -312,6 +312,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1824. Improved NodeManager and clients to be able to handle cross
     platform application submissions. (Jian He via vinodkv)
 
+    YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from
+    node-heartbeats. (Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -172,6 +172,12 @@
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
+  <!-- Inconsistent sync warning - scheduleAsynchronously is only initialized once and never changed -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler" />
+    <Field name="scheduleAsynchronously" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
   <!-- Inconsistent sync warning - minimumAllocation is only initialized once and never changed -->
   <Match>
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler" />

+ 123 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -194,6 +197,18 @@ public class CapacityScheduler extends AbstractYarnScheduler
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
 
+  private boolean scheduleAsynchronously;
+  private AsyncScheduleThread asyncSchedulerThread;
+  
+  /**
+   * EXPERT
+   */
+  private long asyncScheduleInterval;
+  private static final String ASYNC_SCHEDULER_INTERVAL =
+      CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+          + ".scheduling-interval-ms";
+  private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
+  
   public CapacityScheduler() {}
 
   @Override
@@ -272,11 +287,23 @@ public class CapacityScheduler extends AbstractYarnScheduler
 
       initializeQueues(this.conf);
       
+      scheduleAsynchronously = this.conf.getScheduleAynschronously();
+      asyncScheduleInterval = 
+          this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, 
+              DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+      if (scheduleAsynchronously) {
+        asyncSchedulerThread = new AsyncScheduleThread(this);
+        asyncSchedulerThread.start();
+      }
+      
       initialized = true;
       LOG.info("Initialized CapacityScheduler with " +
           "calculator=" + getResourceCalculator().getClass() + ", " +
           "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
-          "maximumAllocation=<" + getMaximumResourceCapability() + ">");
+          "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+          "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+          "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+      
     } else {
       CapacitySchedulerConfiguration oldConf = this.conf; 
       this.conf = loadCapacitySchedulerConfiguration(configuration);
@@ -290,7 +317,69 @@ public class CapacityScheduler extends AbstractYarnScheduler
       }
     }
   }
+  
+  long getAsyncScheduleInterval() {
+    return asyncScheduleInterval;
+  }
+
+  private final static Random random = new Random(System.currentTimeMillis());
+  
+  /**
+   * Schedule on all nodes by starting at a random point.
+   * @param cs
+   */
+  static void schedule(CapacityScheduler cs) {
+    // First randomize the start point
+    int current = 0;
+    Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values();
+    int start = random.nextInt(nodes.size());
+    for (FiCaSchedulerNode node : nodes) {
+      if (current++ >= start) {
+        cs.allocateContainersToNode(node);
+      }
+    }
+    // Now, just get everyone to be safe
+    for (FiCaSchedulerNode node : nodes) {
+      cs.allocateContainersToNode(node);
+    }
+    try {
+      Thread.sleep(cs.getAsyncScheduleInterval());
+    } catch (InterruptedException e) {}
+  }
+  
+  static class AsyncScheduleThread extends Thread {
+
+    private final CapacityScheduler cs;
+    private AtomicBoolean runSchedules = new AtomicBoolean(false);
+
+    public AsyncScheduleThread(CapacityScheduler cs) {
+      this.cs = cs;
+      setDaemon(true);
+    }
 
+    @Override
+    public void run() {
+      while (true) {
+        if (!runSchedules.get()) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ie) {}
+        } else {
+          schedule(cs);
+        }
+      }
+    }
+
+    public void beginSchedule() {
+      runSchedules.set(true);
+    }
+
+    public void suspendSchedule() {
+      runSchedules.set(false);
+    }
+
+  }
+  
   @Private
   public static final String ROOT_QUEUE = 
     CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT;
@@ -696,6 +785,9 @@ public class CapacityScheduler extends AbstractYarnScheduler
       LOG.debug("Node being looked for scheduling " + nm
         + " availableResource: " + node.getAvailableResource());
     }
+  }
+
+  private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
 
     // Assign new containers...
     // 1. Check for reserved applications
@@ -708,7 +800,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
       
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
-          reservedApplication.getApplicationId() + " on node: " + nm);
+          reservedApplication.getApplicationId() + " on node: " + 
+          node.getNodeID());
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
       CSAssignment assignment = queue.assignContainers(clusterResource, node);
@@ -729,9 +822,16 @@ public class CapacityScheduler extends AbstractYarnScheduler
 
     // Try to schedule more if there are no reservations to fulfill
     if (node.getReservedContainer() == null) {
-      root.assignContainers(clusterResource, node);
+      if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
+          node.getAvailableResource(), minimumAllocation)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trying to schedule on node: " + node.getNodeName() +
+              ", available: " + node.getAvailableResource());
+        }
+        root.assignContainers(clusterResource, node);
+      }
     } else {
-      LOG.info("Skipping scheduling since node " + nm + 
+      LOG.info("Skipping scheduling since node " + node.getNodeID() + 
           " is reserved by application " + 
           node.getReservedContainer().getContainerId().getApplicationAttemptId()
           );
@@ -772,7 +872,11 @@ public class CapacityScheduler extends AbstractYarnScheduler
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode());
+      RMNode node = nodeUpdatedEvent.getRMNode();
+      nodeUpdate(node);
+      if (!scheduleAsynchronously) {
+        allocateContainersToNode(getNode(node.getNodeID()));
+      }
     }
     break;
     case APP_ADDED:
@@ -831,6 +935,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
     ++numNodeManagers;
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
+
+    if (scheduleAsynchronously && numNodeManagers == 1) {
+      asyncSchedulerThread.beginSchedule();
+    }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {
@@ -842,6 +950,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
     root.updateClusterResource(clusterResource);
     --numNodeManagers;
 
+    if (scheduleAsynchronously && numNodeManagers == 0) {
+      asyncSchedulerThread.suspendSchedule();
+    }
+    
     // Remove running containers
     List<RMContainer> runningContainers = node.getRunningContainers();
     for (RMContainer container : runningContainers) {
@@ -931,7 +1043,12 @@ public class CapacityScheduler extends AbstractYarnScheduler
   FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
-
+  
+  @Lock(Lock.NoLock.class)
+  Map<NodeId, FiCaSchedulerNode> getAllNodes() {
+    return nodes;
+  }
+  
   @Override
   public RMContainer getRMContainer(ContainerId containerId) {
     FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -135,6 +135,17 @@ public class CapacitySchedulerConfiguration extends Configuration {
   @Private 
   public static final int DEFAULT_NODE_LOCALITY_DELAY = -1;
 
+  @Private
+  public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
+      PREFIX + "schedule-asynchronously";
+
+  @Private
+  public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE =
+      SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable";
+
+  @Private
+  public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+  
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
   }
@@ -357,4 +368,14 @@ public class CapacitySchedulerConfiguration extends Configuration {
         resourceCalculatorClass, 
         ResourceCalculator.class);
   }
+
+  public boolean getScheduleAynschronously() {
+    return getBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE,
+      DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
+  }
+
+  public void setScheduleAynschronously(boolean async) {
+    setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
+  }
+
 }

+ 27 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -19,20 +19,15 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 
@@ -61,11 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -650,4 +642,30 @@ public class TestCapacityScheduler {
           cs.getSchedulerApplications(), cs, "a1");
     Assert.assertEquals("a1", app.getQueue().getQueueName());
   }
- }
+  
+  @Test
+  public void testAsyncScheduling() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    final int NODES = 100;
+    
+    // Register nodes
+    for (int i=0; i < NODES; ++i) {
+      String host = "192.168.1." + i;
+      RMNode node =
+          MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
+      cs.handle(new NodeAddedSchedulerEvent(node));
+    }
+    
+    // Now directly exercise the scheduling loop
+    for (int i=0; i < NODES; ++i) {
+      CapacityScheduler.schedule(cs);
+    }
+  }
+
+}