|
@@ -110,15 +110,17 @@ public class TestParentQueue {
|
|
|
|
|
|
private static final String A = "a";
|
|
|
private static final String B = "b";
|
|
|
+ private static final String Q_A =
|
|
|
+ CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
|
+ private static final String Q_B =
|
|
|
+ CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
|
|
|
|
|
|
// Define top-level queues
|
|
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B});
|
|
|
|
|
|
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
|
conf.setCapacity(Q_A, 30);
|
|
|
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
conf.setCapacity(Q_B, 70);
|
|
|
|
|
|
LOG.info("Setup top-level queues a and b");
|
|
@@ -130,11 +132,9 @@ public class TestParentQueue {
|
|
|
// Define top-level queues
|
|
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});
|
|
|
|
|
|
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
|
conf.setMinimumResourceRequirement("", Q_A,
|
|
|
QUEUE_A_RESOURCE);
|
|
|
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
conf.setMinimumResourceRequirement("", Q_B,
|
|
|
QUEUE_B_RESOURCE);
|
|
|
|
|
@@ -370,9 +370,7 @@ public class TestParentQueue {
|
|
|
public void testSingleLevelQueuesPrecision() throws Exception {
|
|
|
// Setup queue configs
|
|
|
setupSingleLevelQueues(csConf);
|
|
|
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + "a";
|
|
|
csConf.setCapacity(Q_A, 30);
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + "b";
|
|
|
csConf.setCapacity(Q_B, 70.5F);
|
|
|
|
|
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
@@ -436,10 +434,8 @@ public class TestParentQueue {
|
|
|
// Define top-level queues
|
|
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
|
|
|
|
|
|
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
|
conf.setCapacity(Q_A, 10);
|
|
|
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
conf.setCapacity(Q_B, 50);
|
|
|
|
|
|
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
|
@@ -658,7 +654,6 @@ public class TestParentQueue {
|
|
|
setupMultiLevelQueues(csConf);
|
|
|
|
|
|
// set child queues capacity to 0 when parents not 0
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
csConf.setCapacity(Q_B + "." + B1, 0);
|
|
|
csConf.setCapacity(Q_B + "." + B2, 0);
|
|
|
csConf.setCapacity(Q_B + "." + B3, 0);
|
|
@@ -675,9 +670,7 @@ public class TestParentQueue {
|
|
|
setupMultiLevelQueues(csConf);
|
|
|
|
|
|
// set parent capacity to 0 when child not 0
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
csConf.setCapacity(Q_B, 0);
|
|
|
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
|
csConf.setCapacity(Q_A, 60);
|
|
|
|
|
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
@@ -692,13 +685,11 @@ public class TestParentQueue {
|
|
|
setupMultiLevelQueues(csConf);
|
|
|
|
|
|
// set parent and child capacity to 0
|
|
|
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
|
csConf.setCapacity(Q_B, 0);
|
|
|
csConf.setCapacity(Q_B + "." + B1, 0);
|
|
|
csConf.setCapacity(Q_B + "." + B2, 0);
|
|
|
csConf.setCapacity(Q_B + "." + B3, 0);
|
|
|
|
|
|
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
|
csConf.setCapacity(Q_A, 60);
|
|
|
|
|
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
@@ -1026,10 +1017,125 @@ public class TestParentQueue {
|
|
|
QUEUE_B_RESOURCE_70PERC);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
|
|
|
+ // Setup queue configs
|
|
|
+ setupSingleLevelQueuesWithAbsoluteResource(csConf);
|
|
|
+
|
|
|
+ CSQueueStore queues = new CSQueueStore();
|
|
|
+ CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
|
|
|
+ null, CapacitySchedulerConfiguration.ROOT, queues, queues,
|
|
|
+ TestUtils.spyHook);
|
|
|
+
|
|
|
+ // Setup some nodes
|
|
|
+ int numNodes = 2;
|
|
|
+ final long memoryPerNode = (QUEUE_A_RESOURCE.getMemorySize() +
|
|
|
+ QUEUE_B_RESOURCE.getMemorySize()) / numNodes;
|
|
|
+ int coresPerNode = (QUEUE_A_RESOURCE.getVirtualCores() +
|
|
|
+ QUEUE_B_RESOURCE.getVirtualCores()) / numNodes;
|
|
|
+
|
|
|
+ Resource clusterResource = Resources.createResource(
|
|
|
+ numNodes * memoryPerNode, numNodes * coresPerNode);
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ // Start testing
|
|
|
+ // Only MaximumSystemApplications is set in csConf
|
|
|
+ LeafQueue a = (LeafQueue) queues.get(A);
|
|
|
+ LeafQueue b = (LeafQueue) queues.get(B);
|
|
|
+
|
|
|
+ float queueAScale = (float) QUEUE_A_RESOURCE.getMemorySize() /
|
|
|
+ (float) clusterResource.getMemorySize();
|
|
|
+ float queueBScale = (float) QUEUE_B_RESOURCE.getMemorySize() /
|
|
|
+ (float) clusterResource.getMemorySize();
|
|
|
+
|
|
|
+ assertEquals(queueAScale, a.getQueueCapacities().getCapacity(),
|
|
|
+ DELTA);
|
|
|
+ assertEquals(1f, a.getQueueCapacities().getMaximumCapacity(),
|
|
|
+ DELTA);
|
|
|
+ assertEquals(queueAScale, a.getQueueCapacities().getAbsoluteCapacity(),
|
|
|
+ DELTA);
|
|
|
+ assertEquals(1f,
|
|
|
+ a.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA);
|
|
|
+ assertEquals((int) (csConf.getMaximumSystemApplications() * queueAScale),
|
|
|
+ a.getMaxApplications());
|
|
|
+ assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
|
|
|
+
|
|
|
+ assertEquals(queueBScale,
|
|
|
+ b.getQueueCapacities().getCapacity(), DELTA);
|
|
|
+ assertEquals(1f,
|
|
|
+ b.getQueueCapacities().getMaximumCapacity(), DELTA);
|
|
|
+ assertEquals(queueBScale,
|
|
|
+ b.getQueueCapacities().getAbsoluteCapacity(), DELTA);
|
|
|
+ assertEquals(1f,
|
|
|
+ b.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA);
|
|
|
+ assertEquals((int) (csConf.getMaximumSystemApplications() * queueBScale),
|
|
|
+ b.getMaxApplications());
|
|
|
+ assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
|
|
+
|
|
|
+ // Set GlobalMaximumApplicationsPerQueue in csConf
|
|
|
+ csConf.setGlobalMaximumApplicationsPerQueue(20000);
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() *
|
|
|
+ queueAScale), a.getMaxApplications());
|
|
|
+ assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
|
|
|
+ assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() *
|
|
|
+ queueBScale), b.getMaxApplications());
|
|
|
+ assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
|
|
+
|
|
|
+ // Set MaximumApplicationsPerQueue in csConf
|
|
|
+ int queueAMaxApplications = 30000;
|
|
|
+ int queueBMaxApplications = 30000;
|
|
|
+ csConf.set("yarn.scheduler.capacity." + Q_A + ".maximum-applications",
|
|
|
+ Integer.toString(queueAMaxApplications));
|
|
|
+ csConf.set("yarn.scheduler.capacity." + Q_B + ".maximum-applications",
|
|
|
+ Integer.toString(queueBMaxApplications));
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ assertEquals(queueAMaxApplications, a.getMaxApplications());
|
|
|
+ assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
|
|
|
+ assertEquals(queueBMaxApplications, b.getMaxApplications());
|
|
|
+ assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
|
|
+
|
|
|
+ // Extra cases for testing maxApplicationsPerUser
|
|
|
+ int halfPercent = 50;
|
|
|
+ int oneAndQuarterPercent = 125;
|
|
|
+ a.getUsersManager().setUserLimit(halfPercent);
|
|
|
+ b.getUsersManager().setUserLimit(oneAndQuarterPercent);
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ assertEquals(a.getMaxApplications() * halfPercent / 100,
|
|
|
+ a.getMaxApplicationsPerUser());
|
|
|
+ // Q_B's limit per user shouldn't be greater
|
|
|
+ // than the whole queue's application limit
|
|
|
+ assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
|
|
+
|
|
|
+ float userLimitFactorQueueA = 0.9f;
|
|
|
+ float userLimitFactorQueueB = 1.1f;
|
|
|
+ a.getUsersManager().setUserLimit(halfPercent);
|
|
|
+ a.getUsersManager().setUserLimitFactor(userLimitFactorQueueA);
|
|
|
+ b.getUsersManager().setUserLimit(100);
|
|
|
+ b.getUsersManager().setUserLimitFactor(userLimitFactorQueueB);
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ assertEquals((int) (a.getMaxApplications() * halfPercent *
|
|
|
+ userLimitFactorQueueA / 100), a.getMaxApplicationsPerUser());
|
|
|
+ // Q_B's limit per user shouldn't be greater
|
|
|
+ // than the whole queue's application limit
|
|
|
+ assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
@After
|
|
|
public void tearDown() throws Exception {
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private ResourceLimits anyResourceLimits() {
|
|
|
return any(ResourceLimits.class);
|
|
|
}
|