|
@@ -2492,6 +2492,333 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ /**
|
|
|
+ * Tests the decision to preempt tasks respect to non-preemptable queues
|
|
|
+ * 1, Queues as follow:
|
|
|
+ * queueA(non-preemptable)
|
|
|
+ * queueB(preemptable)
|
|
|
+ * parentQueue(non-preemptable)
|
|
|
+ * --queueC(preemptable)
|
|
|
+ * queueD(preemptable)
|
|
|
+ * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
|
|
|
+ * 3, Now all resource are occupied
|
|
|
+ * 4, Submit request to queueD, and need to preempt resource from other queues
|
|
|
+ * 5, Only preemptable queue(queueB) would be preempted.
|
|
|
+ */
|
|
|
+ public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
+ scheduler.setClock(clock);
|
|
|
+
|
|
|
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println("<?xml version=\"1.0\"?>");
|
|
|
+ out.println("<allocations>");
|
|
|
+ out.println("<queue name=\"default\">");
|
|
|
+ out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"queueA\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"queueB\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"parentQueue\">");
|
|
|
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
+ out.println("<queue name=\"queueC\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"queueD\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>2048mb,0vcores</minResources>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
+ out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
+ out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
+ out.println("</allocations>");
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ // Create four nodes(3G each)
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
|
|
|
+ "127.0.0.1");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+
|
|
|
+ RMNode node2 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
|
|
|
+ "127.0.0.2");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
+ scheduler.handle(nodeEvent2);
|
|
|
+
|
|
|
+ RMNode node3 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
|
|
|
+ "127.0.0.3");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
|
|
+ scheduler.handle(nodeEvent3);
|
|
|
+
|
|
|
+ RMNode node4 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
|
|
|
+ "127.0.0.4");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
|
|
|
+ scheduler.handle(nodeEvent4);
|
|
|
+
|
|
|
+ // Submit apps to queueA, queueB, queueC,
|
|
|
+ // now all resource of the cluster is occupied
|
|
|
+ ApplicationAttemptId app1 =
|
|
|
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
|
|
|
+ ApplicationAttemptId app2 =
|
|
|
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
|
|
|
+ ApplicationAttemptId app3 =
|
|
|
+ createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
|
|
|
+
|
|
|
+ scheduler.update();
|
|
|
+
|
|
|
+ // Sufficient node check-ins to fully schedule containers
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeUpdate1);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
+ scheduler.handle(nodeUpdate2);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
+ scheduler.handle(nodeUpdate3);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
+ scheduler.handle(nodeUpdate4);
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now new requests arrive from queues D
|
|
|
+ ApplicationAttemptId app4 =
|
|
|
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
|
|
|
+ scheduler.update();
|
|
|
+ FSLeafQueue schedD =
|
|
|
+ scheduler.getQueueManager().getLeafQueue("queueD", true);
|
|
|
+
|
|
|
+ // After minSharePreemptionTime has passed, 2G resource should preempted from
|
|
|
+ // queueB to queueD
|
|
|
+ clock.tickSec(6);
|
|
|
+ assertEquals(2048,
|
|
|
+ scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
|
|
|
+
|
|
|
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
+ // now only app2 is selected to be preempted
|
|
|
+ assertTrue("App2 should have container to be preempted",
|
|
|
+ !Collections.disjoint(
|
|
|
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
|
|
|
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
+ assertTrue("App1 should not have container to be preempted",
|
|
|
+ Collections.disjoint(
|
|
|
+ scheduler.getSchedulerApp(app1).getLiveContainers(),
|
|
|
+ scheduler.getSchedulerApp(app1).getPreemptionContainers()));
|
|
|
+ assertTrue("App3 should not have container to be preempted",
|
|
|
+ Collections.disjoint(
|
|
|
+ scheduler.getSchedulerApp(app3).getLiveContainers(),
|
|
|
+ scheduler.getSchedulerApp(app3).getPreemptionContainers()));
|
|
|
+ // Pretend 20 seconds have passed
|
|
|
+ clock.tickSec(20);
|
|
|
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeUpdate1);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
+ scheduler.handle(nodeUpdate2);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
+ scheduler.handle(nodeUpdate3);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
+ scheduler.handle(nodeUpdate4);
|
|
|
+ }
|
|
|
+ // after preemption
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
+ assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
+ assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ /**
|
|
|
+ * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
|
|
|
+ * all queues.
|
|
|
+ * Then none of them would be preempted actually.
|
|
|
+ * 1, Queues as follow:
|
|
|
+ * queueA(non-preemptable)
|
|
|
+ * queueB(non-preemptable)
|
|
|
+ * parentQueue(non-preemptable)
|
|
|
+ * --queueC(preemptable)
|
|
|
+ * parentQueue(preemptable)
|
|
|
+ * --queueD(non-preemptable)
|
|
|
+ * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
|
|
|
+ * 3, Now all resource are occupied
|
|
|
+ * 4, Submit request to queueA, and need to preempt resource from other queues
|
|
|
+ * 5, None of queues would be preempted.
|
|
|
+ */
|
|
|
+ public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
|
|
|
+ throws Exception {
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
+ scheduler.setClock(clock);
|
|
|
+
|
|
|
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println("<?xml version=\"1.0\"?>");
|
|
|
+ out.println("<allocations>");
|
|
|
+ out.println("<queue name=\"default\">");
|
|
|
+ out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"queueA\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>2048mb,0vcores</minResources>");
|
|
|
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"queueB\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"parentQueue1\">");
|
|
|
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
+ out.println("<queue name=\"queueC\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<queue name=\"parentQueue2\">");
|
|
|
+ out.println("<queue name=\"queueD\">");
|
|
|
+ out.println("<weight>.25</weight>");
|
|
|
+ out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
+ out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
+ out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
+ out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
+ out.println("</allocations>");
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ // Create four nodes(3G each)
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
|
|
|
+ "127.0.0.1");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+
|
|
|
+ RMNode node2 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
|
|
|
+ "127.0.0.2");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
+ scheduler.handle(nodeEvent2);
|
|
|
+
|
|
|
+ RMNode node3 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
|
|
|
+ "127.0.0.3");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
|
|
+ scheduler.handle(nodeEvent3);
|
|
|
+
|
|
|
+ RMNode node4 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
|
|
|
+ "127.0.0.4");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
|
|
|
+ scheduler.handle(nodeEvent4);
|
|
|
+
|
|
|
+ // Submit apps to queueB, queueC, queueD
|
|
|
+ // now all resource of the cluster is occupied
|
|
|
+
|
|
|
+ ApplicationAttemptId app1 =
|
|
|
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
|
|
|
+ ApplicationAttemptId app2 =
|
|
|
+ createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
|
|
|
+ ApplicationAttemptId app3 =
|
|
|
+ createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
|
|
|
+ scheduler.update();
|
|
|
+
|
|
|
+ // Sufficient node check-ins to fully schedule containers
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeUpdate1);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
+ scheduler.handle(nodeUpdate2);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
+ scheduler.handle(nodeUpdate3);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
+ scheduler.handle(nodeUpdate4);
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
+
|
|
|
+ // Now new requests arrive from queues A
|
|
|
+ ApplicationAttemptId app4 =
|
|
|
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
|
|
|
+ scheduler.update();
|
|
|
+ FSLeafQueue schedA =
|
|
|
+ scheduler.getQueueManager().getLeafQueue("queueA", true);
|
|
|
+
|
|
|
+ // After minSharePreemptionTime has passed, resource deficit is 2G
|
|
|
+ clock.tickSec(6);
|
|
|
+ assertEquals(2048,
|
|
|
+ scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
|
|
|
+
|
|
|
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
+ // now none app is selected to be preempted
|
|
|
+ assertTrue("App1 should have container to be preempted",
|
|
|
+ Collections.disjoint(
|
|
|
+ scheduler.getSchedulerApp(app1).getLiveContainers(),
|
|
|
+ scheduler.getSchedulerApp(app1).getPreemptionContainers()));
|
|
|
+ assertTrue("App2 should not have container to be preempted",
|
|
|
+ Collections.disjoint(
|
|
|
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
|
|
|
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
+ assertTrue("App3 should not have container to be preempted",
|
|
|
+ Collections.disjoint(
|
|
|
+ scheduler.getSchedulerApp(app3).getLiveContainers(),
|
|
|
+ scheduler.getSchedulerApp(app3).getPreemptionContainers()));
|
|
|
+ // Pretend 20 seconds have passed
|
|
|
+ clock.tickSec(20);
|
|
|
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeUpdate1);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
+ scheduler.handle(nodeUpdate2);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
+ scheduler.handle(nodeUpdate3);
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
+ scheduler.handle(nodeUpdate4);
|
|
|
+ }
|
|
|
+ // after preemption
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|