|
@@ -19,9 +19,11 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
|
|
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
|
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
import org.junit.Before;
|
|
@@ -29,8 +31,10 @@ import org.junit.Test;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
|
+import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.ArgumentMatchers.argThat;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
+import static org.mockito.Mockito.reset;
|
|
|
import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -174,4 +178,72 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
|
|
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
getAppAttemptId(2))));
|
|
|
}
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Test
|
|
|
+ public void testInterQueuePreemptionWithStrictAndRelaxedDRF()
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ /*
|
|
|
+ * root
|
|
|
+ * / \ \
|
|
|
+ * a b c
|
|
|
+ *
|
|
|
+ * A / B / C have 33.3 / 33.3 / 33.4 resources
|
|
|
+ * Total cluster resource have mem=61440, cpu=600
|
|
|
+ *
|
|
|
+ * +=================+========================+
|
|
|
+ * | used in queue a | user limit for queue a |
|
|
|
+ * +=================+========================+
|
|
|
+ * | 61440:60 | 20480:200 |
|
|
|
+ * +=================+========================+
|
|
|
+ * In this case, the used memory is over the user limit but the used vCores
|
|
|
+ * is not. If conservative DRF is true, preemptions will not occur.
|
|
|
+ * If conservative DRF is false (default) preemptions will occur.
|
|
|
+ */
|
|
|
+ String labelsConfig = "=61440:600,true;";
|
|
|
+ String nodesConfig = "n1= res=61440:600"; // n1 is default partition
|
|
|
+ String queuesConfig =
|
|
|
+ // guaranteed,max,used,pending,reserved
|
|
|
+ "root(=[61440:600 61440:600 61440:600 20480:20 0]);" + // root
|
|
|
+ "-a(=[20480:200 61440:600 61440:60 0:0 0]);" + // b
|
|
|
+ "-b(=[20480:200 61440:600 0:0 20480:20 0]);" + // a
|
|
|
+ "-c(=[20480:200 61440:600 0:0 0:0 0])"; // c
|
|
|
+ String appsConfig =
|
|
|
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
|
|
|
+ "a\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in a
|
|
|
+ "b\t" + "(1,0:0,n1,,0,false,20480:20,user2);"; // app2 in b
|
|
|
+
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
|
|
|
+ true);
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ Resource ul = Resource.newInstance(20480, 20);
|
|
|
+ when(((LeafQueue)(cs.getQueue("root.a")))
|
|
|
+ .getResourceLimitForAllUsers(any(), any(), any(), any())
|
|
|
+ ).thenReturn(ul);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ verify(eventHandler, times(0)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+
|
|
|
+ reset(eventHandler);
|
|
|
+
|
|
|
+ conf.setBoolean(
|
|
|
+ CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
|
|
|
+ false);
|
|
|
+
|
|
|
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
|
|
+ ul = Resource.newInstance(20480, 20);
|
|
|
+ when(((LeafQueue)(cs.getQueue("root.a")))
|
|
|
+ .getResourceLimitForAllUsers(any(), any(), any(), any())
|
|
|
+ ).thenReturn(ul);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ verify(eventHandler, times(20)).handle(argThat(
|
|
|
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
|
|
+ getAppAttemptId(1))));
|
|
|
+ }
|
|
|
}
|