|
@@ -106,7 +106,7 @@ import org.mockito.Mockito;
|
|
|
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
|
|
|
|
|
|
-public class TestLeafQueue {
|
|
|
|
|
|
+public class TestLeafQueue {
|
|
private final RecordFactory recordFactory =
|
|
private final RecordFactory recordFactory =
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
|
|
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
|
|
@@ -2098,6 +2098,154 @@ public class TestLeafQueue {
|
|
assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey));
|
|
assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testRackLocalityDelayScheduling() throws Exception {
|
|
|
|
+
|
|
|
|
+ // Change parameter values for node locality and rack locality delay.
|
|
|
|
+ csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
|
|
|
|
+ csConf.setInt(
|
|
|
|
+ CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1);
|
|
|
|
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
|
|
+ CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
|
|
|
|
+ csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
|
|
|
|
+ TestUtils.spyHook);
|
|
|
|
+ queues = newQueues;
|
|
|
|
+ root.reinitialize(newRoot, cs.getClusterResource());
|
|
|
|
+
|
|
|
|
+ // Manipulate queue 'b'
|
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
|
|
|
|
+
|
|
|
|
+ // Check locality parameters.
|
|
|
|
+ assertEquals(2, a.getNodeLocalityDelay());
|
|
|
|
+ assertEquals(1, a.getRackLocalityAdditionalDelay());
|
|
|
|
+
|
|
|
|
+ // User
|
|
|
|
+ String user1 = "user_1";
|
|
|
|
+
|
|
|
|
+ // Submit applications
|
|
|
|
+ final ApplicationAttemptId appAttemptId1 =
|
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
|
+ FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user1, a,
|
|
|
|
+ mock(ActiveUsersManager.class), spyRMContext);
|
|
|
|
+ a.submitApplicationAttempt(app1, user1);
|
|
|
|
+
|
|
|
|
+ // Setup some nodes and racks
|
|
|
|
+ String host1 = "127.0.0.1";
|
|
|
|
+ String host2 = "127.0.0.2";
|
|
|
|
+ String host3 = "127.0.0.3";
|
|
|
|
+ String host4 = "127.0.0.4";
|
|
|
|
+ String rack1 = "rack_1";
|
|
|
|
+ String rack2 = "rack_2";
|
|
|
|
+ String rack3 = "rack_3";
|
|
|
|
+ FiCaSchedulerNode node2 = TestUtils.getMockNode(host3, rack2, 0, 8 * GB);
|
|
|
|
+ FiCaSchedulerNode node3 = TestUtils.getMockNode(host4, rack3, 0, 8 * GB);
|
|
|
|
+
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
|
|
|
|
+ ImmutableMap.of(app1.getApplicationAttemptId(), app1);
|
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes =
|
|
|
|
+ ImmutableMap.of(node2.getNodeID(), node2, node3.getNodeID(), node3);
|
|
|
|
+
|
|
|
|
+ final int numNodes = 5;
|
|
|
|
+ Resource clusterResource =
|
|
|
|
+ Resources.createResource(numNodes * (8 * GB), numNodes * 16);
|
|
|
|
+ when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
|
|
|
|
+
|
|
|
|
+ // Setup resource-requests and submit
|
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
|
+ List<ResourceRequest> app1Requests1 = new ArrayList<ResourceRequest>();
|
|
|
|
+ app1Requests1.add(TestUtils.createResourceRequest(host1, 1 * GB, 1,
|
|
|
|
+ true, priority, recordFactory));
|
|
|
|
+ app1Requests1.add(TestUtils.createResourceRequest(rack1, 1 * GB, 1,
|
|
|
|
+ true, priority, recordFactory));
|
|
|
|
+ app1Requests1.add(TestUtils.createResourceRequest(host2, 1 * GB, 1,
|
|
|
|
+ true, priority, recordFactory));
|
|
|
|
+ app1Requests1.add(TestUtils.createResourceRequest(rack2, 1 * GB, 1,
|
|
|
|
+ true, priority, recordFactory));
|
|
|
|
+ // Adding one extra in the ANY.
|
|
|
|
+ app1Requests1.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
|
|
|
+ 1 * GB, 3, true, priority, recordFactory));
|
|
|
|
+ app1.updateResourceRequests(app1Requests1);
|
|
|
|
+
|
|
|
|
+ // Start testing...
|
|
|
|
+ CSAssignment assignment = null;
|
|
|
|
+
|
|
|
|
+ SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
|
|
|
|
+ assertEquals(3, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+
|
|
|
|
+ // No rack-local yet.
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node2,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ verifyNoContainerAllocated(assignment);
|
|
|
|
+ assertEquals(1, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(3, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
|
|
|
+
|
|
|
|
+ // Still no rack-local.
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node2,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assertEquals(2, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(3, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
|
|
|
+
|
|
|
|
+ // Rack local now.
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node2,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assertEquals(0, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(2, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.RACK_LOCAL, assignment.getType());
|
|
|
|
+
|
|
|
|
+ // No off-switch until 3 missed opportunities.
|
|
|
|
+ a.assignContainers(clusterResource, node3,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ a.assignContainers(clusterResource, node3,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node3,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assertEquals(3, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(2, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
|
|
|
+
|
|
|
|
+ // Now off-switch should succeed.
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node3,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assertEquals(4, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(1, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.OFF_SWITCH, assignment.getType());
|
|
|
|
+
|
|
|
|
+ // Check capping by number of cluster nodes.
|
|
|
|
+ doReturn(10).when(a).getRackLocalityAdditionalDelay();
|
|
|
|
+ // Off-switch will happen at 6 missed opportunities now, since cluster size
|
|
|
|
+ // is 5.
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node3,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assertEquals(5, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(1, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
|
|
|
+ assignment = a.assignContainers(clusterResource, node3,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
|
+ applyCSAssignment(clusterResource, assignment, a, nodes, apps);
|
|
|
|
+ assertEquals(6, app1.getSchedulingOpportunities(schedulerKey));
|
|
|
|
+ assertEquals(0, app1.getTotalRequiredResources(schedulerKey));
|
|
|
|
+ assertEquals(NodeType.OFF_SWITCH, assignment.getType());
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testApplicationPriorityScheduling() throws Exception {
|
|
public void testApplicationPriorityScheduling() throws Exception {
|
|
// Manipulate queue 'a'
|
|
// Manipulate queue 'a'
|
|
@@ -2403,16 +2551,18 @@ public class TestLeafQueue {
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@Test (timeout = 30000)
|
|
- public void testNodeLocalityAfterQueueRefresh() throws Exception {
|
|
|
|
|
|
+ public void testLocalityDelaysAfterQueueRefresh() throws Exception {
|
|
|
|
|
|
// Manipulate queue 'e'
|
|
// Manipulate queue 'e'
|
|
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
|
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
|
|
|
|
|
// before reinitialization
|
|
// before reinitialization
|
|
assertEquals(40, e.getNodeLocalityDelay());
|
|
assertEquals(40, e.getNodeLocalityDelay());
|
|
|
|
+ assertEquals(-1, e.getRackLocalityAdditionalDelay());
|
|
|
|
|
|
- csConf.setInt(CapacitySchedulerConfiguration
|
|
|
|
- .NODE_LOCALITY_DELAY, 60);
|
|
|
|
|
|
+ csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
|
|
|
|
+ csConf.setInt(
|
|
|
|
+ CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 600);
|
|
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
CSQueue newRoot =
|
|
CSQueue newRoot =
|
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
|
@@ -2424,6 +2574,7 @@ public class TestLeafQueue {
|
|
|
|
|
|
// after reinitialization
|
|
// after reinitialization
|
|
assertEquals(60, e.getNodeLocalityDelay());
|
|
assertEquals(60, e.getNodeLocalityDelay());
|
|
|
|
+ assertEquals(600, e.getRackLocalityAdditionalDelay());
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@Test (timeout = 30000)
|