|
@@ -17,12 +17,10 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
|
|
|
|
|
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
|
|
|
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
|
|
@@ -52,6 +50,7 @@ import java.util.Map;
|
|
|
import java.util.NavigableSet;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.StringTokenizer;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
import org.apache.commons.collections.map.HashedMap;
|
|
@@ -322,24 +321,22 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
{ 3, 0, 0, 0 }, // subqueues
|
|
|
};
|
|
|
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ schedConf.setPreemptionDisabled("root.queueB", true);
|
|
|
|
|
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
policy.editSchedule();
|
|
|
- // With PREEMPTION_DISABLED set for queueB, get resources from queueC
|
|
|
+ // Since queueB is not preemptable, get resources from queueC
|
|
|
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
|
|
|
|
|
|
- // With no PREEMPTION_DISABLED set for queueB, resources will be preempted
|
|
|
- // from both queueB and queueC. Test must be reset for so that the mDisp
|
|
|
+ // Since queueB is preemptable, resources will be preempted
|
|
|
+ // from both queueB and queueC. Test must be reset so that the mDisp
|
|
|
// event handler will count only events from the following test and not the
|
|
|
// previous one.
|
|
|
setup();
|
|
|
+ schedConf.setPreemptionDisabled("root.queueB", false);
|
|
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
-
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false);
|
|
|
+
|
|
|
policy2.editSchedule();
|
|
|
|
|
|
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
|
@@ -375,9 +372,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
|
|
|
// Need to call setup() again to reset mDisp
|
|
|
setup();
|
|
|
- // Disable preemption for queueB and it's children
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ // Turn off preemption for queueB and it's children
|
|
|
+ schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
|
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
policy2.editSchedule();
|
|
|
ApplicationAttemptId expectedAttemptOnQueueC =
|
|
@@ -423,9 +419,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
|
|
|
// Need to call setup() again to reset mDisp
|
|
|
setup();
|
|
|
- // Disable preemption for queueB(appA)
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ // Turn off preemption for queueB(appA)
|
|
|
+ schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
|
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
policy2.editSchedule();
|
|
|
// Now that queueB(appA) is not preemptable, verify that resources come
|
|
@@ -434,11 +429,9 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
|
|
|
setup();
|
|
|
- // Disable preemption for two of the 3 queues with over-capacity.
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ // Turn off preemption for two of the 3 queues with over-capacity.
|
|
|
+ schedConf.setPreemptionDisabled("root.queueD.queueE", true);
|
|
|
+ schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
|
|
ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
|
|
|
policy3.editSchedule();
|
|
|
|
|
@@ -476,11 +469,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
|
|
|
|
|
- // Disable preemption for queueA and it's children. queueF(appC)'s request
|
|
|
+ // Turn off preemption for queueA and it's children. queueF(appC)'s request
|
|
|
// should starve.
|
|
|
setup(); // Call setup() to reset mDisp
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ schedConf.setPreemptionDisabled("root.queueA", true);
|
|
|
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
|
|
policy2.editSchedule();
|
|
|
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
|
|
@@ -504,8 +496,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
|
|
|
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
|
|
|
};
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
+ schedConf.setPreemptionDisabled("root.queueA.queueC", true);
|
|
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
policy.editSchedule();
|
|
|
// Although queueC(appB) is way over capacity and is untouchable,
|
|
@@ -529,9 +520,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
|
|
};
|
|
|
|
|
|
+ schedConf.setPreemptionDisabled("root", true);
|
|
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
|
|
- schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
|
|
- + "root" + SUFFIX_DISABLE_PREEMPTION, true);
|
|
|
policy.editSchedule();
|
|
|
// All queues should be non-preemptable, so request should starve.
|
|
|
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
|
|
@@ -893,7 +883,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
setAMContainer = false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
static class IsPreemptionRequestFor
|
|
|
extends ArgumentMatcher<ContainerPreemptEvent> {
|
|
|
private final ApplicationAttemptId appAttId;
|
|
@@ -952,6 +942,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
|
|
|
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
|
|
|
when(root.getQueuePath()).thenReturn("root");
|
|
|
+ boolean preemptionDisabled = mockPreemptionStatus("root");
|
|
|
+ when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
|
|
|
|
|
for (int i = 1; i < queues.length; ++i) {
|
|
|
final CSQueue q;
|
|
@@ -971,11 +963,29 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
parentPathName = (parentPathName == null) ? "root" : parentPathName;
|
|
|
String queuePathName = (parentPathName+"."+queueName).replace("/","root");
|
|
|
when(q.getQueuePath()).thenReturn(queuePathName);
|
|
|
+ preemptionDisabled = mockPreemptionStatus(queuePathName);
|
|
|
+ when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
|
|
}
|
|
|
assert 0 == pqs.size();
|
|
|
return root;
|
|
|
}
|
|
|
|
|
|
+ // Determine if any of the elements in the queupath have preemption disabled.
|
|
|
+ // Also must handle the case where preemption disabled property is explicitly
|
|
|
+ // set to something other than the default. Assumes system-wide preemption
|
|
|
+ // property is true.
|
|
|
+ private boolean mockPreemptionStatus(String queuePathName) {
|
|
|
+ boolean preemptionDisabled = false;
|
|
|
+ StringTokenizer tokenizer = new StringTokenizer(queuePathName, ".");
|
|
|
+ String qName = "";
|
|
|
+ while(tokenizer.hasMoreTokens()) {
|
|
|
+ qName += tokenizer.nextToken();
|
|
|
+ preemptionDisabled = schedConf.getPreemptionDisabled(qName, preemptionDisabled);
|
|
|
+ qName += ".";
|
|
|
+ }
|
|
|
+ return preemptionDisabled;
|
|
|
+ }
|
|
|
+
|
|
|
ParentQueue mockParentQueue(ParentQueue p, int subqueues,
|
|
|
Deque<ParentQueue> pqs) {
|
|
|
ParentQueue pq = mock(ParentQueue.class);
|