|
@@ -1027,6 +1027,8 @@ public class TestLeafQueue {
|
|
Resource clusterResource =
|
|
Resource clusterResource =
|
|
Resources.createResource(numNodes * (8*GB), numNodes * 16);
|
|
Resources.createResource(numNodes * (8*GB), numNodes * 16);
|
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
|
|
|
// Setup resource-requests
|
|
// Setup resource-requests
|
|
// app_0 asks for 3 3-GB containers
|
|
// app_0 asks for 3 3-GB containers
|
|
@@ -1083,9 +1085,15 @@ public class TestLeafQueue {
|
|
a.assignContainers(clusterResource, node_1,
|
|
a.assignContainers(clusterResource, node_1,
|
|
new ResourceLimits(clusterResource),
|
|
new ResourceLimits(clusterResource),
|
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
|
- assertEquals(12*GB, a.getUsedResources().getMemorySize());
|
|
|
|
- assertEquals(12*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
|
- assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
|
|
|
|
+ assertEquals(9*GB, a.getUsedResources().getMemorySize());
|
|
|
|
+ assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
|
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
|
|
+
|
|
|
|
+ assertEquals(4 * GB,
|
|
|
|
+ app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
|
|
|
+
|
|
|
|
+ assertEquals(1 * GB,
|
|
|
|
+ app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
|
}
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
@@ -1317,11 +1325,6 @@ public class TestLeafQueue {
|
|
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
|
|
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
|
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
|
|
|
- ParentQueue root = (ParentQueue) queues
|
|
|
|
- .get(CapacitySchedulerConfiguration.ROOT);
|
|
|
|
- root.updateClusterResource(clusterResource,
|
|
|
|
- new ResourceLimits(clusterResource));
|
|
|
|
-
|
|
|
|
// Setup resource-requests
|
|
// Setup resource-requests
|
|
Priority priority = TestUtils.createMockPriority(1);
|
|
Priority priority = TestUtils.createMockPriority(1);
|
|
app_0.updateResourceRequests(Collections.singletonList(
|
|
app_0.updateResourceRequests(Collections.singletonList(
|
|
@@ -1340,6 +1343,11 @@ public class TestLeafQueue {
|
|
a.setUserLimit(50);
|
|
a.setUserLimit(50);
|
|
a.setUserLimitFactor(2);
|
|
a.setUserLimitFactor(2);
|
|
|
|
|
|
|
|
+ ParentQueue root = (ParentQueue) queues
|
|
|
|
+ .get(CapacitySchedulerConfiguration.ROOT);
|
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
|
+
|
|
// Now, only user_0 should be active since he is the only one with
|
|
// Now, only user_0 should be active since he is the only one with
|
|
// outstanding requests
|
|
// outstanding requests
|
|
assertEquals("There should only be 1 active user!",
|
|
assertEquals("There should only be 1 active user!",
|
|
@@ -1368,8 +1376,8 @@ public class TestLeafQueue {
|
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
|
|
assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
|
|
- assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
|
|
|
|
-
|
|
|
|
|
|
+ assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
|
|
|
|
+
|
|
// Submit requests for app_1 and set max-cap
|
|
// Submit requests for app_1 and set max-cap
|
|
a.setMaxCapacity(.1f);
|
|
a.setMaxCapacity(.1f);
|
|
root.updateClusterResource(clusterResource,
|
|
root.updateClusterResource(clusterResource,
|
|
@@ -1404,6 +1412,108 @@ public class TestLeafQueue {
|
|
assertEquals(0*GB, app_2.getHeadroom().getMemorySize()); // hit queue max-cap
|
|
assertEquals(0*GB, app_2.getHeadroom().getMemorySize()); // hit queue max-cap
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testUserHeadroomMultiApp() throws Exception {
|
|
|
|
+ // Mock the queue
|
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
|
|
|
|
+ // unset maxCapacity
|
|
|
|
+ a.setMaxCapacity(1.0f);
|
|
|
|
+
|
|
|
|
+ // Users
|
|
|
|
+ final String user_0 = "user_0";
|
|
|
|
+ final String user_1 = "user_1";
|
|
|
|
+
|
|
|
|
+ // Submit applications
|
|
|
|
+ final ApplicationAttemptId appAttemptId_0 = TestUtils
|
|
|
|
+ .getMockApplicationAttemptId(0, 0);
|
|
|
|
+ FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
|
|
|
+ a.getAbstractUsersManager(), spyRMContext);
|
|
|
|
+ a.submitApplicationAttempt(app_0, user_0);
|
|
|
|
+
|
|
|
|
+ final ApplicationAttemptId appAttemptId_1 = TestUtils
|
|
|
|
+ .getMockApplicationAttemptId(1, 0);
|
|
|
|
+ FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
|
|
|
+ a.getAbstractUsersManager(), spyRMContext);
|
|
|
|
+ a.submitApplicationAttempt(app_1, user_0); // same user
|
|
|
|
+
|
|
|
|
+ final ApplicationAttemptId appAttemptId_2 = TestUtils
|
|
|
|
+ .getMockApplicationAttemptId(2, 0);
|
|
|
|
+ FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a,
|
|
|
|
+ a.getAbstractUsersManager(), spyRMContext);
|
|
|
|
+ a.submitApplicationAttempt(app_2, user_1);
|
|
|
|
+
|
|
|
|
+ // Setup some nodes
|
|
|
|
+ String host_0 = "127.0.0.1";
|
|
|
|
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
|
|
|
|
+ 16 * GB);
|
|
|
|
+ String host_1 = "127.0.0.2";
|
|
|
|
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
|
|
|
|
+ 16 * GB);
|
|
|
|
+
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
|
+ app_1, app_2.getApplicationAttemptId(), app_2);
|
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
|
+ node_0, node_1.getNodeID(), node_1);
|
|
|
|
+
|
|
|
|
+ final int numNodes = 2;
|
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
|
|
|
|
+ 1);
|
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
|
+
|
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
|
+
|
|
|
|
+ app_0.updateResourceRequests(
|
|
|
|
+ Collections.singletonList(TestUtils.createResourceRequest(
|
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory)));
|
|
|
|
+
|
|
|
|
+ applyCSAssignment(clusterResource,
|
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
|
|
|
|
+ a, nodes, apps);
|
|
|
|
+ assertEquals(1 * GB, a.getUsedResources().getMemorySize());
|
|
|
|
+ assertEquals(1 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
|
+ assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize());
|
|
|
|
+ // Now, headroom is the same for all apps for a given user + queue combo
|
|
|
|
+ // and a change to any app's headroom is reflected for all the user's apps
|
|
|
|
+ // once those apps are active/have themselves calculated headroom for
|
|
|
|
+ // allocation at least one time
|
|
|
|
+ assertEquals(2 * GB, app_0.getHeadroom().getMemorySize());
|
|
|
|
+ assertEquals(2 * GB, app_1.getHeadroom().getMemorySize());// not yet active
|
|
|
|
+ assertEquals(3 * GB, app_2.getHeadroom().getMemorySize());// not yet active
|
|
|
|
+
|
|
|
|
+ app_1.updateResourceRequests(
|
|
|
|
+ Collections.singletonList(TestUtils.createResourceRequest(
|
|
|
|
+ ResourceRequest.ANY, 1 * GB, 2, true, priority, recordFactory)));
|
|
|
|
+
|
|
|
|
+ applyCSAssignment(clusterResource,
|
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
|
|
|
|
+ a, nodes, apps);
|
|
|
|
+ assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
|
+ assertEquals(1 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
|
+ assertEquals(1 * GB, app_1.getCurrentConsumption().getMemorySize());
|
|
|
|
+ assertEquals(1 * GB, app_0.getHeadroom().getMemorySize());
|
|
|
|
+ assertEquals(1 * GB, app_1.getHeadroom().getMemorySize());// now active
|
|
|
|
+ assertEquals(3 * GB, app_2.getHeadroom().getMemorySize());// not yet active
|
|
|
|
+
|
|
|
|
+ // Complete container and verify that headroom is updated, for both apps
|
|
|
|
+ // for the user
|
|
|
|
+ RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
|
|
|
|
+ a.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
|
|
|
+ ContainerStatus.newInstance(rmContainer.getContainerId(),
|
|
|
|
+ ContainerState.COMPLETE, "",
|
|
|
|
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
|
|
|
+ RMContainerEventType.KILL, null, true);
|
|
|
|
+
|
|
|
|
+ assertEquals(2 * GB, app_0.getHeadroom().getMemorySize());
|
|
|
|
+ assertEquals(2 * GB, app_1.getHeadroom().getMemorySize());
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testSingleQueueWithMultipleUsers() throws Exception {
|
|
public void testSingleQueueWithMultipleUsers() throws Exception {
|
|
|
|
|