|
@@ -0,0 +1,868 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+
|
|
|
+import static org.mockito.Matchers.argThat;
|
|
|
+import static org.mockito.Mockito.never;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test class for IntraQueuePreemption scenarios.
|
|
|
+ */
|
|
|
+public class TestProportionalCapacityPreemptionPolicyIntraQueue
|
|
|
+ extends
|
|
|
+ ProportionalCapacityPreemptionPolicyMockFramework {
|
|
|
+ @Before
|
|
|
+ public void setup() {
|
|
|
+ super.setup();
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
|
|
|
+ policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSimpleIntraQueuePreemption() throws IOException {
|
|
|
+ /**
|
|
|
+ * The simplest test preemption, Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / | | \
|
|
|
+ * a b c d
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource =
|
|
|
+ * 100
|
|
|
+ * Scenario:
|
|
|
+ * Queue B has few running apps and two high priority apps have demand.
|
|
|
+ * Apps which are running at low priority (4) will preempt few of its
|
|
|
+ * resources to meet the demand.
|
|
|
+ */
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 80 120 0]);" + // root
|
|
|
+ "-a(=[11 100 11 50 0]);" + // a
|
|
|
+ "-b(=[40 100 38 60 0]);" + // b
|
|
|
+ "-c(=[20 100 10 10 0]);" + // c
|
|
|
+ "-d(=[29 100 20 0 0])"; // d
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,
|
|
|
+ // pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,6,false,25);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,34,false,20);" + // app3 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(4,1,n1,,2,false,10);" + // app4 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(5,1,n1,,1,false,10);" + // app5 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(6,1,n1,,1,false,10);" + // app6 in b
|
|
|
+ "c\t" // app1 in a
|
|
|
+ + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
|
|
|
+ + "(1,1,n1,,20,false,0)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // For queue B, app3 and app4 were of lower priority. Hence take 8
|
|
|
+ // containers from them by hitting the intraQueuePreemptionDemand of 20%.
|
|
|
+ verify(mDisp, times(1)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(4))));
|
|
|
+ verify(mDisp, times(7)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNoPreemptionForSamePriorityApps() throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / | | \
|
|
|
+ * a b c d
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
|
|
|
+ * 100
|
|
|
+ * Scenario: In queue A/B, all apps are running at same priority. However
|
|
|
+ * there are many demands also from these apps. Since all apps are at same
|
|
|
+ * priority, preemption should not occur here.
|
|
|
+ */
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 80 120 0]);" + // root
|
|
|
+ "-a(=[10 100 10 50 0]);" + // a
|
|
|
+ "-b(=[40 100 40 60 0]);" + // b
|
|
|
+ "-c(=[20 100 10 10 0]);" + // c
|
|
|
+ "-d(=[30 100 20 0 0])"; // d
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,
|
|
|
+ // pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,6,false,25);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(1,1,n1,,34,false,20);" + // app3 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(1,1,n1,,2,false,10);" + // app4 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(1,1,n1,,1,false,20);" + // app5 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(1,1,n1,,1,false,10);" + // app6 in b
|
|
|
+ "c\t" // app1 in a
|
|
|
+ + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
|
|
|
+ + "(1,1,n1,,20,false,0)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // For queue B, none of the apps should be preempted.
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(4))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(5))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(6))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNoPreemptionWhenQueueIsUnderCapacityLimit()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY
|
|
|
+ * default, this limit is 50%. Test to verify that there wont be any
|
|
|
+ * preemption since used capacity is under 50% for queue a/b even though
|
|
|
+ * there are demands from high priority apps in queue.
|
|
|
+ */
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 35 80 0]);" + // root
|
|
|
+ "-a(=[40 100 10 50 0]);" + // a
|
|
|
+ "-b(=[60 100 25 30 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(2,1,n1,,5,false,25);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,40,false,20);" + // app3 b
|
|
|
+ "b\t" // app1 in a
|
|
|
+ + "(6,1,n1,,5,false,20)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // For queue A/B, none of the apps should be preempted as used capacity
|
|
|
+ // is under 50%.
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(2))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(4))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
|
|
|
+ * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
|
|
|
+ * that the maximum preemption should occur upto 50%, eventhough demand is
|
|
|
+ * more.
|
|
|
+ */
|
|
|
+
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 55 170 0]);" + // root
|
|
|
+ "-a(=[40 100 10 50 0]);" + // a
|
|
|
+ "-b(=[60 100 45 120 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(2,1,n1,,5,false,25);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,40,false,20);" + // app3 b
|
|
|
+ "b\t" // app1 in a
|
|
|
+ + "(6,1,n1,,5,false,100)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // For queueB, eventhough app4 needs 100 resources, only 30 resources were
|
|
|
+ // preempted. (max is 50% of guaranteed cap of any queue
|
|
|
+ // "maxIntraQueuePreemptable")
|
|
|
+ verify(mDisp, times(30)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
|
|
|
+ * totalPreemption allowed is 10%. This test is to verify that only
|
|
|
+ * 10% is preempted.
|
|
|
+ */
|
|
|
+
|
|
|
+ // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
|
+ (float) 0.1);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 55 170 0]);" + // root
|
|
|
+ "-a(=[40 100 10 50 0]);" + // a
|
|
|
+ "-b(=[60 100 45 120 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(2,1,n1,,5,false,25);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,40,false,20);" + // app3 b
|
|
|
+ "b\t" // app1 in a
|
|
|
+ + "(6,1,n1,,5,false,100)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // For queue B eventhough app4 needs 100 resources, only 10 resources were
|
|
|
+ // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
|
|
|
+ verify(mDisp, times(10)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAlreadySelectedContainerFromInterQueuePreemption()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
|
|
|
+ * QueueB is under utilized and QueueA has to release 9 containers here.
|
|
|
+ * However within queue A, high priority app has also a demand for 20.
|
|
|
+ * So additional 11 more containers will be preempted making a tota of 20.
|
|
|
+ */
|
|
|
+
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 95 170 0]);" + // root
|
|
|
+ "-a(=[60 100 70 50 0]);" + // a
|
|
|
+ "-b(=[40 100 25 120 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,50,false,15);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(2,1,n1,,20,false,20);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,20,false,20);" + // app3 b
|
|
|
+ "b\t" // app1 in a
|
|
|
+ + "(4,1,n1,,5,false,100)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // As per intra queue preemption algorithm, 20 more containers were needed
|
|
|
+ // for app2 (in queue a). Inter queue pre-emption had already preselected 9
|
|
|
+ // containers and hence preempted only 11 more.
|
|
|
+ verify(mDisp, times(20)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ verify(mDisp, never()).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(2))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSkipAMContainersInInterQueuePreemption() throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
|
|
|
+ * While preempting containers during intra-queue preemption, AM containers
|
|
|
+ * will be spared for now. Verify the same.
|
|
|
+ */
|
|
|
+
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 100 170 0]);" + // root
|
|
|
+ "-a(=[60 100 60 50 0]);" + // a
|
|
|
+ "-b(=[40 100 40 120 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,30,false,10);" + "a\t" // app2 in a
|
|
|
+ + "(1,1,n1,,10,false,20);" + "a\t" // app3 in a
|
|
|
+ + "(2,1,n1,,20,false,20);" + "b\t" // app4 in b
|
|
|
+ + "(4,1,n1,,20,false,20);" + "b\t" // app5 in a
|
|
|
+ + "(4,1,n1,,20,false,100)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
|
|
|
+ verify(mDisp, times(11)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ verify(mDisp, times(9)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(2))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSkipAMContainersInInterQueuePreemptionSingleApp()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 50:50 Total cluster resource = 100
|
|
|
+ * Spare Am container from a lower priority app during its preemption
|
|
|
+ * cycle. Eventhough there are more demand and no other low priority
|
|
|
+ * apps are present, still AM contaier need to soared.
|
|
|
+ */
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 100 170 0]);" + // root
|
|
|
+ "-a(=[50 100 50 50 0]);" + // a
|
|
|
+ "-b(=[50 100 50 120 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,10,false,10);" + "a\t" // app1 in a
|
|
|
+ + "(2,1,n1,,40,false,10);" + "b\t" // app2 in a
|
|
|
+ + "(4,1,n1,,20,false,20);" + "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,30,false,100)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // Make sure that app1's Am container is spared. Only 9/10 is preempted.
|
|
|
+ verify(mDisp, times(9)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ verify(mDisp, never()).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(2))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNoPreemptionForSingleApp() throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
|
|
|
+ * Only one app is running in queue. And it has more demand but no
|
|
|
+ * resource are available in queue. Preemption must not occur here.
|
|
|
+ */
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 20 50 0]);" + // root
|
|
|
+ "-a(=[60 100 20 50 0]);" + // a
|
|
|
+ "-b(=[40 100 0 0 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(4,1,n1,,20,false,50)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // Ensure there are 0 preemptions since only one app is running in queue.
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testOverutilizedQueueResourceWithInterQueuePreemption()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 20:80 Total cluster resource = 100
|
|
|
+ * QueueB is under utilized and 20 resource will be released from queueA.
|
|
|
+ * 10 containers will also selected for intra-queue too but it will be
|
|
|
+ * pre-selected.
|
|
|
+ */
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 100 70 0]);" + // root
|
|
|
+ "-a(=[20 100 100 30 0]);" + // a
|
|
|
+ "-b(=[80 100 0 20 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,50,false,0);" + "a\t" // app1 in a
|
|
|
+ + "(3,1,n1,,50,false,30);" + "b\t" // app2 in a
|
|
|
+ + "(4,1,n1,,0,false,20)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // Complete demand request from QueueB for 20 resource must be preempted.
|
|
|
+ verify(mDisp, times(20)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(2))));
|
|
|
+ verify(mDisp, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNodePartitionIntraQueuePreemption() throws IOException {
|
|
|
+ /**
|
|
|
+ * The simplest test of node label, Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
|
|
|
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
|
|
|
+ * app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50
|
|
|
+ * NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending
|
|
|
+ * resource for x for app3 of priority 2
|
|
|
+ *
|
|
|
+ * After preemption, it should preempt 20 from app1
|
|
|
+ */
|
|
|
+
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;" + // default partition
|
|
|
+ "x=100,true"; // partition=x
|
|
|
+ String nodesConfig = "n1=x;" + // n1 has partition=x
|
|
|
+ "n2="; // n2 is default partition
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending
|
|
|
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
|
|
|
+ "-a(=[50 100 50 50],x=[50 100 50 50]);" + // a
|
|
|
+ "-b(=[50 100 50 50],x=[50 100 50 50])"; // b
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,x,50,false,10);" + // 50 * x in n1
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(1,1,n2,,50,false);" + // 50 default in n2
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(1,1,n1,x,50,false);" + // 50 * x in n1
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(1,1,n2,,50,false)"; // 50 default in n2
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // 20 preempted from app1
|
|
|
+ verify(mDisp, times(20))
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
|
|
|
+ verify(mDisp, never())
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
|
|
|
+ verify(mDisp, never())
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testComplexIntraQueuePreemption() throws IOException {
|
|
|
+ /**
|
|
|
+ * The complex test preemption, Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / | | \
|
|
|
+ * a b c d
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
|
|
|
+ * 100
|
|
|
+ * All queues under its capacity, but within each queue there are many
|
|
|
+ * under served applications.
|
|
|
+ */
|
|
|
+
|
|
|
+ // report "ideal" preempt as 50%. Ensure preemption happens only for 50%
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
|
+ (float) 0.5);
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 75 130 0]);" + // root
|
|
|
+ "-a(=[10 100 5 50 0]);" + // a
|
|
|
+ "-b(=[40 100 35 60 0]);" + // b
|
|
|
+ "-c(=[20 100 10 10 0]);" + // c
|
|
|
+ "-d(=[30 100 25 10 0])"; // d
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,
|
|
|
+ // pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app1 a
|
|
|
+ "a\t"
|
|
|
+ + "(4,1,n1,,0,false,25);" + // app2 a
|
|
|
+ "a\t"
|
|
|
+ + "(5,1,n1,,0,false,2);" + // app3 a
|
|
|
+ "b\t"
|
|
|
+ + "(3,1,n1,,5,false,20);" + // app4 b
|
|
|
+ "b\t"
|
|
|
+ + "(4,1,n1,,15,false,10);" + // app5 b
|
|
|
+ "b\t"
|
|
|
+ + "(4,1,n1,,10,false,10);" + // app6 b
|
|
|
+ "b\t"
|
|
|
+ + "(5,1,n1,,3,false,5);" + // app7 b
|
|
|
+ "b\t"
|
|
|
+ + "(5,1,n1,,0,false,2);" + // app8 b
|
|
|
+ "b\t"
|
|
|
+ + "(6,1,n1,,2,false,10);" + // app9 in b
|
|
|
+ "c\t"
|
|
|
+ + "(1,1,n1,,8,false,10);" + // app10 in c
|
|
|
+ "c\t"
|
|
|
+ + "(1,1,n1,,2,false,5);" + // app11 in c
|
|
|
+ "c\t"
|
|
|
+ + "(2,1,n1,,0,false,3);" + "d\t" // app12 in c
|
|
|
+ + "(2,1,n1,,25,false,0);" + "d\t" // app13 in d
|
|
|
+ + "(1,1,n1,,0,false,20)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // High priority app in queueA has 30 resource demand. But low priority
|
|
|
+ // app has only 5 resource. Hence preempt 4 here sparing AM.
|
|
|
+ verify(mDisp, times(4)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ // Multiple high priority apps has demand of 17. This will be preempted
|
|
|
+ // from another set of low priority apps.
|
|
|
+ verify(mDisp, times(4)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(4))));
|
|
|
+ verify(mDisp, times(9)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(6))));
|
|
|
+ verify(mDisp, times(4)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(5))));
|
|
|
+ // Only 3 resources will be freed in this round for queue C as we
|
|
|
+ // are trying to save AM container.
|
|
|
+ verify(mDisp, times(2)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(10))));
|
|
|
+ verify(mDisp, times(1)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(11))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testIntraQueuePreemptionWithTwoUsers()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
|
|
|
+ * Consider 2 users in a queue, assume minimum user limit factor is 50%.
|
|
|
+ * Hence in queueB of 40, each use has a quota of 20. app4 of high priority
|
|
|
+ * has a demand of 30 and its already using 5. Adhering to userlimit only
|
|
|
+ * 15 more must be preempted. If its same user,20 would have been preempted
|
|
|
+ */
|
|
|
+
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=100";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[100 100 55 170 0]);" + // root
|
|
|
+ "-a(=[60 100 10 50 0]);" + // a
|
|
|
+ "-b(=[40 100 40 120 0])"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,,5,false,25);" + // app1 a
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(2,1,n1,,5,false,25);" + // app2 a
|
|
|
+ "b\t" // app3 in b
|
|
|
+ + "(4,1,n1,,35,false,20,user1);" + // app3 b
|
|
|
+ "b\t" // app4 in b
|
|
|
+ + "(6,1,n1,,5,false,30,user2)";
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // Considering user-limit of 50% since only 2 users are there, only preempt
|
|
|
+ // 15 more (5 is already running) eventhough demand is for 30.
|
|
|
+ verify(mDisp, times(15)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(3))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testComplexNodePartitionIntraQueuePreemption()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * The simplest test of node label, Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Scenario:
|
|
|
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
|
|
|
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
|
|
|
+ * app1-app4 in a, and app5-app9 in b.
|
|
|
+ *
|
|
|
+ */
|
|
|
+
|
|
|
+ // Set max preemption limit as 50%.
|
|
|
+ conf.setFloat(CapacitySchedulerConfiguration.
|
|
|
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
|
+ (float) 0.5);
|
|
|
+
|
|
|
+ String labelsConfig = "=100,true;" + // default partition
|
|
|
+ "x=100,true"; // partition=x
|
|
|
+ String nodesConfig = "n1=x;" + // n1 has partition=x
|
|
|
+ "n2="; // n2 is default partition
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending
|
|
|
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
|
|
|
+ "-a(=[50 100 50 50],x=[50 100 40 50]);" + // a
|
|
|
+ "-b(=[50 100 35 50],x=[50 100 50 50])"; // b
|
|
|
+ String appsConfig =
|
|
|
+ // queueName\t(priority,resource,host,expression,#repeat,reserved)
|
|
|
+ "a\t" // app1 in a
|
|
|
+ + "(1,1,n1,x,35,false,10);" + // 20 * x in n1
|
|
|
+ "a\t" // app2 in a
|
|
|
+ + "(1,1,n1,x,5,false,10);" + // 20 * x in n1
|
|
|
+ "a\t" // app3 in a
|
|
|
+ + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
|
|
|
+ "a\t" // app4 in a
|
|
|
+ + "(1,1,n2,,50,false);" + // 50 default in n2
|
|
|
+ "b\t" // app5 in b
|
|
|
+ + "(1,1,n1,x,50,false);" + // 50 * x in n1
|
|
|
+ "b\t" // app6 in b
|
|
|
+ + "(1,1,n2,,25,false);" + // 25 * default in n2
|
|
|
+ "b\t" // app7 in b
|
|
|
+ + "(1,1,n2,,3,false);" + // 3 * default in n2
|
|
|
+ "b\t" // app8 in b
|
|
|
+ + "(1,1,n2,,2,false);" + // 2 * default in n2
|
|
|
+ "b\t" // app9 in b
|
|
|
+ + "(5,1,n2,,5,false,30)"; // 50 default in n2
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // Label X: app3 has demand of 20 for label X. Hence app2 will loose
|
|
|
+ // 4 (sparing AM) and 16 more from app1 till preemption limit is met.
|
|
|
+ verify(mDisp, times(16))
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
|
|
|
+ verify(mDisp, times(4))
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
|
|
|
+
|
|
|
+ // Default Label:For a demand of 30, preempt from all low priority
|
|
|
+ // apps of default label. 25 will be preempted as preemption limit is
|
|
|
+ // met.
|
|
|
+ verify(mDisp, times(1))
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
|
|
|
+ verify(mDisp, times(2))
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
|
|
|
+ verify(mDisp, times(22))
|
|
|
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
|
|
|
+ }
|
|
|
+}
|