|
@@ -2288,7 +2288,315 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
// Request should be fulfilled
|
|
|
assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test (timeout = 5000)
|
|
|
+ public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception {
|
|
|
+ String allocBefore = "<?xml version=\"1.0\"?>" +
|
|
|
+ "<allocations>" +
|
|
|
+ "<queue name=\"root\">" +
|
|
|
+ "<queue name=\"queue1\">" +
|
|
|
+ "<maxRunningApps>1</maxRunningApps>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ String allocAfter = "<?xml version=\"1.0\"?>" +
|
|
|
+ "<allocations>" +
|
|
|
+ "<queue name=\"root\">" +
|
|
|
+ "<queue name=\"queue1\">" +
|
|
|
+ "<maxRunningApps>3</maxRunningApps>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 5000)
|
|
|
+ public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception {
|
|
|
+ String allocBefore = "<?xml version=\"1.0\"?>"+
|
|
|
+ "<allocations>"+
|
|
|
+ "<queue name=\"root\">"+
|
|
|
+ "<queue name=\"queue1\">"+
|
|
|
+ "<maxRunningApps>10</maxRunningApps>"+
|
|
|
+ "</queue>"+
|
|
|
+ "</queue>"+
|
|
|
+ "<user name=\"user1\">"+
|
|
|
+ "<maxRunningApps>1</maxRunningApps>"+
|
|
|
+ "</user>"+
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ String allocAfter = "<?xml version=\"1.0\"?>"+
|
|
|
+ "<allocations>"+
|
|
|
+ "<queue name=\"root\">"+
|
|
|
+ "<queue name=\"queue1\">"+
|
|
|
+ "<maxRunningApps>10</maxRunningApps>"+
|
|
|
+ "</queue>"+
|
|
|
+ "</queue>"+
|
|
|
+ "<user name=\"user1\">"+
|
|
|
+ "<maxRunningApps>3</maxRunningApps>"+
|
|
|
+ "</user>"+
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore,
|
|
|
+ String allocAfter) throws Exception {
|
|
|
+ // Set max running apps
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
+
|
|
|
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println(allocBefore);
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ // Add a node
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes
|
|
|
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+
|
|
|
+ // Request for app 1
|
|
|
+ ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 1 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
|
|
+
|
|
|
+ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 2 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
|
|
+ // App 3 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
|
|
|
+ // App 4 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+
|
|
|
+ out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println(allocAfter);
|
|
|
+ out.close();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 2 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 3 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 4 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now remove app 1
|
|
|
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
|
|
+ attId1, RMAppAttemptState.FINISHED, false);
|
|
|
+
|
|
|
+ scheduler.handle(appRemovedEvent1);
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 4 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 5000)
|
|
|
+ public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception {
|
|
|
+ String allocBefore = "<?xml version=\"1.0\"?>" +
|
|
|
+ "<allocations>" +
|
|
|
+ "<queue name=\"root\">" +
|
|
|
+ "<queue name=\"queue1\">" +
|
|
|
+ "<maxRunningApps>3</maxRunningApps>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ String allocAfter = "<?xml version=\"1.0\"?>" +
|
|
|
+ "<allocations>" +
|
|
|
+ "<queue name=\"root\">" +
|
|
|
+ "<queue name=\"queue1\">" +
|
|
|
+ "<maxRunningApps>1</maxRunningApps>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</queue>" +
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 5000)
|
|
|
+ public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception {
|
|
|
+ String allocBefore = "<?xml version=\"1.0\"?>"+
|
|
|
+ "<allocations>"+
|
|
|
+ "<queue name=\"root\">"+
|
|
|
+ "<queue name=\"queue1\">"+
|
|
|
+ "<maxRunningApps>10</maxRunningApps>"+
|
|
|
+ "</queue>"+
|
|
|
+ "</queue>"+
|
|
|
+ "<user name=\"user1\">"+
|
|
|
+ "<maxRunningApps>3</maxRunningApps>"+
|
|
|
+ "</user>"+
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ String allocAfter = "<?xml version=\"1.0\"?>"+
|
|
|
+ "<allocations>"+
|
|
|
+ "<queue name=\"root\">"+
|
|
|
+ "<queue name=\"queue1\">"+
|
|
|
+ "<maxRunningApps>10</maxRunningApps>"+
|
|
|
+ "</queue>"+
|
|
|
+ "</queue>"+
|
|
|
+ "<user name=\"user1\">"+
|
|
|
+ "<maxRunningApps>1</maxRunningApps>"+
|
|
|
+ "</user>"+
|
|
|
+ "</allocations>";
|
|
|
+
|
|
|
+ testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore,
|
|
|
+ String allocAfter) throws Exception {
|
|
|
+ // Set max running apps
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
+
|
|
|
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println(allocBefore);
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ // Add a node
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes
|
|
|
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+
|
|
|
+ // Request for app 1
|
|
|
+ ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 1 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
|
|
+
|
|
|
+ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
|
|
|
+ "user1", 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 2 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
|
|
+ // App 3 should be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
|
|
|
+ // App 4 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+
|
|
|
+ out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println(allocAfter);
|
|
|
+ out.close();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 2 should still be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 3 should still be running
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 4 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now remove app 1
|
|
|
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
|
|
+ attId1, RMAppAttemptState.FINISHED, false);
|
|
|
+
|
|
|
+ scheduler.handle(appRemovedEvent1);
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 4 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now remove app 2
|
|
|
+ appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
|
|
+ attId2, RMAppAttemptState.FINISHED, false);
|
|
|
+
|
|
|
+ scheduler.handle(appRemovedEvent1);
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 4 should not be running
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now remove app 3
|
|
|
+ appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
|
|
+ attId3, RMAppAttemptState.FINISHED, false);
|
|
|
+
|
|
|
+ scheduler.handle(appRemovedEvent1);
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ // App 4 should be running now
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
|
|
+ }
|
|
|
+
|
|
|
@Test (timeout = 5000)
|
|
|
public void testReservationWhileMultiplePriorities() throws IOException {
|
|
|
scheduler.init(conf);
|