|
@@ -28,6 +28,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
.capacity.CapacitySchedulerConfiguration.ROOT;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNotEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyBoolean;
|
|
@@ -45,15 +46,18 @@ import java.util.ConcurrentModificationException;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
@@ -896,6 +900,537 @@ public class TestLeafQueue {
|
|
|
assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testUserLimitCache() throws Exception {
|
|
|
+ // Parameters
|
|
|
+ final int numNodes = 4;
|
|
|
+ final int nodeSize = 100;
|
|
|
+ final int numAllocationThreads = 2;
|
|
|
+ final int numUsers = 40;
|
|
|
+ final int containerSize = 1 * GB;
|
|
|
+ final int numContainersPerApp = 10;
|
|
|
+ final int runTime = 5000; // in ms
|
|
|
+
|
|
|
+ Random random = new Random();
|
|
|
+
|
|
|
+ // Setup nodes
|
|
|
+ FiCaSchedulerNode[] nodes = new FiCaSchedulerNode[numNodes];
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodesMap = new HashMap<>(nodes.length);
|
|
|
+ for (int i = 0; i < numNodes; i++) {
|
|
|
+ String host = "127.0.0." + i;
|
|
|
+ FiCaSchedulerNode node = TestUtils.getMockNode(host, DEFAULT_RACK, 0,
|
|
|
+ nodeSize * GB, nodeSize);
|
|
|
+ nodes[i] = node;
|
|
|
+ nodesMap.put(node.getNodeID(), node);
|
|
|
+ }
|
|
|
+
|
|
|
+ Resource clusterResource =
|
|
|
+ Resources.createResource(numNodes * (nodeSize * GB),
|
|
|
+ numNodes * nodeSize);
|
|
|
+
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
|
|
|
+
|
|
|
+ // working with just one queue
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A});
|
|
|
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
|
|
|
+ csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
|
|
|
+ 100);
|
|
|
+
|
|
|
+ // reinitialize queues
|
|
|
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
|
+ CSQueue newRoot =
|
|
|
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
|
|
+ CapacitySchedulerConfiguration.ROOT,
|
|
|
+ newQueues, queues,
|
|
|
+ TestUtils.spyHook);
|
|
|
+ queues = newQueues;
|
|
|
+ root.reinitialize(newRoot, csContext.getClusterResource());
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ // Mock the queue
|
|
|
+ LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(A));
|
|
|
+
|
|
|
+ // Set user limit factor so some users are at their limit and the
|
|
|
+ // user limit cache has more than just a few entries
|
|
|
+ leafQueue.setUserLimitFactor(10 / nodeSize);
|
|
|
+
|
|
|
+ // Flag to let allocation threads know to stop
|
|
|
+ AtomicBoolean stopThreads = new AtomicBoolean(false);
|
|
|
+ AtomicBoolean errorInThreads = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ // Set up allocation threads
|
|
|
+ Thread[] threads = new Thread[numAllocationThreads];
|
|
|
+ for (int i = 0; i < numAllocationThreads; i++) {
|
|
|
+ threads[i] = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ boolean alwaysNull = true;
|
|
|
+ while (!stopThreads.get()) {
|
|
|
+ CSAssignment assignment = leafQueue.assignContainers(
|
|
|
+ clusterResource,
|
|
|
+ nodes[random.nextInt(numNodes)],
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ applyCSAssignment(clusterResource, assignment, leafQueue,
|
|
|
+ nodesMap, leafQueue.applicationAttemptMap);
|
|
|
+
|
|
|
+ if (assignment != CSAssignment.NULL_ASSIGNMENT) {
|
|
|
+ alwaysNull = false;
|
|
|
+ }
|
|
|
+ Thread.sleep(500);
|
|
|
+ }
|
|
|
+
|
|
|
+ // One more assignment but not committing so that the
|
|
|
+ // user limits cache is updated to the latest version
|
|
|
+ CSAssignment assignment = leafQueue.assignContainers(
|
|
|
+ clusterResource,
|
|
|
+ nodes[random.nextInt(numNodes)],
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+
|
|
|
+ if (alwaysNull && assignment == CSAssignment.NULL_ASSIGNMENT) {
|
|
|
+ LOG.error("Thread only got null assignments");
|
|
|
+ errorInThreads.set(true);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Thread exiting because of exception", e);
|
|
|
+ errorInThreads.set(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, "Scheduling Thread " + i);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set up users and some apps
|
|
|
+ final String[] users = new String[numUsers];
|
|
|
+ for (int i = 0; i < users.length; i++) {
|
|
|
+ users[i] = "user_" + i;
|
|
|
+ }
|
|
|
+ List<ApplicationAttemptId> applicationAttemptIds =
|
|
|
+ new ArrayList<>(10);
|
|
|
+ List<FiCaSchedulerApp> apps = new ArrayList<>(10);
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+
|
|
|
+ // Start up 10 apps to begin with
|
|
|
+ int appId;
|
|
|
+ for (appId = 0; appId < 10; appId++) {
|
|
|
+ String user = users[random.nextInt(users.length)];
|
|
|
+ ApplicationAttemptId applicationAttemptId =
|
|
|
+ TestUtils.getMockApplicationAttemptId(appId, 0);
|
|
|
+ FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
|
|
|
+ user,
|
|
|
+ leafQueue, leafQueue.getUsersManager(), spyRMContext);
|
|
|
+
|
|
|
+ leafQueue.submitApplicationAttempt(app, user);
|
|
|
+ app.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
|
|
|
+ numContainersPerApp, true, priority, recordFactory)));
|
|
|
+
|
|
|
+ applicationAttemptIds.add(applicationAttemptId);
|
|
|
+ apps.add(app);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start threads
|
|
|
+ for (int i = 0; i < numAllocationThreads; i++) {
|
|
|
+ threads[i].start();
|
|
|
+ }
|
|
|
+
|
|
|
+ final long startTime = Time.monotonicNow();
|
|
|
+ while (true) {
|
|
|
+ // Start a new app about half the iterations and stop a random app the
|
|
|
+ // rest of the iterations
|
|
|
+ boolean startOrStopApp = random.nextBoolean();
|
|
|
+ if (startOrStopApp || (apps.size() == 1)) {
|
|
|
+ // start a new app
|
|
|
+ String user = users[random.nextInt(users.length)];
|
|
|
+ ApplicationAttemptId applicationAttemptId =
|
|
|
+ TestUtils.getMockApplicationAttemptId(appId, 0);
|
|
|
+ FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
|
|
|
+ user,
|
|
|
+ leafQueue, leafQueue.getUsersManager(), spyRMContext);
|
|
|
+
|
|
|
+ leafQueue.submitApplicationAttempt(app, user);
|
|
|
+ app.updateResourceRequests(Collections.singletonList(
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
|
|
|
+ numContainersPerApp, true, priority, recordFactory)));
|
|
|
+
|
|
|
+ applicationAttemptIds.add(applicationAttemptId);
|
|
|
+ apps.add(app);
|
|
|
+
|
|
|
+ appId++;
|
|
|
+ } else {
|
|
|
+ // stop a random app
|
|
|
+ int i = random.nextInt(apps.size());
|
|
|
+ FiCaSchedulerApp app = apps.get(i);
|
|
|
+ leafQueue.finishApplication(app.getApplicationId(), app.getUser());
|
|
|
+ leafQueue.releaseResource(clusterResource, app,
|
|
|
+ app.getCurrentConsumption(), "", null);
|
|
|
+ apps.remove(i);
|
|
|
+ applicationAttemptIds.remove(i);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (errorInThreads.get() || (Time.monotonicNow() - startTime) > runTime) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // signal allocation threads to stop
|
|
|
+ stopThreads.set(true);
|
|
|
+
|
|
|
+ // wait for allocation threads to be done
|
|
|
+ for (int i = 0; i < numAllocationThreads; i++) {
|
|
|
+ threads[i].join();
|
|
|
+ }
|
|
|
+
|
|
|
+ // check if there was an error in the allocation threads
|
|
|
+ assertFalse(errorInThreads.get());
|
|
|
+
|
|
|
+ // check there is only one partition in the user limits cache
|
|
|
+ assertEquals( 1, leafQueue.userLimitsCache.size());
|
|
|
+
|
|
|
+ Map<SchedulingMode, ConcurrentMap<String, LeafQueue.CachedUserLimit>>
|
|
|
+ uLCByPartition = leafQueue.userLimitsCache.get(nodes[0].getPartition());
|
|
|
+
|
|
|
+ // check there is only one scheduling mode
|
|
|
+ assertEquals(uLCByPartition.size(), 1);
|
|
|
+
|
|
|
+ ConcurrentMap<String, LeafQueue.CachedUserLimit> uLCBySchedulingMode =
|
|
|
+ uLCByPartition.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+
|
|
|
+ // check entries in the user limits cache
|
|
|
+ for (Map.Entry<String, LeafQueue.CachedUserLimit> entry :
|
|
|
+ uLCBySchedulingMode.entrySet()) {
|
|
|
+ String user = entry.getKey();
|
|
|
+ Resource userLimit = entry.getValue().userLimit;
|
|
|
+
|
|
|
+ Resource expectedUL = leafQueue.getResourceLimitForActiveUsers(user,
|
|
|
+ clusterResource, nodes[0].getPartition(),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+
|
|
|
+ assertEquals(expectedUL, userLimit);
|
|
|
+ }
|
|
|
+
|
|
|
+ // check the current version in the user limits cache
|
|
|
+ assertEquals(leafQueue.getUsersManager().getLatestVersionOfUsersState(),
|
|
|
+ leafQueue.currentUserLimitCacheVersion);
|
|
|
+ assertTrue(leafQueue.currentUserLimitCacheVersion > 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testUserLimitCacheActiveUsersChanged() throws Exception {
|
|
|
+ // Setup some nodes
|
|
|
+ String host_0 = "127.0.0.1";
|
|
|
+ FiCaSchedulerNode node_0 =
|
|
|
+ TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 6*GB);
|
|
|
+ String host_1 = "127.0.0.2";
|
|
|
+ FiCaSchedulerNode node_1 =
|
|
|
+ TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 6*GB);
|
|
|
+ String host_2 = "127.0.0.3";
|
|
|
+ FiCaSchedulerNode node_2 =
|
|
|
+ TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 6*GB);
|
|
|
+ String host_3 = "127.0.0.4";
|
|
|
+ FiCaSchedulerNode node_3 =
|
|
|
+ TestUtils.getMockNode(host_3, DEFAULT_RACK, 0, 6*GB);
|
|
|
+
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes =
|
|
|
+ ImmutableMap.of(
|
|
|
+ node_0.getNodeID(), node_0,
|
|
|
+ node_1.getNodeID(), node_1,
|
|
|
+ node_2.getNodeID(), node_2,
|
|
|
+ node_3.getNodeID(), node_3
|
|
|
+ );
|
|
|
+
|
|
|
+ final int numNodes = 4;
|
|
|
+ Resource clusterResource =
|
|
|
+ Resources.createResource(numNodes * (6*GB), numNodes);
|
|
|
+
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
|
|
|
+
|
|
|
+ // working with just one queue
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A});
|
|
|
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
|
|
|
+ csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
|
|
|
+ 100);
|
|
|
+
|
|
|
+ // reinitialize queues
|
|
|
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
|
+ CSQueue newRoot =
|
|
|
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
|
|
+ CapacitySchedulerConfiguration.ROOT,
|
|
|
+ newQueues, queues,
|
|
|
+ TestUtils.spyHook);
|
|
|
+ queues = newQueues;
|
|
|
+ root.reinitialize(newRoot, csContext.getClusterResource());
|
|
|
+ root.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ // Mock the queue
|
|
|
+ LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(A));
|
|
|
+
|
|
|
+ // initial check
|
|
|
+ assertEquals(0, leafQueue.userLimitsCache.size());
|
|
|
+ assertEquals(0,
|
|
|
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
|
|
|
+ assertEquals(0,
|
|
|
+ leafQueue.getUsersManager().preComputedActiveUserLimit.size());
|
|
|
+
|
|
|
+ // 4 users
|
|
|
+ final String user_0 = "user_0";
|
|
|
+ final String user_1 = "user_1";
|
|
|
+ final String user_2 = "user_2";
|
|
|
+ final String user_3 = "user_3";
|
|
|
+
|
|
|
+ // Set user-limit
|
|
|
+ leafQueue.setUserLimit(0);
|
|
|
+ leafQueue.setUserLimitFactor(1.0f);
|
|
|
+
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
+
|
|
|
+ // Fill queue because user limit is calculated as (used / #active users).
|
|
|
+ final ApplicationAttemptId appAttemptId_9 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(9, 0);
|
|
|
+ FiCaSchedulerApp app_9 =
|
|
|
+ new FiCaSchedulerApp(appAttemptId_9, user_0, leafQueue,
|
|
|
+ leafQueue.getUsersManager(), spyRMContext);
|
|
|
+ leafQueue.submitApplicationAttempt(app_9, user_0);
|
|
|
+
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
|
|
|
+ ImmutableMap.of(app_9.getApplicationAttemptId(), app_9);
|
|
|
+
|
|
|
+ app_9.updateResourceRequests(Arrays.asList(
|
|
|
+ TestUtils.createResourceRequest(host_0, 1*GB, 5, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
|
|
|
+ priority, recordFactory)));
|
|
|
+ assertEquals(1, leafQueue.getUsersManager().getNumActiveUsers());
|
|
|
+
|
|
|
+ CSAssignment assignment;
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
+ assignment = leafQueue.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
|
|
|
+ }
|
|
|
+ app_9.updateResourceRequests(Arrays.asList(
|
|
|
+ TestUtils.createResourceRequest(host_1, 1*GB, 5, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
|
|
|
+ priority, recordFactory)));
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
+ assignment = leafQueue.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
|
|
|
+ }
|
|
|
+ // A total of 10GB have been allocated
|
|
|
+ assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
|
|
|
+ assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
|
|
|
+ // For one user who should have been cached in the assignContainers call
|
|
|
+ assertEquals(1, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .size());
|
|
|
+ // But the cache is stale because an allocation was made
|
|
|
+ assertNotEquals(leafQueue.currentUserLimitCacheVersion,
|
|
|
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
|
|
|
+ // Have not made any calls to fill up the all user limit in UsersManager
|
|
|
+ assertEquals(0,
|
|
|
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
|
|
|
+ // But the user limit cache in leafQueue got filled up using the active
|
|
|
+ // user limit in UsersManager
|
|
|
+ assertEquals(1,
|
|
|
+ leafQueue.getUsersManager().preComputedActiveUserLimit.size());
|
|
|
+
|
|
|
+ // submit 3 applications for now
|
|
|
+ final ApplicationAttemptId appAttemptId_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ FiCaSchedulerApp app_0 =
|
|
|
+ new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue,
|
|
|
+ leafQueue.getUsersManager(), spyRMContext);
|
|
|
+ leafQueue.submitApplicationAttempt(app_0, user_0);
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId_1 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
+ FiCaSchedulerApp app_1 =
|
|
|
+ new FiCaSchedulerApp(appAttemptId_1, user_1, leafQueue,
|
|
|
+ leafQueue.getUsersManager(), spyRMContext);
|
|
|
+ leafQueue.submitApplicationAttempt(app_1, user_1);
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId_2 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(2, 0);
|
|
|
+ FiCaSchedulerApp app_2 =
|
|
|
+ new FiCaSchedulerApp(appAttemptId_2, user_2, leafQueue,
|
|
|
+ leafQueue.getUsersManager(), spyRMContext);
|
|
|
+ leafQueue.submitApplicationAttempt(app_2, user_2);
|
|
|
+
|
|
|
+ apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0,
|
|
|
+ app_1.getApplicationAttemptId(), app_1,
|
|
|
+ app_2.getApplicationAttemptId(), app_2
|
|
|
+ );
|
|
|
+
|
|
|
+ // requests from first three users (all of which will be locality delayed)
|
|
|
+ app_0.updateResourceRequests(Arrays.asList(
|
|
|
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
|
|
|
+ priority, recordFactory)));
|
|
|
+
|
|
|
+ app_1.updateResourceRequests(Arrays.asList(
|
|
|
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
|
|
|
+ priority, recordFactory)));
|
|
|
+
|
|
|
+ app_2.updateResourceRequests(Arrays.asList(
|
|
|
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
|
|
|
+ priority, recordFactory)));
|
|
|
+
|
|
|
+ // There are 3 active users right now
|
|
|
+ assertEquals(3, leafQueue.getUsersManager().getNumActiveUsers());
|
|
|
+
|
|
|
+ // fill up user limit cache
|
|
|
+ assignment = leafQueue.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
|
|
|
+ // A total of 10GB have been allocated
|
|
|
+ assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
|
|
|
+ // There are three users who should have been cached
|
|
|
+ assertEquals(3, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .size());
|
|
|
+ // There are three users so each has a limit of 12/3 = 4GB
|
|
|
+ assertEquals(4*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_0).userLimit.getMemorySize());
|
|
|
+ assertEquals(4*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_1).userLimit.getMemorySize());
|
|
|
+ assertEquals(4*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_2).userLimit.getMemorySize());
|
|
|
+ // And the cache is NOT stale because no allocation was made
|
|
|
+ assertEquals(leafQueue.currentUserLimitCacheVersion,
|
|
|
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
|
|
|
+ // Have not made any calls to fill up the all user limit in UsersManager
|
|
|
+ assertEquals(0,
|
|
|
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
|
|
|
+ // But the user limit cache in leafQueue got filled up using the active
|
|
|
+ // user limit in UsersManager with 4GB limit (since there are three users
|
|
|
+ // so 12/3 = 4GB each)
|
|
|
+ assertEquals(1, leafQueue.getUsersManager()
|
|
|
+ .preComputedActiveUserLimit.size());
|
|
|
+ assertEquals(1, leafQueue.getUsersManager()
|
|
|
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
|
|
|
+ assertEquals(4*GB, leafQueue.getUsersManager()
|
|
|
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
|
|
|
+
|
|
|
+ // submit the 4th application
|
|
|
+ final ApplicationAttemptId appAttemptId_3 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(3, 0);
|
|
|
+ FiCaSchedulerApp app_3 =
|
|
|
+ new FiCaSchedulerApp(appAttemptId_3, user_3, leafQueue,
|
|
|
+ leafQueue.getUsersManager(), spyRMContext);
|
|
|
+ leafQueue.submitApplicationAttempt(app_3, user_3);
|
|
|
+
|
|
|
+ apps = ImmutableMap.of(
|
|
|
+ app_0.getApplicationAttemptId(), app_0,
|
|
|
+ app_1.getApplicationAttemptId(), app_1,
|
|
|
+ app_2.getApplicationAttemptId(), app_2,
|
|
|
+ app_3.getApplicationAttemptId(), app_3
|
|
|
+ );
|
|
|
+
|
|
|
+ app_3.updateResourceRequests(Arrays.asList(
|
|
|
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
|
|
|
+ priority, recordFactory),
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
|
|
|
+ priority, recordFactory)));
|
|
|
+
|
|
|
+ // 4 active users now
|
|
|
+ assertEquals(4, leafQueue.getUsersManager().getNumActiveUsers());
|
|
|
+ // Check that the user limits cache has become stale
|
|
|
+ assertNotEquals(leafQueue.currentUserLimitCacheVersion,
|
|
|
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
|
|
|
+
|
|
|
+ // Even though there are no allocations, user limit cache is repopulated
|
|
|
+ assignment = leafQueue.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
|
|
|
+ // A total of 10GB have been allocated
|
|
|
+ assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
|
|
|
+ assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
|
|
|
+ // There are four users who should have been cached
|
|
|
+ assertEquals(4, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .size());
|
|
|
+ // There are four users so each has a limit of 12/4 = 3GB
|
|
|
+ assertEquals(3*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_0).userLimit.getMemorySize());
|
|
|
+ assertEquals(3*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_1).userLimit.getMemorySize());
|
|
|
+ assertEquals(3*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_2).userLimit.getMemorySize());
|
|
|
+ assertEquals(3*GB, leafQueue.userLimitsCache
|
|
|
+ .get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
|
|
|
+ .get(user_3).userLimit.getMemorySize());
|
|
|
+ // And the cache is NOT stale because no allocation was made
|
|
|
+ assertEquals(leafQueue.currentUserLimitCacheVersion,
|
|
|
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
|
|
|
+ // Have not made any calls to fill up the all user limit in UsersManager
|
|
|
+ assertEquals(0,
|
|
|
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
|
|
|
+ // But the user limit cache in leafQueue got filled up using the active
|
|
|
+ // user limit in UsersManager with 3GB limit (since there are four users
|
|
|
+ // so 12/4 = 3GB each)
|
|
|
+ assertEquals(1, leafQueue.getUsersManager()
|
|
|
+ .preComputedActiveUserLimit.size());
|
|
|
+ assertEquals(1, leafQueue.getUsersManager()
|
|
|
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
|
|
|
+ assertEquals(3*GB, leafQueue.getUsersManager()
|
|
|
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testUserLimits() throws Exception {
|
|
|
// Mock the queue
|