|
@@ -1592,8 +1592,8 @@ public class TestLeafQueue {
|
|
|
@Test
|
|
|
public void testLocalityScheduling() throws Exception {
|
|
|
|
|
|
- // Manipulate queue 'a'
|
|
|
- LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+ // Manipulate queue 'b'
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
|
|
|
|
|
|
// User
|
|
|
String user_0 = "user_0";
|
|
@@ -1708,25 +1708,26 @@ public class TestLeafQueue {
|
|
|
TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
|
|
true, priority, recordFactory));
|
|
|
app_0_requests_0.add(
|
|
|
- TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
|
|
+ TestUtils.createResourceRequest(rack_1, 1*GB, 3,
|
|
|
true, priority, recordFactory));
|
|
|
app_0_requests_0.add(
|
|
|
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra
|
|
|
true, priority, recordFactory));
|
|
|
app_0.updateResourceRequests(app_0_requests_0);
|
|
|
- assertEquals(2, app_0.getTotalRequiredResources(priority));
|
|
|
+ assertEquals(4, app_0.getTotalRequiredResources(priority));
|
|
|
|
|
|
String host_3 = "127.0.0.4"; // on rack_1
|
|
|
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
|
|
|
|
|
// Rack-delay
|
|
|
+ doReturn(true).when(a).getRackLocalityFullReset();
|
|
|
doReturn(1).when(a).getNodeLocalityDelay();
|
|
|
|
|
|
// Shouldn't assign RACK_LOCAL yet
|
|
|
assignment = a.assignContainers(clusterResource, node_3,
|
|
|
new ResourceLimits(clusterResource));
|
|
|
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
|
|
- assertEquals(2, app_0.getTotalRequiredResources(priority));
|
|
|
+ assertEquals(4, app_0.getTotalRequiredResources(priority));
|
|
|
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
|
|
|
|
|
// Should assign RACK_LOCAL now
|
|
@@ -1735,10 +1736,70 @@ public class TestLeafQueue {
|
|
|
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
|
|
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
|
|
+ assertEquals(3, app_0.getTotalRequiredResources(priority));
|
|
|
+ assertEquals(NodeType.RACK_LOCAL, assignment.getType());
|
|
|
+
|
|
|
+ // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
|
|
|
+ assignment = a.assignContainers(clusterResource, node_3,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+ assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
|
|
+ assertEquals(3, app_0.getTotalRequiredResources(priority));
|
|
|
+
|
|
|
+ // Next time we schedule RACK_LOCAL, don't reset
|
|
|
+ doReturn(false).when(a).getRackLocalityFullReset();
|
|
|
+
|
|
|
+ // Should assign RACK_LOCAL now
|
|
|
+ assignment = a.assignContainers(clusterResource, node_3,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+ verify(app_0, Mockito.times(2)).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(2, app_0.getSchedulingOpportunities(priority)); // should NOT reset
|
|
|
+ assertEquals(2, app_0.getTotalRequiredResources(priority));
|
|
|
+ assertEquals(NodeType.RACK_LOCAL, assignment.getType());
|
|
|
+
|
|
|
+ // Another RACK_LOCAL since schedulingOpportunities not reset
|
|
|
+ assignment = a.assignContainers(clusterResource, node_3,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+ verify(app_0, Mockito.times(3)).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(3, app_0.getSchedulingOpportunities(priority)); // should NOT reset
|
|
|
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
|
|
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
|
|
|
+
|
|
|
+ // Add a request larger than cluster size to verify
|
|
|
+ // OFF_SWITCH delay is capped by cluster size
|
|
|
+ app_0.resetSchedulingOpportunities(priority);
|
|
|
+ app_0_requests_0.clear();
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(host_0, 1*GB, 100,
|
|
|
+ true, priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(rack_0, 1*GB, 100,
|
|
|
+ true, priority, recordFactory));
|
|
|
+ app_0_requests_0.add(
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100,
|
|
|
+ true, priority, recordFactory));
|
|
|
+ app_0.updateResourceRequests(app_0_requests_0);
|
|
|
+
|
|
|
+ // Start with off switch. 3 nodes in cluster so shouldn't allocate first 3
|
|
|
+ for (int i = 0; i < numNodes; i++) {
|
|
|
+ assignment =
|
|
|
+ a.assignContainers(clusterResource, node_2, new ResourceLimits(
|
|
|
+ clusterResource));
|
|
|
+ verify(app_0, Mockito.times(1)).allocate(any(NodeType.class), eq(node_2),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(i+1, app_0.getSchedulingOpportunities(priority));
|
|
|
+ }
|
|
|
+ // delay should be capped at numNodes so next one should allocate
|
|
|
+ assignment = a.assignContainers(clusterResource, node_2,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+ verify(app_0, Mockito.times(2)).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
|
|
|
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
|
|
+ assertEquals(numNodes+1, app_0.getSchedulingOpportunities(priority));
|
|
|
+ assertEquals(NodeType.OFF_SWITCH, assignment.getType());
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test
|
|
|
public void testApplicationPriorityScheduling() throws Exception {
|
|
|
// Manipulate queue 'a'
|