浏览代码

YARN-1444. Fix CapacityScheduler to deal with cases where applications specify host/rack requests without off-switch request. Contributed by Wangda Tan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576751 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 11 年之前
父节点
当前提交
4ce0e4bf2e

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -449,6 +449,10 @@ Release 2.4.0 - UNRELEASED
     YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException
     in the public-localizer thread-pool. (Varun Vasudev via vinodkv)
 
+    YARN-1444. Fix CapacityScheduler to deal with cases where applications
+    specify host/rack requests without off-switch request. (Wangda Tan via
+    acmurthy)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -836,10 +836,14 @@ public class LeafQueue implements CSQueue {
         
         // Schedule in priority order
         for (Priority priority : application.getPriorities()) {
+          ResourceRequest anyRequest =
+              application.getResourceRequest(priority, ResourceRequest.ANY);
+          if (null == anyRequest) {
+            continue;
+          }
+          
           // Required resource
-          Resource required = 
-              application.getResourceRequest(
-                  priority, ResourceRequest.ANY).getCapability();
+          Resource required = anyRequest.getCapability();
 
           // Do we need containers at this 'priority'?
           if (!needContainers(application, priority, required)) {

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -503,9 +503,13 @@ public class FifoScheduler extends AbstractYarnScheduler implements
 
   private int getMaxAllocatableContainers(FiCaSchedulerApp application,
       Priority priority, FiCaSchedulerNode node, NodeType type) {
+    int maxContainers = 0;
+    
     ResourceRequest offSwitchRequest = 
       application.getResourceRequest(priority, ResourceRequest.ANY);
-    int maxContainers = offSwitchRequest.getNumContainers();
+    if (offSwitchRequest != null) {
+      maxContainers = offSwitchRequest.getNumContainers();
+    }
 
     if (type == NodeType.OFF_SWITCH) {
       return maxContainers;

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

@@ -91,6 +91,38 @@ public class TestFifoScheduler {
           "Invalid resource scheduler memory"));
     }
   }
+  
+  @Test
+  public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
+      throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+    RMApp app1 = rm.submitApp(2048);
+    // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    // add request for containers
+    List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
+    requests.add(am1.createResourceReq("127.0.0.1", 1 * GB, 1, 1));
+    requests.add(am1.createResourceReq("/default-rack", 1 * GB, 1, 1));
+    am1.allocate(requests, null); // send the request
+
+    try {
+      // kick the schedule
+      nm1.nodeHeartbeat(true);
+    } catch (NullPointerException e) {
+      Assert.fail("NPE when allocating container on node but "
+          + "forget to set off-switch request should be handled");
+    }
+  }
 
   @Test
   public void test() throws Exception {

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -34,11 +34,14 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -2012,6 +2015,55 @@ public class TestLeafQueue {
     //  100 * 20 * 0.2 = 400
     assertEquals(400, a.getMaximumActiveApplications());
   }
+  
+  @Test
+  public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
+      throws Exception {
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
+
+    // Users
+    final String user_0 = "user_0";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            mock(ActiveUsersManager.class), rmContext);
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+            mock(ActiveUsersManager.class), rmContext);
+    a.submitApplicationAttempt(app_1, user_0); // same user
+
+    // Setup some nodes
+    String host_0 = "127.0.0.1";
+    FiCaSchedulerNode node_0 =
+        TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB);
+
+    final int numNodes = 1;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (8 * GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Arrays.asList(TestUtils.createResourceRequest(
+        "127.0.0.1", 1 * GB, 3, true, priority, recordFactory), TestUtils
+        .createResourceRequest(DEFAULT_RACK, 1 * GB, 3, true, priority,
+            recordFactory)));
+
+    try {
+      a.assignContainers(clusterResource, node_0);
+    } catch (NullPointerException e) {
+      Assert.fail("NPE when allocating container on node but "
+          + "forget to set off-switch request should be handled");
+    }
+  }
 
   private CapacitySchedulerContext mockCSContext(
       CapacitySchedulerConfiguration csConf, Resource clusterResource) {