Bladeren bron

YARN-6207. Move application across queues should handle delayed event processing. Contributed by Bibin A Chundatt.

(cherry picked from commit 1eb81867032b016a59662043cbae50daa52dafa9)
Sunil G 8 jaren geleden
bovenliggende
commit
ec4c2d42fe

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -1107,6 +1107,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       QueueMetrics newMetrics = newQueue.getMetrics();
       String newQueueName = newQueue.getQueueName();
       String user = getUser();
+
       for (RMContainer liveContainer : liveContainers.values()) {
         Resource resource = liveContainer.getContainer().getResource();
         ((RMContainerImpl) liveContainer).setQueueName(newQueueName);
@@ -1122,7 +1123,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
         }
       }
 
-      appSchedulingInfo.move(newQueue);
+      if (!isStopped) {
+        appSchedulingInfo.move(newQueue);
+      }
       this.queue = newQueue;
     } finally {
       writeLock.unlock();

+ 44 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2042,36 +2042,47 @@ public class CapacityScheduler extends
       String targetQueueName) throws YarnException {
     try {
       writeLock.lock();
-      FiCaSchedulerApp app = getApplicationAttempt(
-          ApplicationAttemptId.newInstance(appId, 0));
-      String sourceQueueName = app.getQueue().getQueueName();
-      LeafQueue source = this.queueManager.getAndCheckLeafQueue(
-          sourceQueueName);
+      SchedulerApplication<FiCaSchedulerApp> application =
+          applications.get(appId);
+      if (application == null) {
+        throw new YarnException("App to be moved " + appId + " not found.");
+      }
+      String sourceQueueName = application.getQueue().getQueueName();
+      LeafQueue source =
+          this.queueManager.getAndCheckLeafQueue(sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(targetQueueName);
       LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
 
-      String user = app.getUser();
+      String user = application.getUser();
       try {
         dest.submitApplication(appId, user, destQueueName);
       } catch (AccessControlException e) {
         throw new YarnException(e);
       }
-      // Move all live containers
-      for (RMContainer rmContainer : app.getLiveContainers()) {
-        source.detachContainer(getClusterResource(), app, rmContainer);
-        // attach the Container to another queue
-        dest.attachContainer(getClusterResource(), app, rmContainer);
+
+      FiCaSchedulerApp app = application.getCurrentAppAttempt();
+      if (app != null) {
+        // Move all live containers even when stopped.
+        // For transferStateFromPreviousAttempt required
+        for (RMContainer rmContainer : app.getLiveContainers()) {
+          source.detachContainer(getClusterResource(), app, rmContainer);
+          // attach the Container to another queue
+          dest.attachContainer(getClusterResource(), app, rmContainer);
+        }
+        if (!app.isStopped()) {
+          source.finishApplicationAttempt(app, sourceQueueName);
+          // Submit to a new queue
+          dest.submitApplicationAttempt(app, user);
+        }
+        // Finish app & update metrics
+        app.move(dest);
       }
+      source.appFinished();
       // Detach the application..
-      source.finishApplicationAttempt(app, sourceQueueName);
-      source.getParent().finishApplication(appId, app.getUser());
-      // Finish app & update metrics
-      app.move(dest);
-      // Submit to a new queue
-      dest.submitApplicationAttempt(app, user);
-      applications.get(appId).setQueue(dest);
-      LOG.info("App: " + app.getApplicationId() + " successfully moved from "
-          + sourceQueueName + " to: " + destQueueName);
+      source.getParent().finishApplication(appId, user);
+      application.setQueue(dest);
+      LOG.info("App: " + appId + " successfully moved from " + sourceQueueName
+          + " to: " + destQueueName);
       return targetQueueName;
     } finally {
       writeLock.unlock();
@@ -2083,15 +2094,23 @@ public class CapacityScheduler extends
       throws YarnException {
     try {
       writeLock.lock();
-      FiCaSchedulerApp app = getApplicationAttempt(
-          ApplicationAttemptId.newInstance(appId, 0));
-      String sourceQueueName = app.getQueue().getQueueName();
+      SchedulerApplication<FiCaSchedulerApp> application =
+          applications.get(appId);
+      if (application == null) {
+        throw new YarnException("App to be moved " + appId + " not found.");
+      }
+      String sourceQueueName = application.getQueue().getQueueName();
       this.queueManager.getAndCheckLeafQueue(sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(newQueue);
       LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
       // Validation check - ACLs, submission limits for user & queue
-      String user = app.getUser();
-      checkQueuePartition(app, dest);
+      String user = application.getUser();
+      // Check active partition only when attempt is available
+      FiCaSchedulerApp appAttempt =
+          getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+      if (null != appAttempt) {
+        checkQueuePartition(appAttempt, dest);
+      }
       try {
         dest.validateSubmitApplication(appId, user, destQueueName);
       } catch (AccessControlException e) {

+ 200 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -92,12 +93,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -2207,6 +2210,203 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
+  @Test(timeout = 60000)
+  public void testMoveAttemptNotAdded() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(getCapacityConfiguration(conf));
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+
+    RMAppAttemptMetrics attemptMetric =
+        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+    RMAppImpl app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+    when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+    rm.getRMContext().getRMApps().put(appId, app);
+
+    SchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, "a1", "user");
+    try {
+      cs.moveApplication(appId, "b1");
+      fail("Move should throw exception app not available");
+    } catch (YarnException e) {
+      assertEquals("App to be moved application_100_0001 not found.",
+          e.getMessage());
+    }
+    cs.handle(addAppEvent);
+    cs.moveApplication(appId, "b1");
+    SchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+    cs.handle(addAttemptEvent);
+    CSQueue rootQ = cs.getRootQueue();
+    CSQueue queueB = cs.getQueue("b");
+    CSQueue queueA = cs.getQueue("a");
+    CSQueue queueA1 = cs.getQueue("a1");
+    CSQueue queueB1 = cs.getQueue("b1");
+    Assert.assertEquals(1, rootQ.getNumApplications());
+    Assert.assertEquals(0, queueA.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB1.getNumApplications());
+
+    rm.close();
+  }
+
+  @Test
+  public void testRemoveAttemptMoveAdded() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        CapacityScheduler.class);
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    // Create Mock RM
+    MockRM rm = new MockRM(getCapacityConfiguration(conf));
+    CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
+    // add node
+    Resource newResource = Resource.newInstance(4 * GB, 1);
+    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
+    SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
+    sch.handle(addNode);
+    // create appid
+    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+
+    RMAppAttemptMetrics attemptMetric =
+        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+    RMAppImpl app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+    when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+    rm.getRMContext().getRMApps().put(appId, app);
+    // Add application
+    SchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, "a1", "user");
+    sch.handle(addAppEvent);
+    // Add application attempt
+    SchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+    sch.handle(addAttemptEvent);
+    // get Queues
+    CSQueue queueA1 = sch.getQueue("a1");
+    CSQueue queueB = sch.getQueue("b");
+    CSQueue queueB1 = sch.getQueue("b1");
+
+    // add Running rm container and simulate live containers to a1
+    ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
+    RMContainerImpl rmContainer = mock(RMContainerImpl.class);
+    when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
+    Container container2 = mock(Container.class);
+    when(rmContainer.getContainer()).thenReturn(container2);
+    Resource resource = Resource.newInstance(1024, 1);
+    when(container2.getResource()).thenReturn(resource);
+    when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+    when(container2.getNodeId()).thenReturn(node.getNodeID());
+    when(container2.getId()).thenReturn(newContainerId);
+    when(rmContainer.getNodeLabelExpression())
+        .thenReturn(RMNodeLabelsManager.NO_LABEL);
+    when(rmContainer.getContainerId()).thenReturn(newContainerId);
+    sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
+        .put(newContainerId, rmContainer);
+    QueueMetrics queueA1M = queueA1.getMetrics();
+    queueA1M.incrPendingResources("user1", 1, resource);
+    queueA1M.allocateResources("user1", resource);
+    // remove attempt
+    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
+        RMAppAttemptState.KILLED, true));
+    // Move application to queue b1
+    sch.moveApplication(appId, "b1");
+    // Check queue metrics after move
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(0, queueB1.getNumApplications());
+
+    // Release attempt add event
+    ApplicationAttemptId appAttemptId2 =
+        BuilderUtils.newApplicationAttemptId(appId, 2);
+    SchedulerEvent addAttemptEvent2 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
+    sch.handle(addAttemptEvent2);
+
+    // Check metrics after attempt added
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(1, queueB1.getNumApplications());
+
+
+    QueueMetrics queueB1M = queueB1.getMetrics();
+    QueueMetrics queueBM = queueB.getMetrics();
+    // Verify allocation MB of current state
+    Assert.assertEquals(0, queueA1M.getAllocatedMB());
+    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+    Assert.assertEquals(1024, queueB1M.getAllocatedMB());
+    Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
+
+    // remove attempt
+    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
+        RMAppAttemptState.FINISHED, false));
+
+    Assert.assertEquals(0, queueA1M.getAllocatedMB());
+    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+    Assert.assertEquals(0, queueB1M.getAllocatedMB());
+    Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
+
+    verifyQueueMetrics(queueB1M);
+    verifyQueueMetrics(queueBM);
+    // Verify queue A1 metrics
+    verifyQueueMetrics(queueA1M);
+    rm.close();
+  }
+
+  private void verifyQueueMetrics(QueueMetrics queue) {
+    Assert.assertEquals(0, queue.getPendingMB());
+    Assert.assertEquals(0, queue.getActiveUsers());
+    Assert.assertEquals(0, queue.getActiveApps());
+    Assert.assertEquals(0, queue.getAppsPending());
+    Assert.assertEquals(0, queue.getAppsRunning());
+    Assert.assertEquals(0, queue.getAllocatedMB());
+    Assert.assertEquals(0, queue.getAllocatedVirtualCores());
+  }
+
+  private Configuration getCapacityConfiguration(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a", "b"});
+    conf.setCapacity(A, 50);
+    conf.setCapacity(B, 50);
+    conf.setQueues(A, new String[] {"a1", "a2"});
+    conf.setCapacity(A1, 50);
+    conf.setCapacity(A2, 50);
+    conf.setQueues(B, new String[] {"b1"});
+    conf.setCapacity(B1, 100);
+    return conf;
+  }
+
   @Test
   public void testKillAllAppsInQueue() throws Exception {
     MockRM rm = setUpMove();