|
@@ -14,6 +14,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -101,8 +102,10 @@ public class TestApplicationLimits {
|
|
|
|
|
|
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
|
|
when(csContext.getConfiguration()).thenReturn(csConf);
|
|
|
- when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
|
|
|
- when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
|
|
|
+ when(csContext.getMinimumResourceCapability()).
|
|
|
+ thenReturn(Resources.createResource(GB));
|
|
|
+ when(csContext.getMaximumResourceCapability()).
|
|
|
+ thenReturn(Resources.createResource(16*GB));
|
|
|
|
|
|
// Say cluster has 100 nodes of 16G each
|
|
|
Resource clusterResource = Resources.createResource(100 * 16 * GB);
|
|
@@ -227,6 +230,76 @@ public class TestApplicationLimits {
|
|
|
assertEquals(0, queue.getNumPendingApplications(user_1));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testHeadroom() throws Exception {
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration();
|
|
|
+ csConf.setUserLimit(CapacityScheduler.ROOT + "." + A, 25);
|
|
|
+ setupQueueConfiguration(csConf);
|
|
|
+
|
|
|
+ CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
|
|
+ when(csContext.getConfiguration()).thenReturn(csConf);
|
|
|
+ when(csContext.getMinimumResourceCapability()).
|
|
|
+ thenReturn(Resources.createResource(GB));
|
|
|
+ when(csContext.getMaximumResourceCapability()).
|
|
|
+ thenReturn(Resources.createResource(16*GB));
|
|
|
+
|
|
|
+ // Say cluster has 100 nodes of 16G each
|
|
|
+ Resource clusterResource = Resources.createResource(100 * 16 * GB);
|
|
|
+ when(csContext.getClusterResources()).thenReturn(clusterResource);
|
|
|
+
|
|
|
+ Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
|
+ CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
|
|
+ queues, queues,
|
|
|
+ CapacityScheduler.queueComparator,
|
|
|
+ CapacityScheduler.applicationComparator,
|
|
|
+ TestUtils.spyHook);
|
|
|
+
|
|
|
+ // Manipulate queue 'a'
|
|
|
+ LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ String host_0 = "host_0";
|
|
|
+ String rack_0 = "rack_0";
|
|
|
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);
|
|
|
+
|
|
|
+ final String user_0 = "user_0";
|
|
|
+ final String user_1 = "user_1";
|
|
|
+
|
|
|
+ int APPLICATION_ID = 0;
|
|
|
+
|
|
|
+ // Submit first application from user_0, check headroom
|
|
|
+ SchedulerApp app_0_0 = getMockApplication(APPLICATION_ID++, user_0);
|
|
|
+ queue.submitApplication(app_0_0, user_0, A);
|
|
|
+ queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
|
|
+ Resource expectedHeadroom = Resources.createResource(10*16*GB);
|
|
|
+ verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+
|
|
|
+ // Submit second application from user_0, check headroom
|
|
|
+ SchedulerApp app_0_1 = getMockApplication(APPLICATION_ID++, user_0);
|
|
|
+ queue.submitApplication(app_0_1, user_0, A);
|
|
|
+ queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
|
|
+ verify(app_0_0, times(2)).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+ verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));// no change
|
|
|
+
|
|
|
+ // Submit first application from user_1, check for new headroom
|
|
|
+ SchedulerApp app_1_0 = getMockApplication(APPLICATION_ID++, user_1);
|
|
|
+ queue.submitApplication(app_1_0, user_1, A);
|
|
|
+ queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
|
|
+ expectedHeadroom = Resources.createResource(10*16*GB / 2); // changes
|
|
|
+ verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+ verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+ verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+
|
|
|
+ // Now reduce cluster size and check for the smaller headroom
|
|
|
+ clusterResource = Resources.createResource(90*16*GB);
|
|
|
+ queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
|
|
+ expectedHeadroom = Resources.createResource(9*16*GB / 2); // changes
|
|
|
+ verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+ verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+ verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
@After
|
|
|
public void tearDown() {
|
|
|
|