|
@@ -1,976 +0,0 @@
|
|
-/**
|
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
|
- * distributed with this work for additional information
|
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
|
- *
|
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
- *
|
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
|
- * limitations under the License.
|
|
|
|
- */
|
|
|
|
-
|
|
|
|
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
|
|
|
-
|
|
|
|
-import org.slf4j.Logger;
|
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Container;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
-import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
|
-import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
|
|
|
|
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
|
|
|
-import org.apache.hadoop.yarn.util.Clock;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
-import org.junit.After;
|
|
|
|
-import org.junit.Assert;
|
|
|
|
-import org.junit.Before;
|
|
|
|
-import org.mockito.ArgumentMatcher;
|
|
|
|
-import org.mockito.Mockito;
|
|
|
|
-import org.mockito.invocation.InvocationOnMock;
|
|
|
|
-import org.mockito.stubbing.Answer;
|
|
|
|
-
|
|
|
|
-import java.io.IOException;
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Collections;
|
|
|
|
-import java.util.Comparator;
|
|
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.HashSet;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.TreeSet;
|
|
|
|
-import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
-import org.apache.hadoop.yarn.event.Event;
|
|
|
|
-
|
|
|
|
-import static org.mockito.Matchers.any;
|
|
|
|
-import static org.mockito.Matchers.anyString;
|
|
|
|
-import static org.mockito.Matchers.eq;
|
|
|
|
-import static org.mockito.Matchers.isA;
|
|
|
|
-import static org.mockito.Mockito.doAnswer;
|
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
|
-import static org.mockito.Mockito.when;
|
|
|
|
-
|
|
|
|
-public class ProportionalCapacityPreemptionPolicyMockFramework {
|
|
|
|
- static final Logger LOG = LoggerFactory.getLogger(
|
|
|
|
- TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
|
|
|
|
- final String ROOT = CapacitySchedulerConfiguration.ROOT;
|
|
|
|
-
|
|
|
|
- Map<String, CSQueue> nameToCSQueues = null;
|
|
|
|
- Map<String, Resource> partitionToResource = null;
|
|
|
|
- Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = null;
|
|
|
|
- RMNodeLabelsManager nlm = null;
|
|
|
|
- RMContext rmContext = null;
|
|
|
|
-
|
|
|
|
- ResourceCalculator rc = new DefaultResourceCalculator();
|
|
|
|
- Clock mClock = null;
|
|
|
|
- CapacitySchedulerConfiguration conf = null;
|
|
|
|
- CapacityScheduler cs = null;
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
|
- EventHandler<Event> mDisp = null;
|
|
|
|
- ProportionalCapacityPreemptionPolicy policy = null;
|
|
|
|
- Resource clusterResource = null;
|
|
|
|
- // Initialize resource map
|
|
|
|
- Map<String, ResourceInformation> riMap = new HashMap<>();
|
|
|
|
-
|
|
|
|
- private void resetResourceInformationMap() {
|
|
|
|
- // Initialize mandatory resources
|
|
|
|
- ResourceInformation memory = ResourceInformation.newInstance(
|
|
|
|
- ResourceInformation.MEMORY_MB.getName(),
|
|
|
|
- ResourceInformation.MEMORY_MB.getUnits(),
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
|
|
- ResourceInformation vcores = ResourceInformation.newInstance(
|
|
|
|
- ResourceInformation.VCORES.getName(),
|
|
|
|
- ResourceInformation.VCORES.getUnits(),
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
|
|
- riMap.put(ResourceInformation.MEMORY_URI, memory);
|
|
|
|
- riMap.put(ResourceInformation.VCORES_URI, vcores);
|
|
|
|
-
|
|
|
|
- ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- @Before
|
|
|
|
- public void setup() {
|
|
|
|
- resetResourceInformationMap();
|
|
|
|
-
|
|
|
|
- org.apache.log4j.Logger.getRootLogger().setLevel(
|
|
|
|
- org.apache.log4j.Level.DEBUG);
|
|
|
|
-
|
|
|
|
- conf = new CapacitySchedulerConfiguration(new Configuration(false));
|
|
|
|
- conf.setLong(
|
|
|
|
- CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
|
|
|
|
- conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
|
|
|
|
- 3000);
|
|
|
|
- // report "ideal" preempt
|
|
|
|
- conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
|
|
- (float) 1.0);
|
|
|
|
- conf.setFloat(
|
|
|
|
- CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
|
|
|
- (float) 1.0);
|
|
|
|
-
|
|
|
|
- mClock = mock(Clock.class);
|
|
|
|
- cs = mock(CapacityScheduler.class);
|
|
|
|
- when(cs.getResourceCalculator()).thenReturn(rc);
|
|
|
|
- when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
|
|
|
|
- when(cs.getConfiguration()).thenReturn(conf);
|
|
|
|
-
|
|
|
|
- nlm = mock(RMNodeLabelsManager.class);
|
|
|
|
- mDisp = mock(EventHandler.class);
|
|
|
|
-
|
|
|
|
- rmContext = mock(RMContext.class);
|
|
|
|
- when(rmContext.getNodeLabelManager()).thenReturn(nlm);
|
|
|
|
- Dispatcher disp = mock(Dispatcher.class);
|
|
|
|
- when(rmContext.getDispatcher()).thenReturn(disp);
|
|
|
|
- when(disp.getEventHandler()).thenReturn(mDisp);
|
|
|
|
- when(cs.getRMContext()).thenReturn(rmContext);
|
|
|
|
-
|
|
|
|
- partitionToResource = new HashMap<>();
|
|
|
|
- nodeIdToSchedulerNodes = new HashMap<>();
|
|
|
|
- nameToCSQueues = new HashMap<>();
|
|
|
|
- clusterResource = Resource.newInstance(0, 0);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @After
|
|
|
|
- public void cleanup() {
|
|
|
|
- resetResourceInformationMap();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void buildEnv(String labelsConfig, String nodesConfig,
|
|
|
|
- String queuesConfig, String appsConfig) throws IOException {
|
|
|
|
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void buildEnv(String labelsConfig, String nodesConfig,
|
|
|
|
- String queuesConfig, String appsConfig,
|
|
|
|
- boolean useDominantResourceCalculator) throws IOException {
|
|
|
|
- if (useDominantResourceCalculator) {
|
|
|
|
- when(cs.getResourceCalculator()).thenReturn(
|
|
|
|
- new DominantResourceCalculator());
|
|
|
|
- }
|
|
|
|
- mockNodeLabelsManager(labelsConfig);
|
|
|
|
- mockSchedulerNodes(nodesConfig);
|
|
|
|
- for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
|
|
|
|
- when(cs.getSchedulerNode(nodeId)).thenReturn(
|
|
|
|
- nodeIdToSchedulerNodes.get(nodeId));
|
|
|
|
- }
|
|
|
|
- List<FiCaSchedulerNode> allNodes = new ArrayList<>(
|
|
|
|
- nodeIdToSchedulerNodes.values());
|
|
|
|
- when(cs.getAllNodes()).thenReturn(allNodes);
|
|
|
|
- ParentQueue root = mockQueueHierarchy(queuesConfig);
|
|
|
|
- when(cs.getRootQueue()).thenReturn(root);
|
|
|
|
- when(cs.getClusterResource()).thenReturn(clusterResource);
|
|
|
|
- mockApplications(appsConfig);
|
|
|
|
-
|
|
|
|
- policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
|
|
|
|
- mClock);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void updateQueueConfig(String queuesConfig) {
|
|
|
|
- ParentQueue root = mockQueueHierarchy(queuesConfig);
|
|
|
|
- when(cs.getRootQueue()).thenReturn(root);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void mockContainers(String containersConfig, FiCaSchedulerApp app,
|
|
|
|
- ApplicationAttemptId attemptId, String queueName,
|
|
|
|
- List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
|
|
|
|
- int containerId = 1;
|
|
|
|
- int start = containersConfig.indexOf("=") + 1;
|
|
|
|
- int end = -1;
|
|
|
|
-
|
|
|
|
- Resource used = Resource.newInstance(0, 0);
|
|
|
|
- Resource pending = Resource.newInstance(0, 0);
|
|
|
|
- Priority pri = Priority.newInstance(0);
|
|
|
|
-
|
|
|
|
- while (start < containersConfig.length()) {
|
|
|
|
- while (start < containersConfig.length()
|
|
|
|
- && containersConfig.charAt(start) != '(') {
|
|
|
|
- start++;
|
|
|
|
- }
|
|
|
|
- if (start >= containersConfig.length()) {
|
|
|
|
- throw new IllegalArgumentException(
|
|
|
|
- "Error containers specification, line=" + containersConfig);
|
|
|
|
- }
|
|
|
|
- end = start + 1;
|
|
|
|
- while (end < containersConfig.length()
|
|
|
|
- && containersConfig.charAt(end) != ')') {
|
|
|
|
- end++;
|
|
|
|
- }
|
|
|
|
- if (end >= containersConfig.length()) {
|
|
|
|
- throw new IllegalArgumentException(
|
|
|
|
- "Error containers specification, line=" + containersConfig);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // now we found start/end, get container values
|
|
|
|
- String[] values = containersConfig.substring(start + 1, end).split(",");
|
|
|
|
- if (values.length < 6 || values.length > 8) {
|
|
|
|
- throw new IllegalArgumentException("Format to define container is:"
|
|
|
|
- + "(priority,resource,host,expression,repeat,reserved, pending)");
|
|
|
|
- }
|
|
|
|
- pri.setPriority(Integer.valueOf(values[0]));
|
|
|
|
- Resource res = parseResourceFromString(values[1]);
|
|
|
|
- NodeId host = NodeId.newInstance(values[2], 1);
|
|
|
|
- String label = values[3];
|
|
|
|
- String userName = "user";
|
|
|
|
- int repeat = Integer.valueOf(values[4]);
|
|
|
|
- boolean reserved = Boolean.valueOf(values[5]);
|
|
|
|
- if (values.length >= 7) {
|
|
|
|
- Resources.addTo(pending, parseResourceFromString(values[6]));
|
|
|
|
- }
|
|
|
|
- if (values.length == 8) {
|
|
|
|
- userName = values[7];
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < repeat; i++) {
|
|
|
|
- Container c = mock(Container.class);
|
|
|
|
- Resources.addTo(used, res);
|
|
|
|
- when(c.getResource()).thenReturn(res);
|
|
|
|
- when(c.getPriority()).thenReturn(pri);
|
|
|
|
- SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
|
|
|
|
- RMContainerImpl rmc = mock(RMContainerImpl.class);
|
|
|
|
- when(rmc.getAllocatedSchedulerKey()).thenReturn(sk);
|
|
|
|
- when(rmc.getAllocatedNode()).thenReturn(host);
|
|
|
|
- when(rmc.getNodeLabelExpression()).thenReturn(label);
|
|
|
|
- when(rmc.getAllocatedResource()).thenReturn(res);
|
|
|
|
- when(rmc.getContainer()).thenReturn(c);
|
|
|
|
- when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
|
|
|
|
- when(rmc.getQueueName()).thenReturn(queueName);
|
|
|
|
- final ContainerId cId = ContainerId.newContainerId(attemptId,
|
|
|
|
- containerId);
|
|
|
|
- when(rmc.getContainerId()).thenReturn(cId);
|
|
|
|
- doAnswer(new Answer<Integer>() {
|
|
|
|
- @Override
|
|
|
|
- public Integer answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
- return cId.compareTo(
|
|
|
|
- ((RMContainer) invocation.getArguments()[0]).getContainerId());
|
|
|
|
- }
|
|
|
|
- }).when(rmc).compareTo(any(RMContainer.class));
|
|
|
|
-
|
|
|
|
- if (containerId == 1) {
|
|
|
|
- when(rmc.isAMContainer()).thenReturn(true);
|
|
|
|
- when(app.getAMResource(label)).thenReturn(res);
|
|
|
|
- when(app.getAppAMNodePartitionName()).thenReturn(label);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (reserved) {
|
|
|
|
- reservedContainers.add(rmc);
|
|
|
|
- when(rmc.getReservedResource()).thenReturn(res);
|
|
|
|
- } else {
|
|
|
|
- liveContainers.add(rmc);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Add container to scheduler-node
|
|
|
|
- addContainerToSchedulerNode(host, rmc, reserved);
|
|
|
|
-
|
|
|
|
- // If this is a non-exclusive allocation
|
|
|
|
- String partition = null;
|
|
|
|
- if (label.isEmpty()
|
|
|
|
- && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
|
|
|
|
- .isEmpty()) {
|
|
|
|
- LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
|
|
|
|
- Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
|
|
|
|
- .getIgnoreExclusivityRMContainers();
|
|
|
|
- if (!ignoreExclusivityContainers.containsKey(partition)) {
|
|
|
|
- ignoreExclusivityContainers.put(partition,
|
|
|
|
- new TreeSet<RMContainer>());
|
|
|
|
- }
|
|
|
|
- ignoreExclusivityContainers.get(partition).add(rmc);
|
|
|
|
- }
|
|
|
|
- LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
|
|
|
|
- + host + " nodeLabelExpression=" + label + " partition="
|
|
|
|
- + partition);
|
|
|
|
-
|
|
|
|
- containerId++;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // If app has 0 container, and it has only pending, still make sure to
|
|
|
|
- // update label.
|
|
|
|
- if (repeat == 0) {
|
|
|
|
- when(app.getAppAMNodePartitionName()).thenReturn(label);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Some more app specific aggregated data can be better filled here.
|
|
|
|
- when(app.getPriority()).thenReturn(pri);
|
|
|
|
- when(app.getUser()).thenReturn(userName);
|
|
|
|
- when(app.getCurrentConsumption()).thenReturn(used);
|
|
|
|
- when(app.getCurrentReservation())
|
|
|
|
- .thenReturn(Resources.createResource(0, 0));
|
|
|
|
-
|
|
|
|
- Map<String, Resource> pendingForDefaultPartition =
|
|
|
|
- new HashMap<String, Resource>();
|
|
|
|
- // Add for default partition for now.
|
|
|
|
- pendingForDefaultPartition.put(label, pending);
|
|
|
|
- when(app.getTotalPendingRequestsPerPartition())
|
|
|
|
- .thenReturn(pendingForDefaultPartition);
|
|
|
|
-
|
|
|
|
- // need to set pending resource in resource usage as well
|
|
|
|
- ResourceUsage ru = Mockito.spy(new ResourceUsage());
|
|
|
|
- ru.setUsed(label, used);
|
|
|
|
- when(ru.getCachedUsed(anyString())).thenReturn(used);
|
|
|
|
- when(app.getAppAttemptResourceUsage()).thenReturn(ru);
|
|
|
|
- when(app.getSchedulingResourceUsage()).thenReturn(ru);
|
|
|
|
-
|
|
|
|
- start = end + 1;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Format is:
|
|
|
|
- * <pre>
|
|
|
|
- * queueName\t // app1
|
|
|
|
- * (priority,resource,host,expression,#repeat,reserved)
|
|
|
|
- * (priority,resource,host,expression,#repeat,reserved);
|
|
|
|
- * queueName\t // app2
|
|
|
|
- * </pre>
|
|
|
|
- */
|
|
|
|
- private void mockApplications(String appsConfig) {
|
|
|
|
- int id = 1;
|
|
|
|
- HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
|
|
|
|
- HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
|
|
|
|
- LeafQueue queue = null;
|
|
|
|
- int mulp = -1;
|
|
|
|
- for (String a : appsConfig.split(";")) {
|
|
|
|
- String[] strs = a.split("\t");
|
|
|
|
- String queueName = strs[0];
|
|
|
|
- if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
|
|
|
|
- mulp = 100 / (new Integer(strs[2]).intValue());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // get containers
|
|
|
|
- List<RMContainer> liveContainers = new ArrayList<RMContainer>();
|
|
|
|
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
|
|
|
|
- ApplicationId appId = ApplicationId.newInstance(0L, id);
|
|
|
|
- ApplicationAttemptId appAttemptId = ApplicationAttemptId
|
|
|
|
- .newInstance(appId, 1);
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
|
|
|
|
- when(app.getAMResource(anyString()))
|
|
|
|
- .thenReturn(Resources.createResource(0, 0));
|
|
|
|
- mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
|
|
|
|
- liveContainers);
|
|
|
|
- LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
|
|
|
|
-
|
|
|
|
- when(app.getLiveContainers()).thenReturn(liveContainers);
|
|
|
|
- when(app.getReservedContainers()).thenReturn(reservedContainers);
|
|
|
|
- when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
|
|
|
|
- when(app.getApplicationId()).thenReturn(appId);
|
|
|
|
- when(app.getQueueName()).thenReturn(queueName);
|
|
|
|
-
|
|
|
|
- // add to LeafQueue
|
|
|
|
- queue = (LeafQueue) nameToCSQueues.get(queueName);
|
|
|
|
- queue.getApplications().add(app);
|
|
|
|
- queue.getAllApplications().add(app);
|
|
|
|
- when(queue.getMinimumAllocation())
|
|
|
|
- .thenReturn(Resource.newInstance(1,1));
|
|
|
|
- when(app.getCSLeafQueue()).thenReturn(queue);
|
|
|
|
-
|
|
|
|
- HashSet<String> users = userMap.get(queueName);
|
|
|
|
- if (null == users) {
|
|
|
|
- users = new HashSet<String>();
|
|
|
|
- userMap.put(queueName, users);
|
|
|
|
- }
|
|
|
|
- users.add(app.getUser());
|
|
|
|
-
|
|
|
|
- String label = app.getAppAMNodePartitionName();
|
|
|
|
-
|
|
|
|
- // Get label to queue
|
|
|
|
- HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
|
|
|
|
- .get(label);
|
|
|
|
- if (null == userResourceUsagePerQueue) {
|
|
|
|
- userResourceUsagePerQueue = new HashMap<>();
|
|
|
|
- userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Get queue to user based resource map
|
|
|
|
- HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
|
|
|
|
- .get(queueName);
|
|
|
|
- if (null == userResourceUsage) {
|
|
|
|
- userResourceUsage = new HashMap<>();
|
|
|
|
- userResourceUsagePerQueue.put(queueName, userResourceUsage);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Get user to its resource usage.
|
|
|
|
- ResourceUsage usage = userResourceUsage.get(app.getUser());
|
|
|
|
- if (null == usage) {
|
|
|
|
- usage = new ResourceUsage();
|
|
|
|
- userResourceUsage.put(app.getUser(), usage);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- usage.incAMUsed(app.getAMResource(label));
|
|
|
|
- usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
|
|
|
|
- id++;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for (String label : userResourceUsagePerLabel.keySet()) {
|
|
|
|
- for (String queueName : userMap.keySet()) {
|
|
|
|
- queue = (LeafQueue) nameToCSQueues.get(queueName);
|
|
|
|
- // Currently we have user-limit test support only for default label.
|
|
|
|
- Resource totResoucePerPartition = partitionToResource.get("");
|
|
|
|
- Resource capacity = Resources.multiply(totResoucePerPartition,
|
|
|
|
- queue.getQueueCapacities().getAbsoluteCapacity());
|
|
|
|
- HashSet<String> users = userMap.get(queue.getQueueName());
|
|
|
|
- when(queue.getAllUsers()).thenReturn(users);
|
|
|
|
- Resource userLimit;
|
|
|
|
- if (mulp > 0) {
|
|
|
|
- userLimit = Resources.divideAndCeil(rc, capacity, mulp);
|
|
|
|
- } else {
|
|
|
|
- userLimit = Resources.divideAndCeil(rc, capacity,
|
|
|
|
- users.size());
|
|
|
|
- }
|
|
|
|
- LOG.debug("Updating user-limit from mock: totResoucePerPartition="
|
|
|
|
- + totResoucePerPartition + ", capacity=" + capacity
|
|
|
|
- + ", users.size()=" + users.size() + ", userlimit= " + userLimit
|
|
|
|
- + ",label= " + label + ",queueName= " + queueName);
|
|
|
|
-
|
|
|
|
- HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
|
|
|
|
- .get(label).get(queueName);
|
|
|
|
- for (String userName : users) {
|
|
|
|
- User user = new User(userName);
|
|
|
|
- if (userResourceUsage != null) {
|
|
|
|
- user.setResourceUsage(userResourceUsage.get(userName));
|
|
|
|
- }
|
|
|
|
- when(queue.getUser(eq(userName))).thenReturn(user);
|
|
|
|
- when(queue.getResourceLimitForAllUsers(eq(userName),
|
|
|
|
- any(Resource.class), anyString(), any(SchedulingMode.class)))
|
|
|
|
- .thenReturn(userLimit);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
|
|
|
|
- boolean isReserved) {
|
|
|
|
- SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId);
|
|
|
|
- assert node != null;
|
|
|
|
-
|
|
|
|
- if (isReserved) {
|
|
|
|
- when(node.getReservedContainer()).thenReturn(container);
|
|
|
|
- } else {
|
|
|
|
- node.getCopiedListOfRunningContainers().add(container);
|
|
|
|
- Resources.subtractFrom(node.getUnallocatedResource(),
|
|
|
|
- container.getAllocatedResource());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Format is:
|
|
|
|
- * host1=partition[ res=resource];
|
|
|
|
- * host2=partition[ res=resource];
|
|
|
|
- */
|
|
|
|
- private void mockSchedulerNodes(String schedulerNodesConfigStr)
|
|
|
|
- throws IOException {
|
|
|
|
- String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
|
|
|
|
- for (String p : nodesConfigStrArray) {
|
|
|
|
- String[] arr = p.split(" ");
|
|
|
|
-
|
|
|
|
- NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
|
|
|
|
- String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length());
|
|
|
|
-
|
|
|
|
- FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
|
|
|
|
- when(sn.getNodeID()).thenReturn(nodeId);
|
|
|
|
- when(sn.getPartition()).thenReturn(partition);
|
|
|
|
-
|
|
|
|
- Resource totalRes = Resources.createResource(0);
|
|
|
|
- if (arr.length > 1) {
|
|
|
|
- String res = arr[1];
|
|
|
|
- if (res.contains("res=")) {
|
|
|
|
- String resSring = res.substring(
|
|
|
|
- res.indexOf("res=") + "res=".length());
|
|
|
|
- totalRes = parseResourceFromString(resSring);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- when(sn.getTotalResource()).thenReturn(totalRes);
|
|
|
|
- when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
|
|
|
|
-
|
|
|
|
- // TODO, add settings of killable resources when necessary
|
|
|
|
- when(sn.getTotalKillableResources()).thenReturn(Resources.none());
|
|
|
|
-
|
|
|
|
- List<RMContainer> liveContainers = new ArrayList<>();
|
|
|
|
- when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
|
|
|
|
-
|
|
|
|
- nodeIdToSchedulerNodes.put(nodeId, sn);
|
|
|
|
-
|
|
|
|
- LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Format is:
|
|
|
|
- * <pre>
|
|
|
|
- * partition0=total_resource,exclusivity;
|
|
|
|
- * partition1=total_resource,exclusivity;
|
|
|
|
- * ...
|
|
|
|
- * </pre>
|
|
|
|
- */
|
|
|
|
- private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
|
|
|
|
- String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
|
|
|
|
- clusterResource = Resources.createResource(0);
|
|
|
|
- for (String p : partitionConfigArr) {
|
|
|
|
- String partitionName = p.substring(0, p.indexOf("="));
|
|
|
|
- Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
|
|
|
|
- p.indexOf(",")));
|
|
|
|
- boolean exclusivity =
|
|
|
|
- Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
|
|
|
|
- when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
|
|
|
|
- .thenReturn(res);
|
|
|
|
- when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
|
|
|
|
-
|
|
|
|
- // add to partition to resource
|
|
|
|
- partitionToResource.put(partitionName, res);
|
|
|
|
- LOG.debug("add partition=" + partitionName + " totalRes=" + res
|
|
|
|
- + " exclusivity=" + exclusivity);
|
|
|
|
- Resources.addTo(clusterResource, res);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- when(nlm.getClusterNodeLabelNames()).thenReturn(
|
|
|
|
- partitionToResource.keySet());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private Resource parseResourceFromString(String p) {
|
|
|
|
- String[] resource = p.split(":");
|
|
|
|
- Resource res;
|
|
|
|
- if (resource.length == 1) {
|
|
|
|
- res = Resources.createResource(Integer.valueOf(resource[0]));
|
|
|
|
- } else {
|
|
|
|
- res = Resources.createResource(Integer.valueOf(resource[0]),
|
|
|
|
- Integer.valueOf(resource[1]));
|
|
|
|
- if (resource.length > 2) {
|
|
|
|
- // Using the same order of resources from ResourceUtils, set resource
|
|
|
|
- // informations.
|
|
|
|
- ResourceInformation[] storedResourceInfo = ResourceUtils
|
|
|
|
- .getResourceTypesArray();
|
|
|
|
- for (int i = 2; i < resource.length; i++) {
|
|
|
|
- res.setResourceInformation(storedResourceInfo[i].getName(),
|
|
|
|
- ResourceInformation.newInstance(storedResourceInfo[i].getName(),
|
|
|
|
- storedResourceInfo[i].getUnits(),
|
|
|
|
- Integer.valueOf(resource[i])));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return res;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Format is:
|
|
|
|
- * <pre>
|
|
|
|
- * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
|
|
|
|
- * -A(...);
|
|
|
|
- * --A1(...);
|
|
|
|
- * --A2(...);
|
|
|
|
- * -B...
|
|
|
|
- * </pre>
|
|
|
|
- * ";" splits queues, and there should no empty lines, no extra spaces
|
|
|
|
- *
|
|
|
|
- * For each queue, it has configurations to specify capacities (to each
|
|
|
|
- * partition), format is:
|
|
|
|
- * <pre>
|
|
|
|
- * -<queueName> (<labelName1>=[guaranteed max used pending], \
|
|
|
|
- * <labelName2>=[guaranteed max used pending])
|
|
|
|
- * {key1=value1,key2=value2}; // Additional configs
|
|
|
|
- * </pre>
|
|
|
|
- */
|
|
|
|
- @SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
|
- private ParentQueue mockQueueHierarchy(String queueExprs) {
|
|
|
|
- String[] queueExprArray = queueExprs.split(";");
|
|
|
|
- ParentQueue rootQueue = null;
|
|
|
|
- for (int idx = 0; idx < queueExprArray.length; idx++) {
|
|
|
|
- String q = queueExprArray[idx];
|
|
|
|
- CSQueue queue;
|
|
|
|
-
|
|
|
|
- // Initialize queue
|
|
|
|
- if (isParent(queueExprArray, idx)) {
|
|
|
|
- ParentQueue parentQueue = mock(ParentQueue.class);
|
|
|
|
- queue = parentQueue;
|
|
|
|
- List<CSQueue> children = new ArrayList<CSQueue>();
|
|
|
|
- when(parentQueue.getChildQueues()).thenReturn(children);
|
|
|
|
- QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
|
|
|
|
- when(policy.getConfigName()).thenReturn(
|
|
|
|
- CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
|
|
|
|
- when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
|
|
|
|
- } else {
|
|
|
|
- LeafQueue leafQueue = mock(LeafQueue.class);
|
|
|
|
- final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
|
|
|
|
- new Comparator<FiCaSchedulerApp>() {
|
|
|
|
- @Override
|
|
|
|
- public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
|
|
|
|
- if (a1.getPriority() != null
|
|
|
|
- && !a1.getPriority().equals(a2.getPriority())) {
|
|
|
|
- return a1.getPriority().compareTo(a2.getPriority());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- int res = a1.getApplicationId()
|
|
|
|
- .compareTo(a2.getApplicationId());
|
|
|
|
- return res;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- when(leafQueue.getApplications()).thenReturn(apps);
|
|
|
|
- when(leafQueue.getAllApplications()).thenReturn(apps);
|
|
|
|
- OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
|
|
|
|
- String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
|
|
|
|
- + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
|
|
|
|
- + ".ordering-policy", "fifo");
|
|
|
|
- if (opName.equals("fair")) {
|
|
|
|
- so = Mockito.spy(new FairOrderingPolicy<FiCaSchedulerApp>());
|
|
|
|
- }
|
|
|
|
- when(so.getPreemptionIterator()).thenAnswer(new Answer() {
|
|
|
|
- public Object answer(InvocationOnMock invocation) {
|
|
|
|
- return apps.descendingIterator();
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- when(leafQueue.getOrderingPolicy()).thenReturn(so);
|
|
|
|
-
|
|
|
|
- Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
|
|
|
|
- new HashMap<>();
|
|
|
|
- when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
|
|
|
|
- ignorePartitionContainers);
|
|
|
|
- queue = leafQueue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
|
- when(queue.getReadLock()).thenReturn(lock.readLock());
|
|
|
|
- setupQueue(queue, q, queueExprArray, idx);
|
|
|
|
- if (queue.getQueueName().equals(ROOT)) {
|
|
|
|
- rootQueue = (ParentQueue) queue;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return rootQueue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
|
|
|
|
- int idx) {
|
|
|
|
- LOG.debug("*** Setup queue, source=" + q);
|
|
|
|
- String queuePath = null;
|
|
|
|
-
|
|
|
|
- int myLevel = getLevel(q);
|
|
|
|
- if (0 == myLevel) {
|
|
|
|
- // It's root
|
|
|
|
- when(queue.getQueueName()).thenReturn(ROOT);
|
|
|
|
- queuePath = ROOT;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- String queueName = getQueueName(q);
|
|
|
|
- when(queue.getQueueName()).thenReturn(queueName);
|
|
|
|
-
|
|
|
|
- // Setup parent queue, and add myself to parentQueue.children-list
|
|
|
|
- ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
|
|
|
|
- if (null != parentQueue) {
|
|
|
|
- when(queue.getParent()).thenReturn(parentQueue);
|
|
|
|
- parentQueue.getChildQueues().add(queue);
|
|
|
|
-
|
|
|
|
- // Setup my path
|
|
|
|
- queuePath = parentQueue.getQueuePath() + "." + queueName;
|
|
|
|
- }
|
|
|
|
- when(queue.getQueuePath()).thenReturn(queuePath);
|
|
|
|
-
|
|
|
|
- QueueCapacities qc = new QueueCapacities(0 == myLevel);
|
|
|
|
- ResourceUsage ru = new ResourceUsage();
|
|
|
|
- QueueResourceQuotas qr = new QueueResourceQuotas();
|
|
|
|
-
|
|
|
|
- when(queue.getQueueCapacities()).thenReturn(qc);
|
|
|
|
- when(queue.getQueueResourceUsage()).thenReturn(ru);
|
|
|
|
- when(queue.getQueueResourceQuotas()).thenReturn(qr);
|
|
|
|
-
|
|
|
|
- LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
|
|
|
|
- + queue.getQueuePath());
|
|
|
|
- LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
|
|
|
|
- .getQueueName()));
|
|
|
|
-
|
|
|
|
- // Setup other fields like used resource, guaranteed resource, etc.
|
|
|
|
- String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
|
|
|
|
- for (String s : capacitySettingStr.split(",")) {
|
|
|
|
- String partitionName = s.substring(0, s.indexOf("="));
|
|
|
|
- String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
|
|
|
|
- // Add a small epsilon to capacities to avoid truncate when doing
|
|
|
|
- // Resources.multiply
|
|
|
|
- float epsilon = 1e-6f;
|
|
|
|
- Resource totResoucePerPartition = partitionToResource.get(partitionName);
|
|
|
|
- float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
|
|
|
|
- parseResourceFromString(values[0].trim()), totResoucePerPartition)
|
|
|
|
- + epsilon;
|
|
|
|
- float absMax = Resources.divide(rc, totResoucePerPartition,
|
|
|
|
- parseResourceFromString(values[1].trim()), totResoucePerPartition)
|
|
|
|
- + epsilon;
|
|
|
|
- float absUsed = Resources.divide(rc, totResoucePerPartition,
|
|
|
|
- parseResourceFromString(values[2].trim()), totResoucePerPartition)
|
|
|
|
- + epsilon;
|
|
|
|
- float used = Resources.divide(rc, totResoucePerPartition,
|
|
|
|
- parseResourceFromString(values[2].trim()),
|
|
|
|
- parseResourceFromString(values[0].trim())) + epsilon;
|
|
|
|
- Resource pending = parseResourceFromString(values[3].trim());
|
|
|
|
- qc.setAbsoluteCapacity(partitionName, absGuaranteed);
|
|
|
|
- qc.setAbsoluteMaximumCapacity(partitionName, absMax);
|
|
|
|
- qc.setAbsoluteUsedCapacity(partitionName, absUsed);
|
|
|
|
- qc.setUsedCapacity(partitionName, used);
|
|
|
|
- qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
|
|
|
|
- qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
|
|
|
|
- qr.setEffectiveMaxResource(partitionName,
|
|
|
|
- parseResourceFromString(values[1].trim()));
|
|
|
|
- qr.setEffectiveMinResource(partitionName,
|
|
|
|
- parseResourceFromString(values[0].trim()));
|
|
|
|
- when(queue.getUsedCapacity()).thenReturn(used);
|
|
|
|
- when(queue.getEffectiveCapacity(partitionName))
|
|
|
|
- .thenReturn(parseResourceFromString(values[0].trim()));
|
|
|
|
- when(queue.getEffectiveMaxCapacity(partitionName))
|
|
|
|
- .thenReturn(parseResourceFromString(values[1].trim()));
|
|
|
|
- ru.setPending(partitionName, pending);
|
|
|
|
- // Setup reserved resource if it contained by input config
|
|
|
|
- Resource reserved = Resources.none();
|
|
|
|
- if(values.length == 5) {
|
|
|
|
- reserved = parseResourceFromString(values[4].trim());
|
|
|
|
- ru.setReserved(partitionName, reserved);
|
|
|
|
- }
|
|
|
|
- if (!isParent(queueExprArray, idx)) {
|
|
|
|
- LeafQueue lq = (LeafQueue) queue;
|
|
|
|
- when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
|
|
|
|
- isA(String.class), eq(false))).thenReturn(pending);
|
|
|
|
- when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
|
|
|
|
- isA(String.class), eq(true))).thenReturn(
|
|
|
|
- Resources.subtract(pending, reserved));
|
|
|
|
- }
|
|
|
|
- ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
|
|
|
|
-
|
|
|
|
- LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
|
|
|
|
- + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
|
|
|
|
- + ",abs_used" + absUsed + ",pending_resource=" + pending
|
|
|
|
- + ", reserved_resource=" + reserved + "]");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Setup preemption disabled
|
|
|
|
- when(queue.getPreemptionDisabled()).thenReturn(
|
|
|
|
- conf.getPreemptionDisabled(queuePath, false));
|
|
|
|
-
|
|
|
|
- // Setup other queue configurations
|
|
|
|
- Map<String, String> otherConfigs = getOtherConfigurations(
|
|
|
|
- queueExprArray[idx]);
|
|
|
|
- if (otherConfigs.containsKey("priority")) {
|
|
|
|
- when(queue.getPriority()).thenReturn(
|
|
|
|
- Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
|
|
|
|
- } else {
|
|
|
|
- // set queue's priority to 0 by default
|
|
|
|
- when(queue.getPriority()).thenReturn(Priority.newInstance(0));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Setup disable preemption of queues
|
|
|
|
- if (otherConfigs.containsKey("disable_preemption")) {
|
|
|
|
- when(queue.getPreemptionDisabled()).thenReturn(
|
|
|
|
- Boolean.valueOf(otherConfigs.get("disable_preemption")));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- nameToCSQueues.put(queueName, queue);
|
|
|
|
- when(cs.getQueue(eq(queueName))).thenReturn(queue);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Get additional queue's configurations
|
|
|
|
- * @param queueExpr queue expr
|
|
|
|
- * @return maps of configs
|
|
|
|
- */
|
|
|
|
- private Map<String, String> getOtherConfigurations(String queueExpr) {
|
|
|
|
- if (queueExpr.contains("{")) {
|
|
|
|
- int left = queueExpr.indexOf('{');
|
|
|
|
- int right = queueExpr.indexOf('}');
|
|
|
|
-
|
|
|
|
- if (right > left) {
|
|
|
|
- Map<String, String> configs = new HashMap<>();
|
|
|
|
-
|
|
|
|
- String subStr = queueExpr.substring(left + 1, right);
|
|
|
|
- for (String kv : subStr.split(",")) {
|
|
|
|
- if (kv.contains("=")) {
|
|
|
|
- String key = kv.substring(0, kv.indexOf("="));
|
|
|
|
- String value = kv.substring(kv.indexOf("=") + 1);
|
|
|
|
- configs.put(key, value);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return configs;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return Collections.emptyMap();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Level of a queue is how many "-" at beginning, root's level is 0
|
|
|
|
- */
|
|
|
|
- private int getLevel(String q) {
|
|
|
|
- int level = 0; // level = how many "-" at beginning
|
|
|
|
- while (level < q.length() && q.charAt(level) == '-') {
|
|
|
|
- level++;
|
|
|
|
- }
|
|
|
|
- return level;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private String getQueueName(String q) {
|
|
|
|
- int idx = 0;
|
|
|
|
- // find first != '-' char
|
|
|
|
- while (idx < q.length() && q.charAt(idx) == '-') {
|
|
|
|
- idx++;
|
|
|
|
- }
|
|
|
|
- if (idx == q.length()) {
|
|
|
|
- throw new IllegalArgumentException("illegal input:" + q);
|
|
|
|
- }
|
|
|
|
- // name = after '-' and before '('
|
|
|
|
- String name = q.substring(idx, q.indexOf('('));
|
|
|
|
- if (name.isEmpty()) {
|
|
|
|
- throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
|
|
|
|
- }
|
|
|
|
- if (name.contains(".")) {
|
|
|
|
- throw new IllegalArgumentException("queue name shouldn't contain '.':"
|
|
|
|
- + name);
|
|
|
|
- }
|
|
|
|
- return name;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
|
|
|
|
- idx--;
|
|
|
|
- while (idx >= 0) {
|
|
|
|
- int level = getLevel(queueExprArray[idx]);
|
|
|
|
- if (level < myLevel) {
|
|
|
|
- String parentQueuName = getQueueName(queueExprArray[idx]);
|
|
|
|
- return (ParentQueue) nameToCSQueues.get(parentQueuName);
|
|
|
|
- }
|
|
|
|
- idx--;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Get if a queue is ParentQueue
|
|
|
|
- */
|
|
|
|
- private boolean isParent(String[] queues, int idx) {
|
|
|
|
- int myLevel = getLevel(queues[idx]);
|
|
|
|
- idx++;
|
|
|
|
- while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
|
|
|
|
- idx++;
|
|
|
|
- }
|
|
|
|
- if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
|
|
|
|
- // It's a LeafQueue
|
|
|
|
- return false;
|
|
|
|
- } else {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public ApplicationAttemptId getAppAttemptId(int id) {
|
|
|
|
- ApplicationId appId = ApplicationId.newInstance(0L, id);
|
|
|
|
- ApplicationAttemptId appAttemptId =
|
|
|
|
- ApplicationAttemptId.newInstance(appId, 1);
|
|
|
|
- return appAttemptId;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void checkContainerNodesInApp(FiCaSchedulerApp app,
|
|
|
|
- int expectedContainersNumber, String host) {
|
|
|
|
- NodeId nodeId = NodeId.newInstance(host, 1);
|
|
|
|
- int num = 0;
|
|
|
|
- for (RMContainer c : app.getLiveContainers()) {
|
|
|
|
- if (c.getAllocatedNode().equals(nodeId)) {
|
|
|
|
- num++;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- for (RMContainer c : app.getReservedContainers()) {
|
|
|
|
- if (c.getAllocatedNode().equals(nodeId)) {
|
|
|
|
- num++;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Assert.assertEquals(expectedContainersNumber, num);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FiCaSchedulerApp getApp(String queueName, int appId) {
|
|
|
|
- for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
|
|
|
|
- .getApplications()) {
|
|
|
|
- if (app.getApplicationId().getId() == appId) {
|
|
|
|
- return app;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void checkAbsCapacities(CSQueue queue, String partition,
|
|
|
|
- float guaranteed, float max, float used) {
|
|
|
|
- QueueCapacities qc = queue.getQueueCapacities();
|
|
|
|
- Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
|
|
|
|
- Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
|
|
|
|
- Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void checkPendingResource(CSQueue queue, String partition, int pending) {
|
|
|
|
- ResourceUsage ru = queue.getQueueResourceUsage();
|
|
|
|
- Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void checkPriority(CSQueue queue, int expectedPriority) {
|
|
|
|
- Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void checkReservedResource(CSQueue queue, String partition, int reserved) {
|
|
|
|
- ResourceUsage ru = queue.getQueueResourceUsage();
|
|
|
|
- Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- static class IsPreemptionRequestForQueueAndNode
|
|
|
|
- extends ArgumentMatcher<ContainerPreemptEvent> {
|
|
|
|
- private final ApplicationAttemptId appAttId;
|
|
|
|
- private final String queueName;
|
|
|
|
- private final NodeId nodeId;
|
|
|
|
-
|
|
|
|
- IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
|
|
|
|
- String queueName, NodeId nodeId) {
|
|
|
|
- this.appAttId = appAttId;
|
|
|
|
- this.queueName = queueName;
|
|
|
|
- this.nodeId = nodeId;
|
|
|
|
- }
|
|
|
|
- @Override
|
|
|
|
- public boolean matches(Object o) {
|
|
|
|
- ContainerPreemptEvent cpe = (ContainerPreemptEvent)o;
|
|
|
|
-
|
|
|
|
- return appAttId.equals(cpe.getAppId())
|
|
|
|
- && queueName.equals(cpe.getContainer().getQueueName())
|
|
|
|
- && nodeId.equals(cpe.getContainer().getAllocatedNode());
|
|
|
|
- }
|
|
|
|
- @Override
|
|
|
|
- public String toString() {
|
|
|
|
- return appAttId.toString();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|