|
@@ -2894,11 +2894,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Test that NO OPPORTUNISTIC containers can be allocated on a node that
|
|
|
|
- * is fully allocated and with a very high utilization.
|
|
|
|
|
|
+ * Test that NO OPPORTUNISTIC containers can be allocated on a node where
|
|
|
|
+ * the memory is fully allocated and with a very high utilization.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
- public void testAllocateNoOpportunisticContainersOnBusyNode()
|
|
|
|
|
|
+ public void testAllocateNoOpportunisticContainersOnMemoryBusyNode()
|
|
throws IOException {
|
|
throws IOException {
|
|
conf.setBoolean(
|
|
conf.setBoolean(
|
|
YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
|
|
YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
|
|
@@ -2918,7 +2918,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
- // Add a node with 4G of memory and 4 vcores and an overallocation
|
|
|
|
|
|
+ // Add a node with 2G of memory and 2 vcores and an overallocation
|
|
// threshold of 0.75f and 0.75f for memory and cpu respectively
|
|
// threshold of 0.75f and 0.75f for memory and cpu respectively
|
|
OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
|
|
OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
|
|
ResourceThresholds.newInstance(0.75f, 0.75f));
|
|
ResourceThresholds.newInstance(0.75f, 0.75f));
|
|
@@ -2928,7 +2928,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// create a scheduling request that takes up the node's full memory
|
|
// create a scheduling request that takes up the node's full memory
|
|
ApplicationAttemptId appAttempt1 =
|
|
ApplicationAttemptId appAttempt1 =
|
|
- createSchedulingRequest(2048, "queue1", "user1", 1);
|
|
|
|
|
|
+ createSchedulingRequest(2048, 1, "queue1", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
|
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
@@ -2939,18 +2939,18 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
ExecutionType.GUARANTEED,
|
|
ExecutionType.GUARANTEED,
|
|
allocatedContainers1.get(0).getExecutionType());
|
|
allocatedContainers1.get(0).getExecutionType());
|
|
|
|
|
|
- // node utilization shoots up after the container runs on the node
|
|
|
|
|
|
+ // memory utilization shoots up after the container runs on the node
|
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
ContainerExitStatus.SUCCESS);
|
|
ContainerExitStatus.SUCCESS);
|
|
node.updateContainersInfoAndUtilization(
|
|
node.updateContainersInfoAndUtilization(
|
|
new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
Collections.emptyList()),
|
|
Collections.emptyList()),
|
|
- ResourceUtilization.newInstance(2000, 0, 0.8f));
|
|
|
|
|
|
+ ResourceUtilization.newInstance(2000, 0, 0.0f));
|
|
|
|
|
|
// create another scheduling request
|
|
// create another scheduling request
|
|
ApplicationAttemptId appAttempt2
|
|
ApplicationAttemptId appAttempt2
|
|
- = createSchedulingRequest(100, "queue2", "user1", 1);
|
|
|
|
|
|
+ = createSchedulingRequest(100, 1, "queue2", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
List<Container> allocatedContainers2 =
|
|
List<Container> allocatedContainers2 =
|
|
scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
|
|
scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
|
|
@@ -2975,13 +2975,99 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Test that NO OPPORTUNISTIC containers can be allocated on a node where
|
|
|
|
+ * the memory is fully allocated and with a very high utilization.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testAllocateNoOpportunisticContainersOnCPUBusyNode()
|
|
|
|
+ throws Exception {
|
|
|
|
+ conf.setBoolean(
|
|
|
|
+ YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
|
|
|
|
+ // disable resource request normalization in fair scheduler
|
|
|
|
+ int memoryAllocationIncrement = conf.getInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ FairSchedulerConfiguration.
|
|
|
|
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
|
|
|
|
+ int memoryAllocationMinimum = conf.getInt(
|
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
|
|
|
|
+ try {
|
|
|
|
+ scheduler.init(conf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
|
|
+ // Add a node with 4G of memory and 4 vcores and an overallocation
|
|
|
|
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
|
|
|
|
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
|
|
|
|
+ ResourceThresholds.newInstance(0.75f, 0.75f));
|
|
|
|
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
|
|
|
|
+ Resources.createResource(4096, 4), overAllocationInfo);
|
|
|
|
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ // create a scheduling request that takes up the node's full CPU
|
|
|
|
+ ApplicationAttemptId appAttempt1 =
|
|
|
|
+ createSchedulingRequest(1024, 4, "queue1", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ List<Container> allocatedContainers1 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers1.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.GUARANTEED,
|
|
|
|
+ allocatedContainers1.get(0).getExecutionType());
|
|
|
|
+ assertEquals(4, scheduler.getQueueManager().getQueue("queue1").
|
|
|
|
+ getGuaranteedResourceUsage().getVirtualCores());
|
|
|
|
+
|
|
|
|
+ // node utilization shoots up after the container runs on the node
|
|
|
|
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
|
|
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
|
|
+ ContainerExitStatus.SUCCESS);
|
|
|
|
+ node.updateContainersInfoAndUtilization(
|
|
|
|
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
|
|
+ Collections.emptyList()),
|
|
|
|
+ ResourceUtilization.newInstance(0, 0, 3.5f));
|
|
|
|
+
|
|
|
|
+ // create another scheduling request that should NOT be satisfied
|
|
|
|
+ // immediately because the node cpu utilization is over its
|
|
|
|
+ // overallocation threshold
|
|
|
|
+ ApplicationAttemptId appAttempt3
|
|
|
|
+ = createSchedulingRequest(1024, 1, "queue2", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ List<Container> allocatedContainers3 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue("Expecting no containers allocated",
|
|
|
|
+ allocatedContainers3.size() == 0);
|
|
|
|
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
|
|
|
|
+ getOpportunisticResourceUsage().getVirtualCores());
|
|
|
|
+
|
|
|
|
+ // verify that a reservation is made for the second resource request
|
|
|
|
+ Resource reserved = scheduler.getNode(node.getNodeID()).
|
|
|
|
+ getReservedContainer().getReservedResource();
|
|
|
|
+ assertTrue("Expect a reservation made for the second resource request",
|
|
|
|
+ reserved.equals(Resource.newInstance(1024, 1)));
|
|
|
|
+ } finally {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ false);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationMinimum);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationIncrement);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Test that OPPORTUNISTIC containers can be allocated on a node with low
|
|
* Test that OPPORTUNISTIC containers can be allocated on a node with low
|
|
- * utilization even though there is not enough unallocated resource on the
|
|
|
|
- * node to accommodate the request.
|
|
|
|
|
|
+ * memory utilization even though there is not enough unallocated resource
|
|
|
|
+ * on the node to accommodate the request.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
- public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode()
|
|
|
|
|
|
+ public void
|
|
|
|
+ testAllocateOpportunisticContainersOnMemoryPartiallyOverAllocatedNode()
|
|
throws IOException {
|
|
throws IOException {
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
true);
|
|
true);
|
|
@@ -3010,9 +3096,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
Resources.createResource(4096, 4), overAllocationInfo);
|
|
Resources.createResource(4096, 4), overAllocationInfo);
|
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
|
|
- // create a scheduling request that leaves some unallocated resources
|
|
|
|
|
|
+ // create a scheduling request that leaves some unallocated memory
|
|
ApplicationAttemptId appAttempt1 =
|
|
ApplicationAttemptId appAttempt1 =
|
|
- createSchedulingRequest(3600, "queue1", "user1", 1);
|
|
|
|
|
|
+ createSchedulingRequest(3600, 1, "queue1", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
|
|
assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
@@ -3023,19 +3109,19 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
ExecutionType.GUARANTEED,
|
|
ExecutionType.GUARANTEED,
|
|
allocatedContainers1.get(0).getExecutionType());
|
|
allocatedContainers1.get(0).getExecutionType());
|
|
|
|
|
|
- // node utilization is low after the container is launched on the node
|
|
|
|
|
|
+ // memory utilization is low after the container is launched on the node
|
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
ContainerExitStatus.SUCCESS);
|
|
ContainerExitStatus.SUCCESS);
|
|
node.updateContainersInfoAndUtilization(
|
|
node.updateContainersInfoAndUtilization(
|
|
new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
Collections.emptyList()),
|
|
Collections.emptyList()),
|
|
- ResourceUtilization.newInstance(1800, 0, 0.5f));
|
|
|
|
|
|
+ ResourceUtilization.newInstance(1800, 0, 0.0f));
|
|
|
|
|
|
- // create another scheduling request that asks for more than what's left
|
|
|
|
|
|
+ // create another scheduling request that asks for more than the memory
|
|
// unallocated on the node but can be served with overallocation.
|
|
// unallocated on the node but can be served with overallocation.
|
|
ApplicationAttemptId appAttempt2 =
|
|
ApplicationAttemptId appAttempt2 =
|
|
- createSchedulingRequest(1024, "queue2", "user1", 1);
|
|
|
|
|
|
+ createSchedulingRequest(1024, 1, "queue2", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
|
getOpportunisticResourceUsage().getMemorySize());
|
|
getOpportunisticResourceUsage().getMemorySize());
|
|
@@ -3063,11 +3149,98 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Test opportunistic containers can be allocated on a node that is fully
|
|
|
|
- * allocated but whose utilization is very low.
|
|
|
|
|
|
+ * Test that OPPORTUNISTIC containers can be allocated on a node with low
|
|
|
|
+ * cpu utilization even though there is not enough unallocated resource
|
|
|
|
+ * on the node to accommodate the request.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void
|
|
|
|
+ testAllocateOpportunisticContainersOnCPUPartiallyOverAllocatedNode()
|
|
|
|
+ throws IOException {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ true);
|
|
|
|
+ // disable resource request normalization in fair scheduler
|
|
|
|
+ int memoryAllocationIncrement = conf.getInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ FairSchedulerConfiguration.
|
|
|
|
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
|
|
|
|
+ int memoryAllocationMinimum = conf.getInt(
|
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ scheduler.init(conf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
|
|
+ // Add a node with 4G of memory and 4 vcores and an overallocation
|
|
|
|
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
|
|
|
|
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
|
|
|
|
+ ResourceThresholds.newInstance(0.75f, 0.75f));
|
|
|
|
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
|
|
|
|
+ Resources.createResource(4096, 4), overAllocationInfo);
|
|
|
|
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ // create a scheduling request that leaves some unallocated CPU resources
|
|
|
|
+ ApplicationAttemptId appAttempt1 =
|
|
|
|
+ createSchedulingRequest(1024, 3, "queue1", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ assertEquals(3, scheduler.getQueueManager().getQueue("queue1").
|
|
|
|
+ getGuaranteedResourceUsage().getVirtualCores());
|
|
|
|
+ List<Container> allocatedContainers1 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers1.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.GUARANTEED,
|
|
|
|
+ allocatedContainers1.get(0).getExecutionType());
|
|
|
|
+
|
|
|
|
+ // cpu utilization is low after the container is launched on the node
|
|
|
|
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
|
|
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
|
|
+ ContainerExitStatus.SUCCESS);
|
|
|
|
+ node.updateContainersInfoAndUtilization(
|
|
|
|
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
|
|
+ Collections.emptyList()),
|
|
|
|
+ ResourceUtilization.newInstance(0, 0, 1.0f));
|
|
|
|
+
|
|
|
|
+ // create another scheduling request that asks for more than what's left
|
|
|
|
+ // unallocated on the node but can be served with overallocation.
|
|
|
|
+ ApplicationAttemptId appAttempt2 =
|
|
|
|
+ createSchedulingRequest(1024, 2, "queue2", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ assertEquals(2, scheduler.getQueueManager().getQueue("queue2").
|
|
|
|
+ getOpportunisticResourceUsage().getVirtualCores());
|
|
|
|
+ List<Container> allocatedContainers2 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers2.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.OPPORTUNISTIC,
|
|
|
|
+ allocatedContainers2.get(0).getExecutionType());
|
|
|
|
+
|
|
|
|
+ // verify that no reservation is made for the second request given
|
|
|
|
+ // that it's satisfied by an OPPORTUNISTIC container allocation.
|
|
|
|
+ assertTrue("No reservation should be made because we have satisfied" +
|
|
|
|
+ " the second request with an OPPORTUNISTIC container allocation",
|
|
|
|
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
|
|
|
|
+ } finally {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ false);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationMinimum);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationIncrement);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * Test opportunistic containers can be allocated on a node where the memory
|
|
|
|
+ * is fully allocated but whose utilization is very low.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
- public void testAllocateOpportunisticContainersOnFullyAllocatedNode()
|
|
|
|
|
|
+ public void testAllocateOpportunisticContainersOnMemoryFullyAllocatedNode()
|
|
throws IOException {
|
|
throws IOException {
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
true);
|
|
true);
|
|
@@ -3096,9 +3269,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
Resources.createResource(4096, 4), overAllocationInfo);
|
|
Resources.createResource(4096, 4), overAllocationInfo);
|
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
|
|
- // create a scheduling request that takes up the whole node
|
|
|
|
|
|
+ // create a scheduling request that takes up all memory
|
|
ApplicationAttemptId appAttempt1 = createSchedulingRequest(
|
|
ApplicationAttemptId appAttempt1 = createSchedulingRequest(
|
|
- 4096, "queue1", "user1", 4);
|
|
|
|
|
|
+ 4096, 1, "queue1", "user1", 4);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
|
|
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
@@ -3109,20 +3282,20 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
ExecutionType.GUARANTEED,
|
|
ExecutionType.GUARANTEED,
|
|
allocatedContainers1.get(0).getExecutionType());
|
|
allocatedContainers1.get(0).getExecutionType());
|
|
|
|
|
|
- // node utilization is low after the container is launched on the node
|
|
|
|
|
|
+ // memory utilization is low after the container is launched on the node
|
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
ContainerExitStatus.SUCCESS);
|
|
ContainerExitStatus.SUCCESS);
|
|
node.updateContainersInfoAndUtilization(
|
|
node.updateContainersInfoAndUtilization(
|
|
new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
Collections.emptyList()),
|
|
Collections.emptyList()),
|
|
- ResourceUtilization.newInstance(1800, 0, 0.5f));
|
|
|
|
|
|
+ ResourceUtilization.newInstance(1800, 0, 0.0f));
|
|
|
|
|
|
// create another scheduling request now that there is no unallocated
|
|
// create another scheduling request now that there is no unallocated
|
|
- // resources left on the node, the request should be served with an
|
|
|
|
|
|
+ // memory resources left on the node, the request should be served with an
|
|
// allocation of an opportunistic container
|
|
// allocation of an opportunistic container
|
|
ApplicationAttemptId appAttempt2 = createSchedulingRequest(
|
|
ApplicationAttemptId appAttempt2 = createSchedulingRequest(
|
|
- 1024, "queue2", "user1", 1);
|
|
|
|
|
|
+ 1024, 1, "queue2", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
|
getOpportunisticResourceUsage().getMemorySize());
|
|
getOpportunisticResourceUsage().getMemorySize());
|
|
@@ -3149,6 +3322,93 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Test opportunistic containers can be allocated on a node where the CPU
|
|
|
|
+ * is fully allocated but whose utilization is very low.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testAllocateOpportunisticContainersOnCPUFullyAllocatedNode()
|
|
|
|
+ throws IOException {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ true);
|
|
|
|
+ // disable resource request normalization in fair scheduler
|
|
|
|
+ int memoryAllocationIncrement = conf.getInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ FairSchedulerConfiguration.
|
|
|
|
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
|
|
|
|
+ int memoryAllocationMinimum = conf.getInt(
|
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ scheduler.init(conf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
|
|
+ // Add a node with 4G of memory and 4 vcores and an overallocation
|
|
|
|
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
|
|
|
|
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
|
|
|
|
+ ResourceThresholds.newInstance(0.75f, 0.75f));
|
|
|
|
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
|
|
|
|
+ Resources.createResource(4096, 4), overAllocationInfo);
|
|
|
|
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ // create a scheduling request that takes up all vcores
|
|
|
|
+ ApplicationAttemptId appAttempt1 = createSchedulingRequest(
|
|
|
|
+ 1024, 4, "queue1", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ assertEquals(4, scheduler.getQueueManager().getQueue("queue1").
|
|
|
|
+ getGuaranteedResourceUsage().getVirtualCores());
|
|
|
|
+ List<Container> allocatedContainers1 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers1.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.GUARANTEED,
|
|
|
|
+ allocatedContainers1.get(0).getExecutionType());
|
|
|
|
+
|
|
|
|
+ // cpu utilization is low after the container is launched on the node
|
|
|
|
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
|
|
|
|
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
|
|
+ ContainerExitStatus.SUCCESS);
|
|
|
|
+ node.updateContainersInfoAndUtilization(
|
|
|
|
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
|
|
|
|
+ Collections.emptyList()),
|
|
|
|
+ ResourceUtilization.newInstance(0, 0, 1.0f));
|
|
|
|
+
|
|
|
|
+ // create another scheduling request now that there is no unallocated
|
|
|
|
+ // cpu resources left on the node, the request should be served with an
|
|
|
|
+ // allocation of an opportunistic container
|
|
|
|
+ ApplicationAttemptId appAttempt2 = createSchedulingRequest(
|
|
|
|
+ 1024, 2, "queue2", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ assertEquals(2, scheduler.getQueueManager().getQueue("queue2").
|
|
|
|
+ getOpportunisticResourceUsage().getVirtualCores());
|
|
|
|
+ List<Container> allocatedContainers2 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers2.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.OPPORTUNISTIC,
|
|
|
|
+ allocatedContainers2.get(0).getExecutionType());
|
|
|
|
+
|
|
|
|
+ // verify that no reservation is made for the second request given
|
|
|
|
+ // that it's satisfied by an OPPORTUNISTIC container allocation.
|
|
|
|
+ assertTrue("No reservation should be made because we have satisfied" +
|
|
|
|
+ " the second request with an OPPORTUNISTIC container allocation",
|
|
|
|
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
|
|
|
|
+ } finally {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ false);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationMinimum);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationIncrement);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Test opportunistic containers can be allocated on a node with a low
|
|
* Test opportunistic containers can be allocated on a node with a low
|
|
* utilization even though there are GUARANTEED containers allocated.
|
|
* utilization even though there are GUARANTEED containers allocated.
|
|
@@ -3277,7 +3537,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
* @throws Exception
|
|
* @throws Exception
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
- public void testMaxOverallocationPerNode() throws Exception {
|
|
|
|
|
|
+ public void testMaxMemoryOverallocationPerNode() throws Exception {
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
true);
|
|
true);
|
|
// disable resource request normalization in fair scheduler
|
|
// disable resource request normalization in fair scheduler
|
|
@@ -3309,9 +3569,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
Resources.createResource(1024, 1), overAllocationInfo);
|
|
Resources.createResource(1024, 1), overAllocationInfo);
|
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
|
|
- // create a scheduling request that takes up the whole node
|
|
|
|
|
|
+ // create a scheduling request that takes up all memory on the node
|
|
ApplicationAttemptId appAttempt1 =
|
|
ApplicationAttemptId appAttempt1 =
|
|
- createSchedulingRequest(1024, "queue1", "user1", 1);
|
|
|
|
|
|
+ createSchedulingRequest(1024, 1, "queue1", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
|
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
getGuaranteedResourceUsage().getMemorySize());
|
|
@@ -3332,7 +3592,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
ResourceUtilization.newInstance(0, 0, 0.0f));
|
|
ResourceUtilization.newInstance(0, 0, 0.0f));
|
|
|
|
|
|
// create a scheduling request that should get allocated an OPPORTUNISTIC
|
|
// create a scheduling request that should get allocated an OPPORTUNISTIC
|
|
- // container because the node utilization is zero
|
|
|
|
|
|
+ // container because the memory utilization is zero
|
|
ApplicationAttemptId appAttempt2 =
|
|
ApplicationAttemptId appAttempt2 =
|
|
createSchedulingRequest(1024, "queue2", "user1", 1);
|
|
createSchedulingRequest(1024, "queue2", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
@@ -3355,7 +3615,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
ResourceUtilization.newInstance(0, 0, 0.0f));
|
|
ResourceUtilization.newInstance(0, 0, 0.0f));
|
|
|
|
|
|
// create another scheduling request that should not get any allocation
|
|
// create another scheduling request that should not get any allocation
|
|
- // because of the max overallocation on the node will be exceeded.
|
|
|
|
|
|
+ // because of the max memory overallocation on the node will be exceeded.
|
|
ApplicationAttemptId appAttempt3 =
|
|
ApplicationAttemptId appAttempt3 =
|
|
createSchedulingRequest(1024, "queue3", "user1", 1);
|
|
createSchedulingRequest(1024, "queue3", "user1", 1);
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
@@ -3379,6 +3639,112 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Test that max CPU overallocation per node is enforced by Fair Scheduler.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testMaxCPUOverallocationPerNode() throws Exception {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ true);
|
|
|
|
+ // disable resource request normalization in fair scheduler
|
|
|
|
+ int memoryAllocationIncrement = conf.getInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ FairSchedulerConfiguration.
|
|
|
|
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
|
|
|
|
+ int memoryAllocationMinimum = conf.getInt(
|
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
|
|
|
|
+ float maxOverallocationRatio = conf.getFloat(
|
|
|
|
+ YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
|
|
|
|
+ YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
|
|
|
|
+ conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, 1.0f);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ scheduler.init(conf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
|
|
+ // Add a node with 4G of memory and 2 vcores and an overallocation
|
|
|
|
+ // threshold of 1.0f and 1.0f for memory and cpu respectively
|
|
|
|
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
|
|
|
|
+ ResourceThresholds.newInstance(1f, 1f));
|
|
|
|
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
|
|
|
|
+ Resources.createResource(4096, 2), overAllocationInfo);
|
|
|
|
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ // create a scheduling request that takes up all CPU on the node
|
|
|
|
+ ApplicationAttemptId appAttempt1 =
|
|
|
|
+ createSchedulingRequest(1024, 2, "queue1", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ assertEquals(2, scheduler.getQueueManager().getQueue("queue1").
|
|
|
|
+ getGuaranteedResourceUsage().getVirtualCores());
|
|
|
|
+ List<Container> allocatedContainers1 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers1.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.GUARANTEED,
|
|
|
|
+ allocatedContainers1.get(0).getExecutionType());
|
|
|
|
+
|
|
|
|
+ // cpu utilization is zero after the container runs
|
|
|
|
+ ContainerStatus containerStatus1 = ContainerStatus.newInstance(
|
|
|
|
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
|
|
|
|
+ ContainerExitStatus.SUCCESS);
|
|
|
|
+ node.updateContainersInfoAndUtilization(
|
|
|
|
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus1),
|
|
|
|
+ Collections.emptyList()),
|
|
|
|
+ ResourceUtilization.newInstance(0, 0, 0.0f));
|
|
|
|
+
|
|
|
|
+ // create a scheduling request that should get allocated an OPPORTUNISTIC
|
|
|
|
+ // container because the cpu utilization is zero
|
|
|
|
+ ApplicationAttemptId appAttempt2 =
|
|
|
|
+ createSchedulingRequest(1024, 2, "queue2", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ List<Container> allocatedContainers2 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers2.size() == 1);
|
|
|
|
+ assertEquals("unexpected container execution type",
|
|
|
|
+ ExecutionType.OPPORTUNISTIC,
|
|
|
|
+ allocatedContainers2.get(0).getExecutionType());
|
|
|
|
+ assertEquals(2, scheduler.getQueueManager().getQueue("queue2").
|
|
|
|
+ getOpportunisticResourceUsage().getVirtualCores());
|
|
|
|
+
|
|
|
|
+ // node utilization is still zero after the container runs
|
|
|
|
+ ContainerStatus containerStatus2 = ContainerStatus.newInstance(
|
|
|
|
+ allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
|
|
|
|
+ ContainerExitStatus.SUCCESS);
|
|
|
|
+ node.updateContainersInfoAndUtilization(
|
|
|
|
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus2),
|
|
|
|
+ Collections.emptyList()),
|
|
|
|
+ ResourceUtilization.newInstance(0, 0, 0.0f));
|
|
|
|
+
|
|
|
|
+ // create another scheduling request that should not get any allocation
|
|
|
|
+ // because of the max CPU overallocation on the node will be exceeded.
|
|
|
|
+ ApplicationAttemptId appAttempt3 =
|
|
|
|
+ createSchedulingRequest(1024, 1, "queue3", "user1", 1);
|
|
|
|
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
|
|
|
|
+ getOpportunisticResourceUsage().getVirtualCores());
|
|
|
|
+ List<Container> allocatedContainers3 =
|
|
|
|
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
|
|
|
|
+ assertTrue(allocatedContainers3.size() == 0);
|
|
|
|
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
|
|
|
|
+ getOpportunisticResourceUsage().getVirtualCores());
|
|
|
|
+ } finally {
|
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
|
|
|
|
+ false);
|
|
|
|
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationMinimum);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
|
+ memoryAllocationIncrement);
|
|
|
|
+ conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
|
|
|
|
+ maxOverallocationRatio);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Test promotion of a single OPPORTUNISTIC container when no resources are
|
|
* Test promotion of a single OPPORTUNISTIC container when no resources are
|
|
* reserved on the node where the container is allocated.
|
|
* reserved on the node where the container is allocated.
|