Browse Source

YARN-3415. Non-AM containers can be counted towards amResourceUsage of a fairscheduler queue (Zhihai Xu via Sandy Ryza)

Sandy Ryza 10 years ago
parent
commit
6a6a59db7f

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -132,6 +132,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3425. NPE from RMNodeLabelsManager.serviceStop when 
     NodeLabelsManager.serviceInit failed. (Bibin A Chundatt via wangda)
 
+    YARN-3415. Non-AM containers can be counted towards amResourceUsage of a
+    Fair Scheduler queue (Zhihai Xu via Sandy Ryza)
+
 Release 2.7.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 18 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -523,8 +523,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       // Inform the node
       node.allocateContainer(allocatedContainer);
 
-      // If this container is used to run AM, update the leaf queue's AM usage
-      if (getLiveContainers().size() == 1 && !getUnmanagedAM()) {
+      // If not running unmanaged, the first container we allocate is always
+      // the AM. Set the amResource for this app and update the leaf queue's AM
+      // usage
+      if (!isAmRunning() && !getUnmanagedAM()) {
+        setAMResource(container.getResource());
         getQueue().addAMResourceUsage(container.getResource());
         setAmRunning(true);
       }
@@ -551,6 +554,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
     }
 
+    // Check the AM resource usage for the leaf queue
+    if (!isAmRunning() && !getUnmanagedAM()) {
+      List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
+      if (ask.isEmpty() || !getQueue().canRunAppAM(
+          ask.get(0).getCapability())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping allocation because maxAMShare limit would " +
+              "be exceeded");
+        }
+        return Resources.none();
+      }
+    }
+
     Collection<Priority> prioritiesToTry = (reserved) ?
         Arrays.asList(node.getReservedContainer().getReservedPriority()) :
         getPriorities();
@@ -567,17 +583,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
         addSchedulingOpportunity(priority);
 
-        // Check the AM resource usage for the leaf queue
-        if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
-          if (!getQueue().canRunAppAM(getAMResource())) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skipping allocation because maxAMShare limit would " +
-                  "be exceeded");
-            }
-            return Resources.none();
-          }
-        }
-
         ResourceRequest rackLocalRequest = getResourceRequest(priority,
             node.getRackName());
         ResourceRequest localRequest = getResourceRequest(priority,

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -124,8 +124,9 @@ public class FSLeafQueue extends FSQueue {
       writeLock.unlock();
     }
 
-    // Update AM resource usage if needed
-    if (runnable && app.isAmRunning() && app.getAMResource() != null) {
+    // Update AM resource usage if needed. If isAMRunning is true, we're not
+    // running an unmanaged AM.
+    if (runnable && app.isAmRunning()) {
       Resources.subtractFrom(amResourceUsage, app.getAMResource());
     }
 

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -901,12 +901,6 @@ public class FairScheduler extends
     // Record container allocation start time
     application.recordContainerRequestTime(getClock().getTime());
 
-    // Set amResource for this app
-    if (!application.getUnmanagedAM() && ask.size() == 1
-        && application.getLiveContainers().isEmpty()) {
-      application.setAMResource(ask.get(0).getCapability());
-    }
-
     // Release containers
     releaseContainers(release, application);
 

+ 40 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -3548,8 +3549,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
     scheduler.update();
     scheduler.handle(updateEvent);
-    assertEquals("Application3's AM requests 1024 MB memory",
-        1024, app3.getAMResource().getMemory());
+    assertEquals("Application3's AM resource shouldn't be updated",
+        0, app3.getAMResource().getMemory());
     assertEquals("Application3's AM should not be running",
         0, app3.getLiveContainers().size());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -3574,6 +3575,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         0, app1.getLiveContainers().size());
     assertEquals("Application3's AM should be running",
         1, app3.getLiveContainers().size());
+    assertEquals("Application3's AM requests 1024 MB memory",
+        1024, app3.getAMResource().getMemory());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemory());
 
@@ -3584,8 +3587,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
     scheduler.update();
     scheduler.handle(updateEvent);
-    assertEquals("Application4's AM requests 2048 MB memory",
-        2048, app4.getAMResource().getMemory());
+    assertEquals("Application4's AM resource shouldn't be updated",
+        0, app4.getAMResource().getMemory());
     assertEquals("Application4's AM should not be running",
         0, app4.getLiveContainers().size());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -3598,8 +3601,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
     scheduler.update();
     scheduler.handle(updateEvent);
-    assertEquals("Application5's AM requests 2048 MB memory",
-        2048, app5.getAMResource().getMemory());
+    assertEquals("Application5's AM resource shouldn't be updated",
+        0, app5.getAMResource().getMemory());
     assertEquals("Application5's AM should not be running",
         0, app5.getLiveContainers().size());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -3631,6 +3634,33 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         0, app3.getLiveContainers().size());
     assertEquals("Application5's AM should be running",
         1, app5.getLiveContainers().size());
+    assertEquals("Application5's AM requests 2048 MB memory",
+        2048, app5.getAMResource().getMemory());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // request non-AM container for app5
+    createSchedulingRequestExistingApplication(1024, 1, attId5);
+    assertEquals("Application5's AM should have 1 container",
+        1, app5.getLiveContainers().size());
+    // complete AM container before non-AM container is allocated.
+    // spark application hit this situation.
+    RMContainer amContainer5 = (RMContainer)app5.getLiveContainers().toArray()[0];
+    ContainerExpiredSchedulerEvent containerExpired =
+        new ContainerExpiredSchedulerEvent(amContainer5.getContainerId());
+    scheduler.handle(containerExpired);
+    assertEquals("Application5's AM should have 0 container",
+        0, app5.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    // non-AM container should be allocated
+    // check non-AM container allocation is not rejected
+    // due to queue MaxAMShare limitation.
+    assertEquals("Application5 should have 1 container",
+        1, app5.getLiveContainers().size());
+    // check non-AM container allocation won't affect queue AmResourceUsage
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemory());
 
@@ -3643,8 +3673,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(updateEvent);
     assertEquals("Application6's AM should not be running",
         0, app6.getLiveContainers().size());
-    assertEquals("Application6's AM requests 2048 MB memory",
-        2048, app6.getAMResource().getMemory());
+    assertEquals("Application6's AM resource shouldn't be updated",
+        0, app6.getAMResource().getMemory());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemory());
 
@@ -3748,8 +3778,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
     scheduler.update();
     scheduler.handle(updateEvent);
-    assertEquals("Application2's AM requests 1024 MB memory",
-        1024, app2.getAMResource().getMemory());
+    assertEquals("Application2's AM resource shouldn't be updated",
+        0, app2.getAMResource().getMemory());
     assertEquals("Application2's AM should not be running",
         0, app2.getLiveContainers().size());
     assertEquals("Queue2's AM resource usage should be 0 MB memory",