Przeglądaj źródła

YARN-957. Fixed a bug in CapacityScheduler because of which requests that need more than a node's total capability were incorrectly allocated on that node causing apps to hang. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1520187 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1520188 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 lat temu
rodzic
commit
e8ce7cd541

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -137,6 +137,10 @@ Release 2.1.1-beta - UNRELEASED
     YARN-1077. Fixed TestContainerLaunch test failure on Windows. (Chuan Liu via
     vinodkv)
 
+    YARN-957. Fixed a bug in CapacityScheduler because of which requests that
+    need more than a node's total capability were incorrectly allocated on that
+    node causing apps to hang. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -67,4 +67,9 @@ public abstract class SchedulerNode {
    */
   public abstract int getNumContainers();
 
+  /**
+   * Get total resources on the node.
+   * @return total resources on the node.
+   */
+  public abstract Resource getTotalResource();
 }

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -1308,9 +1308,15 @@ public class LeafQueue implements CSQueue {
         + " request=" + request + " type=" + type);
     }
     Resource capability = request.getCapability();
-
     Resource available = node.getAvailableResource();
+    Resource totalResource = node.getTotalResource();
 
+    if (!Resources.fitsIn(capability, totalResource)) {
+      LOG.warn("Node : " + node.getNodeID()
+          + " does not have sufficient resource for request : " + request
+          + " node total capability : " + node.getTotalResource());
+      return Resources.none();
+    }
     assert Resources.greaterThan(
         resourceCalculator, clusterResource, available, Resources.none());
 

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

@@ -49,6 +49,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
 
   private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource totalResourceCapability;
 
   private volatile int numContainers;
 
@@ -65,6 +66,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
     this.rmNode = node;
     this.availableResource.setMemory(node.getTotalCapability().getMemory());
     this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
+    totalResourceCapability =
+        Resource.newInstance(node.getTotalCapability().getMemory(), node
+            .getTotalCapability().getVirtualCores());
     if (usePortForNodeName) {
       nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
     } else {
@@ -126,6 +130,11 @@ public class FiCaSchedulerNode extends SchedulerNode {
     return this.usedResource;
   }
 
+  @Override
+  public Resource getTotalResource() {
+    return this.totalResourceCapability;
+  }
+
   private synchronized boolean isValidContainer(Container c) {    
     if (launchedContainers.containsKey(c.getId()))
       return true;

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java

@@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode {
 
   private Resource availableResource;
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource totalResourceCapability;
 
   private volatile int numContainers;
 
@@ -68,6 +69,9 @@ public class FSSchedulerNode extends SchedulerNode {
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     this.rmNode = node;
     this.availableResource = Resources.clone(node.getTotalCapability());
+    totalResourceCapability =
+        Resource.newInstance(node.getTotalCapability().getMemory(), node
+            .getTotalCapability().getVirtualCores());
     if (usePortForNodeName) {
       nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
     } else {
@@ -173,6 +177,11 @@ public class FSSchedulerNode extends SchedulerNode {
     Resources.subtractFrom(usedResource, resource);
   }
 
+  @Override
+  public Resource getTotalResource() {
+    return this.totalResourceCapability;
+  }
+
   private synchronized void deductAvailableResource(Resource resource) {
     if (resource == null) {
       LOG.error("Invalid deduction of null resource for "

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -232,6 +232,14 @@ public class MockRM extends ResourceManager {
     return nm;
   }
 
+  public MockNM registerNode(String nodeIdStr, int memory, int vCores)
+      throws Exception {
+    MockNM nm =
+        new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService());
+    nm.registerNode();
+    return nm;
+  }
+
   public void sendNodeStarted(MockNM nm) throws Exception {
     RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
         nm.getNodeId());

+ 109 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -0,0 +1,109 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.junit.Test;
+
+
+public class TestContainerAllocation {
+
+  private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
+
+  private final int GB = 1024;
+
+  @Test(timeout = 3000000)
+  public void testExcessReservationThanNodeManagerCapacity() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    // Register node1
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4);
+    MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4);
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    // wait..
+    int waitCount = 20;
+    int size = rm.getRMContext().getRMNodes().size();
+    while ((size = rm.getRMContext().getRMNodes().size()) != 2
+        && waitCount-- > 0) {
+      LOG.info("Waiting for node managers to register : " + size);
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(2, rm.getRMContext().getRMNodes().size());
+    // Submit an application
+    RMApp app1 = rm.submitApp(128);
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    LOG.info("sending container requests ");
+    am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+    // kick the scheduler
+    nm1.nodeHeartbeat(true);
+    int waitCounter = 20;
+    LOG.info("heartbeating nm1");
+    while (alloc1Response.getAllocatedContainers().size() < 1
+        && waitCounter-- > 0) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(500);
+      alloc1Response = am1.schedule();
+    }
+    LOG.info("received container : "
+        + alloc1Response.getAllocatedContainers().size());
+
+    // No container should be allocated.
+    // Internally it should not been reserved.
+    Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0);
+
+    LOG.info("heartbeating nm2");
+    waitCounter = 20;
+    nm2.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().size() < 1
+        && waitCounter-- > 0) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(500);
+      alloc1Response = am1.schedule();
+    }
+    LOG.info("received container : "
+        + alloc1Response.getAllocatedContainers().size());
+    Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1);
+
+    rm.stop();
+  }
+}