Browse Source

YARN-3027. Scheduler should use totalAvailable resource from node instead of availableResource for maxAllocation. (adhoot via rkanter)

(cherry picked from commit ae7bf31fe1c63f323ba5271e50fd0e4425a7510f)
Robert Kanter 10 năm trước cách đây
mục cha
commit
e5059b91bb

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -313,6 +313,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3014. Replaces labels on a host should update all NM's labels on that
     host. (Wangda Tan via jianhe)
 
+    YARN-3027. Scheduler should use totalAvailable resource from node instead of
+    availableResource for maxAllocation. (adhoot via rkanter)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -576,23 +576,23 @@ public abstract class AbstractYarnScheduler
     writeLock.lock();
     try {
       if (add) { // added node
-        int nodeMemory = node.getAvailableResource().getMemory();
+        int nodeMemory = node.getTotalResource().getMemory();
         if (nodeMemory > maxNodeMemory) {
           maxNodeMemory = nodeMemory;
           maximumAllocation.setMemory(Math.min(
               configuredMaximumAllocation.getMemory(), maxNodeMemory));
         }
-        int nodeVCores = node.getAvailableResource().getVirtualCores();
+        int nodeVCores = node.getTotalResource().getVirtualCores();
         if (nodeVCores > maxNodeVCores) {
           maxNodeVCores = nodeVCores;
           maximumAllocation.setVirtualCores(Math.min(
               configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
         }
       } else {  // removed node
-        if (maxNodeMemory == node.getAvailableResource().getMemory()) {
+        if (maxNodeMemory == node.getTotalResource().getMemory()) {
           maxNodeMemory = -1;
         }
-        if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) {
+        if (maxNodeVCores == node.getTotalResource().getVirtualCores()) {
           maxNodeVCores = -1;
         }
         // We only have to iterate through the nodes if the current max memory
@@ -600,12 +600,12 @@ public abstract class AbstractYarnScheduler
         if (maxNodeMemory == -1 || maxNodeVCores == -1) {
           for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
             int nodeMemory =
-                nodeEntry.getValue().getAvailableResource().getMemory();
+                nodeEntry.getValue().getTotalResource().getMemory();
             if (nodeMemory > maxNodeMemory) {
               maxNodeMemory = nodeMemory;
             }
             int nodeVCores =
-                nodeEntry.getValue().getAvailableResource().getVirtualCores();
+                nodeEntry.getValue().getTotalResource().getVirtualCores();
             if (nodeVCores > maxNodeVCores) {
               maxNodeVCores = nodeVCores;
             }

+ 80 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -25,11 +27,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestB
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("unchecked")
 public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
 
   public TestAbstractYarnScheduler(SchedulerType type) {
@@ -210,4 +218,75 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
     Assert.assertEquals(0, scheduler.getNumClusterNodes());
   }
 
+  @Test
+  public void testUpdateMaxAllocationUsesTotal() throws IOException {
+    final int configuredMaxVCores = 20;
+    final int configuredMaxMemory = 10 * 1024;
+    Resource configuredMaximumResource = Resource.newInstance
+        (configuredMaxMemory, configuredMaxVCores);
+
+    configureScheduler();
+    YarnConfiguration conf = getConf();
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        configuredMaxVCores);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        configuredMaxMemory);
+    conf.setLong(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+        0);
+
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+      AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
+          .getResourceScheduler();
+
+      Resource emptyResource = Resource.newInstance(0, 0);
+      Resource fullResource1 = Resource.newInstance(1024, 5);
+      Resource fullResource2 = Resource.newInstance(2048, 10);
+
+      SchedulerNode mockNode1 = mock(SchedulerNode.class);
+      when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
+      when(mockNode1.getAvailableResource()).thenReturn(emptyResource);
+      when(mockNode1.getTotalResource()).thenReturn(fullResource1);
+
+      SchedulerNode mockNode2 = mock(SchedulerNode.class);
+      when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
+      when(mockNode2.getAvailableResource()).thenReturn(emptyResource);
+      when(mockNode2.getTotalResource()).thenReturn(fullResource2);
+
+      verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
+
+      scheduler.nodes = new HashMap<NodeId, SchedulerNode>();
+
+      scheduler.nodes.put(mockNode1.getNodeID(), mockNode1);
+      scheduler.updateMaximumAllocation(mockNode1, true);
+      verifyMaximumResourceCapability(fullResource1, scheduler);
+
+      scheduler.nodes.put(mockNode2.getNodeID(), mockNode2);
+      scheduler.updateMaximumAllocation(mockNode2, true);
+      verifyMaximumResourceCapability(fullResource2, scheduler);
+
+      scheduler.nodes.remove(mockNode2.getNodeID());
+      scheduler.updateMaximumAllocation(mockNode2, false);
+      verifyMaximumResourceCapability(fullResource1, scheduler);
+
+      scheduler.nodes.remove(mockNode1.getNodeID());
+      scheduler.updateMaximumAllocation(mockNode1, false);
+      verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
+    } finally {
+      rm.stop();
+    }
+  }
+
+  private void verifyMaximumResourceCapability(
+      Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
+
+    final Resource schedulerMaximumResourceCapability = scheduler
+        .getMaximumResourceCapability();
+    Assert.assertEquals(expectedMaximumResource.getMemory(),
+        schedulerMaximumResourceCapability.getMemory());
+    Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
+        schedulerMaximumResourceCapability.getVirtualCores());
+  }
 }