|
@@ -0,0 +1,1361 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.QueueACL;
|
|
|
+import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
+/**
|
|
|
+ * A scheduler that schedules resources between a set of queues. The scheduler
|
|
|
+ * keeps track of the resources used by each queue, and attempts to maintain
|
|
|
+ * fairness by scheduling tasks at queues whose allocations are farthest below
|
|
|
+ * an ideal fair distribution.
|
|
|
+ *
|
|
|
+ * The fair scheduler supports hierarchical queues. All queues descend from a
|
|
|
+ * queue named "root". Available resources are distributed among the children
|
|
|
+ * of the root queue in the typical fair scheduling fashion. Then, the children
|
|
|
+ * distribute the resources assigned to them to their children in the same
|
|
|
+ * fashion. Applications may only be scheduled on leaf queues. Queues can be
|
|
|
+ * specified as children of other queues by placing them as sub-elements of their
|
|
|
+ * parents in the fair scheduler configuration file.
|
|
|
+ *
|
|
|
+ * A queue's name starts with the names of its parents, with periods as
|
|
|
+ * separators. So a queue named "queue1" under the root named, would be
|
|
|
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
|
|
|
+ * named "parent1" would be referred to as "root.parent1.queue2".
|
|
|
+ */
|
|
|
+@LimitedPrivate("yarn")
|
|
|
+@Unstable
|
|
|
+@SuppressWarnings("unchecked")
|
|
|
+public class FairScheduler implements ResourceScheduler {
|
|
|
+ private boolean initialized;
|
|
|
+ private FairSchedulerConfiguration conf;
|
|
|
+ private RMContext rmContext;
|
|
|
+ private Resource minimumAllocation;
|
|
|
+ private Resource maximumAllocation;
|
|
|
+ private Resource incrAllocation;
|
|
|
+ private QueueManager queueMgr;
|
|
|
+ private Clock clock;
|
|
|
+ private boolean usePortForNodeName;
|
|
|
+
|
|
|
+ private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
|
|
+
|
|
|
+ private static final ResourceCalculator RESOURCE_CALCULATOR =
|
|
|
+ new DefaultResourceCalculator();
|
|
|
+
|
|
|
+ // Value that container assignment methods return when a container is
|
|
|
+ // reserved
|
|
|
+ public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
|
|
+
|
|
|
+ // How often fair shares are re-calculated (ms)
|
|
|
+ protected long UPDATE_INTERVAL = 500;
|
|
|
+
|
|
|
+ private final static List<Container> EMPTY_CONTAINER_LIST =
|
|
|
+ new ArrayList<Container>();
|
|
|
+
|
|
|
+ private static final Allocation EMPTY_ALLOCATION =
|
|
|
+ new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
|
|
|
+
|
|
|
+ // Aggregate metrics
|
|
|
+ FSQueueMetrics rootMetrics;
|
|
|
+
|
|
|
+ // Time when we last updated preemption vars
|
|
|
+ protected long lastPreemptionUpdateTime;
|
|
|
+ // Time we last ran preemptTasksIfNecessary
|
|
|
+ private long lastPreemptCheckTime;
|
|
|
+
|
|
|
+ // This stores per-application scheduling information,
|
|
|
+ @VisibleForTesting
|
|
|
+ protected Map<ApplicationId, SchedulerApplication> applications =
|
|
|
+ new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
|
|
|
+
|
|
|
+ // Nodes in the cluster, indexed by NodeId
|
|
|
+ private Map<NodeId, FSSchedulerNode> nodes =
|
|
|
+ new ConcurrentHashMap<NodeId, FSSchedulerNode>();
|
|
|
+
|
|
|
+ // Aggregate capacity of the cluster
|
|
|
+ private Resource clusterCapacity =
|
|
|
+ RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
|
|
|
+
|
|
|
+ // How often tasks are preempted
|
|
|
+ protected long preemptionInterval;
|
|
|
+
|
|
|
+ // ms to wait before force killing stuff (must be longer than a couple
|
|
|
+ // of heartbeats to give task-kill commands a chance to act).
|
|
|
+ protected long waitTimeBeforeKill;
|
|
|
+
|
|
|
+ // Containers whose AMs have been warned that they will be preempted soon.
|
|
|
+ private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
|
|
|
+
|
|
|
+ protected boolean preemptionEnabled;
|
|
|
+ protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
|
|
+ protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
|
|
+ protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
|
|
+ protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
|
|
|
+ private Comparator nodeAvailableResourceComparator =
|
|
|
+ new NodeAvailableResourceComparator(); // Node available resource comparator
|
|
|
+ protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
|
|
+ protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
|
|
+ protected long nodeLocalityDelayMs; // Delay for node locality
|
|
|
+ protected long rackLocalityDelayMs; // Delay for rack locality
|
|
|
+ private FairSchedulerEventLog eventLog; // Machine-readable event log
|
|
|
+ protected boolean assignMultiple; // Allocate multiple containers per
|
|
|
+ // heartbeat
|
|
|
+ protected int maxAssign; // Max containers to assign per heartbeat
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ final MaxRunningAppsEnforcer maxRunningEnforcer;
|
|
|
+
|
|
|
+ private AllocationFileLoaderService allocsLoader;
|
|
|
+ @VisibleForTesting
|
|
|
+ AllocationConfiguration allocConf;
|
|
|
+
|
|
|
+ public FairScheduler() {
|
|
|
+ clock = new SystemClock();
|
|
|
+ allocsLoader = new AllocationFileLoaderService();
|
|
|
+ queueMgr = new QueueManager(this);
|
|
|
+ maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void validateConf(Configuration conf) {
|
|
|
+ // validate scheduler memory allocation setting
|
|
|
+ int minMem = conf.getInt(
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
|
+ int maxMem = conf.getInt(
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
|
+
|
|
|
+ if (minMem < 0 || minMem > maxMem) {
|
|
|
+ throw new YarnRuntimeException("Invalid resource scheduler memory"
|
|
|
+ + " allocation configuration"
|
|
|
+ + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
|
|
|
+ + "=" + minMem
|
|
|
+ + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
|
|
|
+ + "=" + maxMem + ", min should equal greater than 0"
|
|
|
+ + ", max should be no smaller than min.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // validate scheduler vcores allocation setting
|
|
|
+ int minVcores = conf.getInt(
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
|
+ int maxVcores = conf.getInt(
|
|
|
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
|
+
|
|
|
+ if (minVcores < 0 || minVcores > maxVcores) {
|
|
|
+ throw new YarnRuntimeException("Invalid resource scheduler vcores"
|
|
|
+ + " allocation configuration"
|
|
|
+ + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
|
|
|
+ + "=" + minVcores
|
|
|
+ + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
|
|
|
+ + "=" + maxVcores + ", min should equal greater than 0"
|
|
|
+ + ", max should be no smaller than min.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public FairSchedulerConfiguration getConf() {
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ public QueueManager getQueueManager() {
|
|
|
+ return queueMgr;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RMContainer getRMContainer(ContainerId containerId) {
|
|
|
+ FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
|
|
|
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private FSSchedulerApp getCurrentAttemptForContainer(
|
|
|
+ ContainerId containerId) {
|
|
|
+ SchedulerApplication app =
|
|
|
+ applications.get(containerId.getApplicationAttemptId()
|
|
|
+ .getApplicationId());
|
|
|
+ if (app != null) {
|
|
|
+ return (FSSchedulerApp) app.getCurrentAppAttempt();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A runnable which calls {@link FairScheduler#update()} every
|
|
|
+ * <code>UPDATE_INTERVAL</code> milliseconds.
|
|
|
+ */
|
|
|
+ private class UpdateThread implements Runnable {
|
|
|
+ public void run() {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(UPDATE_INTERVAL);
|
|
|
+ update();
|
|
|
+ preemptTasksIfNecessary();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Exception in fair scheduler UpdateThread", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Recompute the internal variables used by the scheduler - per-job weights,
|
|
|
+ * fair shares, deficits, minimum slot allocations, and amount of used and
|
|
|
+ * required resources per job.
|
|
|
+ */
|
|
|
+ protected synchronized void update() {
|
|
|
+ updatePreemptionVariables(); // Determine if any queues merit preemption
|
|
|
+
|
|
|
+ FSQueue rootQueue = queueMgr.getRootQueue();
|
|
|
+
|
|
|
+ // Recursively update demands for all queues
|
|
|
+ rootQueue.updateDemand();
|
|
|
+
|
|
|
+ rootQueue.setFairShare(clusterCapacity);
|
|
|
+ // Recursively compute fair shares for all queues
|
|
|
+ // and update metrics
|
|
|
+ rootQueue.recomputeShares();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Update the preemption fields for all QueueScheduables, i.e. the times since
|
|
|
+ * each queue last was at its guaranteed share and at > 1/2 of its fair share
|
|
|
+ * for each type of task.
|
|
|
+ */
|
|
|
+ private void updatePreemptionVariables() {
|
|
|
+ long now = clock.getTime();
|
|
|
+ lastPreemptionUpdateTime = now;
|
|
|
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
|
|
+ if (!isStarvedForMinShare(sched)) {
|
|
|
+ sched.setLastTimeAtMinShare(now);
|
|
|
+ }
|
|
|
+ if (!isStarvedForFairShare(sched)) {
|
|
|
+ sched.setLastTimeAtHalfFairShare(now);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is a queue below its min share for the given task type?
|
|
|
+ */
|
|
|
+ boolean isStarvedForMinShare(FSLeafQueue sched) {
|
|
|
+ Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getMinShare(), sched.getDemand());
|
|
|
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), desiredShare);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is a queue being starved for fair share for the given task type? This is
|
|
|
+ * defined as being below half its fair share.
|
|
|
+ */
|
|
|
+ boolean isStarvedForFairShare(FSLeafQueue sched) {
|
|
|
+ Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
|
|
|
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), desiredFairShare);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check for queues that need tasks preempted, either because they have been
|
|
|
+ * below their guaranteed share for minSharePreemptionTimeout or they have
|
|
|
+ * been below half their fair share for the fairSharePreemptionTimeout. If
|
|
|
+ * such queues exist, compute how many tasks of each type need to be preempted
|
|
|
+ * and then select the right ones using preemptTasks.
|
|
|
+ */
|
|
|
+ protected synchronized void preemptTasksIfNecessary() {
|
|
|
+ if (!preemptionEnabled) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long curTime = clock.getTime();
|
|
|
+ if (curTime - lastPreemptCheckTime < preemptionInterval) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ lastPreemptCheckTime = curTime;
|
|
|
+
|
|
|
+ Resource resToPreempt = Resources.none();
|
|
|
+
|
|
|
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
|
|
+ resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
|
|
|
+ }
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
|
|
|
+ Resources.none())) {
|
|
|
+ preemptResources(queueMgr.getLeafQueues(), resToPreempt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Preempt a quantity of resources from a list of QueueSchedulables. The
|
|
|
+ * policy for this is to pick apps from queues that are over their fair share,
|
|
|
+ * but make sure that no queue is placed below its fair share in the process.
|
|
|
+ * We further prioritize preemption by choosing containers with lowest
|
|
|
+ * priority to preempt.
|
|
|
+ */
|
|
|
+ protected void preemptResources(Collection<FSLeafQueue> scheds,
|
|
|
+ Resource toPreempt) {
|
|
|
+ if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<RMContainer, FSSchedulerApp> apps =
|
|
|
+ new HashMap<RMContainer, FSSchedulerApp>();
|
|
|
+ Map<RMContainer, FSLeafQueue> queues =
|
|
|
+ new HashMap<RMContainer, FSLeafQueue>();
|
|
|
+
|
|
|
+ // Collect running containers from over-scheduled queues
|
|
|
+ List<RMContainer> runningContainers = new ArrayList<RMContainer>();
|
|
|
+ for (FSLeafQueue sched : scheds) {
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
+ for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
|
|
|
+ for (RMContainer c : as.getApp().getLiveContainers()) {
|
|
|
+ runningContainers.add(c);
|
|
|
+ apps.put(c, as.getApp());
|
|
|
+ queues.put(c, sched);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Sort containers into reverse order of priority
|
|
|
+ Collections.sort(runningContainers, new Comparator<RMContainer>() {
|
|
|
+ public int compare(RMContainer c1, RMContainer c2) {
|
|
|
+ int ret = c1.getContainer().getPriority().compareTo(
|
|
|
+ c2.getContainer().getPriority());
|
|
|
+ if (ret == 0) {
|
|
|
+ return c2.getContainerId().compareTo(c1.getContainerId());
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // Scan down the list of containers we've already warned and kill them
|
|
|
+ // if we need to. Remove any containers from the list that we don't need
|
|
|
+ // or that are no longer running.
|
|
|
+ Iterator<RMContainer> warnedIter = warnedContainers.iterator();
|
|
|
+ Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
|
|
|
+ while (warnedIter.hasNext()) {
|
|
|
+ RMContainer container = warnedIter.next();
|
|
|
+ if (container.getState() == RMContainerState.RUNNING &&
|
|
|
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ toPreempt, Resources.none())) {
|
|
|
+ warnOrKillContainer(container, apps.get(container), queues.get(container));
|
|
|
+ preemptedThisRound.add(container);
|
|
|
+ Resources.subtractFrom(toPreempt, container.getContainer().getResource());
|
|
|
+ } else {
|
|
|
+ warnedIter.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Scan down the rest of the containers until we've preempted enough, making
|
|
|
+ // sure we don't preempt too many from any queue
|
|
|
+ Iterator<RMContainer> runningIter = runningContainers.iterator();
|
|
|
+ while (runningIter.hasNext() &&
|
|
|
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ toPreempt, Resources.none())) {
|
|
|
+ RMContainer container = runningIter.next();
|
|
|
+ FSLeafQueue sched = queues.get(container);
|
|
|
+ if (!preemptedThisRound.contains(container) &&
|
|
|
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
+ warnOrKillContainer(container, apps.get(container), sched);
|
|
|
+
|
|
|
+ warnedContainers.add(container);
|
|
|
+ Resources.subtractFrom(toPreempt, container.getContainer().getResource());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
|
|
|
+ FSLeafQueue queue) {
|
|
|
+ LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
|
|
|
+ "res=" + container.getContainer().getResource() +
|
|
|
+ ") from queue " + queue.getName());
|
|
|
+
|
|
|
+ Long time = app.getContainerPreemptionTime(container);
|
|
|
+
|
|
|
+ if (time != null) {
|
|
|
+ // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
|
|
|
+ // proceed with kill
|
|
|
+ if (time + waitTimeBeforeKill < clock.getTime()) {
|
|
|
+ ContainerStatus status =
|
|
|
+ SchedulerUtils.createPreemptedContainerStatus(
|
|
|
+ container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
|
|
|
+
|
|
|
+ // TODO: Not sure if this ever actually adds this to the list of cleanup
|
|
|
+ // containers on the RMNode (see SchedulerNode.releaseContainer()).
|
|
|
+ completedContainer(container, status, RMContainerEventType.KILL);
|
|
|
+ LOG.info("Killing container" + container +
|
|
|
+ " (after waiting for premption for " +
|
|
|
+ (clock.getTime() - time) + "ms)");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // track the request in the FSSchedulerApp itself
|
|
|
+ app.addPreemption(container, clock.getTime());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the resource amount that this queue is allowed to preempt, if any.
|
|
|
+ * If the queue has been below its min share for at least its preemption
|
|
|
+ * timeout, it should preempt the difference between its current share and
|
|
|
+ * this min share. If it has been below half its fair share for at least the
|
|
|
+ * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
|
|
|
+ * full fair share. If both conditions hold, we preempt the max of the two
|
|
|
+ * amounts (this shouldn't happen unless someone sets the timeouts to be
|
|
|
+ * identical for some reason).
|
|
|
+ */
|
|
|
+ protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
|
|
|
+ String queue = sched.getName();
|
|
|
+ long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
|
|
|
+ long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
|
|
|
+ Resource resDueToMinShare = Resources.none();
|
|
|
+ Resource resDueToFairShare = Resources.none();
|
|
|
+ if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
|
|
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getMinShare(), sched.getDemand());
|
|
|
+ resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
|
|
+ }
|
|
|
+ if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
|
|
|
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getFairShare(), sched.getDemand());
|
|
|
+ resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
|
|
+ }
|
|
|
+ Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ resDueToMinShare, resDueToFairShare);
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ resToPreempt, Resources.none())) {
|
|
|
+ String message = "Should preempt " + resToPreempt + " res for queue "
|
|
|
+ + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
|
|
|
+ + ", resDueToFairShare = " + resDueToFairShare;
|
|
|
+ LOG.info(message);
|
|
|
+ }
|
|
|
+ return resToPreempt;
|
|
|
+ }
|
|
|
+
|
|
|
+ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
|
|
+ return rmContext.getContainerTokenSecretManager();
|
|
|
+ }
|
|
|
+
|
|
|
+ // synchronized for sizeBasedWeight
|
|
|
+ public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
|
|
|
+ double weight = 1.0;
|
|
|
+ if (sizeBasedWeight) {
|
|
|
+ // Set weight based on current memory demand
|
|
|
+ weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
|
|
|
+ }
|
|
|
+ weight *= app.getPriority().getPriority();
|
|
|
+ if (weightAdjuster != null) {
|
|
|
+ // Run weight through the user-supplied weightAdjuster
|
|
|
+ weight = weightAdjuster.adjustWeight(app, weight);
|
|
|
+ }
|
|
|
+ return new ResourceWeights((float)weight);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getMinimumResourceCapability() {
|
|
|
+ return minimumAllocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Resource getIncrementResourceCapability() {
|
|
|
+ return incrAllocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getMaximumResourceCapability() {
|
|
|
+ return maximumAllocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ public double getNodeLocalityThreshold() {
|
|
|
+ return nodeLocalityThreshold;
|
|
|
+ }
|
|
|
+
|
|
|
+ public double getRackLocalityThreshold() {
|
|
|
+ return rackLocalityThreshold;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getNodeLocalityDelayMs() {
|
|
|
+ return nodeLocalityDelayMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getRackLocalityDelayMs() {
|
|
|
+ return rackLocalityDelayMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isContinuousSchedulingEnabled() {
|
|
|
+ return continuousSchedulingEnabled;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized int getContinuousSchedulingSleepMs() {
|
|
|
+ return continuousSchedulingSleepMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Resource getClusterCapacity() {
|
|
|
+ return clusterCapacity;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized Clock getClock() {
|
|
|
+ return clock;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected synchronized void setClock(Clock clock) {
|
|
|
+ this.clock = clock;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FairSchedulerEventLog getEventLog() {
|
|
|
+ return eventLog;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a new application to the scheduler, with a given id, queue name, and
|
|
|
+ * user. This will accept a new app even if the user or queue is above
|
|
|
+ * configured limits, but the app will not be marked as runnable.
|
|
|
+ */
|
|
|
+ protected synchronized void addApplication(ApplicationId applicationId,
|
|
|
+ String queueName, String user) {
|
|
|
+ if (queueName == null || queueName.isEmpty()) {
|
|
|
+ String message = "Reject application " + applicationId +
|
|
|
+ " submitted by user " + user + " with an empty queue name.";
|
|
|
+ LOG.info(message);
|
|
|
+ rmContext.getDispatcher().getEventHandler()
|
|
|
+ .handle(new RMAppRejectedEvent(applicationId, message));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
|
|
+ FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
|
|
+ if (queue == null) {
|
|
|
+ rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMAppRejectedEvent(applicationId,
|
|
|
+ "Application rejected by queue placement policy"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Enforce ACLs
|
|
|
+ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
|
|
|
+
|
|
|
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
|
|
|
+ && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
|
|
+ String msg = "User " + userUgi.getUserName() +
|
|
|
+ " cannot submit applications to queue " + queue.getName();
|
|
|
+ LOG.info(msg);
|
|
|
+ rmContext.getDispatcher().getEventHandler()
|
|
|
+ .handle(new RMAppRejectedEvent(applicationId, msg));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ SchedulerApplication application =
|
|
|
+ new SchedulerApplication(queue, user);
|
|
|
+ applications.put(applicationId, application);
|
|
|
+ queue.getMetrics().submitApp(user);
|
|
|
+
|
|
|
+ LOG.info("Accepted application " + applicationId + " from user: " + user
|
|
|
+ + ", in queue: " + queueName + ", currently num of applications: "
|
|
|
+ + applications.size());
|
|
|
+ rmContext.getDispatcher().getEventHandler()
|
|
|
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a new application attempt to the scheduler.
|
|
|
+ */
|
|
|
+ protected synchronized void addApplicationAttempt(
|
|
|
+ ApplicationAttemptId applicationAttemptId,
|
|
|
+ boolean transferStateFromPreviousAttempt) {
|
|
|
+ SchedulerApplication application =
|
|
|
+ applications.get(applicationAttemptId.getApplicationId());
|
|
|
+ String user = application.getUser();
|
|
|
+ FSLeafQueue queue = (FSLeafQueue) application.getQueue();
|
|
|
+
|
|
|
+ FSSchedulerApp attempt =
|
|
|
+ new FSSchedulerApp(applicationAttemptId, user,
|
|
|
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
|
|
|
+ rmContext);
|
|
|
+ if (transferStateFromPreviousAttempt) {
|
|
|
+ attempt.transferStateFromPreviousAttempt(application
|
|
|
+ .getCurrentAppAttempt());
|
|
|
+ }
|
|
|
+ application.setCurrentAppAttempt(attempt);
|
|
|
+
|
|
|
+ boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
|
|
|
+ queue.addApp(attempt, runnable);
|
|
|
+ if (runnable) {
|
|
|
+ maxRunningEnforcer.trackRunnableApp(attempt);
|
|
|
+ } else {
|
|
|
+ maxRunningEnforcer.trackNonRunnableApp(attempt);
|
|
|
+ }
|
|
|
+
|
|
|
+ queue.getMetrics().submitAppAttempt(user);
|
|
|
+
|
|
|
+ LOG.info("Added Application Attempt " + applicationAttemptId
|
|
|
+ + " to scheduler from user: " + user);
|
|
|
+ rmContext.getDispatcher().getEventHandler().handle(
|
|
|
+ new RMAppAttemptEvent(applicationAttemptId,
|
|
|
+ RMAppAttemptEventType.ATTEMPT_ADDED));
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
|
|
+ FSLeafQueue queue = null;
|
|
|
+ try {
|
|
|
+ QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
|
|
|
+ queueName = placementPolicy.assignAppToQueue(queueName, user);
|
|
|
+ if (queueName == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ queue = queueMgr.getLeafQueue(queueName, true);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Error assigning app to queue, rejecting", ex);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (rmApp != null) {
|
|
|
+ rmApp.setQueue(queue.getName());
|
|
|
+ } else {
|
|
|
+ LOG.warn("Couldn't find RM app to set queue name on");
|
|
|
+ }
|
|
|
+
|
|
|
+ return queue;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void removeApplication(ApplicationId applicationId,
|
|
|
+ RMAppState finalState) {
|
|
|
+ SchedulerApplication application = applications.get(applicationId);
|
|
|
+ if (application == null){
|
|
|
+ LOG.warn("Couldn't find application " + applicationId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ application.stop(finalState);
|
|
|
+ applications.remove(applicationId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void removeApplicationAttempt(
|
|
|
+ ApplicationAttemptId applicationAttemptId,
|
|
|
+ RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
|
|
+ LOG.info("Application " + applicationAttemptId + " is done." +
|
|
|
+ " finalState=" + rmAppAttemptFinalState);
|
|
|
+ SchedulerApplication application =
|
|
|
+ applications.get(applicationAttemptId.getApplicationId());
|
|
|
+ FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
|
|
|
+
|
|
|
+ if (attempt == null || application == null) {
|
|
|
+ LOG.info("Unknown application " + applicationAttemptId + " has completed!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Release all the running containers
|
|
|
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
|
|
|
+ if (keepContainers
|
|
|
+ && rmContainer.getState().equals(RMContainerState.RUNNING)) {
|
|
|
+ // do not kill the running container in the case of work-preserving AM
|
|
|
+ // restart.
|
|
|
+ LOG.info("Skip killing " + rmContainer.getContainerId());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ completedContainer(rmContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ rmContainer.getContainerId(),
|
|
|
+ SchedulerUtils.COMPLETED_APPLICATION),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Release all reserved containers
|
|
|
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
|
|
|
+ completedContainer(rmContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ rmContainer.getContainerId(),
|
|
|
+ "Application Complete"),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+ // Clean up pending requests, metrics etc.
|
|
|
+ attempt.stop(rmAppAttemptFinalState);
|
|
|
+
|
|
|
+ // Inform the queue
|
|
|
+ FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
|
|
|
+ .getQueueName(), false);
|
|
|
+ boolean wasRunnable = queue.removeApp(attempt);
|
|
|
+
|
|
|
+ if (wasRunnable) {
|
|
|
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
|
|
|
+ } else {
|
|
|
+ maxRunningEnforcer.untrackNonRunnableApp(attempt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Clean up a completed container.
|
|
|
+ */
|
|
|
+ private synchronized void completedContainer(RMContainer rmContainer,
|
|
|
+ ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
+ if (rmContainer == null) {
|
|
|
+ LOG.info("Null container completed...");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Container container = rmContainer.getContainer();
|
|
|
+
|
|
|
+ // Get the application for the finished container
|
|
|
+ FSSchedulerApp application =
|
|
|
+ getCurrentAttemptForContainer(container.getId());
|
|
|
+ ApplicationId appId =
|
|
|
+ container.getId().getApplicationAttemptId().getApplicationId();
|
|
|
+ if (application == null) {
|
|
|
+ LOG.info("Container " + container + " of" +
|
|
|
+ " unknown application attempt " + appId +
|
|
|
+ " completed with event " + event);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the node on which the container was allocated
|
|
|
+ FSSchedulerNode node = nodes.get(container.getNodeId());
|
|
|
+
|
|
|
+ if (rmContainer.getState() == RMContainerState.RESERVED) {
|
|
|
+ application.unreserve(node, rmContainer.getReservedPriority());
|
|
|
+ node.unreserveResource(application);
|
|
|
+ } else {
|
|
|
+ application.containerCompleted(rmContainer, containerStatus, event);
|
|
|
+ node.releaseContainer(container);
|
|
|
+ updateRootQueueMetrics();
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Application attempt " + application.getApplicationAttemptId()
|
|
|
+ + " released container " + container.getId() + " on node: " + node
|
|
|
+ + " with event: " + event);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void addNode(RMNode node) {
|
|
|
+ nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
|
|
|
+ Resources.addTo(clusterCapacity, node.getTotalCapability());
|
|
|
+ updateRootQueueMetrics();
|
|
|
+
|
|
|
+ LOG.info("Added node " + node.getNodeAddress() +
|
|
|
+ " cluster capacity: " + clusterCapacity);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void removeNode(RMNode rmNode) {
|
|
|
+ FSSchedulerNode node = nodes.get(rmNode.getNodeID());
|
|
|
+ // This can occur when an UNHEALTHY node reconnects
|
|
|
+ if (node == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
|
|
|
+ updateRootQueueMetrics();
|
|
|
+
|
|
|
+ // Remove running containers
|
|
|
+ List<RMContainer> runningContainers = node.getRunningContainers();
|
|
|
+ for (RMContainer container : runningContainers) {
|
|
|
+ completedContainer(container,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ container.getContainerId(),
|
|
|
+ SchedulerUtils.LOST_CONTAINER),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove reservations, if any
|
|
|
+ RMContainer reservedContainer = node.getReservedContainer();
|
|
|
+ if (reservedContainer != null) {
|
|
|
+ completedContainer(reservedContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ reservedContainer.getContainerId(),
|
|
|
+ SchedulerUtils.LOST_CONTAINER),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
+ }
|
|
|
+
|
|
|
+ nodes.remove(rmNode.getNodeID());
|
|
|
+ LOG.info("Removed node " + rmNode.getNodeAddress() +
|
|
|
+ " cluster capacity: " + clusterCapacity);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Allocation allocate(ApplicationAttemptId appAttemptId,
|
|
|
+ List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
|
|
+
|
|
|
+ // Make sure this application exists
|
|
|
+ FSSchedulerApp application = getSchedulerApp(appAttemptId);
|
|
|
+ if (application == null) {
|
|
|
+ LOG.info("Calling allocate on removed " +
|
|
|
+ "or non existant application " + appAttemptId);
|
|
|
+ return EMPTY_ALLOCATION;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Sanity check
|
|
|
+ SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
|
|
|
+ clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
|
|
|
+
|
|
|
+ // Release containers
|
|
|
+ for (ContainerId releasedContainerId : release) {
|
|
|
+ RMContainer rmContainer = getRMContainer(releasedContainerId);
|
|
|
+ if (rmContainer == null) {
|
|
|
+ RMAuditLogger.logFailure(application.getUser(),
|
|
|
+ AuditConstants.RELEASE_CONTAINER,
|
|
|
+ "Unauthorized access or invalid container", "FairScheduler",
|
|
|
+ "Trying to release container not owned by app or with invalid id",
|
|
|
+ application.getApplicationId(), releasedContainerId);
|
|
|
+ }
|
|
|
+ completedContainer(rmContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ releasedContainerId,
|
|
|
+ SchedulerUtils.RELEASED_CONTAINER),
|
|
|
+ RMContainerEventType.RELEASED);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (application) {
|
|
|
+ if (!ask.isEmpty()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("allocate: pre-update" +
|
|
|
+ " applicationAttemptId=" + appAttemptId +
|
|
|
+ " application=" + application.getApplicationId());
|
|
|
+ }
|
|
|
+ application.showRequests();
|
|
|
+
|
|
|
+ // Update application requests
|
|
|
+ application.updateResourceRequests(ask);
|
|
|
+
|
|
|
+ LOG.debug("allocate: post-update");
|
|
|
+ application.showRequests();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("allocate:" +
|
|
|
+ " applicationAttemptId=" + appAttemptId +
|
|
|
+ " #ask=" + ask.size());
|
|
|
+
|
|
|
+ LOG.debug("Preempting " + application.getPreemptionContainers().size()
|
|
|
+ + " container(s)");
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
|
|
|
+ for (RMContainer container : application.getPreemptionContainers()) {
|
|
|
+ preemptionContainerIds.add(container.getContainerId());
|
|
|
+ }
|
|
|
+
|
|
|
+ application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
|
|
+
|
|
|
+ return new Allocation(application.pullNewlyAllocatedContainers(),
|
|
|
+ application.getHeadroom(), preemptionContainerIds);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process a container which has launched on a node, as reported by the node.
|
|
|
+ */
|
|
|
+ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
|
|
|
+ // Get the application for the finished container
|
|
|
+ FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
|
|
|
+ if (application == null) {
|
|
|
+ LOG.info("Unknown application "
|
|
|
+ + containerId.getApplicationAttemptId().getApplicationId()
|
|
|
+ + " launched container " + containerId + " on node: " + node);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ application.containerLaunchedOnNode(containerId, node.getNodeID());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process a heartbeat update from a node.
|
|
|
+ */
|
|
|
+ private synchronized void nodeUpdate(RMNode nm) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
|
|
|
+ }
|
|
|
+ eventLog.log("HEARTBEAT", nm.getHostName());
|
|
|
+ FSSchedulerNode node = nodes.get(nm.getNodeID());
|
|
|
+
|
|
|
+ // Update resource if any change
|
|
|
+ SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
|
|
|
+
|
|
|
+ List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
|
|
+ List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
|
|
|
+ List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
|
|
|
+ for(UpdatedContainerInfo containerInfo : containerInfoList) {
|
|
|
+ newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
|
|
|
+ completedContainers.addAll(containerInfo.getCompletedContainers());
|
|
|
+ }
|
|
|
+ // Processing the newly launched containers
|
|
|
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
|
|
|
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process completed containers
|
|
|
+ for (ContainerStatus completedContainer : completedContainers) {
|
|
|
+ ContainerId containerId = completedContainer.getContainerId();
|
|
|
+ LOG.debug("Container FINISHED: " + containerId);
|
|
|
+ completedContainer(getRMContainer(containerId),
|
|
|
+ completedContainer, RMContainerEventType.FINISHED);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (continuousSchedulingEnabled) {
|
|
|
+ if (!completedContainers.isEmpty()) {
|
|
|
+ attemptScheduling(node);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ attemptScheduling(node);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void continuousScheduling() {
|
|
|
+ while (true) {
|
|
|
+ List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
|
|
+ Collections.sort(nodeIdList, nodeAvailableResourceComparator);
|
|
|
+
|
|
|
+ // iterate all nodes
|
|
|
+ for (NodeId nodeId : nodeIdList) {
|
|
|
+ if (nodes.containsKey(nodeId)) {
|
|
|
+ FSSchedulerNode node = nodes.get(nodeId);
|
|
|
+ try {
|
|
|
+ if (Resources.fitsIn(minimumAllocation,
|
|
|
+ node.getAvailableResource())) {
|
|
|
+ attemptScheduling(node);
|
|
|
+ }
|
|
|
+ } catch (Throwable ex) {
|
|
|
+ LOG.warn("Error while attempting scheduling for node " + node +
|
|
|
+ ": " + ex.toString(), ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(getContinuousSchedulingSleepMs());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Error while doing sleep in continuous scheduling: " +
|
|
|
+ e.toString(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Sort nodes by available resource */
|
|
|
+ private class NodeAvailableResourceComparator implements Comparator<NodeId> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int compare(NodeId n1, NodeId n2) {
|
|
|
+ return RESOURCE_CALCULATOR.compare(clusterCapacity,
|
|
|
+ nodes.get(n2).getAvailableResource(),
|
|
|
+ nodes.get(n1).getAvailableResource());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void attemptScheduling(FSSchedulerNode node) {
|
|
|
+ // Assign new containers...
|
|
|
+ // 1. Check for reserved applications
|
|
|
+ // 2. Schedule if there are no reservations
|
|
|
+
|
|
|
+ AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
|
|
|
+ if (reservedAppSchedulable != null) {
|
|
|
+ Priority reservedPriority = node.getReservedContainer().getReservedPriority();
|
|
|
+ if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
|
|
|
+ // Don't hold the reservation if app can no longer use it
|
|
|
+ LOG.info("Releasing reservation that cannot be satisfied for application "
|
|
|
+ + reservedAppSchedulable.getApp().getApplicationAttemptId()
|
|
|
+ + " on node " + node);
|
|
|
+ reservedAppSchedulable.unreserve(reservedPriority, node);
|
|
|
+ reservedAppSchedulable = null;
|
|
|
+ } else {
|
|
|
+ // Reservation exists; try to fulfill the reservation
|
|
|
+ LOG.info("Trying to fulfill reservation for application "
|
|
|
+ + reservedAppSchedulable.getApp().getApplicationAttemptId()
|
|
|
+ + " on node: " + node);
|
|
|
+
|
|
|
+ node.getReservedAppSchedulable().assignReservedContainer(node);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (reservedAppSchedulable == null) {
|
|
|
+ // No reservation, schedule at queue which is farthest below fair share
|
|
|
+ int assignedContainers = 0;
|
|
|
+ while (node.getReservedContainer() == null) {
|
|
|
+ boolean assignedContainer = false;
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ queueMgr.getRootQueue().assignContainer(node),
|
|
|
+ Resources.none())) {
|
|
|
+ assignedContainers++;
|
|
|
+ assignedContainer = true;
|
|
|
+ }
|
|
|
+ if (!assignedContainer) { break; }
|
|
|
+ if (!assignMultiple) { break; }
|
|
|
+ if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ updateRootQueueMetrics();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SchedulerNodeReport getNodeReport(NodeId nodeId) {
|
|
|
+ FSSchedulerNode node = nodes.get(nodeId);
|
|
|
+ return node == null ? null : new SchedulerNodeReport(node);
|
|
|
+ }
|
|
|
+
|
|
|
+ public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
|
|
|
+ SchedulerApplication app =
|
|
|
+ applications.get(appAttemptId.getApplicationId());
|
|
|
+ if (app != null) {
|
|
|
+ return (FSSchedulerApp) app.getCurrentAppAttempt();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SchedulerAppReport getSchedulerAppInfo(
|
|
|
+ ApplicationAttemptId appAttemptId) {
|
|
|
+ FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
|
|
|
+ if (attempt == null) {
|
|
|
+ LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return new SchedulerAppReport(attempt);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ApplicationResourceUsageReport getAppResourceUsageReport(
|
|
|
+ ApplicationAttemptId appAttemptId) {
|
|
|
+ FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
|
|
|
+ if (attempt == null) {
|
|
|
+ LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return attempt.getResourceUsageReport();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Subqueue metrics might be a little out of date because fair shares are
|
|
|
+ * recalculated at the update interval, but the root queue metrics needs to
|
|
|
+ * be updated synchronously with allocations and completions so that cluster
|
|
|
+ * metrics will be consistent.
|
|
|
+ */
|
|
|
+ private void updateRootQueueMetrics() {
|
|
|
+ rootMetrics.setAvailableResourcesToQueue(
|
|
|
+ Resources.subtract(
|
|
|
+ clusterCapacity, rootMetrics.getAllocatedResources()));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public QueueMetrics getRootQueueMetrics() {
|
|
|
+ return rootMetrics;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ switch (event.getType()) {
|
|
|
+ case NODE_ADDED:
|
|
|
+ if (!(event instanceof NodeAddedSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
|
|
|
+ addNode(nodeAddedEvent.getAddedRMNode());
|
|
|
+ break;
|
|
|
+ case NODE_REMOVED:
|
|
|
+ if (!(event instanceof NodeRemovedSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
|
|
|
+ removeNode(nodeRemovedEvent.getRemovedRMNode());
|
|
|
+ break;
|
|
|
+ case NODE_UPDATE:
|
|
|
+ if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
|
|
+ nodeUpdate(nodeUpdatedEvent.getRMNode());
|
|
|
+ break;
|
|
|
+ case APP_ADDED:
|
|
|
+ if (!(event instanceof AppAddedSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
|
|
+ addApplication(appAddedEvent.getApplicationId(),
|
|
|
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
|
|
|
+ break;
|
|
|
+ case APP_REMOVED:
|
|
|
+ if (!(event instanceof AppRemovedSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
|
|
+ removeApplication(appRemovedEvent.getApplicationID(),
|
|
|
+ appRemovedEvent.getFinalState());
|
|
|
+ break;
|
|
|
+ case APP_ATTEMPT_ADDED:
|
|
|
+ if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
|
|
+ (AppAttemptAddedSchedulerEvent) event;
|
|
|
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
|
|
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
|
|
|
+ break;
|
|
|
+ case APP_ATTEMPT_REMOVED:
|
|
|
+ if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
|
|
+ (AppAttemptRemovedSchedulerEvent) event;
|
|
|
+ removeApplicationAttempt(
|
|
|
+ appAttemptRemovedEvent.getApplicationAttemptID(),
|
|
|
+ appAttemptRemovedEvent.getFinalAttemptState(),
|
|
|
+ appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
|
|
|
+ break;
|
|
|
+ case CONTAINER_EXPIRED:
|
|
|
+ if (!(event instanceof ContainerExpiredSchedulerEvent)) {
|
|
|
+ throw new RuntimeException("Unexpected event type: " + event);
|
|
|
+ }
|
|
|
+ ContainerExpiredSchedulerEvent containerExpiredEvent =
|
|
|
+ (ContainerExpiredSchedulerEvent)event;
|
|
|
+ ContainerId containerId = containerExpiredEvent.getContainerId();
|
|
|
+ completedContainer(getRMContainer(containerId),
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ containerId,
|
|
|
+ SchedulerUtils.EXPIRED_CONTAINER),
|
|
|
+ RMContainerEventType.EXPIRE);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void recover(RMState state) throws Exception {
|
|
|
+ // NOT IMPLEMENTED
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
|
|
+ throws IOException {
|
|
|
+ if (!initialized) {
|
|
|
+ this.conf = new FairSchedulerConfiguration(conf);
|
|
|
+ validateConf(this.conf);
|
|
|
+ minimumAllocation = this.conf.getMinimumAllocation();
|
|
|
+ maximumAllocation = this.conf.getMaximumAllocation();
|
|
|
+ incrAllocation = this.conf.getIncrementAllocation();
|
|
|
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
|
|
+ continuousSchedulingSleepMs =
|
|
|
+ this.conf.getContinuousSchedulingSleepMs();
|
|
|
+ nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
|
|
+ rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
|
|
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
|
|
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
|
|
+ preemptionEnabled = this.conf.getPreemptionEnabled();
|
|
|
+ assignMultiple = this.conf.getAssignMultiple();
|
|
|
+ maxAssign = this.conf.getMaxAssign();
|
|
|
+ sizeBasedWeight = this.conf.getSizeBasedWeight();
|
|
|
+ preemptionInterval = this.conf.getPreemptionInterval();
|
|
|
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
|
|
+ usePortForNodeName = this.conf.getUsePortForNodeName();
|
|
|
+
|
|
|
+ rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
|
|
+ this.rmContext = rmContext;
|
|
|
+ this.eventLog = new FairSchedulerEventLog();
|
|
|
+ eventLog.init(this.conf);
|
|
|
+
|
|
|
+ initialized = true;
|
|
|
+
|
|
|
+ allocConf = new AllocationConfiguration(conf);
|
|
|
+ try {
|
|
|
+ queueMgr.initialize(conf);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Failed to start FairScheduler", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ Thread updateThread = new Thread(new UpdateThread());
|
|
|
+ updateThread.setName("FairSchedulerUpdateThread");
|
|
|
+ updateThread.setDaemon(true);
|
|
|
+ updateThread.start();
|
|
|
+
|
|
|
+ if (continuousSchedulingEnabled) {
|
|
|
+ // start continuous scheduling thread
|
|
|
+ Thread schedulingThread = new Thread(
|
|
|
+ new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ continuousScheduling();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ );
|
|
|
+ schedulingThread.setName("ContinuousScheduling");
|
|
|
+ schedulingThread.setDaemon(true);
|
|
|
+ schedulingThread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ allocsLoader.init(conf);
|
|
|
+ allocsLoader.setReloadListener(new AllocationReloadListener());
|
|
|
+ // If we fail to load allocations file on initialize, we want to fail
|
|
|
+ // immediately. After a successful load, exceptions on future reloads
|
|
|
+ // will just result in leaving things as they are.
|
|
|
+ try {
|
|
|
+ allocsLoader.reloadAllocations();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Failed to initialize FairScheduler", e);
|
|
|
+ }
|
|
|
+ allocsLoader.start();
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ allocsLoader.reloadAllocations();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Failed to reload allocations file", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
|
|
|
+ boolean recursive) throws IOException {
|
|
|
+ if (!queueMgr.exists(queueName)) {
|
|
|
+ throw new IOException("queue " + queueName + " does not exist");
|
|
|
+ }
|
|
|
+ return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
|
|
|
+ recursive);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<QueueUserACLInfo> getQueueUserAclInfo() {
|
|
|
+ UserGroupInformation user = null;
|
|
|
+ try {
|
|
|
+ user = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ return new ArrayList<QueueUserACLInfo>();
|
|
|
+ }
|
|
|
+
|
|
|
+ return queueMgr.getRootQueue().getQueueUserAclInfo(user);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int getNumClusterNodes() {
|
|
|
+ return nodes.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
|
|
+ QueueACL acl, String queueName) {
|
|
|
+ FSQueue queue = getQueueManager().getQueue(queueName);
|
|
|
+ if (queue == null) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("ACL not found for queue access-type " + acl
|
|
|
+ + " for queue " + queueName);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return queue.hasAccess(acl, callerUGI);
|
|
|
+ }
|
|
|
+
|
|
|
+ public AllocationConfiguration getAllocationConfiguration() {
|
|
|
+ return allocConf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private class AllocationReloadListener implements
|
|
|
+ AllocationFileLoaderService.Listener {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onReload(AllocationConfiguration queueInfo) {
|
|
|
+ // Commit the reload; also create any queue defined in the alloc file
|
|
|
+ // if it does not already exist, so it can be displayed on the web UI.
|
|
|
+ synchronized (FairScheduler.this) {
|
|
|
+ allocConf = queueInfo;
|
|
|
+ allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
|
|
|
+ queueMgr.updateAllocationConfiguration(allocConf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
|
|
+ FSQueue queue = queueMgr.getQueue(queueName);
|
|
|
+ if (queue == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
|
|
|
+ queue.collectSchedulerApplications(apps);
|
|
|
+ return apps;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|