|
@@ -1221,6 +1221,79 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
|
|
|
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
|
|
|
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
|
+
|
|
|
+ MockClock clock = new MockClock();
|
|
|
+ scheduler.setClock(clock);
|
|
|
+
|
|
|
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println("<?xml version=\"1.0\"?>");
|
|
|
+ out.println("<allocations>");
|
|
|
+ out.println("<queue name=\"queueA\">");
|
|
|
+ out.println("<weight>8</weight>");
|
|
|
+ out.println("<queue name=\"queueA1\" />");
|
|
|
+ out.println("<queue name=\"queueA2\" />");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"queueB\">");
|
|
|
+ out.println("<weight>2</weight>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
|
|
|
+ out.println("</allocations>");
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ // Add a node of 8G
|
|
|
+ RMNode node1 = MockNodes.newNodeInfo(1,
|
|
|
+ Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+
|
|
|
+ // Run apps in queueA.A1 and queueB
|
|
|
+ ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
|
|
|
+ "queueA.queueA1", "user1", 7, 1);
|
|
|
+ // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
|
|
|
+ ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
|
|
|
+ "user2", 1, 1);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ for (int i = 0; i < 8; i++) {
|
|
|
+ scheduler.handle(nodeUpdate1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify if the apps got the containers they requested
|
|
|
+ assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now submit an app in queueA.queueA2
|
|
|
+ ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
|
|
|
+ "queueA.queueA2", "user3", 7, 1);
|
|
|
+ scheduler.update();
|
|
|
+
|
|
|
+ // Let 11 sec pass
|
|
|
+ clock.tick(11);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+ Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
|
|
|
+ .getLeafQueue("queueA.queueA2", false), clock.getTime());
|
|
|
+ assertEquals(2980, toPreempt.getMemory());
|
|
|
+
|
|
|
+ // verify if the 3 containers required by queueA2 are preempted in the same
|
|
|
+ // round
|
|
|
+ scheduler.preemptResources(toPreempt);
|
|
|
+ assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
|
|
|
+ .size());
|
|
|
+ }
|
|
|
+
|
|
|
@Test (timeout = 5000)
|
|
|
/**
|
|
|
* Tests the timing of decision to preempt tasks.
|