|
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Mockito.doNothing;
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -30,6 +32,7 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
@@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
@@ -190,6 +194,15 @@ public class TestReservations {
|
|
|
}
|
|
|
|
|
|
static LeafQueue stubLeafQueue(LeafQueue queue) {
|
|
|
+ ParentQueue parent = (ParentQueue) queue.getParent();
|
|
|
+
|
|
|
+ if (parent != null) {
|
|
|
+ // Stub out parent queue's accept and apply.
|
|
|
+ doReturn(true).when(parent).accept(any(Resource.class),
|
|
|
+ any(ResourceCommitRequest.class));
|
|
|
+ doNothing().when(parent).apply(any(Resource.class),
|
|
|
+ any(ResourceCommitRequest.class));
|
|
|
+ }
|
|
|
return queue;
|
|
|
}
|
|
|
|
|
@@ -239,6 +252,12 @@ public class TestReservations {
|
|
|
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
|
|
|
8 * GB);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
|
|
|
+
|
|
|
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
|
|
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
|
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
|
@@ -268,8 +287,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -280,8 +301,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -292,8 +315,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map to other node - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -308,8 +333,10 @@ public class TestReservations {
|
|
|
toSchedulerKey(priorityReduce)));
|
|
|
|
|
|
// try to assign reducer (5G on node 0 and should reserve)
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -325,8 +352,10 @@ public class TestReservations {
|
|
|
toSchedulerKey(priorityReduce)));
|
|
|
|
|
|
// assign reducer to node 2
|
|
|
- a.assignContainers(clusterResource, node_2,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_2,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -343,8 +372,10 @@ public class TestReservations {
|
|
|
|
|
|
// node_1 heartbeat and unreserves from node_0 in order to allocate
|
|
|
// on node_1
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -412,6 +443,12 @@ public class TestReservations {
|
|
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
|
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
|
|
|
+
|
|
|
cs.getNodeTracker().addNode(node_0);
|
|
|
cs.getNodeTracker().addNode(node_1);
|
|
|
cs.getNodeTracker().addNode(node_2);
|
|
@@ -434,8 +471,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize());
|
|
@@ -446,8 +485,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(4 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
|
|
@@ -467,8 +508,10 @@ public class TestReservations {
|
|
|
priorityMap, recordFactory)));
|
|
|
|
|
|
// add a reservation for app_0
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(12 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
|
|
@@ -481,8 +524,10 @@ public class TestReservations {
|
|
|
|
|
|
// next assignment is beyond user limit for user_0 but it should assign to
|
|
|
// app_1 for user_1
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(14 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(4 * GB, app_1.getCurrentConsumption().getMemorySize());
|
|
@@ -544,6 +589,12 @@ public class TestReservations {
|
|
|
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
|
|
|
8 * GB);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
|
|
|
+
|
|
|
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
|
|
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
|
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
|
@@ -569,8 +620,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -581,8 +634,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -593,8 +648,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map to other node - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -609,8 +666,10 @@ public class TestReservations {
|
|
|
toSchedulerKey(priorityReduce)));
|
|
|
|
|
|
// try to assign reducer (5G on node 0 and should reserve)
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -626,8 +685,10 @@ public class TestReservations {
|
|
|
toSchedulerKey(priorityReduce)));
|
|
|
|
|
|
// assign reducer to node 2
|
|
|
- a.assignContainers(clusterResource, node_2,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_2,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -644,8 +705,10 @@ public class TestReservations {
|
|
|
|
|
|
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
|
|
|
// if AM doesn't handle
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(18 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -706,6 +769,12 @@ public class TestReservations {
|
|
|
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
|
|
|
8 * GB);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1);
|
|
|
+
|
|
|
cs.getNodeTracker().addNode(node_0);
|
|
|
cs.getNodeTracker().addNode(node_1);
|
|
|
|
|
@@ -733,8 +802,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -744,8 +815,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -755,8 +828,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map to other node - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -770,8 +845,10 @@ public class TestReservations {
|
|
|
toSchedulerKey(priorityReduce)));
|
|
|
|
|
|
// try to assign reducer (5G on node 0 and should reserve)
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -786,8 +863,10 @@ public class TestReservations {
|
|
|
toSchedulerKey(priorityReduce)));
|
|
|
|
|
|
// could allocate but told need to unreserve first
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -983,6 +1062,12 @@ public class TestReservations {
|
|
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
|
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
|
|
|
+
|
|
|
final int numNodes = 2;
|
|
|
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
|
|
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
@@ -1004,8 +1089,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1015,8 +1102,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1026,8 +1115,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map to other node - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1040,8 +1131,10 @@ public class TestReservations {
|
|
|
// now add in reservations and make sure it continues if config set
|
|
|
// allocate to queue so that the potential new capacity is greater then
|
|
|
// absoluteMaxCapacity
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1153,6 +1246,12 @@ public class TestReservations {
|
|
|
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
|
|
|
8 * GB);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
|
|
|
+
|
|
|
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
|
|
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
|
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
|
@@ -1178,8 +1277,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1189,8 +1290,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1200,8 +1303,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map to other node - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1214,8 +1319,10 @@ public class TestReservations {
|
|
|
// now add in reservations and make sure it continues if config set
|
|
|
// allocate to queue so that the potential new capacity is greater then
|
|
|
// absoluteMaxCapacity
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentReservation().getMemorySize());
|
|
@@ -1301,6 +1408,12 @@ public class TestReservations {
|
|
|
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
|
|
|
8 * GB);
|
|
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
|
|
+ app_1);
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
|
|
+ node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
|
|
|
+
|
|
|
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
|
|
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
|
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
|
@@ -1330,8 +1443,10 @@ public class TestReservations {
|
|
|
|
|
|
// Start testing...
|
|
|
// Only AM
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(2 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1342,8 +1457,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(5 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1354,8 +1471,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// Only 1 map to other node - simulating reduce
|
|
|
- a.assignContainers(clusterResource, node_1,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1370,8 +1489,10 @@ public class TestReservations {
|
|
|
// used (8G) + required (5G). It will not reserved since it has to unreserve
|
|
|
// some resource. Even with continous reservation looking, we don't allow
|
|
|
// unreserve resource to reserve container.
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(Resources.createResource(10 * GB)),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1386,8 +1507,10 @@ public class TestReservations {
|
|
|
// try to assign reducer (5G on node 0), but tell it's resource limits <
|
|
|
// used (8G) + required (5G). It will not reserved since it has to unreserve
|
|
|
// some resource. Unfortunately, there's nothing to unreserve.
|
|
|
- a.assignContainers(clusterResource, node_2,
|
|
|
- new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_2,
|
|
|
+ new ResourceLimits(Resources.createResource(10 * GB)),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(8 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1400,8 +1523,10 @@ public class TestReservations {
|
|
|
assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// let it assign 5G to node_2
|
|
|
- a.assignContainers(clusterResource, node_2,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_2,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(13 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1413,8 +1538,10 @@ public class TestReservations {
|
|
|
assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize());
|
|
|
|
|
|
// reserve 8G node_0
|
|
|
- a.assignContainers(clusterResource, node_0,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(21 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(8 * GB, a.getMetrics().getReservedMB());
|
|
@@ -1428,8 +1555,10 @@ public class TestReservations {
|
|
|
// try to assign (8G on node 2). No room to allocate,
|
|
|
// continued to try due to having reservation above,
|
|
|
// but hits queue limits so can't reserve anymore.
|
|
|
- a.assignContainers(clusterResource, node_2,
|
|
|
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ TestUtils.applyResourceCommitRequest(clusterResource,
|
|
|
+ a.assignContainers(clusterResource, node_2,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
|
|
|
assertEquals(21 * GB, a.getUsedResources().getMemorySize());
|
|
|
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
assertEquals(8 * GB, a.getMetrics().getReservedMB());
|