Browse Source

YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)

(cherry picked from commit 22426a1c9f4bd616558089b6862fd34ab42d19a7)
(cherry picked from commit 721d7b574126c4070322f70ec5b49a7b8558a4c7)
(cherry picked from commit 5dfa25f22a989222e8b3d1013117b3350a48b2c5)
Karthik Kambatla 10 years ago
parent
commit
dbc5bab9fd

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -91,6 +91,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3222. Fixed RMNode to send scheduler events in sequential order when a
     node reconnects. (Rohith Sharma K S via jianhe)
 
+    YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending 
+    jobs. (Siqi Li via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1388,6 +1388,7 @@ public class FairScheduler extends
         allocConf = queueInfo;
         allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
         queueMgr.updateAllocationConfiguration(allocConf);
+        maxRunningEnforcer.updateRunnabilityOnReload();
       }
     }
   }

+ 35 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java

@@ -104,6 +104,26 @@ public class MaxRunningAppsEnforcer {
     usersNonRunnableApps.put(user, app);
   }
 
+  /**
+   * This is called after reloading the allocation configuration when the
+   * scheduler is reinitilized
+   *
+   * Checks to see whether any non-runnable applications become runnable
+   * now that the max running apps of given queue has been changed
+   *
+   * Runs in O(n) where n is the number of apps that are non-runnable and in
+   * the queues that went from having no slack to having slack.
+   */
+  public void updateRunnabilityOnReload() {
+    FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue();
+    List<List<FSAppAttempt>> appsNowMaybeRunnable =
+        new ArrayList<List<FSAppAttempt>>();
+
+    gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
+
+    updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
+  }
+
   /**
    * Checks to see whether any other applications runnable now that the given
    * application has been removed from the given queue.  And makes them so.
@@ -156,6 +176,19 @@ public class MaxRunningAppsEnforcer {
       }
     }
 
+    updateAppsRunnability(appsNowMaybeRunnable,
+        appsNowMaybeRunnable.size());
+  }
+
+  /**
+   * Checks to see whether applications are runnable now by iterating
+   * through each one of them and check if the queue and user have slack
+   *
+   * if we know how many apps can be runnable, there is no need to iterate
+   * through all apps, maxRunnableApps is used to break out of the iteration
+   */
+  private void updateAppsRunnability(List<List<FSAppAttempt>>
+      appsNowMaybeRunnable, int maxRunnableApps) {
     // Scan through and check whether this means that any apps are now runnable
     Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
         appsNowMaybeRunnable);
@@ -173,9 +206,7 @@ public class MaxRunningAppsEnforcer {
         next.getQueue().getRunnableAppSchedulables().add(appSched);
         noLongerPendingApps.add(appSched);
 
-        // No more than one app per list will be able to be made runnable, so
-        // we can stop looking after we've found that many
-        if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
+        if (noLongerPendingApps.size() >= maxRunnableApps) {
           break;
         }
       }
@@ -195,11 +226,10 @@ public class MaxRunningAppsEnforcer {
       
       if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
         LOG.error("Waiting app " + appSched + " expected to be in "
-        		+ "usersNonRunnableApps, but was not. This should never happen.");
+            + "usersNonRunnableApps, but was not. This should never happen.");
       }
     }
   }
-  
   /**
    * Updates the relevant tracking variables after a runnable app with the given
    * queue and user has been removed.

+ 309 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -2035,7 +2035,315 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Request should be fulfilled
     assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
   }
-  
+
+  @Test (timeout = 5000)
+  public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+  String allocBefore = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>1</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>3</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  @Test (timeout = 5000)
+  public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception {
+    String allocBefore = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>1</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>3</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore,
+      String allocAfter) throws Exception {
+    // Set max running apps
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocBefore);
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Request for app 1
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // App 1 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+    // App 3 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocAfter);
+    out.close();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 3 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 1
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId1, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+  }
+
+  @Test (timeout = 5000)
+  public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+  String allocBefore = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>3</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>1</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  @Test (timeout = 5000)
+  public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception {
+    String allocBefore = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>3</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>1</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore,
+      String allocAfter) throws Exception {
+    // Set max running apps
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocBefore);
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Request for app 1
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // App 1 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+    // App 3 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocAfter);
+    out.close();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should still be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 3 should still be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 1
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId1, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 2
+    appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId2, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 3
+    appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId3, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should be running now
+    assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+  }
+
   @Test (timeout = 5000)
   public void testReservationWhileMultiplePriorities() throws IOException {
     scheduler.init(conf);