|
@@ -17,17 +17,25 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
import org.apache.commons.lang.math.RandomUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
|
|
|
+ .NullRMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
|
|
.ApplicationPlacementContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
|
@@ -51,13 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
|
|
.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
@@ -73,13 +85,14 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
.capacity.CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(
|
|
|
TestCapacitySchedulerAutoCreatedQueueBase.class);
|
|
|
- public final int GB = 1024;
|
|
|
+ public static final int GB = 1024;
|
|
|
public final static ContainerUpdates NULL_UPDATE_REQUESTS =
|
|
|
new ContainerUpdates();
|
|
|
|
|
@@ -107,6 +120,12 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
public static final float C1_CAPACITY = 20f;
|
|
|
public static final float C2_CAPACITY = 20f;
|
|
|
|
|
|
+ public static final int NODE_MEMORY = 16;
|
|
|
+
|
|
|
+ public static final int NODE1_VCORES = 16;
|
|
|
+ public static final int NODE2_VCORES = 32;
|
|
|
+ public static final int NODE3_VCORES = 48;
|
|
|
+
|
|
|
public static final String USER = "user_";
|
|
|
public static final String USER0 = USER + 0;
|
|
|
public static final String USER1 = USER + 1;
|
|
@@ -120,6 +139,9 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
public static final String NODEL_LABEL_SSD = "SSD";
|
|
|
|
|
|
protected MockRM mockRM = null;
|
|
|
+ protected MockNM nm1 = null;
|
|
|
+ protected MockNM nm2 = null;
|
|
|
+ protected MockNM nm3 = null;
|
|
|
protected CapacityScheduler cs;
|
|
|
private final TestCapacityScheduler tcs = new TestCapacityScheduler();
|
|
|
protected SpyDispatcher dispatcher;
|
|
@@ -163,16 +185,43 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
|
|
|
setupQueueMappings(conf);
|
|
|
|
|
|
- mockRM = new MockRM(conf);
|
|
|
- cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
-
|
|
|
dispatcher = new SpyDispatcher();
|
|
|
rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler();
|
|
|
dispatcher.register(RMAppEventType.class, rmAppEventEventHandler);
|
|
|
+
|
|
|
+ RMNodeLabelsManager mgr = setupNodeLabelManager(conf);
|
|
|
+
|
|
|
+ mockRM = new MockRM(conf) {
|
|
|
+ protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
cs.updatePlacementRules();
|
|
|
mockRM.start();
|
|
|
-
|
|
|
cs.start();
|
|
|
+
|
|
|
+ setupNodes(mockRM);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void setupNodes(MockRM newMockRM) throws Exception {
|
|
|
+ nm1 = // label = SSD
|
|
|
+ new MockNM("h1:1234", NODE_MEMORY * GB, NODE1_VCORES, newMockRM
|
|
|
+ .getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+
|
|
|
+ nm2 = // label = GPU
|
|
|
+ new MockNM("h2:1234", NODE_MEMORY * GB, NODE2_VCORES, newMockRM
|
|
|
+ .getResourceTrackerService
|
|
|
+ ());
|
|
|
+ nm2.registerNode();
|
|
|
+
|
|
|
+ nm3 = // label = ""
|
|
|
+ new MockNM("h3:1234", NODE_MEMORY * GB, NODE3_VCORES, newMockRM
|
|
|
+ .getResourceTrackerService
|
|
|
+ ());
|
|
|
+ nm3.registerNode();
|
|
|
}
|
|
|
|
|
|
public static CapacitySchedulerConfiguration setupQueueMappings(
|
|
@@ -340,21 +389,23 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
return queueMappings;
|
|
|
}
|
|
|
|
|
|
- protected MockRM setupSchedulerInstance() {
|
|
|
+ protected MockRM setupSchedulerInstance() throws Exception {
|
|
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
setupQueueConfiguration(conf);
|
|
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
ResourceScheduler.class);
|
|
|
|
|
|
- List<String> queuePlacementRules = new ArrayList<String>();
|
|
|
- queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
|
|
- conf.setQueuePlacementRules(queuePlacementRules);
|
|
|
-
|
|
|
setupQueueMappings(conf);
|
|
|
|
|
|
- MockRM newMockRM = new MockRM(conf);
|
|
|
+ RMNodeLabelsManager mgr = setupNodeLabelManager(conf);
|
|
|
+ MockRM newMockRM = new MockRM(conf) {
|
|
|
+ protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
newMockRM.start();
|
|
|
((CapacityScheduler) newMockRM.getResourceScheduler()).start();
|
|
|
+ setupNodes(newMockRM);
|
|
|
return newMockRM;
|
|
|
}
|
|
|
|
|
@@ -390,6 +441,21 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
return parentQueue + DOT + leafQueue;
|
|
|
}
|
|
|
|
|
|
+ protected RMNodeLabelsManager setupNodeLabelManager(
|
|
|
+ CapacitySchedulerConfiguration conf) throws IOException {
|
|
|
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
|
|
+ mgr.init(conf);
|
|
|
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(
|
|
|
+ ImmutableSet.of(NODEL_LABEL_SSD, NODEL_LABEL_GPU));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap
|
|
|
+ .of(NodeId.newInstance("h1", 0),
|
|
|
+ TestUtils.toSet(NODEL_LABEL_SSD)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap
|
|
|
+ .of(NodeId.newInstance("h2", 0),
|
|
|
+ TestUtils.toSet(NODEL_LABEL_GPU)));
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+
|
|
|
protected ApplicationAttemptId submitApp(CapacityScheduler newCS, String user,
|
|
|
String queue, String parentQueue) {
|
|
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
|
@@ -460,8 +526,19 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
AutoCreatedLeafQueue leafQueue =
|
|
|
(AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
|
|
|
|
|
|
+ Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
|
|
|
+ QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
|
|
|
+ .getQueueCapacities();
|
|
|
+
|
|
|
for (String label : accessibleNodeLabelsOnC) {
|
|
|
validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label);
|
|
|
+
|
|
|
+ QueueEntitlement expectedEntitlement = new QueueEntitlement(
|
|
|
+ cap.getCapacity(label), cap.getMaximumCapacity(label));
|
|
|
+
|
|
|
+ expectedEntitlements.put(label, expectedEntitlement);
|
|
|
+
|
|
|
+ validateEffectiveMinResource(leafQueue, label, expectedEntitlements);
|
|
|
}
|
|
|
|
|
|
assertEquals(true, policy.isActive(leafQueue));
|
|
@@ -480,6 +557,28 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
leafQueue.getQueueCapacities().getMaximumCapacity(label), EPSILON);
|
|
|
}
|
|
|
|
|
|
+ protected void validateEffectiveMinResource(CSQueue leafQueue,
|
|
|
+ String label, Map<String, QueueEntitlement> expectedQueueEntitlements) {
|
|
|
+ ManagedParentQueue parentQueue = (ManagedParentQueue) leafQueue.getParent();
|
|
|
+
|
|
|
+ Resource resourceByLabel = mockRM.getRMContext().getNodeLabelManager().
|
|
|
+ getResourceByLabel(label, cs.getClusterResource());
|
|
|
+ Resource effMinCapacity = Resources.multiply(resourceByLabel,
|
|
|
+ expectedQueueEntitlements.get(label).getCapacity() * parentQueue
|
|
|
+ .getQueueCapacities().getAbsoluteCapacity(label));
|
|
|
+ assertEquals(effMinCapacity, Resources.multiply(resourceByLabel,
|
|
|
+ leafQueue.getQueueCapacities().getAbsoluteCapacity(label)));
|
|
|
+ assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label));
|
|
|
+
|
|
|
+ if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) {
|
|
|
+ assertTrue(Resources
|
|
|
+ .greaterThan(cs.getResourceCalculator(), cs.getClusterResource(),
|
|
|
+ effMinCapacity, Resources.none()));
|
|
|
+ } else{
|
|
|
+ assertTrue(Resources.equals(effMinCapacity, Resources.none()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected void validateActivatedQueueEntitlement(CSQueue parentQueue,
|
|
|
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
|
|
List<QueueManagementChange> queueManagementChanges)
|
|
@@ -552,6 +651,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
QueueEntitlement expectedQueueEntitlement,
|
|
|
final List<QueueManagementChange> queueEntitlementChanges) {
|
|
|
boolean found = false;
|
|
|
+
|
|
|
+ Map<String, QueueEntitlement> expectedQueueEntitlements = new HashMap<>();
|
|
|
for (QueueManagementChange entitlementChange : queueEntitlementChanges) {
|
|
|
if (leafQueue.getQueueName().equals(
|
|
|
entitlementChange.getQueue().getQueueName())) {
|
|
@@ -565,6 +666,9 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
updatedQueueTemplate.getQueueCapacities()
|
|
|
.getMaximumCapacity(label));
|
|
|
assertEquals(expectedQueueEntitlement, newEntitlement);
|
|
|
+ expectedQueueEntitlements.put(label, expectedQueueEntitlement);
|
|
|
+ validateEffectiveMinResource(leafQueue, label,
|
|
|
+ expectedQueueEntitlements);
|
|
|
}
|
|
|
found = true;
|
|
|
break;
|