|
@@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
|
|
|
+ ContainerExpiredSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
@@ -166,6 +168,12 @@ public class TestCapacityScheduler {
|
|
|
private static final String B3 = B + ".b3";
|
|
|
private static float A_CAPACITY = 10.5f;
|
|
|
private static float B_CAPACITY = 89.5f;
|
|
|
+ private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
|
|
|
+ private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
|
|
|
+ private static final String X1 = P1 + ".x1";
|
|
|
+ private static final String X2 = P1 + ".x2";
|
|
|
+ private static final String Y1 = P2 + ".y1";
|
|
|
+ private static final String Y2 = P2 + ".y2";
|
|
|
private static float A1_CAPACITY = 30;
|
|
|
private static float A2_CAPACITY = 70;
|
|
|
private static float B1_CAPACITY = 79.2f;
|
|
@@ -411,7 +419,52 @@ public class TestCapacityScheduler {
|
|
|
LOG.info("Setup top-level queues a and b");
|
|
|
return conf;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private CapacitySchedulerConfiguration setupBlockedQueueConfiguration(
|
|
|
+ CapacitySchedulerConfiguration conf) {
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[]{"a", "b"});
|
|
|
+
|
|
|
+ conf.setCapacity(A, 80f);
|
|
|
+ conf.setCapacity(B, 20f);
|
|
|
+ conf.setUserLimitFactor(A, 100);
|
|
|
+ conf.setUserLimitFactor(B, 100);
|
|
|
+ conf.setMaximumCapacity(A, 100);
|
|
|
+ conf.setMaximumCapacity(B, 100);
|
|
|
+ LOG.info("Setup top-level queues a and b");
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration(
|
|
|
+ CapacitySchedulerConfiguration conf) {
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[]{"p1", "p2"});
|
|
|
+
|
|
|
+ conf.setCapacity(P1, 50f);
|
|
|
+ conf.setMaximumCapacity(P1, 50f);
|
|
|
+ conf.setCapacity(P2, 50f);
|
|
|
+ conf.setMaximumCapacity(P2, 100f);
|
|
|
+ // Define 2nd-level queues
|
|
|
+ conf.setQueues(P1, new String[] {"x1", "x2"});
|
|
|
+ conf.setCapacity(X1, 80f);
|
|
|
+ conf.setMaximumCapacity(X1, 100f);
|
|
|
+ conf.setUserLimitFactor(X1, 2f);
|
|
|
+ conf.setCapacity(X2, 20f);
|
|
|
+ conf.setMaximumCapacity(X2, 100f);
|
|
|
+ conf.setUserLimitFactor(X2, 2f);
|
|
|
+
|
|
|
+ conf.setQueues(P2, new String[]{"y1", "y2"});
|
|
|
+ conf.setCapacity(Y1, 80f);
|
|
|
+ conf.setUserLimitFactor(Y1, 2f);
|
|
|
+ conf.setCapacity(Y2, 20f);
|
|
|
+ conf.setUserLimitFactor(Y2, 2f);
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testMaximumCapacitySetup() {
|
|
|
float delta = 0.0000001f;
|
|
@@ -3415,4 +3468,237 @@ public class TestCapacityScheduler {
|
|
|
scheduler.handle(appRemovedEvent1);
|
|
|
rm.stop();
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCSReservationWithRootUnblocked() throws Exception {
|
|
|
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
+ conf.setResourceComparator(DominantResourceCalculator.class);
|
|
|
+ setupOtherBlockedQueueConfiguration(conf);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+ ParentQueue q = (ParentQueue) cs.getQueue("p1");
|
|
|
+
|
|
|
+ Assert.assertNotNull(q);
|
|
|
+ String host = "127.0.0.1";
|
|
|
+ String host1 = "test";
|
|
|
+ RMNode node =
|
|
|
+ MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node));
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node1));
|
|
|
+ ApplicationAttemptId appAttemptId1 =
|
|
|
+ appHelper(rm, cs, 100, 1, "x1", "userX1");
|
|
|
+ ApplicationAttemptId appAttemptId2 =
|
|
|
+ appHelper(rm, cs, 100, 2, "x2", "userX2");
|
|
|
+ ApplicationAttemptId appAttemptId3 =
|
|
|
+ appHelper(rm, cs, 100, 3, "y1", "userY1");
|
|
|
+ RecordFactory recordFactory =
|
|
|
+ RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+ ResourceRequest y1Req = null;
|
|
|
+ ResourceRequest x1Req = null;
|
|
|
+ ResourceRequest x2Req = null;
|
|
|
+ for(int i=0; i < 4; i++) {
|
|
|
+ y1Req = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId3,
|
|
|
+ Collections.<ResourceRequest>singletonList(y1Req),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ }
|
|
|
+ assertEquals("Y1 Used Resource should be 4 GB", 4 * GB,
|
|
|
+ cs.getQueue("y1").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("P2 Used Resource should be 4 GB", 4 * GB,
|
|
|
+ cs.getQueue("p2").getUsedResources().getMemorySize());
|
|
|
+
|
|
|
+ for(int i=0; i < 7; i++) {
|
|
|
+ x1Req = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId1,
|
|
|
+ Collections.<ResourceRequest>singletonList(x1Req),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ }
|
|
|
+ assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
|
|
|
+ cs.getQueue("x1").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
|
|
|
+ cs.getQueue("p1").getUsedResources().getMemorySize());
|
|
|
+
|
|
|
+ x2Req = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId2,
|
|
|
+ Collections.<ResourceRequest>singletonList(x2Req),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ assertEquals("X2 Used Resource should be 0", 0,
|
|
|
+ cs.getQueue("x2").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
|
|
|
+ cs.getQueue("p1").getUsedResources().getMemorySize());
|
|
|
+ //this assign should fail
|
|
|
+ x1Req = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId1,
|
|
|
+ Collections.<ResourceRequest>singletonList(x1Req),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
|
|
|
+ cs.getQueue("x1").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
|
|
|
+ cs.getQueue("p1").getUsedResources().getMemorySize());
|
|
|
+
|
|
|
+ //this should get thru
|
|
|
+ for (int i=0; i < 4; i++) {
|
|
|
+ y1Req = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId3,
|
|
|
+ Collections.<ResourceRequest>singletonList(y1Req),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ }
|
|
|
+ assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
|
|
|
+ cs.getQueue("p2").getUsedResources().getMemorySize());
|
|
|
+
|
|
|
+ //Free a container from X1
|
|
|
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2);
|
|
|
+ cs.handle(new ContainerExpiredSchedulerEvent(containerId));
|
|
|
+
|
|
|
+ //Schedule pending request
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ assertEquals("X2 Used Resource should be 2 GB", 2 * GB,
|
|
|
+ cs.getQueue("x2").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("P1 Used Resource should be 8 GB", 8 * GB,
|
|
|
+ cs.getQueue("p1").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
|
|
|
+ cs.getQueue("p2").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("Root Used Resource should be 16 GB", 16 * GB,
|
|
|
+ cs.getRootQueue().getUsedResources().getMemorySize());
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCSQueueBlocked() throws Exception {
|
|
|
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
+ setupBlockedQueueConfiguration(conf);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+ LeafQueue q = (LeafQueue) cs.getQueue("a");
|
|
|
+
|
|
|
+ Assert.assertNotNull(q);
|
|
|
+ String host = "127.0.0.1";
|
|
|
+ String host1 = "test";
|
|
|
+ RMNode node =
|
|
|
+ MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node));
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node1));
|
|
|
+ //add app begin
|
|
|
+ ApplicationAttemptId appAttemptId1 =
|
|
|
+ appHelper(rm, cs, 100, 1, "a", "user1");
|
|
|
+ ApplicationAttemptId appAttemptId2 =
|
|
|
+ appHelper(rm, cs, 100, 2, "b", "user2");
|
|
|
+ //add app end
|
|
|
+
|
|
|
+ RecordFactory recordFactory =
|
|
|
+ RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+ ResourceRequest r1 = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
|
|
|
+ //This will allocate for app1
|
|
|
+ cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null).getContainers().size();
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ ResourceRequest r2 = null;
|
|
|
+ for (int i =0; i < 13; i++) {
|
|
|
+ r2 = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId2,
|
|
|
+ Collections.<ResourceRequest>singletonList(r2),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ }
|
|
|
+ assertEquals("A Used Resource should be 2 GB", 2 * GB,
|
|
|
+ cs.getQueue("a").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("B Used Resource should be 2 GB", 13 * GB,
|
|
|
+ cs.getQueue("b").getUsedResources().getMemorySize());
|
|
|
+ r1 = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
|
|
|
+ r2 = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
|
|
|
+ cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
|
|
|
+ Collections.<ContainerId>emptyList(),
|
|
|
+ null, null, null, null).getContainers().size();
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+
|
|
|
+ cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
|
|
|
+ Collections.<ContainerId>emptyList(), null, null, null, null);
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ //Check blocked Resource
|
|
|
+ assertEquals("A Used Resource should be 2 GB", 2 * GB,
|
|
|
+ cs.getQueue("a").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("B Used Resource should be 13 GB", 13 * GB,
|
|
|
+ cs.getQueue("b").getUsedResources().getMemorySize());
|
|
|
+
|
|
|
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10);
|
|
|
+ ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11);
|
|
|
+
|
|
|
+ cs.handle(new ContainerExpiredSchedulerEvent(containerId1));
|
|
|
+ cs.handle(new ContainerExpiredSchedulerEvent(containerId2));
|
|
|
+ CapacityScheduler.schedule(cs);
|
|
|
+ rm.drainEvents();
|
|
|
+ assertEquals("A Used Resource should be 2 GB", 4 * GB,
|
|
|
+ cs.getQueue("a").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("B Used Resource should be 12 GB", 12 * GB,
|
|
|
+ cs.getQueue("b").getUsedResources().getMemorySize());
|
|
|
+ assertEquals("Used Resource on Root should be 16 GB", 16 * GB,
|
|
|
+ cs.getRootQueue().getUsedResources().getMemorySize());
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
|
|
|
+ int clusterTs, int appId, String queue,
|
|
|
+ String user) {
|
|
|
+ ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
|
|
|
+ ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
|
|
+ appId1, appId);
|
|
|
+
|
|
|
+ RMAppAttemptMetrics attemptMetric1 =
|
|
|
+ new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
|
|
|
+ RMAppImpl app1 = mock(RMAppImpl.class);
|
|
|
+ when(app1.getApplicationId()).thenReturn(appId1);
|
|
|
+ RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
|
|
|
+ Container container = mock(Container.class);
|
|
|
+ when(attempt1.getMasterContainer()).thenReturn(container);
|
|
|
+ ApplicationSubmissionContext submissionContext = mock(
|
|
|
+ ApplicationSubmissionContext.class);
|
|
|
+ when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
|
|
|
+ when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
|
|
|
+ when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
|
|
|
+ when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
|
|
|
+ rm.getRMContext().getRMApps().put(appId1, app1);
|
|
|
+
|
|
|
+ SchedulerEvent addAppEvent1 =
|
|
|
+ new AppAddedSchedulerEvent(appId1, queue, user);
|
|
|
+ cs.handle(addAppEvent1);
|
|
|
+ SchedulerEvent addAttemptEvent1 =
|
|
|
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
|
|
|
+ cs.handle(addAttemptEvent1);
|
|
|
+ return appAttemptId1;
|
|
|
+ }
|
|
|
}
|