|
@@ -18,11 +18,14 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
|
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -189,6 +192,86 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
|
|
|
getAppAttemptId(1))));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Test
|
|
|
+ public void testIntraQueuePreemptionFairOrdering3ResourcesWithStrictAndRelaxedDRF()
|
|
|
+ throws IOException {
|
|
|
+ /**
|
|
|
+ * Continue to allow intra-queue preemption when only one of the user's
|
|
|
+ * resources is above the user limit.
|
|
|
+ * Queue structure is:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / |
|
|
|
+ * a b
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Guaranteed resource of a and b are 30720:300 and 30720:300 Total cluster
|
|
|
+ * resource = 61440:600.
|
|
|
+ * Scenario: Queue B has one running app using 61720:60 resources with no
|
|
|
+ * pending resources, and one app with no used resources and 30720:30
|
|
|
+ * pending resources.
|
|
|
+ *
|
|
|
+ * The first part of the test is to show what happens when the conservative
|
|
|
+ * DRF property is set. Since the memory is above and the vcores is below
|
|
|
+ * the user limit, only the minimum number of containers is allowed.
|
|
|
+ * In the second part, since conservative DRF is relaxed, all containers
|
|
|
+ * needed are allowed to be preempted (minus the AM size).
|
|
|
+ */
|
|
|
+
|
|
|
+ conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
|
|
|
+ "userlimit_first");
|
|
|
+ conf.set(CapacitySchedulerConfiguration.PREFIX
|
|
|
+ + "root.b." + CapacitySchedulerConfiguration.ORDERING_POLICY, "fair");
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
|
|
|
+ true);
|
|
|
+ String RESOURCE_1 = "res1";
|
|
|
+ riMap.put(RESOURCE_1, ResourceInformation
|
|
|
+ .newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
|
|
|
+ Integer.MAX_VALUE));
|
|
|
+
|
|
|
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
|
|
+
|
|
|
+ String labelsConfig = "=61440:600,true;";
|
|
|
+ String nodesConfig = // n1 has no label
|
|
|
+ "n1= res=61440:600";
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[61440:600 61440:600 61440:600 30720:30 0]);" + // root
|
|
|
+ "-a(=[30720:300 61440:600 0:0 0:0 0]);" + // a
|
|
|
+ "-b(=[30720:300 61440:600 61440:60 30720:30 0]);"; // b
|
|
|
+
|
|
|
+ String appsConfig =
|
|
|
+ "b\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in b
|
|
|
+ "b\t" + "(1,0:0,n1,,0,false,30720:30,user3);"; // app2 in b
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ Resource ul = Resource.newInstance(30720, 300);
|
|
|
+ when(((LeafQueue)(cs.getQueue("root.b")))
|
|
|
+ .getResourceLimitForAllUsers(any(), any(), any(), any())
|
|
|
+ ).thenReturn(ul);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ verify(eventHandler, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ reset(eventHandler);
|
|
|
+
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
|
|
|
+ false);
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ when(((LeafQueue)(cs.getQueue("root.b")))
|
|
|
+ .getResourceLimitForAllUsers(any(), any(), any(), any())
|
|
|
+ ).thenReturn(ul);
|
|
|
+ policy.editSchedule();
|
|
|
+ verify(eventHandler, times(29)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testIntraQueuePreemptionWithDominantVCoreResource()
|
|
|
throws IOException {
|