|
@@ -22,22 +22,26 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
+import org.junit.Ignore;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
public class TestCapacitySchedulerSurgicalPreemption
|
|
|
extends CapacitySchedulerPreemptionTestBase {
|
|
@@ -167,8 +171,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
|
*
|
|
|
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
|
|
|
*
|
|
|
- * 2) app1 submit to queue-a first, it asked 38 * 1G containers
|
|
|
- * We will allocate 20 on n1 and 19 on n2.
|
|
|
+ * 2) app1 submit to queue-b, asks for 1G * 5
|
|
|
*
|
|
|
* 3) app2 submit to queue-c, ask for one 4G container (for AM)
|
|
|
*
|
|
@@ -243,4 +246,569 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
|
|
|
|
rm1.close();
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
+ * structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
|
|
|
+ *
|
|
|
+ * 2) app1 submit to queue-b first, it asked 6 * 1G containers
|
|
|
+ * We will allocate 4 on n1 (including AM) and 3 on n2.
|
|
|
+ *
|
|
|
+ * 3) app2 submit to queue-c, ask for one 18G container (for AM)
|
|
|
+ *
|
|
|
+ * After preemption, we should expect:
|
|
|
+ * Preempt 3 containers from app1 and AM of app2 successfully allocated.
|
|
|
+ */
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
|
|
|
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
|
+
|
|
|
+ // Queue c has higher priority than a/b
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
|
|
|
+
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 6, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for node1/node2
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, so the abs-used-cap of b is
|
|
|
+ // 7 / 40 = 17.5% < 20% (guaranteed)
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+ // 4 from n1 and 3 from n2
|
|
|
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
|
|
|
+ am1.getApplicationAttemptId(), 4);
|
|
|
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
|
|
|
+ am1.getApplicationAttemptId(), 3);
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c");
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
|
|
+
|
|
|
+ while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ Thread.sleep(10);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Call editSchedule immediately: containers are not selected
|
|
|
+ ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Sleep the timeout interval, we should be able to see containers selected
|
|
|
+ Thread.sleep(1000);
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed, and new AM
|
|
|
+ // container launched
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // Do allocation till reserved container allocated
|
|
|
+ while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ Thread.sleep(10);
|
|
|
+ }
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testPriorityPreemptionRequiresMoveReservation()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
+ * structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * 1) 3 nodes in the cluster, 10G for each
|
|
|
+ *
|
|
|
+ * 2) app1 submit to queue-b first, it asked 2G each,
|
|
|
+ * it can get 2G on n1 (AM), 2 * 2G on n2
|
|
|
+ *
|
|
|
+ * 3) app2 submit to queue-c, with 2G AM container (allocated on n3)
|
|
|
+ * app2 requires 9G resource, which will be reserved on n3
|
|
|
+ *
|
|
|
+ * We should expect container unreserved from n3 and allocated on n1/n2
|
|
|
+ */
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
|
|
|
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true);
|
|
|
+
|
|
|
+ // Queue c has higher priority than a/b
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
|
|
|
+
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB);
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+ RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 2 * GB, 2, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for node2 twice
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // 1 from n1 and 2 from n2
|
|
|
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
|
|
|
+ am1.getApplicationAttemptId(), 1);
|
|
|
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
|
|
|
+ am1.getApplicationAttemptId(), 2);
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 2G container for AM, on n3
|
|
|
+ RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
|
|
+
|
|
|
+ // Asks 1 * 9G container
|
|
|
+ am2.allocate("*", 9 * GB, 1, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for node3 once
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
|
|
|
+
|
|
|
+ // Make sure container reserved on node3
|
|
|
+ Assert.assertNotNull(
|
|
|
+ cs.getNode(rmNode3.getNodeID()).getReservedContainer());
|
|
|
+
|
|
|
+ // Call editSchedule immediately: nothing happens
|
|
|
+ ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertNotNull(
|
|
|
+ cs.getNode(rmNode3.getNodeID()).getReservedContainer());
|
|
|
+
|
|
|
+ // Sleep the timeout interval, we should be able to see reserved container
|
|
|
+ // moved to n2 (n1 occupied by AM)
|
|
|
+ Thread.sleep(1000);
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertNull(
|
|
|
+ cs.getNode(rmNode3.getNodeID()).getReservedContainer());
|
|
|
+ Assert.assertNotNull(
|
|
|
+ cs.getNode(rmNode2.getNodeID()).getReservedContainer());
|
|
|
+ Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode(
|
|
|
+ rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Do it again, we should see containers marked to be preempt
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // Do allocation till reserved container allocated
|
|
|
+ while (schedulerApp2.getLiveContainers().size() < 2) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ Thread.sleep(200);
|
|
|
+ }
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
+ * structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * 1) 10 nodes (n0-n9) in the cluster, each of them has 10G.
|
|
|
+ *
|
|
|
+ * 2) app1 submit to queue-b first, it asked 8 * 1G containers
|
|
|
+ * We will allocate 1 container on each of n0-n10
|
|
|
+ *
|
|
|
+ * 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM)
|
|
|
+ *
|
|
|
+ * After preemption, we should expect:
|
|
|
+ * Preempt 7 containers from app1 and usage of app2 is 70%
|
|
|
+ */
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
|
|
|
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
|
+
|
|
|
+ // Queue c has higher priority than a/b
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
|
|
|
+
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ MockNM[] mockNMs = new MockNM[10];
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
+ mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB);
|
|
|
+ }
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+
|
|
|
+ RMNode[] rmNodes = new RMNode[10];
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
|
|
|
+ }
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 8, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for nm1-nm8
|
|
|
+ for (int i = 1; i < 9; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 9 containers now, so the abs-used-cap of b is 9%
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(9, schedulerApp1.getLiveContainers().size());
|
|
|
+ for (int i = 0; i < 9; i++) {
|
|
|
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
|
|
|
+ am1.getApplicationAttemptId(), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 10G container for AM
|
|
|
+ // Launch AM in NM9
|
|
|
+ RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]);
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
|
|
+
|
|
|
+ // Ask 10 * 10GB containers
|
|
|
+ am2.allocate("*", 10 * GB, 10, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for all nms
|
|
|
+ for (int i = 1; i < 10; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check am2 reserved resource from nm1-nm9
|
|
|
+ for (int i = 1; i < 9; i++) {
|
|
|
+ Assert.assertNotNull("Should reserve on nm-" + i,
|
|
|
+ cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Sleep the timeout interval, we should be able to see 6 containers selected
|
|
|
+ // 6 (selected) + 1 (allocated) which makes target capacity to 70%
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
|
|
|
+
|
|
|
+ // Do allocation for all nms
|
|
|
+ for (int i = 1; i < 10; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 7);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 600000)
|
|
|
+ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
+ * structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 45 45 10
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Priority of queue_a = 1
|
|
|
+ * Priority of queue_b = 2
|
|
|
+ *
|
|
|
+ * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G.
|
|
|
+ *
|
|
|
+ * 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers
|
|
|
+ * We will allocate 1 container on each of n0-n4. AM on n4.
|
|
|
+ *
|
|
|
+ * 3) app2 submit to queue-a, AM container=0.5G, allocated on n0
|
|
|
+ * Ask for 2 * 3.5G containers. (Reserved on n0/n1)
|
|
|
+ *
|
|
|
+ * 4) app2 submit to queue-b, AM container=0.5G, allocated on n2
|
|
|
+ * Ask for 2 * 3.5G containers. (Reserved on n2/n3)
|
|
|
+ *
|
|
|
+ * First we will preempt container on n2 since it is the oldest container of
|
|
|
+ * Highest priority queue (b)
|
|
|
+ */
|
|
|
+
|
|
|
+ // Total preemption = 1G per round, which is 5% of cluster resource (20G)
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
|
+ 0.05f);
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
|
|
|
+ conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
|
|
|
+ conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
|
+
|
|
|
+ // A/B has higher priority
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1);
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f);
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f);
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f);
|
|
|
+
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ MockNM[] mockNMs = new MockNM[5];
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
+ mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB);
|
|
|
+ }
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+
|
|
|
+ RMNode[] rmNodes = new RMNode[5];
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
|
|
|
+ }
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 4, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for nm1-nm8
|
|
|
+ for (int i = 0; i < 4; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 5 containers now, one for each node
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
|
|
|
+ am1.getApplicationAttemptId(), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
|
|
|
+ RMApp app2 = rm1.submitApp(512, "app", "user", null, "a");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
|
|
+
|
|
|
+ // Ask 2 * 3.5GB containers
|
|
|
+ am2.allocate("*", 3 * GB + 512, 2, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for n0-n1
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check am2 reserved resource from nm0-nm1
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ Assert.assertNotNull("Should reserve on nm-" + i,
|
|
|
+ cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
|
+ Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
|
|
|
+ .getReservedContainer().getQueueName(), "a");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
|
|
|
+ RMApp app3 = rm1.submitApp(512, "app", "user", null, "b");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
|
|
|
+ FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
|
|
|
+ ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
|
|
|
+
|
|
|
+ // Ask 2 * 3.5GB containers
|
|
|
+ am3.allocate("*", 3 * GB + 512, 2, new ArrayList<>());
|
|
|
+
|
|
|
+ // Do allocation for n2-n3
|
|
|
+ for (int i = 2; i < 4; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check am2 reserved resource from nm2-nm3
|
|
|
+ for (int i = 2; i < 4; i++) {
|
|
|
+ Assert.assertNotNull("Should reserve on nm-" + i,
|
|
|
+ cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
|
+ Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
|
|
|
+ .getReservedContainer().getQueueName(), "b");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Sleep the timeout interval, we should be able to see 1 container selected
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ /* 1st container preempted is on n2 */
|
|
|
+ ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // We should have one to-preempt container, on node[2]
|
|
|
+ Set<RMContainer> selectedToPreempt =
|
|
|
+ editPolicy.getToPreemptContainers().keySet();
|
|
|
+ Assert.assertEquals(1, selectedToPreempt.size());
|
|
|
+ Assert.assertEquals(mockNMs[2].getNodeId(),
|
|
|
+ selectedToPreempt.iterator().next().getAllocatedNode());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // Do allocation for all nms
|
|
|
+ for (int i = 0; i < 4; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 2);
|
|
|
+
|
|
|
+ /* 2nd container preempted is on n3 */
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // We should have one to-preempt container, on node[3]
|
|
|
+ selectedToPreempt =
|
|
|
+ editPolicy.getToPreemptContainers().keySet();
|
|
|
+ Assert.assertEquals(1, selectedToPreempt.size());
|
|
|
+ Assert.assertEquals(mockNMs[3].getNodeId(),
|
|
|
+ selectedToPreempt.iterator().next().getAllocatedNode());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
|
|
|
+
|
|
|
+ // Do allocation for all nms
|
|
|
+ for (int i = 0; i < 4; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
|
|
|
+
|
|
|
+ /* 3rd container preempted is on n0 */
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // We should have one to-preempt container, on node[0]
|
|
|
+ selectedToPreempt =
|
|
|
+ editPolicy.getToPreemptContainers().keySet();
|
|
|
+ Assert.assertEquals(1, selectedToPreempt.size());
|
|
|
+ Assert.assertEquals(mockNMs[0].getNodeId(),
|
|
|
+ selectedToPreempt.iterator().next().getAllocatedNode());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
|
|
|
+
|
|
|
+ // Do allocation for all nms
|
|
|
+ for (int i = 0; i < 4; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 2);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
|
|
|
+
|
|
|
+ /* 4th container preempted is on n1 */
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // We should have one to-preempt container, on node[0]
|
|
|
+ selectedToPreempt =
|
|
|
+ editPolicy.getToPreemptContainers().keySet();
|
|
|
+ Assert.assertEquals(1, selectedToPreempt.size());
|
|
|
+ Assert.assertEquals(mockNMs[1].getNodeId(),
|
|
|
+ selectedToPreempt.iterator().next().getAllocatedNode());
|
|
|
+
|
|
|
+ // Call editSchedule again: selected containers are killed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
|
|
|
+
|
|
|
+ // Do allocation for all nms
|
|
|
+ for (int i = 0; i < 4; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
+ }
|
|
|
+
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp2, 3);
|
|
|
+ waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
}
|