|
@@ -5029,4 +5029,132 @@ public class TestCapacityScheduler {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
|
|
|
|
+
|
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
+ ResourceScheduler.class);
|
|
|
|
+
|
|
|
|
+ CapacitySchedulerConfiguration newConf =
|
|
|
|
+ new CapacitySchedulerConfiguration(conf);
|
|
|
|
+
|
|
|
|
+ // Define top-level queues
|
|
|
|
+ newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
|
+ new String[] { "a", "b" });
|
|
|
|
+
|
|
|
|
+ newConf.setCapacity(A, 50);
|
|
|
|
+ newConf.setCapacity(B, 50);
|
|
|
|
+
|
|
|
|
+ // Define 2nd-level queues
|
|
|
|
+ newConf.setQueues(A, new String[] { "a1" });
|
|
|
|
+ newConf.setCapacity(A1, 100);
|
|
|
|
+ newConf.setUserLimitFactor(A1, 2.0f);
|
|
|
|
+ newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
|
|
|
|
+
|
|
|
|
+ newConf.setQueues(B, new String[] { "b1" });
|
|
|
|
+ newConf.setCapacity(B1, 100);
|
|
|
|
+ newConf.setUserLimitFactor(B1, 2.0f);
|
|
|
|
+
|
|
|
|
+ LOG.info("Setup top-level queues a and b");
|
|
|
|
+
|
|
|
|
+ MockRM rm = new MockRM(newConf);
|
|
|
|
+ rm.start();
|
|
|
|
+
|
|
|
|
+ CapacityScheduler scheduler =
|
|
|
|
+ (CapacityScheduler) rm.getResourceScheduler();
|
|
|
|
+
|
|
|
|
+ MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
|
|
|
|
+
|
|
|
|
+ // submit an app
|
|
|
|
+ RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1");
|
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
|
|
|
|
+
|
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
|
+ rm.getApplicationReport(app.getApplicationId())
|
|
|
|
+ .getCurrentApplicationAttemptId();
|
|
|
|
+
|
|
|
|
+ RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1");
|
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
|
|
|
|
+
|
|
|
|
+ RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1");
|
|
|
|
+
|
|
|
|
+ RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1");
|
|
|
|
+
|
|
|
|
+ // Each application asks 50 * 1GB containers
|
|
|
|
+ am1.allocate("*", 1 * GB, 50, null);
|
|
|
|
+ am2.allocate("*", 1 * GB, 50, null);
|
|
|
|
+
|
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
|
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
+
|
|
|
|
+ // check preconditions
|
|
|
|
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
|
|
|
+ assertEquals(4, appsInA1.size());
|
|
|
|
+ String queue =
|
|
|
|
+ scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
|
|
|
|
+ .getQueueName();
|
|
|
|
+ Assert.assertEquals("a1", queue);
|
|
|
|
+
|
|
|
|
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
|
|
|
+ assertTrue(appsInA.contains(appAttemptId));
|
|
|
|
+ assertEquals(4, appsInA.size());
|
|
|
|
+
|
|
|
|
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
|
|
|
+ assertTrue(appsInRoot.contains(appAttemptId));
|
|
|
|
+ assertEquals(4, appsInRoot.size());
|
|
|
|
+
|
|
|
|
+ List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
|
|
|
+ assertTrue(appsInB1.isEmpty());
|
|
|
|
+
|
|
|
|
+ List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
|
|
|
|
+ assertTrue(appsInB.isEmpty());
|
|
|
|
+
|
|
|
|
+ UsersManager um =
|
|
|
|
+ (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
|
|
|
|
+
|
|
|
|
+ assertEquals(4, um.getNumActiveUsers());
|
|
|
|
+ assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
|
|
|
|
+
|
|
|
|
+ // now move the app
|
|
|
|
+ scheduler.moveAllApps("a1", "b1");
|
|
|
|
+
|
|
|
|
+ //Triggering this event so that user limit computation can
|
|
|
|
+ //happen again
|
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // check postconditions
|
|
|
|
+ appsInB1 = scheduler.getAppsInQueue("b1");
|
|
|
|
+
|
|
|
|
+ assertEquals(4, appsInB1.size());
|
|
|
|
+ queue =
|
|
|
|
+ scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
|
|
|
|
+ .getQueueName();
|
|
|
|
+ Assert.assertEquals("b1", queue);
|
|
|
|
+
|
|
|
|
+ appsInB = scheduler.getAppsInQueue("b");
|
|
|
|
+ assertTrue(appsInB.contains(appAttemptId));
|
|
|
|
+ assertEquals(4, appsInB.size());
|
|
|
|
+
|
|
|
|
+ appsInRoot = scheduler.getAppsInQueue("root");
|
|
|
|
+ assertTrue(appsInRoot.contains(appAttemptId));
|
|
|
|
+ assertEquals(4, appsInRoot.size());
|
|
|
|
+
|
|
|
|
+ List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
|
|
|
|
+ assertEquals(0, oldAppsInA1.size());
|
|
|
|
+
|
|
|
|
+ UsersManager um_b1 =
|
|
|
|
+ (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
|
|
|
|
+
|
|
|
|
+ assertEquals(2, um_b1.getNumActiveUsers());
|
|
|
|
+ assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
|
|
|
|
+
|
|
|
|
+ appsInB1 = scheduler.getAppsInQueue("b1");
|
|
|
|
+ assertEquals(4, appsInB1.size());
|
|
|
|
+ rm.close();
|
|
|
|
+ }
|
|
}
|
|
}
|