|
@@ -1,1361 +0,0 @@
|
|
-/**
|
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
|
- * distributed with this work for additional information
|
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
|
- *
|
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
- *
|
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
|
- * limitations under the License.
|
|
|
|
- */
|
|
|
|
-
|
|
|
|
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
|
-
|
|
|
|
-import java.io.IOException;
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Collection;
|
|
|
|
-import java.util.Collections;
|
|
|
|
-import java.util.Comparator;
|
|
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.HashSet;
|
|
|
|
-import java.util.Iterator;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Set;
|
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
-
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
|
|
|
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Container;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.QueueACL;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
|
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
|
-import org.apache.hadoop.yarn.util.Clock;
|
|
|
|
-import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
-
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
|
-
|
|
|
|
-/**
|
|
|
|
- * A scheduler that schedules resources between a set of queues. The scheduler
|
|
|
|
- * keeps track of the resources used by each queue, and attempts to maintain
|
|
|
|
- * fairness by scheduling tasks at queues whose allocations are farthest below
|
|
|
|
- * an ideal fair distribution.
|
|
|
|
- *
|
|
|
|
- * The fair scheduler supports hierarchical queues. All queues descend from a
|
|
|
|
- * queue named "root". Available resources are distributed among the children
|
|
|
|
- * of the root queue in the typical fair scheduling fashion. Then, the children
|
|
|
|
- * distribute the resources assigned to them to their children in the same
|
|
|
|
- * fashion. Applications may only be scheduled on leaf queues. Queues can be
|
|
|
|
- * specified as children of other queues by placing them as sub-elements of their
|
|
|
|
- * parents in the fair scheduler configuration file.
|
|
|
|
- *
|
|
|
|
- * A queue's name starts with the names of its parents, with periods as
|
|
|
|
- * separators. So a queue named "queue1" under the root named, would be
|
|
|
|
- * referred to as "root.queue1", and a queue named "queue2" under a queue
|
|
|
|
- * named "parent1" would be referred to as "root.parent1.queue2".
|
|
|
|
- */
|
|
|
|
-@LimitedPrivate("yarn")
|
|
|
|
-@Unstable
|
|
|
|
-@SuppressWarnings("unchecked")
|
|
|
|
-public class FairScheduler implements ResourceScheduler {
|
|
|
|
- private boolean initialized;
|
|
|
|
- private FairSchedulerConfiguration conf;
|
|
|
|
- private RMContext rmContext;
|
|
|
|
- private Resource minimumAllocation;
|
|
|
|
- private Resource maximumAllocation;
|
|
|
|
- private Resource incrAllocation;
|
|
|
|
- private QueueManager queueMgr;
|
|
|
|
- private Clock clock;
|
|
|
|
- private boolean usePortForNodeName;
|
|
|
|
-
|
|
|
|
- private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
|
|
|
-
|
|
|
|
- private static final ResourceCalculator RESOURCE_CALCULATOR =
|
|
|
|
- new DefaultResourceCalculator();
|
|
|
|
-
|
|
|
|
- // Value that container assignment methods return when a container is
|
|
|
|
- // reserved
|
|
|
|
- public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
|
|
|
-
|
|
|
|
- // How often fair shares are re-calculated (ms)
|
|
|
|
- protected long UPDATE_INTERVAL = 500;
|
|
|
|
-
|
|
|
|
- private final static List<Container> EMPTY_CONTAINER_LIST =
|
|
|
|
- new ArrayList<Container>();
|
|
|
|
-
|
|
|
|
- private static final Allocation EMPTY_ALLOCATION =
|
|
|
|
- new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
|
|
|
|
-
|
|
|
|
- // Aggregate metrics
|
|
|
|
- FSQueueMetrics rootMetrics;
|
|
|
|
-
|
|
|
|
- // Time when we last updated preemption vars
|
|
|
|
- protected long lastPreemptionUpdateTime;
|
|
|
|
- // Time we last ran preemptTasksIfNecessary
|
|
|
|
- private long lastPreemptCheckTime;
|
|
|
|
-
|
|
|
|
- // This stores per-application scheduling information,
|
|
|
|
- @VisibleForTesting
|
|
|
|
- protected Map<ApplicationId, SchedulerApplication> applications =
|
|
|
|
- new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
|
|
|
|
-
|
|
|
|
- // Nodes in the cluster, indexed by NodeId
|
|
|
|
- private Map<NodeId, FSSchedulerNode> nodes =
|
|
|
|
- new ConcurrentHashMap<NodeId, FSSchedulerNode>();
|
|
|
|
-
|
|
|
|
- // Aggregate capacity of the cluster
|
|
|
|
- private Resource clusterCapacity =
|
|
|
|
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
|
|
|
|
-
|
|
|
|
- // How often tasks are preempted
|
|
|
|
- protected long preemptionInterval;
|
|
|
|
-
|
|
|
|
- // ms to wait before force killing stuff (must be longer than a couple
|
|
|
|
- // of heartbeats to give task-kill commands a chance to act).
|
|
|
|
- protected long waitTimeBeforeKill;
|
|
|
|
-
|
|
|
|
- // Containers whose AMs have been warned that they will be preempted soon.
|
|
|
|
- private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
|
|
|
|
-
|
|
|
|
- protected boolean preemptionEnabled;
|
|
|
|
- protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
|
|
|
- protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
|
|
|
- protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
|
|
|
- protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
|
|
|
|
- private Comparator nodeAvailableResourceComparator =
|
|
|
|
- new NodeAvailableResourceComparator(); // Node available resource comparator
|
|
|
|
- protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
|
|
|
- protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
|
|
|
- protected long nodeLocalityDelayMs; // Delay for node locality
|
|
|
|
- protected long rackLocalityDelayMs; // Delay for rack locality
|
|
|
|
- private FairSchedulerEventLog eventLog; // Machine-readable event log
|
|
|
|
- protected boolean assignMultiple; // Allocate multiple containers per
|
|
|
|
- // heartbeat
|
|
|
|
- protected int maxAssign; // Max containers to assign per heartbeat
|
|
|
|
-
|
|
|
|
- @VisibleForTesting
|
|
|
|
- final MaxRunningAppsEnforcer maxRunningEnforcer;
|
|
|
|
-
|
|
|
|
- private AllocationFileLoaderService allocsLoader;
|
|
|
|
- @VisibleForTesting
|
|
|
|
- AllocationConfiguration allocConf;
|
|
|
|
-
|
|
|
|
- public FairScheduler() {
|
|
|
|
- clock = new SystemClock();
|
|
|
|
- allocsLoader = new AllocationFileLoaderService();
|
|
|
|
- queueMgr = new QueueManager(this);
|
|
|
|
- maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void validateConf(Configuration conf) {
|
|
|
|
- // validate scheduler memory allocation setting
|
|
|
|
- int minMem = conf.getInt(
|
|
|
|
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
|
|
- int maxMem = conf.getInt(
|
|
|
|
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
|
|
-
|
|
|
|
- if (minMem < 0 || minMem > maxMem) {
|
|
|
|
- throw new YarnRuntimeException("Invalid resource scheduler memory"
|
|
|
|
- + " allocation configuration"
|
|
|
|
- + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
|
|
|
|
- + "=" + minMem
|
|
|
|
- + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
|
|
|
|
- + "=" + maxMem + ", min should equal greater than 0"
|
|
|
|
- + ", max should be no smaller than min.");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // validate scheduler vcores allocation setting
|
|
|
|
- int minVcores = conf.getInt(
|
|
|
|
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
|
|
- int maxVcores = conf.getInt(
|
|
|
|
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
|
|
-
|
|
|
|
- if (minVcores < 0 || minVcores > maxVcores) {
|
|
|
|
- throw new YarnRuntimeException("Invalid resource scheduler vcores"
|
|
|
|
- + " allocation configuration"
|
|
|
|
- + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
|
|
|
|
- + "=" + minVcores
|
|
|
|
- + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
|
|
|
|
- + "=" + maxVcores + ", min should equal greater than 0"
|
|
|
|
- + ", max should be no smaller than min.");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FairSchedulerConfiguration getConf() {
|
|
|
|
- return conf;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public QueueManager getQueueManager() {
|
|
|
|
- return queueMgr;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public RMContainer getRMContainer(ContainerId containerId) {
|
|
|
|
- FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
|
|
|
|
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private FSSchedulerApp getCurrentAttemptForContainer(
|
|
|
|
- ContainerId containerId) {
|
|
|
|
- SchedulerApplication app =
|
|
|
|
- applications.get(containerId.getApplicationAttemptId()
|
|
|
|
- .getApplicationId());
|
|
|
|
- if (app != null) {
|
|
|
|
- return (FSSchedulerApp) app.getCurrentAppAttempt();
|
|
|
|
- }
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * A runnable which calls {@link FairScheduler#update()} every
|
|
|
|
- * <code>UPDATE_INTERVAL</code> milliseconds.
|
|
|
|
- */
|
|
|
|
- private class UpdateThread implements Runnable {
|
|
|
|
- public void run() {
|
|
|
|
- while (true) {
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(UPDATE_INTERVAL);
|
|
|
|
- update();
|
|
|
|
- preemptTasksIfNecessary();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.error("Exception in fair scheduler UpdateThread", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Recompute the internal variables used by the scheduler - per-job weights,
|
|
|
|
- * fair shares, deficits, minimum slot allocations, and amount of used and
|
|
|
|
- * required resources per job.
|
|
|
|
- */
|
|
|
|
- protected synchronized void update() {
|
|
|
|
- updatePreemptionVariables(); // Determine if any queues merit preemption
|
|
|
|
-
|
|
|
|
- FSQueue rootQueue = queueMgr.getRootQueue();
|
|
|
|
-
|
|
|
|
- // Recursively update demands for all queues
|
|
|
|
- rootQueue.updateDemand();
|
|
|
|
-
|
|
|
|
- rootQueue.setFairShare(clusterCapacity);
|
|
|
|
- // Recursively compute fair shares for all queues
|
|
|
|
- // and update metrics
|
|
|
|
- rootQueue.recomputeShares();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Update the preemption fields for all QueueScheduables, i.e. the times since
|
|
|
|
- * each queue last was at its guaranteed share and at > 1/2 of its fair share
|
|
|
|
- * for each type of task.
|
|
|
|
- */
|
|
|
|
- private void updatePreemptionVariables() {
|
|
|
|
- long now = clock.getTime();
|
|
|
|
- lastPreemptionUpdateTime = now;
|
|
|
|
- for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
|
|
|
- if (!isStarvedForMinShare(sched)) {
|
|
|
|
- sched.setLastTimeAtMinShare(now);
|
|
|
|
- }
|
|
|
|
- if (!isStarvedForFairShare(sched)) {
|
|
|
|
- sched.setLastTimeAtHalfFairShare(now);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Is a queue below its min share for the given task type?
|
|
|
|
- */
|
|
|
|
- boolean isStarvedForMinShare(FSLeafQueue sched) {
|
|
|
|
- Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getMinShare(), sched.getDemand());
|
|
|
|
- return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getResourceUsage(), desiredShare);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Is a queue being starved for fair share for the given task type? This is
|
|
|
|
- * defined as being below half its fair share.
|
|
|
|
- */
|
|
|
|
- boolean isStarvedForFairShare(FSLeafQueue sched) {
|
|
|
|
- Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
|
|
|
|
- return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getResourceUsage(), desiredFairShare);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Check for queues that need tasks preempted, either because they have been
|
|
|
|
- * below their guaranteed share for minSharePreemptionTimeout or they have
|
|
|
|
- * been below half their fair share for the fairSharePreemptionTimeout. If
|
|
|
|
- * such queues exist, compute how many tasks of each type need to be preempted
|
|
|
|
- * and then select the right ones using preemptTasks.
|
|
|
|
- */
|
|
|
|
- protected synchronized void preemptTasksIfNecessary() {
|
|
|
|
- if (!preemptionEnabled) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- long curTime = clock.getTime();
|
|
|
|
- if (curTime - lastPreemptCheckTime < preemptionInterval) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- lastPreemptCheckTime = curTime;
|
|
|
|
-
|
|
|
|
- Resource resToPreempt = Resources.none();
|
|
|
|
-
|
|
|
|
- for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
|
|
|
- resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
|
|
|
|
- }
|
|
|
|
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
|
|
|
|
- Resources.none())) {
|
|
|
|
- preemptResources(queueMgr.getLeafQueues(), resToPreempt);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Preempt a quantity of resources from a list of QueueSchedulables. The
|
|
|
|
- * policy for this is to pick apps from queues that are over their fair share,
|
|
|
|
- * but make sure that no queue is placed below its fair share in the process.
|
|
|
|
- * We further prioritize preemption by choosing containers with lowest
|
|
|
|
- * priority to preempt.
|
|
|
|
- */
|
|
|
|
- protected void preemptResources(Collection<FSLeafQueue> scheds,
|
|
|
|
- Resource toPreempt) {
|
|
|
|
- if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Map<RMContainer, FSSchedulerApp> apps =
|
|
|
|
- new HashMap<RMContainer, FSSchedulerApp>();
|
|
|
|
- Map<RMContainer, FSLeafQueue> queues =
|
|
|
|
- new HashMap<RMContainer, FSLeafQueue>();
|
|
|
|
-
|
|
|
|
- // Collect running containers from over-scheduled queues
|
|
|
|
- List<RMContainer> runningContainers = new ArrayList<RMContainer>();
|
|
|
|
- for (FSLeafQueue sched : scheds) {
|
|
|
|
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
|
- for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
|
|
|
|
- for (RMContainer c : as.getApp().getLiveContainers()) {
|
|
|
|
- runningContainers.add(c);
|
|
|
|
- apps.put(c, as.getApp());
|
|
|
|
- queues.put(c, sched);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Sort containers into reverse order of priority
|
|
|
|
- Collections.sort(runningContainers, new Comparator<RMContainer>() {
|
|
|
|
- public int compare(RMContainer c1, RMContainer c2) {
|
|
|
|
- int ret = c1.getContainer().getPriority().compareTo(
|
|
|
|
- c2.getContainer().getPriority());
|
|
|
|
- if (ret == 0) {
|
|
|
|
- return c2.getContainerId().compareTo(c1.getContainerId());
|
|
|
|
- }
|
|
|
|
- return ret;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- // Scan down the list of containers we've already warned and kill them
|
|
|
|
- // if we need to. Remove any containers from the list that we don't need
|
|
|
|
- // or that are no longer running.
|
|
|
|
- Iterator<RMContainer> warnedIter = warnedContainers.iterator();
|
|
|
|
- Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
|
|
|
|
- while (warnedIter.hasNext()) {
|
|
|
|
- RMContainer container = warnedIter.next();
|
|
|
|
- if (container.getState() == RMContainerState.RUNNING &&
|
|
|
|
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- toPreempt, Resources.none())) {
|
|
|
|
- warnOrKillContainer(container, apps.get(container), queues.get(container));
|
|
|
|
- preemptedThisRound.add(container);
|
|
|
|
- Resources.subtractFrom(toPreempt, container.getContainer().getResource());
|
|
|
|
- } else {
|
|
|
|
- warnedIter.remove();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Scan down the rest of the containers until we've preempted enough, making
|
|
|
|
- // sure we don't preempt too many from any queue
|
|
|
|
- Iterator<RMContainer> runningIter = runningContainers.iterator();
|
|
|
|
- while (runningIter.hasNext() &&
|
|
|
|
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- toPreempt, Resources.none())) {
|
|
|
|
- RMContainer container = runningIter.next();
|
|
|
|
- FSLeafQueue sched = queues.get(container);
|
|
|
|
- if (!preemptedThisRound.contains(container) &&
|
|
|
|
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
|
- warnOrKillContainer(container, apps.get(container), sched);
|
|
|
|
-
|
|
|
|
- warnedContainers.add(container);
|
|
|
|
- Resources.subtractFrom(toPreempt, container.getContainer().getResource());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
|
|
|
|
- FSLeafQueue queue) {
|
|
|
|
- LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
|
|
|
|
- "res=" + container.getContainer().getResource() +
|
|
|
|
- ") from queue " + queue.getName());
|
|
|
|
-
|
|
|
|
- Long time = app.getContainerPreemptionTime(container);
|
|
|
|
-
|
|
|
|
- if (time != null) {
|
|
|
|
- // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
|
|
|
|
- // proceed with kill
|
|
|
|
- if (time + waitTimeBeforeKill < clock.getTime()) {
|
|
|
|
- ContainerStatus status =
|
|
|
|
- SchedulerUtils.createPreemptedContainerStatus(
|
|
|
|
- container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
|
|
|
|
-
|
|
|
|
- // TODO: Not sure if this ever actually adds this to the list of cleanup
|
|
|
|
- // containers on the RMNode (see SchedulerNode.releaseContainer()).
|
|
|
|
- completedContainer(container, status, RMContainerEventType.KILL);
|
|
|
|
- LOG.info("Killing container" + container +
|
|
|
|
- " (after waiting for premption for " +
|
|
|
|
- (clock.getTime() - time) + "ms)");
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- // track the request in the FSSchedulerApp itself
|
|
|
|
- app.addPreemption(container, clock.getTime());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Return the resource amount that this queue is allowed to preempt, if any.
|
|
|
|
- * If the queue has been below its min share for at least its preemption
|
|
|
|
- * timeout, it should preempt the difference between its current share and
|
|
|
|
- * this min share. If it has been below half its fair share for at least the
|
|
|
|
- * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
|
|
|
|
- * full fair share. If both conditions hold, we preempt the max of the two
|
|
|
|
- * amounts (this shouldn't happen unless someone sets the timeouts to be
|
|
|
|
- * identical for some reason).
|
|
|
|
- */
|
|
|
|
- protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
|
|
|
|
- String queue = sched.getName();
|
|
|
|
- long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
|
|
|
|
- long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
|
|
|
|
- Resource resDueToMinShare = Resources.none();
|
|
|
|
- Resource resDueToFairShare = Resources.none();
|
|
|
|
- if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
|
|
|
- Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getMinShare(), sched.getDemand());
|
|
|
|
- resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
|
|
|
- }
|
|
|
|
- if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
|
|
|
|
- Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- sched.getFairShare(), sched.getDemand());
|
|
|
|
- resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
|
|
|
- }
|
|
|
|
- Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- resDueToMinShare, resDueToFairShare);
|
|
|
|
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- resToPreempt, Resources.none())) {
|
|
|
|
- String message = "Should preempt " + resToPreempt + " res for queue "
|
|
|
|
- + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
|
|
|
|
- + ", resDueToFairShare = " + resDueToFairShare;
|
|
|
|
- LOG.info(message);
|
|
|
|
- }
|
|
|
|
- return resToPreempt;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
|
|
|
- return rmContext.getContainerTokenSecretManager();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // synchronized for sizeBasedWeight
|
|
|
|
- public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
|
|
|
|
- double weight = 1.0;
|
|
|
|
- if (sizeBasedWeight) {
|
|
|
|
- // Set weight based on current memory demand
|
|
|
|
- weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
|
|
|
|
- }
|
|
|
|
- weight *= app.getPriority().getPriority();
|
|
|
|
- if (weightAdjuster != null) {
|
|
|
|
- // Run weight through the user-supplied weightAdjuster
|
|
|
|
- weight = weightAdjuster.adjustWeight(app, weight);
|
|
|
|
- }
|
|
|
|
- return new ResourceWeights((float)weight);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public Resource getMinimumResourceCapability() {
|
|
|
|
- return minimumAllocation;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public Resource getIncrementResourceCapability() {
|
|
|
|
- return incrAllocation;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public Resource getMaximumResourceCapability() {
|
|
|
|
- return maximumAllocation;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public double getNodeLocalityThreshold() {
|
|
|
|
- return nodeLocalityThreshold;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public double getRackLocalityThreshold() {
|
|
|
|
- return rackLocalityThreshold;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long getNodeLocalityDelayMs() {
|
|
|
|
- return nodeLocalityDelayMs;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long getRackLocalityDelayMs() {
|
|
|
|
- return rackLocalityDelayMs;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public boolean isContinuousSchedulingEnabled() {
|
|
|
|
- return continuousSchedulingEnabled;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public synchronized int getContinuousSchedulingSleepMs() {
|
|
|
|
- return continuousSchedulingSleepMs;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public Resource getClusterCapacity() {
|
|
|
|
- return clusterCapacity;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public synchronized Clock getClock() {
|
|
|
|
- return clock;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- protected synchronized void setClock(Clock clock) {
|
|
|
|
- this.clock = clock;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FairSchedulerEventLog getEventLog() {
|
|
|
|
- return eventLog;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Add a new application to the scheduler, with a given id, queue name, and
|
|
|
|
- * user. This will accept a new app even if the user or queue is above
|
|
|
|
- * configured limits, but the app will not be marked as runnable.
|
|
|
|
- */
|
|
|
|
- protected synchronized void addApplication(ApplicationId applicationId,
|
|
|
|
- String queueName, String user) {
|
|
|
|
- if (queueName == null || queueName.isEmpty()) {
|
|
|
|
- String message = "Reject application " + applicationId +
|
|
|
|
- " submitted by user " + user + " with an empty queue name.";
|
|
|
|
- LOG.info(message);
|
|
|
|
- rmContext.getDispatcher().getEventHandler()
|
|
|
|
- .handle(new RMAppRejectedEvent(applicationId, message));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
|
|
|
- FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
|
|
|
- if (queue == null) {
|
|
|
|
- rmContext.getDispatcher().getEventHandler().handle(
|
|
|
|
- new RMAppRejectedEvent(applicationId,
|
|
|
|
- "Application rejected by queue placement policy"));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Enforce ACLs
|
|
|
|
- UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
|
|
|
|
-
|
|
|
|
- if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
|
|
|
|
- && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
|
|
|
- String msg = "User " + userUgi.getUserName() +
|
|
|
|
- " cannot submit applications to queue " + queue.getName();
|
|
|
|
- LOG.info(msg);
|
|
|
|
- rmContext.getDispatcher().getEventHandler()
|
|
|
|
- .handle(new RMAppRejectedEvent(applicationId, msg));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- SchedulerApplication application =
|
|
|
|
- new SchedulerApplication(queue, user);
|
|
|
|
- applications.put(applicationId, application);
|
|
|
|
- queue.getMetrics().submitApp(user);
|
|
|
|
-
|
|
|
|
- LOG.info("Accepted application " + applicationId + " from user: " + user
|
|
|
|
- + ", in queue: " + queueName + ", currently num of applications: "
|
|
|
|
- + applications.size());
|
|
|
|
- rmContext.getDispatcher().getEventHandler()
|
|
|
|
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Add a new application attempt to the scheduler.
|
|
|
|
- */
|
|
|
|
- protected synchronized void addApplicationAttempt(
|
|
|
|
- ApplicationAttemptId applicationAttemptId,
|
|
|
|
- boolean transferStateFromPreviousAttempt) {
|
|
|
|
- SchedulerApplication application =
|
|
|
|
- applications.get(applicationAttemptId.getApplicationId());
|
|
|
|
- String user = application.getUser();
|
|
|
|
- FSLeafQueue queue = (FSLeafQueue) application.getQueue();
|
|
|
|
-
|
|
|
|
- FSSchedulerApp attempt =
|
|
|
|
- new FSSchedulerApp(applicationAttemptId, user,
|
|
|
|
- queue, new ActiveUsersManager(getRootQueueMetrics()),
|
|
|
|
- rmContext);
|
|
|
|
- if (transferStateFromPreviousAttempt) {
|
|
|
|
- attempt.transferStateFromPreviousAttempt(application
|
|
|
|
- .getCurrentAppAttempt());
|
|
|
|
- }
|
|
|
|
- application.setCurrentAppAttempt(attempt);
|
|
|
|
-
|
|
|
|
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
|
|
|
|
- queue.addApp(attempt, runnable);
|
|
|
|
- if (runnable) {
|
|
|
|
- maxRunningEnforcer.trackRunnableApp(attempt);
|
|
|
|
- } else {
|
|
|
|
- maxRunningEnforcer.trackNonRunnableApp(attempt);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- queue.getMetrics().submitAppAttempt(user);
|
|
|
|
-
|
|
|
|
- LOG.info("Added Application Attempt " + applicationAttemptId
|
|
|
|
- + " to scheduler from user: " + user);
|
|
|
|
- rmContext.getDispatcher().getEventHandler().handle(
|
|
|
|
- new RMAppAttemptEvent(applicationAttemptId,
|
|
|
|
- RMAppAttemptEventType.ATTEMPT_ADDED));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @VisibleForTesting
|
|
|
|
- FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
|
|
|
- FSLeafQueue queue = null;
|
|
|
|
- try {
|
|
|
|
- QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
|
|
|
|
- queueName = placementPolicy.assignAppToQueue(queueName, user);
|
|
|
|
- if (queueName == null) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- queue = queueMgr.getLeafQueue(queueName, true);
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- LOG.error("Error assigning app to queue, rejecting", ex);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (rmApp != null) {
|
|
|
|
- rmApp.setQueue(queue.getName());
|
|
|
|
- } else {
|
|
|
|
- LOG.warn("Couldn't find RM app to set queue name on");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return queue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void removeApplication(ApplicationId applicationId,
|
|
|
|
- RMAppState finalState) {
|
|
|
|
- SchedulerApplication application = applications.get(applicationId);
|
|
|
|
- if (application == null){
|
|
|
|
- LOG.warn("Couldn't find application " + applicationId);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- application.stop(finalState);
|
|
|
|
- applications.remove(applicationId);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void removeApplicationAttempt(
|
|
|
|
- ApplicationAttemptId applicationAttemptId,
|
|
|
|
- RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
|
|
|
- LOG.info("Application " + applicationAttemptId + " is done." +
|
|
|
|
- " finalState=" + rmAppAttemptFinalState);
|
|
|
|
- SchedulerApplication application =
|
|
|
|
- applications.get(applicationAttemptId.getApplicationId());
|
|
|
|
- FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
|
|
|
|
-
|
|
|
|
- if (attempt == null || application == null) {
|
|
|
|
- LOG.info("Unknown application " + applicationAttemptId + " has completed!");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Release all the running containers
|
|
|
|
- for (RMContainer rmContainer : attempt.getLiveContainers()) {
|
|
|
|
- if (keepContainers
|
|
|
|
- && rmContainer.getState().equals(RMContainerState.RUNNING)) {
|
|
|
|
- // do not kill the running container in the case of work-preserving AM
|
|
|
|
- // restart.
|
|
|
|
- LOG.info("Skip killing " + rmContainer.getContainerId());
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- completedContainer(rmContainer,
|
|
|
|
- SchedulerUtils.createAbnormalContainerStatus(
|
|
|
|
- rmContainer.getContainerId(),
|
|
|
|
- SchedulerUtils.COMPLETED_APPLICATION),
|
|
|
|
- RMContainerEventType.KILL);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Release all reserved containers
|
|
|
|
- for (RMContainer rmContainer : attempt.getReservedContainers()) {
|
|
|
|
- completedContainer(rmContainer,
|
|
|
|
- SchedulerUtils.createAbnormalContainerStatus(
|
|
|
|
- rmContainer.getContainerId(),
|
|
|
|
- "Application Complete"),
|
|
|
|
- RMContainerEventType.KILL);
|
|
|
|
- }
|
|
|
|
- // Clean up pending requests, metrics etc.
|
|
|
|
- attempt.stop(rmAppAttemptFinalState);
|
|
|
|
-
|
|
|
|
- // Inform the queue
|
|
|
|
- FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
|
|
|
|
- .getQueueName(), false);
|
|
|
|
- boolean wasRunnable = queue.removeApp(attempt);
|
|
|
|
-
|
|
|
|
- if (wasRunnable) {
|
|
|
|
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
|
|
|
|
- } else {
|
|
|
|
- maxRunningEnforcer.untrackNonRunnableApp(attempt);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Clean up a completed container.
|
|
|
|
- */
|
|
|
|
- private synchronized void completedContainer(RMContainer rmContainer,
|
|
|
|
- ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
|
- if (rmContainer == null) {
|
|
|
|
- LOG.info("Null container completed...");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Container container = rmContainer.getContainer();
|
|
|
|
-
|
|
|
|
- // Get the application for the finished container
|
|
|
|
- FSSchedulerApp application =
|
|
|
|
- getCurrentAttemptForContainer(container.getId());
|
|
|
|
- ApplicationId appId =
|
|
|
|
- container.getId().getApplicationAttemptId().getApplicationId();
|
|
|
|
- if (application == null) {
|
|
|
|
- LOG.info("Container " + container + " of" +
|
|
|
|
- " unknown application attempt " + appId +
|
|
|
|
- " completed with event " + event);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Get the node on which the container was allocated
|
|
|
|
- FSSchedulerNode node = nodes.get(container.getNodeId());
|
|
|
|
-
|
|
|
|
- if (rmContainer.getState() == RMContainerState.RESERVED) {
|
|
|
|
- application.unreserve(node, rmContainer.getReservedPriority());
|
|
|
|
- node.unreserveResource(application);
|
|
|
|
- } else {
|
|
|
|
- application.containerCompleted(rmContainer, containerStatus, event);
|
|
|
|
- node.releaseContainer(container);
|
|
|
|
- updateRootQueueMetrics();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- LOG.info("Application attempt " + application.getApplicationAttemptId()
|
|
|
|
- + " released container " + container.getId() + " on node: " + node
|
|
|
|
- + " with event: " + event);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void addNode(RMNode node) {
|
|
|
|
- nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
|
|
|
|
- Resources.addTo(clusterCapacity, node.getTotalCapability());
|
|
|
|
- updateRootQueueMetrics();
|
|
|
|
-
|
|
|
|
- LOG.info("Added node " + node.getNodeAddress() +
|
|
|
|
- " cluster capacity: " + clusterCapacity);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void removeNode(RMNode rmNode) {
|
|
|
|
- FSSchedulerNode node = nodes.get(rmNode.getNodeID());
|
|
|
|
- // This can occur when an UNHEALTHY node reconnects
|
|
|
|
- if (node == null) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
|
|
|
|
- updateRootQueueMetrics();
|
|
|
|
-
|
|
|
|
- // Remove running containers
|
|
|
|
- List<RMContainer> runningContainers = node.getRunningContainers();
|
|
|
|
- for (RMContainer container : runningContainers) {
|
|
|
|
- completedContainer(container,
|
|
|
|
- SchedulerUtils.createAbnormalContainerStatus(
|
|
|
|
- container.getContainerId(),
|
|
|
|
- SchedulerUtils.LOST_CONTAINER),
|
|
|
|
- RMContainerEventType.KILL);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Remove reservations, if any
|
|
|
|
- RMContainer reservedContainer = node.getReservedContainer();
|
|
|
|
- if (reservedContainer != null) {
|
|
|
|
- completedContainer(reservedContainer,
|
|
|
|
- SchedulerUtils.createAbnormalContainerStatus(
|
|
|
|
- reservedContainer.getContainerId(),
|
|
|
|
- SchedulerUtils.LOST_CONTAINER),
|
|
|
|
- RMContainerEventType.KILL);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- nodes.remove(rmNode.getNodeID());
|
|
|
|
- LOG.info("Removed node " + rmNode.getNodeAddress() +
|
|
|
|
- " cluster capacity: " + clusterCapacity);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public Allocation allocate(ApplicationAttemptId appAttemptId,
|
|
|
|
- List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
|
|
|
-
|
|
|
|
- // Make sure this application exists
|
|
|
|
- FSSchedulerApp application = getSchedulerApp(appAttemptId);
|
|
|
|
- if (application == null) {
|
|
|
|
- LOG.info("Calling allocate on removed " +
|
|
|
|
- "or non existant application " + appAttemptId);
|
|
|
|
- return EMPTY_ALLOCATION;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Sanity check
|
|
|
|
- SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
|
|
|
|
- clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
|
|
|
|
-
|
|
|
|
- // Release containers
|
|
|
|
- for (ContainerId releasedContainerId : release) {
|
|
|
|
- RMContainer rmContainer = getRMContainer(releasedContainerId);
|
|
|
|
- if (rmContainer == null) {
|
|
|
|
- RMAuditLogger.logFailure(application.getUser(),
|
|
|
|
- AuditConstants.RELEASE_CONTAINER,
|
|
|
|
- "Unauthorized access or invalid container", "FairScheduler",
|
|
|
|
- "Trying to release container not owned by app or with invalid id",
|
|
|
|
- application.getApplicationId(), releasedContainerId);
|
|
|
|
- }
|
|
|
|
- completedContainer(rmContainer,
|
|
|
|
- SchedulerUtils.createAbnormalContainerStatus(
|
|
|
|
- releasedContainerId,
|
|
|
|
- SchedulerUtils.RELEASED_CONTAINER),
|
|
|
|
- RMContainerEventType.RELEASED);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- synchronized (application) {
|
|
|
|
- if (!ask.isEmpty()) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("allocate: pre-update" +
|
|
|
|
- " applicationAttemptId=" + appAttemptId +
|
|
|
|
- " application=" + application.getApplicationId());
|
|
|
|
- }
|
|
|
|
- application.showRequests();
|
|
|
|
-
|
|
|
|
- // Update application requests
|
|
|
|
- application.updateResourceRequests(ask);
|
|
|
|
-
|
|
|
|
- LOG.debug("allocate: post-update");
|
|
|
|
- application.showRequests();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("allocate:" +
|
|
|
|
- " applicationAttemptId=" + appAttemptId +
|
|
|
|
- " #ask=" + ask.size());
|
|
|
|
-
|
|
|
|
- LOG.debug("Preempting " + application.getPreemptionContainers().size()
|
|
|
|
- + " container(s)");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
|
|
|
|
- for (RMContainer container : application.getPreemptionContainers()) {
|
|
|
|
- preemptionContainerIds.add(container.getContainerId());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
|
|
|
-
|
|
|
|
- return new Allocation(application.pullNewlyAllocatedContainers(),
|
|
|
|
- application.getHeadroom(), preemptionContainerIds);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Process a container which has launched on a node, as reported by the node.
|
|
|
|
- */
|
|
|
|
- private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
|
|
|
|
- // Get the application for the finished container
|
|
|
|
- FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
|
|
|
|
- if (application == null) {
|
|
|
|
- LOG.info("Unknown application "
|
|
|
|
- + containerId.getApplicationAttemptId().getApplicationId()
|
|
|
|
- + " launched container " + containerId + " on node: " + node);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- application.containerLaunchedOnNode(containerId, node.getNodeID());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Process a heartbeat update from a node.
|
|
|
|
- */
|
|
|
|
- private synchronized void nodeUpdate(RMNode nm) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
|
|
|
|
- }
|
|
|
|
- eventLog.log("HEARTBEAT", nm.getHostName());
|
|
|
|
- FSSchedulerNode node = nodes.get(nm.getNodeID());
|
|
|
|
-
|
|
|
|
- // Update resource if any change
|
|
|
|
- SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
|
|
|
|
-
|
|
|
|
- List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
|
|
|
- List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
|
|
|
|
- List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
|
|
|
|
- for(UpdatedContainerInfo containerInfo : containerInfoList) {
|
|
|
|
- newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
|
|
|
|
- completedContainers.addAll(containerInfo.getCompletedContainers());
|
|
|
|
- }
|
|
|
|
- // Processing the newly launched containers
|
|
|
|
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
|
|
|
|
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Process completed containers
|
|
|
|
- for (ContainerStatus completedContainer : completedContainers) {
|
|
|
|
- ContainerId containerId = completedContainer.getContainerId();
|
|
|
|
- LOG.debug("Container FINISHED: " + containerId);
|
|
|
|
- completedContainer(getRMContainer(containerId),
|
|
|
|
- completedContainer, RMContainerEventType.FINISHED);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (continuousSchedulingEnabled) {
|
|
|
|
- if (!completedContainers.isEmpty()) {
|
|
|
|
- attemptScheduling(node);
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- attemptScheduling(node);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void continuousScheduling() {
|
|
|
|
- while (true) {
|
|
|
|
- List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
|
|
|
- Collections.sort(nodeIdList, nodeAvailableResourceComparator);
|
|
|
|
-
|
|
|
|
- // iterate all nodes
|
|
|
|
- for (NodeId nodeId : nodeIdList) {
|
|
|
|
- if (nodes.containsKey(nodeId)) {
|
|
|
|
- FSSchedulerNode node = nodes.get(nodeId);
|
|
|
|
- try {
|
|
|
|
- if (Resources.fitsIn(minimumAllocation,
|
|
|
|
- node.getAvailableResource())) {
|
|
|
|
- attemptScheduling(node);
|
|
|
|
- }
|
|
|
|
- } catch (Throwable ex) {
|
|
|
|
- LOG.warn("Error while attempting scheduling for node " + node +
|
|
|
|
- ": " + ex.toString(), ex);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(getContinuousSchedulingSleepMs());
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- LOG.warn("Error while doing sleep in continuous scheduling: " +
|
|
|
|
- e.toString(), e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** Sort nodes by available resource */
|
|
|
|
- private class NodeAvailableResourceComparator implements Comparator<NodeId> {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public int compare(NodeId n1, NodeId n2) {
|
|
|
|
- return RESOURCE_CALCULATOR.compare(clusterCapacity,
|
|
|
|
- nodes.get(n2).getAvailableResource(),
|
|
|
|
- nodes.get(n1).getAvailableResource());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void attemptScheduling(FSSchedulerNode node) {
|
|
|
|
- // Assign new containers...
|
|
|
|
- // 1. Check for reserved applications
|
|
|
|
- // 2. Schedule if there are no reservations
|
|
|
|
-
|
|
|
|
- AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
|
|
|
|
- if (reservedAppSchedulable != null) {
|
|
|
|
- Priority reservedPriority = node.getReservedContainer().getReservedPriority();
|
|
|
|
- if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
|
|
|
|
- // Don't hold the reservation if app can no longer use it
|
|
|
|
- LOG.info("Releasing reservation that cannot be satisfied for application "
|
|
|
|
- + reservedAppSchedulable.getApp().getApplicationAttemptId()
|
|
|
|
- + " on node " + node);
|
|
|
|
- reservedAppSchedulable.unreserve(reservedPriority, node);
|
|
|
|
- reservedAppSchedulable = null;
|
|
|
|
- } else {
|
|
|
|
- // Reservation exists; try to fulfill the reservation
|
|
|
|
- LOG.info("Trying to fulfill reservation for application "
|
|
|
|
- + reservedAppSchedulable.getApp().getApplicationAttemptId()
|
|
|
|
- + " on node: " + node);
|
|
|
|
-
|
|
|
|
- node.getReservedAppSchedulable().assignReservedContainer(node);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (reservedAppSchedulable == null) {
|
|
|
|
- // No reservation, schedule at queue which is farthest below fair share
|
|
|
|
- int assignedContainers = 0;
|
|
|
|
- while (node.getReservedContainer() == null) {
|
|
|
|
- boolean assignedContainer = false;
|
|
|
|
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
|
- queueMgr.getRootQueue().assignContainer(node),
|
|
|
|
- Resources.none())) {
|
|
|
|
- assignedContainers++;
|
|
|
|
- assignedContainer = true;
|
|
|
|
- }
|
|
|
|
- if (!assignedContainer) { break; }
|
|
|
|
- if (!assignMultiple) { break; }
|
|
|
|
- if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- updateRootQueueMetrics();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public SchedulerNodeReport getNodeReport(NodeId nodeId) {
|
|
|
|
- FSSchedulerNode node = nodes.get(nodeId);
|
|
|
|
- return node == null ? null : new SchedulerNodeReport(node);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
|
|
|
|
- SchedulerApplication app =
|
|
|
|
- applications.get(appAttemptId.getApplicationId());
|
|
|
|
- if (app != null) {
|
|
|
|
- return (FSSchedulerApp) app.getCurrentAppAttempt();
|
|
|
|
- }
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public SchedulerAppReport getSchedulerAppInfo(
|
|
|
|
- ApplicationAttemptId appAttemptId) {
|
|
|
|
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
|
|
|
|
- if (attempt == null) {
|
|
|
|
- LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- return new SchedulerAppReport(attempt);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public ApplicationResourceUsageReport getAppResourceUsageReport(
|
|
|
|
- ApplicationAttemptId appAttemptId) {
|
|
|
|
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
|
|
|
|
- if (attempt == null) {
|
|
|
|
- LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- return attempt.getResourceUsageReport();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Subqueue metrics might be a little out of date because fair shares are
|
|
|
|
- * recalculated at the update interval, but the root queue metrics needs to
|
|
|
|
- * be updated synchronously with allocations and completions so that cluster
|
|
|
|
- * metrics will be consistent.
|
|
|
|
- */
|
|
|
|
- private void updateRootQueueMetrics() {
|
|
|
|
- rootMetrics.setAvailableResourcesToQueue(
|
|
|
|
- Resources.subtract(
|
|
|
|
- clusterCapacity, rootMetrics.getAllocatedResources()));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public QueueMetrics getRootQueueMetrics() {
|
|
|
|
- return rootMetrics;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void handle(SchedulerEvent event) {
|
|
|
|
- switch (event.getType()) {
|
|
|
|
- case NODE_ADDED:
|
|
|
|
- if (!(event instanceof NodeAddedSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
|
|
|
|
- addNode(nodeAddedEvent.getAddedRMNode());
|
|
|
|
- break;
|
|
|
|
- case NODE_REMOVED:
|
|
|
|
- if (!(event instanceof NodeRemovedSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
|
|
|
|
- removeNode(nodeRemovedEvent.getRemovedRMNode());
|
|
|
|
- break;
|
|
|
|
- case NODE_UPDATE:
|
|
|
|
- if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
|
|
|
- nodeUpdate(nodeUpdatedEvent.getRMNode());
|
|
|
|
- break;
|
|
|
|
- case APP_ADDED:
|
|
|
|
- if (!(event instanceof AppAddedSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
|
|
|
- addApplication(appAddedEvent.getApplicationId(),
|
|
|
|
- appAddedEvent.getQueue(), appAddedEvent.getUser());
|
|
|
|
- break;
|
|
|
|
- case APP_REMOVED:
|
|
|
|
- if (!(event instanceof AppRemovedSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
|
|
|
- removeApplication(appRemovedEvent.getApplicationID(),
|
|
|
|
- appRemovedEvent.getFinalState());
|
|
|
|
- break;
|
|
|
|
- case APP_ATTEMPT_ADDED:
|
|
|
|
- if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
|
|
|
- (AppAttemptAddedSchedulerEvent) event;
|
|
|
|
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
|
|
|
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
|
|
|
|
- break;
|
|
|
|
- case APP_ATTEMPT_REMOVED:
|
|
|
|
- if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
|
|
|
- (AppAttemptRemovedSchedulerEvent) event;
|
|
|
|
- removeApplicationAttempt(
|
|
|
|
- appAttemptRemovedEvent.getApplicationAttemptID(),
|
|
|
|
- appAttemptRemovedEvent.getFinalAttemptState(),
|
|
|
|
- appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
|
|
|
|
- break;
|
|
|
|
- case CONTAINER_EXPIRED:
|
|
|
|
- if (!(event instanceof ContainerExpiredSchedulerEvent)) {
|
|
|
|
- throw new RuntimeException("Unexpected event type: " + event);
|
|
|
|
- }
|
|
|
|
- ContainerExpiredSchedulerEvent containerExpiredEvent =
|
|
|
|
- (ContainerExpiredSchedulerEvent)event;
|
|
|
|
- ContainerId containerId = containerExpiredEvent.getContainerId();
|
|
|
|
- completedContainer(getRMContainer(containerId),
|
|
|
|
- SchedulerUtils.createAbnormalContainerStatus(
|
|
|
|
- containerId,
|
|
|
|
- SchedulerUtils.EXPIRED_CONTAINER),
|
|
|
|
- RMContainerEventType.EXPIRE);
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void recover(RMState state) throws Exception {
|
|
|
|
- // NOT IMPLEMENTED
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
|
|
|
- throws IOException {
|
|
|
|
- if (!initialized) {
|
|
|
|
- this.conf = new FairSchedulerConfiguration(conf);
|
|
|
|
- validateConf(this.conf);
|
|
|
|
- minimumAllocation = this.conf.getMinimumAllocation();
|
|
|
|
- maximumAllocation = this.conf.getMaximumAllocation();
|
|
|
|
- incrAllocation = this.conf.getIncrementAllocation();
|
|
|
|
- continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
|
|
|
- continuousSchedulingSleepMs =
|
|
|
|
- this.conf.getContinuousSchedulingSleepMs();
|
|
|
|
- nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
|
|
|
- rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
|
|
|
- nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
|
|
|
- rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
|
|
|
- preemptionEnabled = this.conf.getPreemptionEnabled();
|
|
|
|
- assignMultiple = this.conf.getAssignMultiple();
|
|
|
|
- maxAssign = this.conf.getMaxAssign();
|
|
|
|
- sizeBasedWeight = this.conf.getSizeBasedWeight();
|
|
|
|
- preemptionInterval = this.conf.getPreemptionInterval();
|
|
|
|
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
|
|
|
- usePortForNodeName = this.conf.getUsePortForNodeName();
|
|
|
|
-
|
|
|
|
- rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
|
|
|
- this.rmContext = rmContext;
|
|
|
|
- this.eventLog = new FairSchedulerEventLog();
|
|
|
|
- eventLog.init(this.conf);
|
|
|
|
-
|
|
|
|
- initialized = true;
|
|
|
|
-
|
|
|
|
- allocConf = new AllocationConfiguration(conf);
|
|
|
|
- try {
|
|
|
|
- queueMgr.initialize(conf);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- throw new IOException("Failed to start FairScheduler", e);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Thread updateThread = new Thread(new UpdateThread());
|
|
|
|
- updateThread.setName("FairSchedulerUpdateThread");
|
|
|
|
- updateThread.setDaemon(true);
|
|
|
|
- updateThread.start();
|
|
|
|
-
|
|
|
|
- if (continuousSchedulingEnabled) {
|
|
|
|
- // start continuous scheduling thread
|
|
|
|
- Thread schedulingThread = new Thread(
|
|
|
|
- new Runnable() {
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- continuousScheduling();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- );
|
|
|
|
- schedulingThread.setName("ContinuousScheduling");
|
|
|
|
- schedulingThread.setDaemon(true);
|
|
|
|
- schedulingThread.start();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- allocsLoader.init(conf);
|
|
|
|
- allocsLoader.setReloadListener(new AllocationReloadListener());
|
|
|
|
- // If we fail to load allocations file on initialize, we want to fail
|
|
|
|
- // immediately. After a successful load, exceptions on future reloads
|
|
|
|
- // will just result in leaving things as they are.
|
|
|
|
- try {
|
|
|
|
- allocsLoader.reloadAllocations();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- throw new IOException("Failed to initialize FairScheduler", e);
|
|
|
|
- }
|
|
|
|
- allocsLoader.start();
|
|
|
|
- } else {
|
|
|
|
- try {
|
|
|
|
- allocsLoader.reloadAllocations();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.error("Failed to reload allocations file", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
|
|
|
|
- boolean recursive) throws IOException {
|
|
|
|
- if (!queueMgr.exists(queueName)) {
|
|
|
|
- throw new IOException("queue " + queueName + " does not exist");
|
|
|
|
- }
|
|
|
|
- return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
|
|
|
|
- recursive);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public List<QueueUserACLInfo> getQueueUserAclInfo() {
|
|
|
|
- UserGroupInformation user = null;
|
|
|
|
- try {
|
|
|
|
- user = UserGroupInformation.getCurrentUser();
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- return new ArrayList<QueueUserACLInfo>();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return queueMgr.getRootQueue().getQueueUserAclInfo(user);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public int getNumClusterNodes() {
|
|
|
|
- return nodes.size();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
|
|
|
- QueueACL acl, String queueName) {
|
|
|
|
- FSQueue queue = getQueueManager().getQueue(queueName);
|
|
|
|
- if (queue == null) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("ACL not found for queue access-type " + acl
|
|
|
|
- + " for queue " + queueName);
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- return queue.hasAccess(acl, callerUGI);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public AllocationConfiguration getAllocationConfiguration() {
|
|
|
|
- return allocConf;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private class AllocationReloadListener implements
|
|
|
|
- AllocationFileLoaderService.Listener {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void onReload(AllocationConfiguration queueInfo) {
|
|
|
|
- // Commit the reload; also create any queue defined in the alloc file
|
|
|
|
- // if it does not already exist, so it can be displayed on the web UI.
|
|
|
|
- synchronized (FairScheduler.this) {
|
|
|
|
- allocConf = queueInfo;
|
|
|
|
- allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
|
|
|
|
- queueMgr.updateAllocationConfiguration(allocConf);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
|
|
|
- FSQueue queue = queueMgr.getQueue(queueName);
|
|
|
|
- if (queue == null) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
|
|
|
|
- queue.collectSchedulerApplications(apps);
|
|
|
|
- return apps;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-}
|
|
|