|
@@ -17,16 +17,19 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
|
|
|
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
import static org.mockito.Matchers.argThat;
|
|
|
import static org.mockito.Matchers.isA;
|
|
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
@@ -86,6 +90,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
Clock mClock = null;
|
|
|
Configuration conf = null;
|
|
|
CapacityScheduler mCS = null;
|
|
|
+ CapacitySchedulerConfiguration schedConf = null;
|
|
|
EventHandler<ContainerPreemptEvent> mDisp = null;
|
|
|
ResourceCalculator rc = new DefaultResourceCalculator();
|
|
|
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
|
|
@@ -98,6 +103,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
ApplicationId.newInstance(TS, 3), 0);
|
|
|
final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
|
|
|
ApplicationId.newInstance(TS, 4), 0);
|
|
|
+ final ApplicationAttemptId appF = ApplicationAttemptId.newInstance(
|
|
|
+ ApplicationId.newInstance(TS, 4), 0);
|
|
|
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
|
|
|
ArgumentCaptor.forClass(ContainerPreemptEvent.class);
|
|
|
|
|
@@ -123,6 +130,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
mClock = mock(Clock.class);
|
|
|
mCS = mock(CapacityScheduler.class);
|
|
|
when(mCS.getResourceCalculator()).thenReturn(rc);
|
|
|
+ schedConf = new CapacitySchedulerConfiguration();
|
|
|
+ when(mCS.getConfiguration()).thenReturn(schedConf);
|
|
|
mDisp = mock(EventHandler.class);
|
|
|
rand = new Random();
|
|
|
long seed = rand.nextLong();
|
|
@@ -265,6 +274,240 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testPerQueueDisablePreemption() {
|
|
|
+ int[][] qData = new int[][]{
|
|
|
+ // / A B C
|
|
|
+ { 100, 55, 25, 20 }, // abs
|
|
|
+ { 100, 100, 100, 100 }, // maxCap
|
|
|
+ { 100, 0, 54, 46 }, // used
|
|
|
+ { 10, 10, 0, 0 }, // pending
|
|
|
+ { 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC
|
|
|
+ { 3, 1, 1, 1 }, // apps
|
|
|
+ { -1, 1, 1, 1 }, // req granularity
|
|
|
+ { 3, 0, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
+ policy.editSchedule();
|
|
|
+ // With PREEMPTION_DISABLED set for queueB, get resources from queueC
|
|
|
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
|
|
|
+
|
|
|
+ // With no PREEMPTION_DISABLED set for queueB, resources will be preempted
|
|
|
+ // from both queueB and queueC. Test must be reset for so that the mDisp
|
|
|
+ // event handler will count only events from the following test and not the
|
|
|
+ // previous one.
|
|
|
+ setup();
|
|
|
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
+
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false);
|
|
|
+ policy2.editSchedule();
|
|
|
+
|
|
|
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
|
|
+ verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPerQueueDisablePreemptionHierarchical() {
|
|
|
+ int[][] qData = new int[][] {
|
|
|
+ // / A D
|
|
|
+ // B C E F
|
|
|
+ { 200, 100, 50, 50, 100, 10, 90 }, // abs
|
|
|
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
|
|
|
+ { 200, 110, 60, 50, 90, 90, 0 }, // used
|
|
|
+ { 10, 0, 0, 0, 10, 0, 10 }, // pending
|
|
|
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC appD
|
|
|
+ { 4, 2, 1, 1, 2, 1, 1 }, // apps
|
|
|
+ { -1, -1, 1, 1, -1, 1, 1 }, // req granularity
|
|
|
+ { 2, 2, 0, 0, 2, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
+ policy.editSchedule();
|
|
|
+ // verify capacity taken from queueB (appA), not queueE (appC) despite
|
|
|
+ // queueE being far over its absolute capacity because queueA (queueB's
|
|
|
+ // parent) is over capacity and queueD (queueE's parent) is not.
|
|
|
+ ApplicationAttemptId expectedAttemptOnQueueB =
|
|
|
+ ApplicationAttemptId.newInstance(
|
|
|
+ appA.getApplicationId(), appA.getAttemptId());
|
|
|
+ assertTrue("appA should be running on queueB",
|
|
|
+ mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
|
|
|
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+
|
|
|
+ // Need to call setup() again to reset mDisp
|
|
|
+ setup();
|
|
|
+ // Disable preemption for queueB and it's children
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
+ policy2.editSchedule();
|
|
|
+ ApplicationAttemptId expectedAttemptOnQueueC =
|
|
|
+ ApplicationAttemptId.newInstance(
|
|
|
+ appB.getApplicationId(), appB.getAttemptId());
|
|
|
+ ApplicationAttemptId expectedAttemptOnQueueE =
|
|
|
+ ApplicationAttemptId.newInstance(
|
|
|
+ appC.getApplicationId(), appC.getAttemptId());
|
|
|
+ // Now, all of queueB's (appA) over capacity is not preemptable, so neither
|
|
|
+ // is queueA's. Verify that capacity is taken from queueE (appC).
|
|
|
+ assertTrue("appB should be running on queueC",
|
|
|
+ mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC));
|
|
|
+ assertTrue("appC should be running on queueE",
|
|
|
+ mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE));
|
|
|
+ // Resources should have come from queueE (appC) and neither of queueA's
|
|
|
+ // children.
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPerQueueDisablePreemptionBroadHierarchical() {
|
|
|
+ int[][] qData = new int[][] {
|
|
|
+ // / A D G
|
|
|
+ // B C E F H I
|
|
|
+ {1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs
|
|
|
+ {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
|
|
|
+ {1000, 400, 200, 200, 400, 250, 150, 200, 150, 50 }, // used
|
|
|
+ { 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending
|
|
|
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC appD appE appF
|
|
|
+ { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
|
|
|
+ { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
|
|
|
+ { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
+ policy.editSchedule();
|
|
|
+ // queueF(appD) wants resources, Verify that resources come from queueE(appC)
|
|
|
+ // because it's a sibling and queueB(appA) because queueA is over capacity.
|
|
|
+ verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+
|
|
|
+ // Need to call setup() again to reset mDisp
|
|
|
+ setup();
|
|
|
+ // Disable preemption for queueB(appA)
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
+ policy2.editSchedule();
|
|
|
+ // Now that queueB(appA) is not preemptable, verify that resources come
|
|
|
+ // from queueE(appC)
|
|
|
+ verify(mDisp, times(50)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+
|
|
|
+ setup();
|
|
|
+ // Disable preemption for two of the 3 queues with over-capacity.
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
|
|
|
+ policy3.editSchedule();
|
|
|
+
|
|
|
+ // Verify that the request was starved out even though queueH(appE) is
|
|
|
+ // over capacity. This is because queueG (queueH's parent) is NOT
|
|
|
+ // overcapacity.
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueB
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPerQueueDisablePreemptionInheritParent() {
|
|
|
+ int[][] qData = new int[][] {
|
|
|
+ // / A E
|
|
|
+ // B C D F G H
|
|
|
+ {1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar)
|
|
|
+ {1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
|
|
|
+ {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used
|
|
|
+ { 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending
|
|
|
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC appD appE
|
|
|
+ { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps
|
|
|
+ { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity
|
|
|
+ { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
+ policy.editSchedule();
|
|
|
+ // With all queues preemptable, resources should be taken from queueC(appA)
|
|
|
+ // and queueD(appB). Resources taken more from queueD(appB) than
|
|
|
+ // queueC(appA) because it's over its capacity by a larger percentage.
|
|
|
+ verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
|
|
+
|
|
|
+ // Disable preemption for queueA and it's children. queueF(appC)'s request
|
|
|
+ // should starve.
|
|
|
+ setup(); // Call setup() to reset mDisp
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
+ policy2.editSchedule();
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueD
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueG
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPerQueuePreemptionNotAllUntouchable() {
|
|
|
+ int[][] qData = new int[][] {
|
|
|
+ // / A E
|
|
|
+ // B C D F G H
|
|
|
+ { 2000, 1000, 800, 100, 100, 1000, 500, 300, 200 }, // abs
|
|
|
+ { 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000 }, // maxCap
|
|
|
+ { 2000, 1300, 300, 800, 200, 700, 500, 0, 200 }, // used
|
|
|
+ { 300, 0, 0, 0, 0, 300, 0, 300, 0 }, // pending
|
|
|
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC appD appE appF
|
|
|
+ { 6, 3, 1, 1, 1, 3, 1, 1, 1 }, // apps
|
|
|
+ { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
|
|
|
+ { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
+ policy.editSchedule();
|
|
|
+ // Although queueC(appB) is way over capacity and is untouchable,
|
|
|
+ // queueD(appC) is preemptable. Request should be filled from queueD(appC).
|
|
|
+ verify(mDisp, times(100)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPerQueueDisablePreemptionRootDisablesAll() {
|
|
|
+ int[][] qData = new int[][] {
|
|
|
+ // / A D G
|
|
|
+ // B C E F H I
|
|
|
+ {1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs
|
|
|
+ {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
|
|
|
+ {1000, 20, 0, 20, 490, 240, 250, 490, 240, 250 }, // used
|
|
|
+ { 200, 200, 200, 0, 0, 0, 0, 0, 0, 0 }, // pending
|
|
|
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC appD appE appF
|
|
|
+ { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
|
|
|
+ { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
|
|
|
+ { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
+ + "root" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ policy.editSchedule();
|
|
|
+ // All queues should be non-preemptable, so request should starve.
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueB
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
|
|
|
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testOverCapacityImbalance() {
|
|
|
int[][] qData = new int[][]{
|
|
@@ -341,7 +584,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
policy.editSchedule();
|
|
|
// verify capacity taken from A1, not B1 despite B1 being far over
|
|
|
// its absolute guaranteed capacity
|
|
|
- verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -390,15 +633,17 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
@Test
|
|
|
public void testHierarchicalLarge() {
|
|
|
int[][] qData = new int[][] {
|
|
|
- // / A B C D E F G H I
|
|
|
- { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
|
|
|
- { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
|
|
|
- { 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
|
|
|
- { 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
|
|
|
- { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
- { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
|
|
|
- { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
|
|
|
- { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
|
|
+ // / A D G
|
|
|
+ // B C E F H I
|
|
|
+ { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
|
|
|
+ { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
|
|
|
+ { 400, 210, 70, 140, 100, 50, 50, 90, 90, 0 }, // used
|
|
|
+ { 15, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
|
|
|
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
|
|
+ // appA appB appC appD appE appF
|
|
|
+ { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
|
|
|
+ { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
|
|
|
+ { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
|
|
};
|
|
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
policy.editSchedule();
|
|
@@ -407,8 +652,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
|
|
|
// XXX note: compensating for rounding error in Resources.multiplyTo
|
|
|
// which is likely triggered since we use small numbers for readability
|
|
|
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
- verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
|
|
|
+ verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -629,6 +874,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
|
|
|
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
|
|
|
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
|
|
|
+ when(root.getQueuePath()).thenReturn("root");
|
|
|
|
|
|
for (int i = 1; i < queues.length; ++i) {
|
|
|
final CSQueue q;
|
|
@@ -644,6 +890,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
|
|
|
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
|
|
|
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
|
|
|
+ String parentPathName = p.getQueuePath();
|
|
|
+ parentPathName = (parentPathName == null) ? "root" : parentPathName;
|
|
|
+ String queuePathName = (parentPathName+"."+queueName).replace("/","root");
|
|
|
+ when(q.getQueuePath()).thenReturn(queuePathName);
|
|
|
}
|
|
|
assert 0 == pqs.size();
|
|
|
return root;
|
|
@@ -666,6 +916,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
|
|
|
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
|
|
|
LeafQueue lq = mock(LeafQueue.class);
|
|
|
+ List<ApplicationAttemptId> appAttemptIdList =
|
|
|
+ new ArrayList<ApplicationAttemptId>();
|
|
|
when(lq.getTotalResourcePending()).thenReturn(
|
|
|
Resource.newInstance(pending[i], 0));
|
|
|
// consider moving where CapacityScheduler::comparator accessible
|
|
@@ -683,9 +935,14 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
int aPending = pending[i] / apps[i];
|
|
|
int aReserve = reserved[i] / apps[i];
|
|
|
for (int a = 0; a < apps[i]; ++a) {
|
|
|
- qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
|
|
|
+ FiCaSchedulerApp mockFiCaApp =
|
|
|
+ mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
|
|
|
+ qApps.add(mockFiCaApp);
|
|
|
++appAlloc;
|
|
|
+ appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId());
|
|
|
}
|
|
|
+ when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1)))
|
|
|
+ .thenReturn(appAttemptIdList);
|
|
|
}
|
|
|
when(lq.getApplications()).thenReturn(qApps);
|
|
|
if(setAMResourcePercent != 0.0f){
|