|
@@ -0,0 +1,743 @@
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
+
|
|
|
+import static org.junit.Assert.*;
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
+
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
+
|
|
|
+public class TestLeafQueue {
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
|
|
|
+
|
|
|
+ private final RecordFactory recordFactory =
|
|
|
+ RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ RMContext rmContext;
|
|
|
+ CapacityScheduler cs;
|
|
|
+ CapacitySchedulerConfiguration csConf;
|
|
|
+ CapacitySchedulerContext csContext;
|
|
|
+
|
|
|
+ Queue root;
|
|
|
+ Map<String, Queue> queues = new HashMap<String, Queue>();
|
|
|
+
|
|
|
+ final static int GB = 1024;
|
|
|
+ final static String DEFAULT_RACK = "/default";
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ cs = new CapacityScheduler();
|
|
|
+ rmContext = TestUtils.getMockRMContext();
|
|
|
+
|
|
|
+ csConf =
|
|
|
+ new CapacitySchedulerConfiguration();
|
|
|
+ setupQueueConfiguration(csConf);
|
|
|
+
|
|
|
+
|
|
|
+ csContext = mock(CapacitySchedulerContext.class);
|
|
|
+ when(csContext.getConfiguration()).thenReturn(csConf);
|
|
|
+ when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
|
|
|
+ when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
|
|
|
+ root =
|
|
|
+ CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
|
|
+ queues, queues,
|
|
|
+ CapacityScheduler.queueComparator,
|
|
|
+ CapacityScheduler.applicationComparator,
|
|
|
+ TestUtils.spyHook);
|
|
|
+
|
|
|
+ cs.reinitialize(csConf, null, rmContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final String A = "a";
|
|
|
+ private static final String B = "b";
|
|
|
+ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
|
|
|
+ conf.setCapacity(CapacityScheduler.ROOT, 100);
|
|
|
+
|
|
|
+ final String Q_A = CapacityScheduler.ROOT + "." + A;
|
|
|
+ conf.setCapacity(Q_A, 10);
|
|
|
+
|
|
|
+ final String Q_B = CapacityScheduler.ROOT + "." + B;
|
|
|
+ conf.setCapacity(Q_B, 90);
|
|
|
+
|
|
|
+ LOG.info("Setup top-level queues a and b");
|
|
|
+ }
|
|
|
+
|
|
|
+ private LeafQueue stubLeafQueue(LeafQueue queue) {
|
|
|
+
|
|
|
+ // Mock some methods for ease in these unit tests
|
|
|
+
|
|
|
+ // 1. LeafQueue.createContainer to return dummy containers
|
|
|
+ doAnswer(
|
|
|
+ new Answer<Container>() {
|
|
|
+ @Override
|
|
|
+ public Container answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
+ final SchedulerApp application =
|
|
|
+ (SchedulerApp)(invocation.getArguments()[0]);
|
|
|
+ final ContainerId containerId =
|
|
|
+ TestUtils.getMockContainerId(application);
|
|
|
+
|
|
|
+ Container container = TestUtils.getMockContainer(
|
|
|
+ containerId,
|
|
|
+ ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(),
|
|
|
+ (Resource)(invocation.getArguments()[2]));
|
|
|
+ return container;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ).
|
|
|
+ when(queue).createContainer(
|
|
|
+ any(SchedulerApp.class),
|
|
|
+ any(SchedulerNode.class),
|
|
|
+ any(Resource.class));
|
|
|
+
|
|
|
+ // 2. Stub out LeafQueue.parent.completedContainer
|
|
|
+ Queue parent = queue.getParent();
|
|
|
+ doNothing().when(parent).completedContainer(
|
|
|
+ any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
|
|
|
+ any(RMContainer.class), any(RMContainerEventType.class));
|
|
|
+
|
|
|
+ return queue;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSingleQueueWithOneUser() throws Exception {
|
|
|
+
|
|
|
+ // Manipulate queue 'a'
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ // Users
|
|
|
+ final String user_0 = "user_0";
|
|
|
+
|
|
|
+ // Submit applications
|
|
|
+ final ApplicationAttemptId appAttemptId_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ SchedulerApp app_0 =
|
|
|
+ new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
|
|
|
+ a.submitApplication(app_0, user_0, A);
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId_1 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
+ SchedulerApp app_1 =
|
|
|
+ new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
|
|
|
+ a.submitApplication(app_1, user_0, A); // same user
|
|
|
+
|
|
|
+ // Setup some nodes
|
|
|
+ String host_0 = "host_0";
|
|
|
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
|
|
+
|
|
|
+ final int numNodes = 1;
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+
|
|
|
+ // Setup resource-requests
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+ app_0.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ app_1.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ // Start testing...
|
|
|
+
|
|
|
+ // Only 1 container
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(1*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
|
|
+ // you can get one container more than user-limit
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(2*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Can't allocate 3rd due to user-limit
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(2*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Bump up user-limit-factor, now allocate should work
|
|
|
+ a.setUserLimitFactor(10);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(3*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // One more should work, for app_1, due to user-limit-factor
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(4*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Test max-capacity
|
|
|
+ // Now - no more allocs since we are at max-cap
|
|
|
+ a.setMaxCapacity(0.5f);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(4*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Release each container from app_0
|
|
|
+ for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
|
|
+ a.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+ assertEquals(1*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Release each container from app_1
|
|
|
+ for (RMContainer rmContainer : app_1.getLiveContainers()) {
|
|
|
+ a.completedContainer(clusterResource, app_1, node_0, rmContainer,
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+ assertEquals(0*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSingleQueueWithMultipleUsers() throws Exception {
|
|
|
+
|
|
|
+ // Mock the queue
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ // Users
|
|
|
+ final String user_0 = "user_0";
|
|
|
+ final String user_1 = "user_1";
|
|
|
+ final String user_2 = "user_2";
|
|
|
+
|
|
|
+ // Submit applications
|
|
|
+ final ApplicationAttemptId appAttemptId_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ SchedulerApp app_0 =
|
|
|
+ new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
|
|
|
+ a.submitApplication(app_0, user_0, A);
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId_1 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
+ SchedulerApp app_1 =
|
|
|
+ new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
|
|
|
+ a.submitApplication(app_1, user_0, A); // same user
|
|
|
+
|
|
|
+ // Setup some nodes
|
|
|
+ String host_0 = "host_0";
|
|
|
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
|
|
+
|
|
|
+ final int numNodes = 1;
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+
|
|
|
+ // Setup resource-requests
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+ app_0.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ app_1.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start testing...
|
|
|
+ */
|
|
|
+
|
|
|
+ // Only 1 container
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(1*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
|
|
+ // you can get one container more than user-limit
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(2*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Can't allocate 3rd due to user-limit
|
|
|
+ a.setUserLimit(25);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(2*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Submit more apps
|
|
|
+ final ApplicationAttemptId appAttemptId_2 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(2, 0);
|
|
|
+ SchedulerApp app_2 =
|
|
|
+ new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null);
|
|
|
+ a.submitApplication(app_2, user_1, A);
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId_3 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(3, 0);
|
|
|
+ SchedulerApp app_3 =
|
|
|
+ new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null);
|
|
|
+ a.submitApplication(app_3, user_2, A);
|
|
|
+
|
|
|
+ app_2.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ app_3.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ // Now allocations should goto app_2 since
|
|
|
+ // user_0 is at limit inspite of high user-limit-factor
|
|
|
+ a.setUserLimitFactor(10);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(5*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Now allocations should goto app_0 since
|
|
|
+ // user_0 is at user-limit not above it
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(6*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Test max-capacity
|
|
|
+ // Now - no more allocs since we are at max-cap
|
|
|
+ a.setMaxCapacity(0.5f);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(6*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Revert max-capacity and user-limit-factor
|
|
|
+ // Now, allocations should goto app_3 since it's under user-limit
|
|
|
+ a.setMaxCapacity(-1);
|
|
|
+ a.setUserLimitFactor(1);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(7*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Now we should assign to app_3 again since user_2 is under user-limit
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(8*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // 8. Release each container from app_0
|
|
|
+ for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
|
|
+ a.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+ assertEquals(5*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // 9. Release each container from app_2
|
|
|
+ for (RMContainer rmContainer : app_2.getLiveContainers()) {
|
|
|
+ a.completedContainer(clusterResource, app_2, node_0, rmContainer,
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+ assertEquals(2*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // 10. Release each container from app_3
|
|
|
+ for (RMContainer rmContainer : app_3.getLiveContainers()) {
|
|
|
+ a.completedContainer(clusterResource, app_3, node_0, rmContainer,
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+ assertEquals(0*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testReservation() throws Exception {
|
|
|
+
|
|
|
+ // Manipulate queue 'a'
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ // Users
|
|
|
+ final String user_0 = "user_0";
|
|
|
+ final String user_1 = "user_1";
|
|
|
+
|
|
|
+ // Submit applications
|
|
|
+ final ApplicationAttemptId appAttemptId_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ SchedulerApp app_0 =
|
|
|
+ new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
|
|
|
+ a.submitApplication(app_0, user_0, A);
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId_1 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
+ SchedulerApp app_1 =
|
|
|
+ new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
|
|
|
+ a.submitApplication(app_1, user_1, A);
|
|
|
+
|
|
|
+ // Setup some nodes
|
|
|
+ String host_0 = "host_0";
|
|
|
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
|
|
+
|
|
|
+ final int numNodes = 1;
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+
|
|
|
+ // Setup resource-requests
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+ app_0.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ app_1.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
|
|
|
+ recordFactory)));
|
|
|
+
|
|
|
+ // Start testing...
|
|
|
+
|
|
|
+ // Only 1 container
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(1*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
|
|
+ // you can get one container more than user-limit
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(2*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // Now, reservation should kick in for app_1
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(6*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
|
|
+ assertEquals(2*GB, node_0.getUsedResource().getMemory());
|
|
|
+
|
|
|
+ // Now free 1 container from app_0 i.e. 1G
|
|
|
+ a.completedContainer(clusterResource, app_0, node_0,
|
|
|
+ app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(5*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
|
|
+ assertEquals(1*GB, node_0.getUsedResource().getMemory());
|
|
|
+
|
|
|
+ // Now finish another container from app_0 and fulfill the reservation
|
|
|
+ a.completedContainer(clusterResource, app_0, node_0,
|
|
|
+ app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ assertEquals(4*GB, a.getUsedResources().getMemory());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
|
|
|
+ assertEquals(4*GB, node_0.getUsedResource().getMemory());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLocalityScheduling() throws Exception {
|
|
|
+
|
|
|
+ // Manipulate queue 'a'
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ // User
|
|
|
+ String user_0 = "user_0";
|
|
|
+
|
|
|
+ // Submit applications
|
|
|
+ final ApplicationAttemptId appAttemptId_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ SchedulerApp app_0 =
|
|
|
+ spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
|
|
|
+ a.submitApplication(app_0, user_0, A);
|
|
|
+
|
|
|
+ // Setup some nodes and racks
|
|
|
+ String host_0 = "host_0";
|
|
|
+ String rack_0 = "rack_0";
|
|
|
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
|
|
|
+
|
|
|
+ String host_1 = "host_1";
|
|
|
+ String rack_1 = "rack_1";
|
|
|
+ SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
|
|
|
+
|
|
|
+ String host_2 = "host_2";
|
|
|
+ String rack_2 = "rack_2";
|
|
|
+ SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
|
|
|
+
|
|
|
+ final int numNodes = 3;
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+
|
|
|
+ // Setup resource-requests and submit
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+ List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_0, 1*GB, 1,
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_0, 1*GB, 1,
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, // one extra
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0.updateResourceRequests(app_0_requests_0);
|
|
|
+
|
|
|
+ // Start testing...
|
|
|
+
|
|
|
+ // Start with off switch, shouldn't allocate due to delay scheduling
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
|
|
+ assertEquals(3, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // Another off switch, shouldn't allocate due to delay scheduling
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(2, app_0.getSchedulingOpportunities(priority));
|
|
|
+ assertEquals(3, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // Another off switch, shouldn't allocate due to delay scheduling
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(3, app_0.getSchedulingOpportunities(priority));
|
|
|
+ assertEquals(3, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // Another off switch, now we should allocate
|
|
|
+ // since missedOpportunities=3 and reqdContainers=3
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
|
|
+ assertEquals(2, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // NODE_LOCAL - node_0
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // NODE_LOCAL - node_1
|
|
|
+ a.assignContainers(clusterResource, node_1);
|
|
|
+ verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
|
|
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // Add 1 more request to check for RACK_LOCAL
|
|
|
+ app_0_requests_0.clear();
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // one extra
|
|
|
+ priority, recordFactory));
|
|
|
+ app_0.updateResourceRequests(app_0_requests_0);
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ String host_3 = "host_3"; // on rack_1
|
|
|
+ SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
|
|
+
|
|
|
+ a.assignContainers(clusterResource, node_3);
|
|
|
+ verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
|
|
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testApplicationPriorityScheduling() throws Exception {
|
|
|
+ // Manipulate queue 'a'
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ // User
|
|
|
+ String user_0 = "user_0";
|
|
|
+
|
|
|
+ // Submit applications
|
|
|
+ final ApplicationAttemptId appAttemptId_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ SchedulerApp app_0 =
|
|
|
+ spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
|
|
|
+ a.submitApplication(app_0, user_0, A);
|
|
|
+
|
|
|
+ // Setup some nodes and racks
|
|
|
+ String host_0 = "host_0";
|
|
|
+ String rack_0 = "rack_0";
|
|
|
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
|
|
|
+
|
|
|
+ String host_1 = "host_1";
|
|
|
+ String rack_1 = "rack_1";
|
|
|
+ SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
|
|
|
+
|
|
|
+ String host_2 = "host_2";
|
|
|
+ String rack_2 = "rack_2";
|
|
|
+ SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
|
|
|
+
|
|
|
+ final int numNodes = 3;
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+
|
|
|
+ // Setup resource-requests and submit
|
|
|
+ List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
|
|
|
+
|
|
|
+ // P1
|
|
|
+ Priority priority_1 = TestUtils.createMockPriority(1);
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_0, 1*GB, 1,
|
|
|
+ priority_1, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_0, 1*GB, 1,
|
|
|
+ priority_1, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
|
|
+ priority_1, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
|
|
+ priority_1, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
|
|
|
+ priority_1, recordFactory));
|
|
|
+
|
|
|
+ // P2
|
|
|
+ Priority priority_2 = TestUtils.createMockPriority(2);
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_2, 2*GB, 1,
|
|
|
+ priority_2, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_2, 2*GB, 1,
|
|
|
+ priority_2, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1,
|
|
|
+ priority_2, recordFactory));
|
|
|
+
|
|
|
+ app_0.updateResourceRequests(app_0_requests_0);
|
|
|
+
|
|
|
+ // Start testing...
|
|
|
+
|
|
|
+ // Start with off switch, shouldn't allocate P1 due to delay scheduling
|
|
|
+ // thus, no P2 either!
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
|
|
|
+ assertEquals(2, app_0.getTotalRequiredResources(priority_1));
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ eq(priority_2), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
|
|
+
|
|
|
+ // Another off-switch, shouldn't allocate P1 due to delay scheduling
|
|
|
+ // thus, no P2 either!
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
|
|
|
+ assertEquals(2, app_0.getTotalRequiredResources(priority_1));
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ eq(priority_2), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
|
|
+
|
|
|
+ // Another off-switch, shouldn allocate OFF_SWITCH P1
|
|
|
+ a.assignContainers(clusterResource, node_2);
|
|
|
+ verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
|
|
|
+ eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority_1));
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ eq(priority_2), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
|
|
+
|
|
|
+ // Now, DATA_LOCAL for P1
|
|
|
+ a.assignContainers(clusterResource, node_0);
|
|
|
+ verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
|
|
|
+ eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
|
|
|
+ assertEquals(0, app_0.getTotalRequiredResources(priority_1));
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_0),
|
|
|
+ eq(priority_2), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
|
|
|
+ assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
|
|
+
|
|
|
+ // Now, OFF_SWITCH for P2
|
|
|
+ a.assignContainers(clusterResource, node_1);
|
|
|
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
|
|
|
+ eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
|
|
|
+ assertEquals(0, app_0.getTotalRequiredResources(priority_1));
|
|
|
+ verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1),
|
|
|
+ eq(priority_2), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
|
|
|
+ assertEquals(0, app_0.getTotalRequiredResources(priority_2));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void tearDown() throws Exception {
|
|
|
+ }
|
|
|
+}
|