|
@@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
|
|
|
|
+ .RMNodeLabelsManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
@@ -86,8 +89,19 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
* After preemption, we should expect:
|
|
* After preemption, we should expect:
|
|
* Preempt 4 containers from app1 on n1.
|
|
* Preempt 4 containers from app1 on n1.
|
|
*/
|
|
*/
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
|
|
+ testSimpleSurgicalPreemption("a", "c", "user", "user");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void testSimpleSurgicalPreemption(String queue1, String queue2,
|
|
|
|
+ String user1, String user2)
|
|
|
|
+ throws Exception {
|
|
|
|
+
|
|
|
|
+ MockRM rm1 = new MockRM(conf) {
|
|
|
|
+ protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
|
+ return mgr;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
rm1.start();
|
|
rm1.start();
|
|
|
|
|
|
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
|
|
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
|
|
@@ -97,7 +111,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
|
|
|
// launch an app to queue, AM container should be launched in nm1
|
|
// launch an app to queue, AM container should be launched in nm1
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", user1, null, queue1);
|
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
|
|
am1.allocate("*", 1 * GB, 32, new ArrayList<ContainerId>());
|
|
am1.allocate("*", 1 * GB, 32, new ArrayList<ContainerId>());
|
|
@@ -120,7 +134,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
|
|
|
|
|
|
|
|
// Submit app2 to queue-c and asks for a 1G container for AM
|
|
// Submit app2 to queue-c and asks for a 1G container for AM
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", user2, null, queue2);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
|
|
|
// NM1/NM2 has available resource = 2G/4G
|
|
// NM1/NM2 has available resource = 2G/4G
|
|
@@ -632,6 +646,21 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
* Highest priority queue (b)
|
|
* Highest priority queue (b)
|
|
*/
|
|
*/
|
|
|
|
|
|
|
|
+ // A/B has higher priority
|
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a" , 1);
|
|
|
|
+ conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
|
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f);
|
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f);
|
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f);
|
|
|
|
+
|
|
|
|
+ testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(new
|
|
|
|
+ String[] {"a", "b", "c"}, new String[] {"user", "user", "user"});
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void
|
|
|
|
+ testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(String[]
|
|
|
|
+ queues, String[] users) throws Exception {
|
|
// Total preemption = 1G per round, which is 5% of cluster resource (20G)
|
|
// Total preemption = 1G per round, which is 5% of cluster resource (20G)
|
|
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
0.05f);
|
|
0.05f);
|
|
@@ -641,15 +670,11 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
|
|
conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
|
|
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
|
|
|
|
- // A/B has higher priority
|
|
|
|
- conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1);
|
|
|
|
- conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
|
|
|
|
- conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f);
|
|
|
|
- conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f);
|
|
|
|
- conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f);
|
|
|
|
-
|
|
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
|
|
+ MockRM rm1 = new MockRM(conf) {
|
|
|
|
+ protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
|
+ return mgr;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
rm1.start();
|
|
rm1.start();
|
|
|
|
|
|
MockNM[] mockNMs = new MockNM[5];
|
|
MockNM[] mockNMs = new MockNM[5];
|
|
@@ -665,7 +690,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
}
|
|
}
|
|
|
|
|
|
// launch an app to queue, AM container should be launched in nm1
|
|
// launch an app to queue, AM container should be launched in nm1
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", users[2], null, queues[2]);
|
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
|
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
|
|
|
|
|
|
am1.allocate("*", 1 * GB, 4, new ArrayList<>());
|
|
am1.allocate("*", 1 * GB, 4, new ArrayList<>());
|
|
@@ -685,7 +710,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
}
|
|
}
|
|
|
|
|
|
// Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
|
|
// Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
|
|
- RMApp app2 = rm1.submitApp(512, "app", "user", null, "a");
|
|
|
|
|
|
+ RMApp app2 = rm1.submitApp(512, "app", users[0], null, queues[0]);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
|
|
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
|
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
|
@@ -703,11 +728,11 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
Assert.assertNotNull("Should reserve on nm-" + i,
|
|
Assert.assertNotNull("Should reserve on nm-" + i,
|
|
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
|
|
Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
|
|
- .getReservedContainer().getQueueName(), "a");
|
|
|
|
|
|
+ .getReservedContainer().getQueueName(), queues[0]);
|
|
}
|
|
}
|
|
|
|
|
|
// Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
|
|
// Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
|
|
- RMApp app3 = rm1.submitApp(512, "app", "user", null, "b");
|
|
|
|
|
|
+ RMApp app3 = rm1.submitApp(512, "app", users[1], null, queues[1]);
|
|
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
|
|
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
|
|
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
|
|
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
|
|
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
|
|
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
|
|
@@ -725,7 +750,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
Assert.assertNotNull("Should reserve on nm-" + i,
|
|
Assert.assertNotNull("Should reserve on nm-" + i,
|
|
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
|
|
Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
|
|
Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
|
|
- .getReservedContainer().getQueueName(), "b");
|
|
|
|
|
|
+ .getReservedContainer().getQueueName(), queues[1]);
|
|
}
|
|
}
|
|
|
|
|
|
// Sleep the timeout interval, we should be able to see 1 container selected
|
|
// Sleep the timeout interval, we should be able to see 1 container selected
|
|
@@ -831,6 +856,7 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|
rm1.close();
|
|
rm1.close();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
@Test(timeout = 60000)
|
|
@Test(timeout = 60000)
|
|
public void testPreemptionForFragmentatedCluster() throws Exception {
|
|
public void testPreemptionForFragmentatedCluster() throws Exception {
|
|
// Set additional_balance_queue_based_on_reserved_res to true to get
|
|
// Set additional_balance_queue_based_on_reserved_res to true to get
|