|
@@ -26,51 +26,54 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
|
-import org.apache.hadoop.yarn.event.Event;
|
|
|
|
-import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
|
-import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.placement
|
|
|
|
+ .ApplicationPlacementContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
|
|
|
|
+ .RMAppAttemptState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
|
+ .SchedulerDynamicEditException;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
|
|
+ .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
|
|
|
|
+ .QueueEntitlement;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
|
|
|
+ .AppAddedSchedulerEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
|
|
|
+ .NodeAddedSchedulerEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
|
|
|
+ .SchedulerEvent;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy
|
|
|
|
+ .FairOrderingPolicy;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security
|
|
|
|
+ .ClientToAMTokenSecretManagerInRM;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security
|
|
|
|
+ .NMTokenSecretManagerInRM;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security
|
|
|
|
+ .RMContainerTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
-import org.junit.After;
|
|
|
|
-import org.junit.Before;
|
|
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.util.ArrayList;
|
|
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.concurrent.BlockingQueue;
|
|
|
|
-import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
-import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
|
|
|
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
|
|
|
|
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.server.resourcemanager.placement
|
|
|
|
+ .UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
|
|
|
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
|
+ .capacity.CSQueueUtils.EPSILON;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertNotNull;
|
|
import static org.junit.Assert.assertNotNull;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
@@ -79,198 +82,14 @@ import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Tests for creation and reinitilization of auto created leaf queues
|
|
|
|
|
|
+ * Tests for creation and reinitialization of auto created leaf queues
|
|
* under a ManagedParentQueue.
|
|
* under a ManagedParentQueue.
|
|
*/
|
|
*/
|
|
-public class TestCapacitySchedulerAutoQueueCreation {
|
|
|
|
-
|
|
|
|
- private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
|
|
|
|
- private final int GB = 1024;
|
|
|
|
- private final static ContainerUpdates NULL_UPDATE_REQUESTS =
|
|
|
|
- new ContainerUpdates();
|
|
|
|
-
|
|
|
|
- private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
|
- private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
|
- private static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
|
|
|
- private static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
|
|
|
|
- private static final String A1 = A + ".a1";
|
|
|
|
- private static final String A2 = A + ".a2";
|
|
|
|
- private static final String B1 = B + ".b1";
|
|
|
|
- private static final String B2 = B + ".b2";
|
|
|
|
- private static final String B3 = B + ".b3";
|
|
|
|
- private static final String C1 = C + ".c1";
|
|
|
|
- private static final String C2 = C + ".c2";
|
|
|
|
- private static final String C3 = C + ".c3";
|
|
|
|
- private static float A_CAPACITY = 20f;
|
|
|
|
- private static float B_CAPACITY = 40f;
|
|
|
|
- private static float C_CAPACITY = 20f;
|
|
|
|
- private static float D_CAPACITY = 20f;
|
|
|
|
- private static float A1_CAPACITY = 30;
|
|
|
|
- private static float A2_CAPACITY = 70;
|
|
|
|
- private static float B1_CAPACITY = 60f;
|
|
|
|
- private static float B2_CAPACITY = 20f;
|
|
|
|
- private static float B3_CAPACITY = 20f;
|
|
|
|
- private static float C1_CAPACITY = 20f;
|
|
|
|
- private static float C2_CAPACITY = 20f;
|
|
|
|
-
|
|
|
|
- private static String USER = "user_";
|
|
|
|
- private static String USER0 = USER + 0;
|
|
|
|
- private static String USER2 = USER + 2;
|
|
|
|
- private static String PARENT_QUEUE = "c";
|
|
|
|
-
|
|
|
|
- private MockRM mockRM = null;
|
|
|
|
-
|
|
|
|
- private CapacityScheduler cs;
|
|
|
|
-
|
|
|
|
- private final TestCapacityScheduler tcs = new TestCapacityScheduler();
|
|
|
|
-
|
|
|
|
- private static SpyDispatcher dispatcher;
|
|
|
|
-
|
|
|
|
- private static EventHandler<Event> rmAppEventEventHandler;
|
|
|
|
-
|
|
|
|
- private static class SpyDispatcher extends AsyncDispatcher {
|
|
|
|
-
|
|
|
|
- private static BlockingQueue<Event> eventQueue =
|
|
|
|
- new LinkedBlockingQueue<>();
|
|
|
|
-
|
|
|
|
- private static class SpyRMAppEventHandler implements EventHandler<Event> {
|
|
|
|
- public void handle(Event event) {
|
|
|
|
- eventQueue.add(event);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- protected void dispatch(Event event) {
|
|
|
|
- eventQueue.add(event);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public EventHandler<Event> getEventHandler() {
|
|
|
|
- return rmAppEventEventHandler;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- void spyOnNextEvent(Event expectedEvent, long timeout)
|
|
|
|
- throws InterruptedException {
|
|
|
|
-
|
|
|
|
- Event event = eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
|
|
|
|
- assertEquals(expectedEvent.getType(), event.getType());
|
|
|
|
- assertEquals(expectedEvent.getClass(), event.getClass());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Before
|
|
|
|
- public void setUp() throws Exception {
|
|
|
|
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
|
- setupQueueConfiguration(conf);
|
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
- ResourceScheduler.class);
|
|
|
|
-
|
|
|
|
- List<String> queuePlacementRules = new ArrayList<>();
|
|
|
|
- queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
|
|
|
- conf.setQueuePlacementRules(queuePlacementRules);
|
|
|
|
-
|
|
|
|
- setupQueueMappings(conf);
|
|
|
|
-
|
|
|
|
- mockRM = new MockRM(conf);
|
|
|
|
- cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
|
|
|
-
|
|
|
|
- dispatcher = new SpyDispatcher();
|
|
|
|
- rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler();
|
|
|
|
- dispatcher.register(RMAppEventType.class, rmAppEventEventHandler);
|
|
|
|
- cs.updatePlacementRules();
|
|
|
|
- mockRM.start();
|
|
|
|
-
|
|
|
|
- cs.start();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private CapacitySchedulerConfiguration setupQueueMappings(
|
|
|
|
- CapacitySchedulerConfiguration conf) {
|
|
|
|
-
|
|
|
|
- //set queue mapping
|
|
|
|
- List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
|
|
|
|
- new ArrayList<>();
|
|
|
|
- for (int i = 0; i <= 3; i++) {
|
|
|
|
- //Set C as parent queue name for auto queue creation
|
|
|
|
- UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
|
|
|
|
- new UserGroupMappingPlacementRule.QueueMapping(
|
|
|
|
- UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
|
|
|
- USER + i, getQueueMapping(PARENT_QUEUE, USER + i));
|
|
|
|
- queueMappings.add(userQueueMapping);
|
|
|
|
- }
|
|
|
|
|
|
+public class TestCapacitySchedulerAutoQueueCreation
|
|
|
|
+ extends TestCapacitySchedulerAutoCreatedQueueBase {
|
|
|
|
|
|
- conf.setQueueMappings(queueMappings);
|
|
|
|
- //override with queue mappings
|
|
|
|
- conf.setOverrideWithQueueMappings(true);
|
|
|
|
- return conf;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @param conf, to be modified
|
|
|
|
- * @return, CS configuration which has C
|
|
|
|
- * as an auto creation enabled parent queue
|
|
|
|
- * <p>
|
|
|
|
- * root
|
|
|
|
- * / \ \ \
|
|
|
|
- * a b c d
|
|
|
|
- * / \ / | \
|
|
|
|
- * a1 a2 b1 b2 b3
|
|
|
|
- */
|
|
|
|
- private CapacitySchedulerConfiguration setupQueueConfiguration(
|
|
|
|
- CapacitySchedulerConfiguration conf) {
|
|
|
|
-
|
|
|
|
- //setup new queues with one of them auto enabled
|
|
|
|
- // Define top-level queues
|
|
|
|
- // Set childQueue for root
|
|
|
|
- conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
|
- new String[] { "a", "b", "c", "d" });
|
|
|
|
-
|
|
|
|
- conf.setCapacity(A, A_CAPACITY);
|
|
|
|
- conf.setCapacity(B, B_CAPACITY);
|
|
|
|
- conf.setCapacity(C, C_CAPACITY);
|
|
|
|
- conf.setCapacity(D, D_CAPACITY);
|
|
|
|
-
|
|
|
|
- // Define 2nd-level queues
|
|
|
|
- conf.setQueues(A, new String[] { "a1", "a2" });
|
|
|
|
- conf.setCapacity(A1, A1_CAPACITY);
|
|
|
|
- conf.setUserLimitFactor(A1, 100.0f);
|
|
|
|
- conf.setCapacity(A2, A2_CAPACITY);
|
|
|
|
- conf.setUserLimitFactor(A2, 100.0f);
|
|
|
|
-
|
|
|
|
- conf.setQueues(B, new String[] { "b1", "b2", "b3" });
|
|
|
|
- conf.setCapacity(B1, B1_CAPACITY);
|
|
|
|
- conf.setUserLimitFactor(B1, 100.0f);
|
|
|
|
- conf.setCapacity(B2, B2_CAPACITY);
|
|
|
|
- conf.setUserLimitFactor(B2, 100.0f);
|
|
|
|
- conf.setCapacity(B3, B3_CAPACITY);
|
|
|
|
- conf.setUserLimitFactor(B3, 100.0f);
|
|
|
|
-
|
|
|
|
- conf.setUserLimitFactor(C, 1.0f);
|
|
|
|
- conf.setAutoCreateChildQueueEnabled(C, true);
|
|
|
|
-
|
|
|
|
- //Setup leaf queue template configs
|
|
|
|
- conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f);
|
|
|
|
- conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
|
|
|
|
-
|
|
|
|
- LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
|
|
|
|
-
|
|
|
|
- conf.setUserLimitFactor(D, 1.0f);
|
|
|
|
- conf.setAutoCreateChildQueueEnabled(D, true);
|
|
|
|
-
|
|
|
|
- //Setup leaf queue template configs
|
|
|
|
- conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f);
|
|
|
|
- conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f);
|
|
|
|
-
|
|
|
|
- LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
|
|
|
|
-
|
|
|
|
- return conf;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @After
|
|
|
|
- public void tearDown() throws Exception {
|
|
|
|
- if (mockRM != null) {
|
|
|
|
- mockRM.stop();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(
|
|
|
|
+ TestCapacitySchedulerAutoQueueCreation.class);
|
|
|
|
|
|
@Test(timeout = 10000)
|
|
@Test(timeout = 10000)
|
|
public void testAutoCreateLeafQueueCreation() throws Exception {
|
|
public void testAutoCreateLeafQueueCreation() throws Exception {
|
|
@@ -289,7 +108,11 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
|
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
|
PARENT_QUEUE);
|
|
PARENT_QUEUE);
|
|
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
|
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
|
- validateCapacities(autoCreatedLeafQueue);
|
|
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, USER0, 0.1f);
|
|
|
|
+ validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
|
|
|
|
+
|
|
|
|
+ assertTrue(autoCreatedLeafQueue
|
|
|
|
+ .getOrderingPolicy() instanceof FairOrderingPolicy);
|
|
} finally {
|
|
} finally {
|
|
cleanupQueue(USER0);
|
|
cleanupQueue(USER0);
|
|
}
|
|
}
|
|
@@ -297,7 +120,6 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
|
|
public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
|
|
-
|
|
|
|
try {
|
|
try {
|
|
String host = "127.0.0.1";
|
|
String host = "127.0.0.1";
|
|
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
|
|
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
|
|
@@ -306,20 +128,28 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
|
|
|
|
// submit an app
|
|
// submit an app
|
|
|
|
|
|
- RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0,
|
|
|
|
|
|
+ RMApp app1 = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0,
|
|
null, USER0);
|
|
null, USER0);
|
|
|
|
+
|
|
|
|
+ RMApp app2 = mockRM.submitApp(GB, "test-auto-queue-creation-2", USER1,
|
|
|
|
+ null, USER1);
|
|
// check preconditions
|
|
// check preconditions
|
|
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
|
|
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
|
|
- assertEquals(1, appsInC.size());
|
|
|
|
|
|
+ assertEquals(2, appsInC.size());
|
|
|
|
|
|
assertNotNull(cs.getQueue(USER0));
|
|
assertNotNull(cs.getQueue(USER0));
|
|
|
|
+ assertNotNull(cs.getQueue(USER1));
|
|
|
|
|
|
- AutoCreatedLeafQueue autoCreatedLeafQueue =
|
|
|
|
- (AutoCreatedLeafQueue) cs.getQueue(USER0);
|
|
|
|
|
|
+ AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) cs.getQueue(
|
|
|
|
+ USER0);
|
|
|
|
+ AutoCreatedLeafQueue user1Queue = (AutoCreatedLeafQueue) cs.getQueue(
|
|
|
|
+ USER0);
|
|
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
|
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
|
PARENT_QUEUE);
|
|
PARENT_QUEUE);
|
|
- assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
|
|
|
- validateCapacities(autoCreatedLeafQueue);
|
|
|
|
|
|
+ assertEquals(parentQueue, user0Queue.getParent());
|
|
|
|
+ assertEquals(parentQueue, user1Queue.getParent());
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, USER0, 0.2f);
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, USER1, 0.2f);
|
|
|
|
|
|
ApplicationAttemptId appAttemptId = appsInC.get(0);
|
|
ApplicationAttemptId appAttemptId = appsInC.get(0);
|
|
|
|
|
|
@@ -337,7 +167,7 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
CapacityScheduler.schedule(cs);
|
|
CapacityScheduler.schedule(cs);
|
|
|
|
|
|
//change state to draining
|
|
//change state to draining
|
|
- autoCreatedLeafQueue.stopQueue();
|
|
|
|
|
|
+ user0Queue.stopQueue();
|
|
|
|
|
|
cs.killAllAppsInQueue(USER0);
|
|
cs.killAllAppsInQueue(USER0);
|
|
|
|
|
|
@@ -346,80 +176,24 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED);
|
|
mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED);
|
|
|
|
|
|
//change state to stopped
|
|
//change state to stopped
|
|
- autoCreatedLeafQueue.stopQueue();
|
|
|
|
|
|
+ user0Queue.stopQueue();
|
|
assertEquals(QueueState.STOPPED,
|
|
assertEquals(QueueState.STOPPED,
|
|
- autoCreatedLeafQueue.getQueueInfo().getQueueState());
|
|
|
|
|
|
+ user0Queue.getQueueInfo().getQueueState());
|
|
|
|
|
|
cs.reinitialize(cs.getConf(), mockRM.getRMContext());
|
|
cs.reinitialize(cs.getConf(), mockRM.getRMContext());
|
|
|
|
|
|
- AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
|
|
|
- USER0);
|
|
|
|
- validateCapacities(leafQueue);
|
|
|
|
-
|
|
|
|
- } finally {
|
|
|
|
- cleanupQueue(USER0);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test
|
|
|
|
- public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception {
|
|
|
|
-
|
|
|
|
- MockRM newMockRM = setupSchedulerInstance();
|
|
|
|
- try {
|
|
|
|
- CapacityScheduler newCS =
|
|
|
|
- (CapacityScheduler) newMockRM.getResourceScheduler();
|
|
|
|
- CapacitySchedulerConfiguration conf = newCS.getConfiguration();
|
|
|
|
-
|
|
|
|
- // Test add one auto created queue dynamically and manually modify
|
|
|
|
- // capacity
|
|
|
|
- ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c");
|
|
|
|
- AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1",
|
|
|
|
- parentQueue);
|
|
|
|
- newCS.addQueue(c1);
|
|
|
|
- c1.setEntitlement(new QueueEntitlement(C1_CAPACITY / 100, 1f));
|
|
|
|
-
|
|
|
|
- // Test add another auto created queue and use setEntitlement to modify
|
|
|
|
- // capacity
|
|
|
|
- AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2",
|
|
|
|
- (ManagedParentQueue) newCS.getQueue("c"));
|
|
|
|
- newCS.addQueue(c2);
|
|
|
|
- newCS.setEntitlement("c2", new QueueEntitlement(C2_CAPACITY / 100, 1f));
|
|
|
|
-
|
|
|
|
- // Verify all allocations match
|
|
|
|
- checkQueueCapacities(newCS, C_CAPACITY, D_CAPACITY);
|
|
|
|
-
|
|
|
|
- // Reinitialize and verify all dynamic queued survived
|
|
|
|
-
|
|
|
|
- conf.setCapacity(A, 20f);
|
|
|
|
- conf.setCapacity(B, 20f);
|
|
|
|
- conf.setCapacity(C, 40f);
|
|
|
|
- conf.setCapacity(D, 20f);
|
|
|
|
- newCS.reinitialize(conf, newMockRM.getRMContext());
|
|
|
|
-
|
|
|
|
- checkQueueCapacities(newCS, 40f, 20f);
|
|
|
|
-
|
|
|
|
- //chnage parent template configs and reinitialize
|
|
|
|
- conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f);
|
|
|
|
- conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
|
|
|
|
- newCS.reinitialize(conf, newMockRM.getRMContext());
|
|
|
|
|
|
+ AutoCreatedLeafQueue user0QueueReinited =
|
|
|
|
+ (AutoCreatedLeafQueue) cs.getQueue(USER0);
|
|
|
|
|
|
- ManagedParentQueue c = (ManagedParentQueue) newCS.getQueue("c");
|
|
|
|
- AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", c);
|
|
|
|
- newCS.addQueue(c3);
|
|
|
|
|
|
+ validateCapacities(user0QueueReinited, 0.0f, 0.0f, 1.0f, 1.0f);
|
|
|
|
|
|
- AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate
|
|
|
|
- leafQueueTemplate = parentQueue.getLeafQueueTemplate();
|
|
|
|
- QueueCapacities cap = leafQueueTemplate.getQueueCapacities();
|
|
|
|
- c3.setEntitlement(
|
|
|
|
- new QueueEntitlement(cap.getCapacity(), cap.getMaximumCapacity()));
|
|
|
|
- newCS.reinitialize(conf, newMockRM.getRMContext());
|
|
|
|
|
|
+ AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
|
|
|
+ USER1);
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(),
|
|
|
|
+ 0.1f);
|
|
|
|
|
|
- checkQueueCapacities(newCS, 40f, 20f);
|
|
|
|
} finally {
|
|
} finally {
|
|
- if (newMockRM != null) {
|
|
|
|
- ((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
|
|
|
|
- newMockRM.stop();
|
|
|
|
- }
|
|
|
|
|
|
+ cleanupQueue(USER0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -460,7 +234,7 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
CapacitySchedulerConfiguration newConf =
|
|
CapacitySchedulerConfiguration newConf =
|
|
new CapacitySchedulerConfiguration();
|
|
new CapacitySchedulerConfiguration();
|
|
setupQueueConfiguration(newConf);
|
|
setupQueueConfiguration(newConf);
|
|
- newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10);
|
|
|
|
|
|
+ newConf.setAutoCreatedLeafQueueConfigCapacity(A1, A1_CAPACITY / 10);
|
|
newConf.setAutoCreateChildQueueEnabled(A1, true);
|
|
newConf.setAutoCreateChildQueueEnabled(A1, true);
|
|
|
|
|
|
newCS.setConf(new YarnConfiguration());
|
|
newCS.setConf(new YarnConfiguration());
|
|
@@ -490,7 +264,7 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
CapacitySchedulerConfiguration newConf =
|
|
CapacitySchedulerConfiguration newConf =
|
|
new CapacitySchedulerConfiguration();
|
|
new CapacitySchedulerConfiguration();
|
|
setupQueueConfiguration(newConf);
|
|
setupQueueConfiguration(newConf);
|
|
- newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10);
|
|
|
|
|
|
+ newConf.setAutoCreatedLeafQueueConfigCapacity(A, A_CAPACITY / 10);
|
|
newConf.setAutoCreateChildQueueEnabled(A, true);
|
|
newConf.setAutoCreateChildQueueEnabled(A, true);
|
|
|
|
|
|
newCS.setConf(new YarnConfiguration());
|
|
newCS.setConf(new YarnConfiguration());
|
|
@@ -531,39 +305,6 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
assertEquals(RMAppState.FAILED, app.getState());
|
|
assertEquals(RMAppState.FAILED, app.getState());
|
|
}
|
|
}
|
|
|
|
|
|
- private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) {
|
|
|
|
- assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON);
|
|
|
|
- assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON);
|
|
|
|
- assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON);
|
|
|
|
- assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f,
|
|
|
|
- EPSILON);
|
|
|
|
- int maxAppsForAutoCreatedQueues = (int) (
|
|
|
|
- CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
|
|
|
|
- * autoCreatedLeafQueue.getParent().getAbsoluteCapacity());
|
|
|
|
- assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(),
|
|
|
|
- maxAppsForAutoCreatedQueues);
|
|
|
|
- assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(),
|
|
|
|
- (int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration()
|
|
|
|
- .getUserLimitFactor(
|
|
|
|
- autoCreatedLeafQueue.getParent().getQueuePath()))));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void cleanupQueue(String queueName) throws YarnException {
|
|
|
|
- AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName);
|
|
|
|
- if (queue != null) {
|
|
|
|
- queue.setEntitlement(new QueueEntitlement(0.0f, 0.0f));
|
|
|
|
- ((ManagedParentQueue) queue.getParent()).removeChildQueue(
|
|
|
|
- queue.getQueueName());
|
|
|
|
- cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName());
|
|
|
|
- } else{
|
|
|
|
- throw new YarnException("Queue does not exist " + queueName);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- String getQueueMapping(String parentQueue, String leafQueue) {
|
|
|
|
- return parentQueue + DOT + leafQueue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test(timeout = 10000)
|
|
@Test(timeout = 10000)
|
|
public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
|
|
public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
|
|
throws Exception {
|
|
throws Exception {
|
|
@@ -586,8 +327,7 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
//expected exception
|
|
//expected exception
|
|
assertTrue(e.getMessage().contains(
|
|
assertTrue(e.getMessage().contains(
|
|
"invalid parent queue which does not have auto creation of leaf "
|
|
"invalid parent queue which does not have auto creation of leaf "
|
|
- + "queues enabled ["
|
|
|
|
- + "a" + "]"));
|
|
|
|
|
|
+ + "queues enabled [" + "a" + "]"));
|
|
}
|
|
}
|
|
|
|
|
|
//"a" is not auto create enabled and app_user does not exist as a leaf
|
|
//"a" is not auto create enabled and app_user does not exist as a leaf
|
|
@@ -650,9 +390,6 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
(CapacityScheduler) newMockRM.getResourceScheduler();
|
|
(CapacityScheduler) newMockRM.getResourceScheduler();
|
|
|
|
|
|
try {
|
|
try {
|
|
- newMockRM.start();
|
|
|
|
- newCS.start();
|
|
|
|
-
|
|
|
|
submitApp(newCS, USER0, USER0, PARENT_QUEUE);
|
|
submitApp(newCS, USER0, USER0, PARENT_QUEUE);
|
|
|
|
|
|
assertNotNull(newCS.getQueue(USER0));
|
|
assertNotNull(newCS.getQueue(USER0));
|
|
@@ -700,12 +437,16 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1",
|
|
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1",
|
|
parentQueue);
|
|
parentQueue);
|
|
newCS.addQueue(c1);
|
|
newCS.addQueue(c1);
|
|
- c1.setEntitlement(new QueueEntitlement(0.5f, 1f));
|
|
|
|
|
|
+ c1.setCapacity(0.5f);
|
|
|
|
+ c1.setAbsoluteCapacity(c1.getParent().getAbsoluteCapacity() * 1f);
|
|
|
|
+ c1.setMaxCapacity(1f);
|
|
|
|
+
|
|
|
|
+ setEntitlement(c1, new QueueEntitlement(0.5f, 1f));
|
|
|
|
|
|
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2",
|
|
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2",
|
|
parentQueue);
|
|
parentQueue);
|
|
newCS.addQueue(c2);
|
|
newCS.addQueue(c2);
|
|
- c2.setEntitlement(new QueueEntitlement(0.5f, 1f));
|
|
|
|
|
|
+ setEntitlement(c2, new QueueEntitlement(0.5f, 1f));
|
|
|
|
|
|
try {
|
|
try {
|
|
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3",
|
|
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3",
|
|
@@ -723,72 +464,160 @@ public class TestCapacitySchedulerAutoQueueCreation {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private List<UserGroupMappingPlacementRule.QueueMapping> setupQueueMapping(
|
|
|
|
- CapacityScheduler newCS, String user, String parentQueue, String queue) {
|
|
|
|
- List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
|
|
|
|
- new ArrayList<>();
|
|
|
|
- queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping(
|
|
|
|
- UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user,
|
|
|
|
- getQueueMapping(parentQueue, queue)));
|
|
|
|
- newCS.getConfiguration().setQueueMappings(queueMappings);
|
|
|
|
- return queueMappings;
|
|
|
|
- }
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testAutoCreatedQueueActivationDeactivation() throws Exception {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ String host = "127.0.0.1";
|
|
|
|
+ RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
|
|
|
|
+ host);
|
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
|
|
|
|
+
|
|
|
|
+ //submit app1 as USER1
|
|
|
|
+ submitApp(mockRM, parentQueue, USER1, USER1, 1, 1);
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, USER1, 0.1f);
|
|
|
|
+
|
|
|
|
+ //submit another app2 as USER2
|
|
|
|
+ ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
|
|
|
|
+ 1);
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, USER2, 0.2f);
|
|
|
|
+
|
|
|
|
+ //submit another app3 as USER1
|
|
|
|
+ submitApp(mockRM, parentQueue, USER1, USER1, 3, 2);
|
|
|
|
+
|
|
|
|
+ //validate total activated abs capacity remains the same
|
|
|
|
+ GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
|
|
|
|
+ (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
|
|
|
|
+ parentQueue)
|
|
|
|
+ .getAutoCreatedQueueManagementPolicy();
|
|
|
|
+ assertEquals(autoCreatedQueueManagementPolicy
|
|
|
|
+ .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
|
|
|
+
|
|
|
|
+ //submit user_3 app. This cant be scheduled since there is no capacity
|
|
|
|
+ submitApp(mockRM, parentQueue, USER3, USER3, 4, 1);
|
|
|
|
+ final CSQueue user3LeafQueue = cs.getQueue(USER3);
|
|
|
|
+ validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f,
|
|
|
|
+ 1.0f, 1.0f);
|
|
|
|
|
|
- private MockRM setupSchedulerInstance() {
|
|
|
|
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
|
- setupQueueConfiguration(conf);
|
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
- ResourceScheduler.class);
|
|
|
|
|
|
+ assertEquals(autoCreatedQueueManagementPolicy
|
|
|
|
+ .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
|
|
|
|
|
- List<String> queuePlacementRules = new ArrayList<String>();
|
|
|
|
- queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
|
|
|
- conf.setQueuePlacementRules(queuePlacementRules);
|
|
|
|
|
|
+ //deactivate USER2 queue
|
|
|
|
+ cs.killAllAppsInQueue(USER2);
|
|
|
|
+ mockRM.waitForState(user2AppId, RMAppState.KILLED);
|
|
|
|
|
|
- setupQueueMappings(conf);
|
|
|
|
|
|
+ //Verify if USER_2 can be deactivated since it has no pending appsA
|
|
|
|
+ List<QueueManagementChange> queueManagementChanges =
|
|
|
|
+ autoCreatedQueueManagementPolicy.computeQueueManagementChanges();
|
|
|
|
|
|
- MockRM newMockRM = new MockRM(conf);
|
|
|
|
- return newMockRM;
|
|
|
|
|
|
+ ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue;
|
|
|
|
+ managedParentQueue.validateAndApplyQueueManagementChanges(
|
|
|
|
+ queueManagementChanges);
|
|
|
|
+
|
|
|
|
+ validateDeactivatedQueueEntitlement(parentQueue, USER2, 0.2f,
|
|
|
|
+ queueManagementChanges);
|
|
|
|
+
|
|
|
|
+ //USER_3 should now get activated
|
|
|
|
+ validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f,
|
|
|
|
+ queueManagementChanges);
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ cleanupQueue(USER1);
|
|
|
|
+ cleanupQueue(USER2);
|
|
|
|
+ cleanupQueue(USER3);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- void checkQueueCapacities(CapacityScheduler newCS, float capacityC,
|
|
|
|
- float capacityD) {
|
|
|
|
- CSQueue rootQueue = newCS.getRootQueue();
|
|
|
|
- CSQueue queueC = tcs.findQueue(rootQueue, C);
|
|
|
|
- CSQueue queueD = tcs.findQueue(rootQueue, D);
|
|
|
|
- CSQueue queueC1 = tcs.findQueue(queueC, C1);
|
|
|
|
- CSQueue queueC2 = tcs.findQueue(queueC, C2);
|
|
|
|
- CSQueue queueC3 = tcs.findQueue(queueC, C3);
|
|
|
|
-
|
|
|
|
- float capC = capacityC / 100.0f;
|
|
|
|
- float capD = capacityD / 100.0f;
|
|
|
|
-
|
|
|
|
- tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f);
|
|
|
|
- tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f);
|
|
|
|
- tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f,
|
|
|
|
- (C1_CAPACITY / 100.0f) * capC, 1.0f, 1.0f);
|
|
|
|
- tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f,
|
|
|
|
- (C2_CAPACITY / 100.0f) * capC, 1.0f, 1.0f);
|
|
|
|
-
|
|
|
|
- if (queueC3 != null) {
|
|
|
|
- ManagedParentQueue parentQueue = (ManagedParentQueue) queueC;
|
|
|
|
- QueueCapacities cap =
|
|
|
|
- parentQueue.getLeafQueueTemplate().getQueueCapacities();
|
|
|
|
- tcs.checkQueueCapacity(queueC3, cap.getCapacity(),
|
|
|
|
- (cap.getCapacity()) * capC, 1.0f, 1.0f);
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testAutoCreatedQueueInheritsNodeLabels() throws Exception {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ String host = "127.0.0.1";
|
|
|
|
+ RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
|
|
|
|
+ host);
|
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
|
|
|
|
+
|
|
|
|
+ submitApp(USER1, USER1, NODEL_LABEL_GPU);
|
|
|
|
+ //submit app1 as USER1
|
|
|
|
+ validateInitialQueueEntitlement(parentQueue, USER1, 0.1f);
|
|
|
|
+ } finally {
|
|
|
|
+ cleanupQueue(USER1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- ApplicationAttemptId submitApp(CapacityScheduler newCS, String user,
|
|
|
|
- String queue, String parentQueue) {
|
|
|
|
- ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
|
|
|
- SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, queue, user,
|
|
|
|
- new ApplicationPlacementContext(queue, parentQueue));
|
|
|
|
- ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
|
|
|
- appId, 1);
|
|
|
|
- SchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent(
|
|
|
|
- appAttemptId, false);
|
|
|
|
- newCS.handle(addAppEvent);
|
|
|
|
- newCS.handle(addAttemptEvent);
|
|
|
|
- return appAttemptId;
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testReinitializeQueuesWithAutoCreatedLeafQueues()
|
|
|
|
+ throws Exception {
|
|
|
|
+
|
|
|
|
+ MockRM newMockRM = setupSchedulerInstance();
|
|
|
|
+ try {
|
|
|
|
+ CapacityScheduler newCS =
|
|
|
|
+ (CapacityScheduler) newMockRM.getResourceScheduler();
|
|
|
|
+ CapacitySchedulerConfiguration conf = newCS.getConfiguration();
|
|
|
|
+
|
|
|
|
+ String host = "127.0.0.1";
|
|
|
|
+ RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
|
|
|
|
+ host);
|
|
|
|
+ newCS.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ CSQueue parentQueue = newCS.getQueue(PARENT_QUEUE);
|
|
|
|
+
|
|
|
|
+ //submit app1 as USER1
|
|
|
|
+ submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
|
|
|
|
+ validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f);
|
|
|
|
+
|
|
|
|
+ //submit another app2 as USER2
|
|
|
|
+ ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2,
|
|
|
|
+ 2, 1);
|
|
|
|
+ validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f);
|
|
|
|
+
|
|
|
|
+ //update parent queue capacity
|
|
|
|
+ conf.setCapacity(C, 30f);
|
|
|
|
+ conf.setCapacity(D, 10f);
|
|
|
|
+ conf.setMaximumCapacity(C, 50f);
|
|
|
|
+
|
|
|
|
+ newCS.reinitialize(conf, newMockRM.getRMContext());
|
|
|
|
+
|
|
|
|
+ // validate that leaf queues abs capacity is now changed
|
|
|
|
+ AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) newCS.getQueue(
|
|
|
|
+ USER1);
|
|
|
|
+ validateCapacities(user0Queue, 0.5f, 0.15f, 1.0f, 0.5f);
|
|
|
|
+ validateUserAndAppLimits(user0Queue, 1500, 1500);
|
|
|
|
+
|
|
|
|
+ //update leaf queue template capacities
|
|
|
|
+ conf.setAutoCreatedLeafQueueConfigCapacity(C, 30f);
|
|
|
|
+ conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 40f);
|
|
|
|
+
|
|
|
|
+ newCS.reinitialize(conf, newMockRM.getRMContext());
|
|
|
|
+ validateCapacities(user0Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
|
|
|
+ validateUserAndAppLimits(user0Queue, 900, 900);
|
|
|
|
+
|
|
|
|
+ //submit app1 as USER3
|
|
|
|
+ submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
|
|
|
|
+ validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f);
|
|
|
|
+ AutoCreatedLeafQueue user3Queue = (AutoCreatedLeafQueue) newCS.getQueue(
|
|
|
|
+ USER1);
|
|
|
|
+ validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
|
|
|
+ validateUserAndAppLimits(user3Queue, 900, 900);
|
|
|
|
+
|
|
|
|
+ //submit app1 as USER1 - is already activated. there should be no diff
|
|
|
|
+ // in capacities
|
|
|
|
+ submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2);
|
|
|
|
+ validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f);
|
|
|
|
+ validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
|
|
|
+ validateUserAndAppLimits(user3Queue, 900, 900);
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ cleanupQueue(USER1);
|
|
|
|
+ cleanupQueue(USER2);
|
|
|
|
+ if (newMockRM != null) {
|
|
|
|
+ ((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
|
|
|
|
+ newMockRM.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|