|
@@ -144,8 +144,6 @@ public class TestLeafQueue {
|
|
|
thenReturn(Resources.createResource(16*GB, 32));
|
|
|
when(csContext.getClusterResource()).
|
|
|
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
|
|
- when(csContext.getApplicationComparator()).
|
|
|
- thenReturn(CapacityScheduler.applicationComparator);
|
|
|
when(csContext.getNonPartitionedQueueComparator()).
|
|
|
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
|
|
when(csContext.getResourceCalculator()).
|
|
@@ -1910,7 +1908,7 @@ public class TestLeafQueue {
|
|
|
|
|
|
// before reinitialization
|
|
|
assertEquals(2, e.getNumActiveApplications());
|
|
|
- assertEquals(1, e.pendingApplications.size());
|
|
|
+ assertEquals(1, e.getNumPendingApplications());
|
|
|
|
|
|
csConf.setDouble(CapacitySchedulerConfiguration
|
|
|
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
@@ -1927,7 +1925,7 @@ public class TestLeafQueue {
|
|
|
|
|
|
// after reinitialization
|
|
|
assertEquals(3, e.getNumActiveApplications());
|
|
|
- assertEquals(0, e.pendingApplications.size());
|
|
|
+ assertEquals(0, e.getNumPendingApplications());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -1991,7 +1989,7 @@ public class TestLeafQueue {
|
|
|
|
|
|
// before updating cluster resource
|
|
|
assertEquals(2, e.getNumActiveApplications());
|
|
|
- assertEquals(1, e.pendingApplications.size());
|
|
|
+ assertEquals(1, e.getNumPendingApplications());
|
|
|
|
|
|
Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32);
|
|
|
e.updateClusterResource(clusterResource,
|
|
@@ -1999,7 +1997,7 @@ public class TestLeafQueue {
|
|
|
|
|
|
// after updating cluster resource
|
|
|
assertEquals(3, e.getNumActiveApplications());
|
|
|
- assertEquals(0, e.pendingApplications.size());
|
|
|
+ assertEquals(0, e.getNumPendingApplications());
|
|
|
}
|
|
|
|
|
|
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
|
|
@@ -2350,8 +2348,9 @@ public class TestLeafQueue {
|
|
|
|
|
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
|
|
|
- a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
|
|
|
-
|
|
|
+ a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
|
|
|
+ a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
|
|
|
+
|
|
|
String host_0_0 = "127.0.0.1";
|
|
|
String rack_0 = "rack_0";
|
|
|
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
|
|
@@ -2367,14 +2366,14 @@ public class TestLeafQueue {
|
|
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
FiCaSchedulerApp app_0 =
|
|
|
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
|
|
- mock(ActiveUsersManager.class), spyRMContext));
|
|
|
+ mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3)));
|
|
|
a.submitApplicationAttempt(app_0, user_0);
|
|
|
|
|
|
final ApplicationAttemptId appAttemptId_1 =
|
|
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
FiCaSchedulerApp app_1 =
|
|
|
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
|
|
- mock(ActiveUsersManager.class), spyRMContext));
|
|
|
+ mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5)));
|
|
|
a.submitApplicationAttempt(app_1, user_0);
|
|
|
|
|
|
Priority priority = TestUtils.createMockPriority(1);
|
|
@@ -2392,36 +2391,34 @@ public class TestLeafQueue {
|
|
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
|
|
|
true, priority, recordFactory));
|
|
|
app_1.updateResourceRequests(app_1_requests_0);
|
|
|
-
|
|
|
- a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
- Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
+ // app_1 will get containers as it has high priority
|
|
|
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
-
|
|
|
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+
|
|
|
app_0_requests_0.clear();
|
|
|
app_0_requests_0.add(
|
|
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
|
|
|
true, priority, recordFactory));
|
|
|
app_0.updateResourceRequests(app_0_requests_0);
|
|
|
-
|
|
|
+
|
|
|
app_1_requests_0.clear();
|
|
|
app_1_requests_0.add(
|
|
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
|
|
|
true, priority, recordFactory));
|
|
|
app_1.updateResourceRequests(app_1_requests_0);
|
|
|
-
|
|
|
- //Even thought it already has more resources, app_0 will still get
|
|
|
- //assigned first
|
|
|
- a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
- Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
- Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
-
|
|
|
- //and only then will app_1
|
|
|
+
|
|
|
+ //app_1 will still get assigned first as priority is more.
|
|
|
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
|
|
+ Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
+ //and only then will app_2
|
|
|
+ a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
+}
|
|
|
@Test
|
|
|
public void testConcurrentAccess() throws Exception {
|
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
@@ -2500,6 +2497,7 @@ public class TestLeafQueue {
|
|
|
new FairOrderingPolicy<FiCaSchedulerApp>();
|
|
|
|
|
|
a.setOrderingPolicy(schedulingOrder);
|
|
|
+ a.setPendingAppsOrderingPolicy(new FairOrderingPolicy<FiCaSchedulerApp>());
|
|
|
|
|
|
String host_0_0 = "127.0.0.1";
|
|
|
String rack_0 = "rack_0";
|
|
@@ -2542,6 +2540,7 @@ public class TestLeafQueue {
|
|
|
true, priority, recordFactory));
|
|
|
app_1.updateResourceRequests(app_1_requests_0);
|
|
|
|
|
|
+ // app_0 will get containers as its submitted first.
|
|
|
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
|
|
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|