Просмотр исходного кода

YARN-5802. updateApplicationPriority api in scheduler should ensure to re-insert app to correct ordering policy. Contributed by Bibin A Chundatt

Sunil 8 лет назад
Родитель
Сommit
ee5dd2c388

+ 11 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2067,14 +2067,19 @@ public class CapacityScheduler extends
     // priority then reinsert back to make order correct.
     LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
     synchronized (queue) {
-      queue.getOrderingPolicy().removeSchedulableEntity(
-          application.getCurrentAppAttempt());
-
+      FiCaSchedulerApp attempt = application.getCurrentAppAttempt();
+      boolean isActive =
+          queue.getOrderingPolicy().removeSchedulableEntity(attempt);
+      if (!isActive) {
+        queue.getPendingAppsOrderingPolicy().removeSchedulableEntity(attempt);
+      }
       // Update new priority in SchedulerApplication
       application.setPriority(appPriority);
-
-      queue.getOrderingPolicy().addSchedulableEntity(
-          application.getCurrentAppAttempt());
+      if (isActive) {
+        queue.getOrderingPolicy().addSchedulableEntity(attempt);
+      } else {
+        queue.getPendingAppsOrderingPolicy().addSchedulableEntity(attempt);
+      }
     }
 
     // Update the changed application state to timeline server

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -1617,6 +1617,32 @@ public class TestClientRMService {
     rm.close();
   }
 
+  @Test(timeout = 120000)
+  public void testUpdatePriorityAndKillAppWithZeroClusterResource()
+      throws Exception {
+    int maxPriority = 10;
+    int appPriority = 5;
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
+        maxPriority);
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+    RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    ClientRMService rmService = rm.getClientRMService();
+    // Update application priority
+    UpdateApplicationPriorityRequest updateRequest =
+        UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
+            Priority.newInstance(appPriority));
+    rmService.updateApplicationPriority(updateRequest);
+    Assert.assertEquals(
+        "Application priority should be updated to " + appPriority, appPriority,
+        app1.getApplicationSubmissionContext().getPriority().getPriority());
+    rm.killApp(app1.getApplicationId());
+    rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    rm.stop();
+  }
+
   @Test(timeout = 120000)
   public void testUpdateApplicationPriorityRequest() throws Exception {
     int maxPriority = 10;

+ 74 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java

@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -733,4 +736,75 @@ public class TestApplicationPriority {
     rm2.stop();
     rm1.stop();
   }
+
+  @Test(timeout = 120000)
+  public void testUpdatePriorityOnPendingAppAndKillAttempt() throws Exception {
+    int maxPriority = 10;
+    int appPriority = 5;
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
+        maxPriority);
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue defaultQueue = (LeafQueue) cs.getQueue("default");
+
+    // Update priority and kill application with no resource
+    RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    Collection<FiCaSchedulerApp> appsPending =
+        ((LeafQueue) defaultQueue).getPendingApplications();
+    Collection<FiCaSchedulerApp> activeApps =
+        ((LeafQueue) defaultQueue).getOrderingPolicy().getSchedulableEntities();
+
+    // Verify app is in pending state
+    Assert.assertEquals("Pending apps should be 1", 1, appsPending.size());
+    Assert.assertEquals("Active apps should be 0", 0, activeApps.size());
+
+    // kill app1 which is pending
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app1);
+
+    // Check ordering policy size when resource is added
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8096, rm.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app2 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    Assert.assertEquals("Pending apps should be 0", 0, appsPending.size());
+    Assert.assertEquals("Active apps should be 1", 1, activeApps.size());
+    RMApp app3 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    RMApp app4 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    Assert.assertEquals("Pending apps should be 2", 2, appsPending.size());
+    Assert.assertEquals("Active apps should be 1", 1, activeApps.size());
+    // kill app3, pending apps should reduce to 1
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 1, 1, app3);
+    // kill app2, running apps is killed and pending added to running
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 1, app2);
+    // kill app4, all apps are killed and both policy size should be zero
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app4);
+    rm.stop();
+  }
+
+  private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue,
+      int appsPendingExpected, int activeAppsExpected, RMApp app)
+      throws YarnException {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    cs.updateApplicationPriority(Priority.newInstance(2),
+        app.getApplicationId());
+    SchedulerEvent removeAttempt;
+    removeAttempt = new AppAttemptRemovedSchedulerEvent(
+        app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED,
+        false);
+    cs.handle(removeAttempt);
+    rm.drainEvents();
+    Collection<FiCaSchedulerApp> appsPending =
+        ((LeafQueue) defaultQueue).getPendingApplications();
+    Collection<FiCaSchedulerApp> activeApps =
+        ((LeafQueue) defaultQueue).getApplications();
+    Assert.assertEquals("Pending apps should be " + appsPendingExpected,
+        appsPendingExpected, appsPending.size());
+    Assert.assertEquals("Active apps should be " + activeAppsExpected,
+        activeAppsExpected, activeApps.size());
+  }
+
 }