|
@@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.junit.runner.RunWith;
|
|
|
|
+import org.junit.runners.Parameterized;
|
|
|
|
+import org.junit.runners.Parameterized.Parameters;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
@@ -68,28 +70,43 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinali
|
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
|
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
|
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
|
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
|
|
|
|
|
|
|
|
+/**
|
|
|
|
+ * Test Container Allocation with SchedulingRequest.
|
|
|
|
+ */
|
|
|
|
+@RunWith(Parameterized.class)
|
|
public class TestSchedulingRequestContainerAllocation {
|
|
public class TestSchedulingRequestContainerAllocation {
|
|
private static final int GB = 1024;
|
|
private static final int GB = 1024;
|
|
-
|
|
|
|
private YarnConfiguration conf;
|
|
private YarnConfiguration conf;
|
|
-
|
|
|
|
|
|
+ private String placementConstraintHandler;
|
|
RMNodeLabelsManager mgr;
|
|
RMNodeLabelsManager mgr;
|
|
|
|
|
|
|
|
+
|
|
|
|
+ @Parameters
|
|
|
|
+ public static Object[] placementConstarintHandlers() {
|
|
|
|
+ return new Object[] {
|
|
|
|
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public TestSchedulingRequestContainerAllocation(
|
|
|
|
+ String placementConstraintHandler) {
|
|
|
|
+ this.placementConstraintHandler = placementConstraintHandler;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Before
|
|
@Before
|
|
public void setUp() throws Exception {
|
|
public void setUp() throws Exception {
|
|
conf = new YarnConfiguration();
|
|
conf = new YarnConfiguration();
|
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
ResourceScheduler.class);
|
|
ResourceScheduler.class);
|
|
|
|
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
+ this.placementConstraintHandler);
|
|
mgr = new NullRMNodeLabelsManager();
|
|
mgr = new NullRMNodeLabelsManager();
|
|
mgr.init(conf);
|
|
mgr.init(conf);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout = 30000L)
|
|
public void testIntraAppAntiAffinity() throws Exception {
|
|
public void testIntraAppAntiAffinity() throws Exception {
|
|
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
|
|
|
- new Configuration());
|
|
|
|
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
|
|
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
|
|
|
|
|
|
// inject node label manager
|
|
// inject node label manager
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
@@ -120,18 +137,9 @@ public class TestSchedulingRequestContainerAllocation {
|
|
ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
|
|
ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
|
|
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
|
|
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
|
|
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should get 5 containers allocated (1 AM + 1 node each).
|
|
|
|
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
|
|
|
|
|
|
+ List<Container> allocated = waitForAllocation(4, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(4, allocated.size());
|
|
|
|
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
|
|
|
|
|
|
// Similarly, app1 asks 10 anti-affinity containers at different priority,
|
|
// Similarly, app1 asks 10 anti-affinity containers at different priority,
|
|
// it should be satisfied as well.
|
|
// it should be satisfied as well.
|
|
@@ -141,38 +149,30 @@ public class TestSchedulingRequestContainerAllocation {
|
|
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
|
|
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
|
|
Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
|
|
Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
|
|
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should get 9 containers allocated (1 AM + 8 containers).
|
|
|
|
- Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
|
|
|
|
|
|
+ allocated = waitForAllocation(4, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(4, allocated.size());
|
|
|
|
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
|
|
|
|
|
|
// Test anti-affinity to both of "mapper/reducer", we should only get no
|
|
// Test anti-affinity to both of "mapper/reducer", we should only get no
|
|
// container allocated
|
|
// container allocated
|
|
am1.allocateIntraAppAntiAffinity(
|
|
am1.allocateIntraAppAntiAffinity(
|
|
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
|
|
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
|
|
Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
|
|
Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
|
|
- // App1 should get 10 containers allocated (1 AM + 9 containers).
|
|
|
|
- Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
|
|
|
|
|
|
+ boolean caughtException = false;
|
|
|
|
+ try {
|
|
|
|
+ allocated = waitForAllocation(1, 3000, am1, nms);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ caughtException = true;
|
|
|
|
+ }
|
|
|
|
+ Assert.assertTrue(caughtException);
|
|
|
|
|
|
rm1.close();
|
|
rm1.close();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout = 30000L)
|
|
public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
|
|
public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
|
|
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
|
|
|
- new Configuration());
|
|
|
|
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
|
|
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
|
|
|
|
|
|
// inject node label manager
|
|
// inject node label manager
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
@@ -203,18 +203,9 @@ public class TestSchedulingRequestContainerAllocation {
|
|
Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
|
|
Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
|
|
"tag_1_1", "tag_1_2");
|
|
"tag_1_1", "tag_1_2");
|
|
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should get 3 containers allocated (1 AM + 2 task).
|
|
|
|
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
|
|
|
|
|
|
+ List<Container> allocated = waitForAllocation(2, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(2, allocated.size());
|
|
|
|
+ Assert.assertEquals(2, getContainerNodesNum(allocated));
|
|
|
|
|
|
// app1 asks for 1 anti-affinity containers for the same app. anti-affinity
|
|
// app1 asks for 1 anti-affinity containers for the same app. anti-affinity
|
|
// to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
|
|
// to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
|
|
@@ -223,33 +214,22 @@ public class TestSchedulingRequestContainerAllocation {
|
|
Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
|
|
Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
|
|
"tag_1_1", "tag_1_2");
|
|
"tag_1_1", "tag_1_2");
|
|
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should get 4 containers allocated (1 AM + 2 task (first request) +
|
|
|
|
- // 1 task (2nd request).
|
|
|
|
- Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
|
|
|
|
|
|
+ List<Container> allocated1 = waitForAllocation(1, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(1, allocated1.size());
|
|
|
|
+ allocated.addAll(allocated1);
|
|
|
|
+ Assert.assertEquals(3, getContainerNodesNum(allocated));
|
|
|
|
|
|
- // app1 asks for 10 anti-affinity containers for the same app. anti-affinity
|
|
|
|
|
|
+ // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
|
|
// to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
|
|
// to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
|
|
am1.allocateIntraAppAntiAffinity(
|
|
am1.allocateIntraAppAntiAffinity(
|
|
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
|
|
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
|
|
Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
|
|
Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
|
|
"tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
|
|
"tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
|
|
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should get 1 more containers allocated
|
|
|
|
- // 1 AM + 2 task (first request) + 1 task (2nd request) +
|
|
|
|
- // 1 task (3rd request)
|
|
|
|
- Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
|
|
|
|
|
|
+ allocated1 = waitForAllocation(1, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(1, allocated1.size());
|
|
|
|
+ allocated.addAll(allocated1);
|
|
|
|
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
|
|
|
|
|
|
rm1.close();
|
|
rm1.close();
|
|
}
|
|
}
|
|
@@ -260,12 +240,9 @@ public class TestSchedulingRequestContainerAllocation {
|
|
* types, see more in TestPlacementConstraintsUtil.
|
|
* types, see more in TestPlacementConstraintsUtil.
|
|
* @throws Exception
|
|
* @throws Exception
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout = 30000L)
|
|
public void testInterAppAntiAffinity() throws Exception {
|
|
public void testInterAppAntiAffinity() throws Exception {
|
|
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
|
|
|
- new Configuration());
|
|
|
|
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
|
|
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
|
|
|
|
|
|
// inject node label manager
|
|
// inject node label manager
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
@@ -296,13 +273,9 @@ public class TestSchedulingRequestContainerAllocation {
|
|
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
|
|
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
|
|
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
|
|
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
|
|
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ List<Container> allocated = waitForAllocation(3, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(3, allocated.size());
|
|
|
|
+ Assert.assertEquals(3, getContainerNodesNum(allocated));
|
|
|
|
|
|
System.out.println("Mappers on HOST0: "
|
|
System.out.println("Mappers on HOST0: "
|
|
+ rmNodes[0].getAllocationTagsWithCount().get("mapper"));
|
|
+ rmNodes[0].getAllocationTagsWithCount().get("mapper"));
|
|
@@ -311,11 +284,6 @@ public class TestSchedulingRequestContainerAllocation {
|
|
System.out.println("Mappers on HOST2: "
|
|
System.out.println("Mappers on HOST2: "
|
|
+ rmNodes[2].getAllocationTagsWithCount().get("mapper"));
|
|
+ rmNodes[2].getAllocationTagsWithCount().get("mapper"));
|
|
|
|
|
|
- // App1 should get 4 containers allocated (1 AM + 3 mappers).
|
|
|
|
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
// app2 -> c
|
|
// app2 -> c
|
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
|
|
@@ -330,18 +298,17 @@ public class TestSchedulingRequestContainerAllocation {
|
|
Priority.newInstance(1), 1L, allNs.toString(),
|
|
Priority.newInstance(1), 1L, allNs.toString(),
|
|
ImmutableSet.of("foo"), "mapper");
|
|
ImmutableSet.of("foo"), "mapper");
|
|
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ List<Container> allocated1 = waitForAllocation(3, 3000, am2, nms);
|
|
|
|
+ Assert.assertEquals(3, allocated1.size());
|
|
|
|
+ Assert.assertEquals(1, getContainerNodesNum(allocated1));
|
|
|
|
+ allocated.addAll(allocated1);
|
|
|
|
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
|
|
|
|
+
|
|
|
|
|
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
am2.getApplicationAttemptId());
|
|
am2.getApplicationAttemptId());
|
|
|
|
|
|
- // App2 should get 4 containers allocated (1 AM + 3 container).
|
|
|
|
- Assert.assertEquals(4, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
// The allocated node should not have mapper tag.
|
|
// The allocated node should not have mapper tag.
|
|
Assert.assertTrue(schedulerApp2.getLiveContainers()
|
|
Assert.assertTrue(schedulerApp2.getLiveContainers()
|
|
.stream().allMatch(rmContainer -> {
|
|
.stream().allMatch(rmContainer -> {
|
|
@@ -365,17 +332,11 @@ public class TestSchedulingRequestContainerAllocation {
|
|
Priority.newInstance(1), 1L, allNs.toString(),
|
|
Priority.newInstance(1), 1L, allNs.toString(),
|
|
ImmutableSet.of("mapper"), "mapper");
|
|
ImmutableSet.of("mapper"), "mapper");
|
|
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- for (int j = 0; j < 4; j++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
|
|
|
|
- am3.getApplicationAttemptId());
|
|
|
|
|
|
|
|
- // App3 should get 2 containers allocated (1 AM + 1 container).
|
|
|
|
- Assert.assertEquals(2, schedulerApp3.getLiveContainers().size());
|
|
|
|
|
|
+ allocated1 = waitForAllocation(1, 3000, am3, nms);
|
|
|
|
+ Assert.assertEquals(1, allocated1.size());
|
|
|
|
+ allocated.addAll(allocated1);
|
|
|
|
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
|
|
|
|
|
|
rm1.close();
|
|
rm1.close();
|
|
}
|
|
}
|
|
@@ -423,12 +384,9 @@ public class TestSchedulingRequestContainerAllocation {
|
|
rm1.close();
|
|
rm1.close();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout = 30000L)
|
|
public void testSchedulingRequestWithNullConstraint() throws Exception {
|
|
public void testSchedulingRequestWithNullConstraint() throws Exception {
|
|
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
|
|
|
- new Configuration());
|
|
|
|
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
|
|
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
|
|
|
|
|
|
// inject node label manager
|
|
// inject node label manager
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
MockRM rm1 = new MockRM(csConf) {
|
|
@@ -467,14 +425,8 @@ public class TestSchedulingRequestContainerAllocation {
|
|
.schedulingRequests(ImmutableList.of(sc)).build();
|
|
.schedulingRequests(ImmutableList.of(sc)).build();
|
|
am1.allocate(request);
|
|
am1.allocate(request);
|
|
|
|
|
|
- for (int i = 0; i < 4; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(2, schedApp.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
|
|
+ List<Container> allocated = waitForAllocation(1, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(1, allocated.size());
|
|
|
|
|
|
// Send another request with null placement constraint,
|
|
// Send another request with null placement constraint,
|
|
// ensure there is no NPE while handling this request.
|
|
// ensure there is no NPE while handling this request.
|
|
@@ -488,22 +440,19 @@ public class TestSchedulingRequestContainerAllocation {
|
|
.schedulingRequests(ImmutableList.of(sc)).build();
|
|
.schedulingRequests(ImmutableList.of(sc)).build();
|
|
am1.allocate(request1);
|
|
am1.allocate(request1);
|
|
|
|
|
|
- for (int i = 0; i < 4; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Assert.assertEquals(4, schedApp.getLiveContainers().size());
|
|
|
|
|
|
+ allocated = waitForAllocation(2, 3000, am1, nms);
|
|
|
|
+ Assert.assertEquals(2, allocated.size());
|
|
|
|
|
|
rm1.close();
|
|
rm1.close();
|
|
}
|
|
}
|
|
|
|
|
|
- private void doNodeHeartbeat(MockNM... nms) throws Exception {
|
|
|
|
|
|
+ private static void doNodeHeartbeat(MockNM... nms) throws Exception {
|
|
for (MockNM nm : nms) {
|
|
for (MockNM nm : nms) {
|
|
nm.nodeHeartbeat(true);
|
|
nm.nodeHeartbeat(true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private List<Container> waitForAllocation(int allocNum, int timeout,
|
|
|
|
|
|
+ public static List<Container> waitForAllocation(int allocNum, int timeout,
|
|
MockAM am, MockNM... nms) throws Exception {
|
|
MockAM am, MockNM... nms) throws Exception {
|
|
final List<Container> result = new ArrayList<>();
|
|
final List<Container> result = new ArrayList<>();
|
|
GenericTestUtils.waitFor(() -> {
|
|
GenericTestUtils.waitFor(() -> {
|
|
@@ -553,7 +502,7 @@ public class TestSchedulingRequestContainerAllocation {
|
|
.build();
|
|
.build();
|
|
}
|
|
}
|
|
|
|
|
|
- private int getContainerNodesNum(List<Container> containers) {
|
|
|
|
|
|
+ public static int getContainerNodesNum(List<Container> containers) {
|
|
Set<NodeId> nodes = new HashSet<>();
|
|
Set<NodeId> nodes = new HashSet<>();
|
|
if (containers != null) {
|
|
if (containers != null) {
|
|
containers.forEach(c -> nodes.add(c.getNodeId()));
|
|
containers.forEach(c -> nodes.add(c.getNodeId()));
|
|
@@ -566,12 +515,8 @@ public class TestSchedulingRequestContainerAllocation {
|
|
// This test both intra and inter app constraints.
|
|
// This test both intra and inter app constraints.
|
|
// Including simple affinity, anti-affinity, cardinality constraints,
|
|
// Including simple affinity, anti-affinity, cardinality constraints,
|
|
// and simple AND composite constraints.
|
|
// and simple AND composite constraints.
|
|
- YarnConfiguration config = new YarnConfiguration();
|
|
|
|
- config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
- config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
- ResourceScheduler.class);
|
|
|
|
- MockRM rm = new MockRM(config);
|
|
|
|
|
|
+
|
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
try {
|
|
try {
|
|
rm.start();
|
|
rm.start();
|
|
|
|
|
|
@@ -698,12 +643,8 @@ public class TestSchedulingRequestContainerAllocation {
|
|
@Test(timeout = 30000L)
|
|
@Test(timeout = 30000L)
|
|
public void testMultiAllocationTagsConstraints() throws Exception {
|
|
public void testMultiAllocationTagsConstraints() throws Exception {
|
|
// This test simulates to use PC to avoid port conflicts
|
|
// This test simulates to use PC to avoid port conflicts
|
|
- YarnConfiguration config = new YarnConfiguration();
|
|
|
|
- config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
- config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
- ResourceScheduler.class);
|
|
|
|
- MockRM rm = new MockRM(config);
|
|
|
|
|
|
+
|
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
try {
|
|
try {
|
|
rm.start();
|
|
rm.start();
|
|
|
|
|
|
@@ -779,12 +720,7 @@ public class TestSchedulingRequestContainerAllocation {
|
|
public void testInterAppConstraintsWithNamespaces() throws Exception {
|
|
public void testInterAppConstraintsWithNamespaces() throws Exception {
|
|
// This test verifies inter-app constraints with namespaces
|
|
// This test verifies inter-app constraints with namespaces
|
|
// not-self/app-id/app-tag
|
|
// not-self/app-id/app-tag
|
|
- YarnConfiguration config = new YarnConfiguration();
|
|
|
|
- config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
|
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
|
- config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
- ResourceScheduler.class);
|
|
|
|
- MockRM rm = new MockRM(config);
|
|
|
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
try {
|
|
try {
|
|
rm.start();
|
|
rm.start();
|
|
|
|
|