|
@@ -0,0 +1,768 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
+
|
|
|
+import java.io.Serializable;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Represents an application attempt from the viewpoint of the Fair Scheduler.
|
|
|
+ */
|
|
|
+@Private
|
|
|
+@Unstable
|
|
|
+public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
+ implements Schedulable {
|
|
|
+
|
|
|
+ private static final Log LOG = LogFactory.getLog(FSAppAttempt.class);
|
|
|
+ private static final DefaultResourceCalculator RESOURCE_CALCULATOR
|
|
|
+ = new DefaultResourceCalculator();
|
|
|
+
|
|
|
+ private long startTime;
|
|
|
+ private Priority priority;
|
|
|
+ private ResourceWeights resourceWeights;
|
|
|
+ private Resource demand = Resources.createResource(0);
|
|
|
+ private FairScheduler scheduler;
|
|
|
+ private Resource fairShare = Resources.createResource(0, 0);
|
|
|
+ private Resource preemptedResources = Resources.createResource(0);
|
|
|
+ private RMContainerComparator comparator = new RMContainerComparator();
|
|
|
+ private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delay scheduling: We often want to prioritize scheduling of node-local
|
|
|
+ * containers over rack-local or off-switch containers. To acheive this
|
|
|
+ * we first only allow node-local assigments for a given prioirty level,
|
|
|
+ * then relax the locality threshold once we've had a long enough period
|
|
|
+ * without succesfully scheduling. We measure both the number of "missed"
|
|
|
+ * scheduling opportunities since the last container was scheduled
|
|
|
+ * at the current allowed level and the time since the last container
|
|
|
+ * was scheduled. Currently we use only the former.
|
|
|
+ */
|
|
|
+ private final Map<Priority, NodeType> allowedLocalityLevel =
|
|
|
+ new HashMap<Priority, NodeType>();
|
|
|
+
|
|
|
+ public FSAppAttempt(FairScheduler scheduler,
|
|
|
+ ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue,
|
|
|
+ ActiveUsersManager activeUsersManager, RMContext rmContext) {
|
|
|
+ super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
|
|
|
+
|
|
|
+ this.scheduler = scheduler;
|
|
|
+ this.startTime = scheduler.getClock().getTime();
|
|
|
+ this.priority = Priority.newInstance(1);
|
|
|
+ this.resourceWeights = new ResourceWeights();
|
|
|
+ }
|
|
|
+
|
|
|
+ public ResourceWeights getResourceWeights() {
|
|
|
+ return resourceWeights;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get metrics reference from containing queue.
|
|
|
+ */
|
|
|
+ public QueueMetrics getMetrics() {
|
|
|
+ return queue.getMetrics();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized public void containerCompleted(RMContainer rmContainer,
|
|
|
+ ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
+
|
|
|
+ Container container = rmContainer.getContainer();
|
|
|
+ ContainerId containerId = container.getId();
|
|
|
+
|
|
|
+ // Remove from the list of newly allocated containers if found
|
|
|
+ newlyAllocatedContainers.remove(rmContainer);
|
|
|
+
|
|
|
+ // Inform the container
|
|
|
+ rmContainer.handle(
|
|
|
+ new RMContainerFinishedEvent(
|
|
|
+ containerId,
|
|
|
+ containerStatus,
|
|
|
+ event)
|
|
|
+ );
|
|
|
+ LOG.info("Completed container: " + rmContainer.getContainerId() +
|
|
|
+ " in state: " + rmContainer.getState() + " event:" + event);
|
|
|
+
|
|
|
+ // Remove from the list of containers
|
|
|
+ liveContainers.remove(rmContainer.getContainerId());
|
|
|
+
|
|
|
+ RMAuditLogger.logSuccess(getUser(),
|
|
|
+ AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
|
|
|
+ getApplicationId(), containerId);
|
|
|
+
|
|
|
+ // Update usage metrics
|
|
|
+ Resource containerResource = rmContainer.getContainer().getResource();
|
|
|
+ queue.getMetrics().releaseResources(getUser(), 1, containerResource);
|
|
|
+ Resources.subtractFrom(currentConsumption, containerResource);
|
|
|
+
|
|
|
+ // remove from preemption map if it is completed
|
|
|
+ preemptionMap.remove(rmContainer);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void unreserveInternal(
|
|
|
+ Priority priority, FSSchedulerNode node) {
|
|
|
+ Map<NodeId, RMContainer> reservedContainers =
|
|
|
+ this.reservedContainers.get(priority);
|
|
|
+ RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
|
|
|
+ if (reservedContainers.isEmpty()) {
|
|
|
+ this.reservedContainers.remove(priority);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reset the re-reservation count
|
|
|
+ resetReReservations(priority);
|
|
|
+
|
|
|
+ Resource resource = reservedContainer.getContainer().getResource();
|
|
|
+ Resources.subtractFrom(currentReservation, resource);
|
|
|
+
|
|
|
+ LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
|
|
|
+ + node + ", currently has " + reservedContainers.size() + " at priority "
|
|
|
+ + priority + "; currentReservation " + currentReservation);
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized float getLocalityWaitFactor(
|
|
|
+ Priority priority, int clusterNodes) {
|
|
|
+ // Estimate: Required unique resources (i.e. hosts + racks)
|
|
|
+ int requiredResources =
|
|
|
+ Math.max(this.getResourceRequests(priority).size() - 1, 0);
|
|
|
+
|
|
|
+ // waitFactor can't be more than '1'
|
|
|
+ // i.e. no point skipping more than clustersize opportunities
|
|
|
+ return Math.min(((float)requiredResources / clusterNodes), 1.0f);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the level at which we are allowed to schedule containers, given the
|
|
|
+ * current size of the cluster and thresholds indicating how many nodes to
|
|
|
+ * fail at (as a fraction of cluster size) before relaxing scheduling
|
|
|
+ * constraints.
|
|
|
+ */
|
|
|
+ public synchronized NodeType getAllowedLocalityLevel(Priority priority,
|
|
|
+ int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
|
|
|
+ // upper limit on threshold
|
|
|
+ if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
|
|
|
+ if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
|
|
|
+
|
|
|
+ // If delay scheduling is not being used, can schedule anywhere
|
|
|
+ if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
|
|
|
+ return NodeType.OFF_SWITCH;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Default level is NODE_LOCAL
|
|
|
+ if (!allowedLocalityLevel.containsKey(priority)) {
|
|
|
+ allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
|
|
|
+ return NodeType.NODE_LOCAL;
|
|
|
+ }
|
|
|
+
|
|
|
+ NodeType allowed = allowedLocalityLevel.get(priority);
|
|
|
+
|
|
|
+ // If level is already most liberal, we're done
|
|
|
+ if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
|
|
|
+
|
|
|
+ double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
|
|
|
+ rackLocalityThreshold;
|
|
|
+
|
|
|
+ // Relax locality constraints once we've surpassed threshold.
|
|
|
+ if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
|
|
|
+ if (allowed.equals(NodeType.NODE_LOCAL)) {
|
|
|
+ allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
|
|
|
+ resetSchedulingOpportunities(priority);
|
|
|
+ }
|
|
|
+ else if (allowed.equals(NodeType.RACK_LOCAL)) {
|
|
|
+ allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
|
|
|
+ resetSchedulingOpportunities(priority);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return allowedLocalityLevel.get(priority);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the level at which we are allowed to schedule containers.
|
|
|
+ * Given the thresholds indicating how much time passed before relaxing
|
|
|
+ * scheduling constraints.
|
|
|
+ */
|
|
|
+ public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
|
|
|
+ long nodeLocalityDelayMs, long rackLocalityDelayMs,
|
|
|
+ long currentTimeMs) {
|
|
|
+
|
|
|
+ // if not being used, can schedule anywhere
|
|
|
+ if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
|
|
|
+ return NodeType.OFF_SWITCH;
|
|
|
+ }
|
|
|
+
|
|
|
+ // default level is NODE_LOCAL
|
|
|
+ if (! allowedLocalityLevel.containsKey(priority)) {
|
|
|
+ allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
|
|
|
+ return NodeType.NODE_LOCAL;
|
|
|
+ }
|
|
|
+
|
|
|
+ NodeType allowed = allowedLocalityLevel.get(priority);
|
|
|
+
|
|
|
+ // if level is already most liberal, we're done
|
|
|
+ if (allowed.equals(NodeType.OFF_SWITCH)) {
|
|
|
+ return NodeType.OFF_SWITCH;
|
|
|
+ }
|
|
|
+
|
|
|
+ // check waiting time
|
|
|
+ long waitTime = currentTimeMs;
|
|
|
+ if (lastScheduledContainer.containsKey(priority)) {
|
|
|
+ waitTime -= lastScheduledContainer.get(priority);
|
|
|
+ } else {
|
|
|
+ waitTime -= getStartTime();
|
|
|
+ }
|
|
|
+
|
|
|
+ long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
|
|
|
+ nodeLocalityDelayMs : rackLocalityDelayMs;
|
|
|
+
|
|
|
+ if (waitTime > thresholdTime) {
|
|
|
+ if (allowed.equals(NodeType.NODE_LOCAL)) {
|
|
|
+ allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
|
|
|
+ resetSchedulingOpportunities(priority, currentTimeMs);
|
|
|
+ } else if (allowed.equals(NodeType.RACK_LOCAL)) {
|
|
|
+ allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
|
|
|
+ resetSchedulingOpportunities(priority, currentTimeMs);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return allowedLocalityLevel.get(priority);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
|
|
|
+ Priority priority, ResourceRequest request,
|
|
|
+ Container container) {
|
|
|
+ // Update allowed locality level
|
|
|
+ NodeType allowed = allowedLocalityLevel.get(priority);
|
|
|
+ if (allowed != null) {
|
|
|
+ if (allowed.equals(NodeType.OFF_SWITCH) &&
|
|
|
+ (type.equals(NodeType.NODE_LOCAL) ||
|
|
|
+ type.equals(NodeType.RACK_LOCAL))) {
|
|
|
+ this.resetAllowedLocalityLevel(priority, type);
|
|
|
+ }
|
|
|
+ else if (allowed.equals(NodeType.RACK_LOCAL) &&
|
|
|
+ type.equals(NodeType.NODE_LOCAL)) {
|
|
|
+ this.resetAllowedLocalityLevel(priority, type);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Required sanity check - AM can call 'allocate' to update resource
|
|
|
+ // request without locking the scheduler, hence we need to check
|
|
|
+ if (getTotalRequiredResources(priority) <= 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create RMContainer
|
|
|
+ RMContainer rmContainer = new RMContainerImpl(container,
|
|
|
+ getApplicationAttemptId(), node.getNodeID(),
|
|
|
+ appSchedulingInfo.getUser(), rmContext);
|
|
|
+
|
|
|
+ // Add it to allContainers list.
|
|
|
+ newlyAllocatedContainers.add(rmContainer);
|
|
|
+ liveContainers.put(container.getId(), rmContainer);
|
|
|
+
|
|
|
+ // Update consumption and track allocations
|
|
|
+ List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
|
|
|
+ type, node, priority, request, container);
|
|
|
+ Resources.addTo(currentConsumption, container.getResource());
|
|
|
+
|
|
|
+ // Update resource requests related to "request" and store in RMContainer
|
|
|
+ ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
|
|
|
+
|
|
|
+ // Inform the container
|
|
|
+ rmContainer.handle(
|
|
|
+ new RMContainerEvent(container.getId(), RMContainerEventType.START));
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("allocate: applicationAttemptId="
|
|
|
+ + container.getId().getApplicationAttemptId()
|
|
|
+ + " container=" + container.getId() + " host="
|
|
|
+ + container.getNodeId().getHost() + " type=" + type);
|
|
|
+ }
|
|
|
+ RMAuditLogger.logSuccess(getUser(),
|
|
|
+ AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
|
|
|
+ getApplicationId(), container.getId());
|
|
|
+
|
|
|
+ return rmContainer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Should be called when the scheduler assigns a container at a higher
|
|
|
+ * degree of locality than the current threshold. Reset the allowed locality
|
|
|
+ * level to a higher degree of locality.
|
|
|
+ */
|
|
|
+ public synchronized void resetAllowedLocalityLevel(Priority priority,
|
|
|
+ NodeType level) {
|
|
|
+ NodeType old = allowedLocalityLevel.get(priority);
|
|
|
+ LOG.info("Raising locality level from " + old + " to " + level + " at " +
|
|
|
+ " priority " + priority);
|
|
|
+ allowedLocalityLevel.put(priority, level);
|
|
|
+ }
|
|
|
+
|
|
|
+ // related methods
|
|
|
+ public void addPreemption(RMContainer container, long time) {
|
|
|
+ assert preemptionMap.get(container) == null;
|
|
|
+ preemptionMap.put(container, time);
|
|
|
+ Resources.addTo(preemptedResources, container.getAllocatedResource());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long getContainerPreemptionTime(RMContainer container) {
|
|
|
+ return preemptionMap.get(container);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Set<RMContainer> getPreemptionContainers() {
|
|
|
+ return preemptionMap.keySet();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FSLeafQueue getQueue() {
|
|
|
+ return (FSLeafQueue)super.getQueue();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Resource getPreemptedResources() {
|
|
|
+ return preemptedResources;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void resetPreemptedResources() {
|
|
|
+ preemptedResources = Resources.createResource(0);
|
|
|
+ for (RMContainer container : getPreemptionContainers()) {
|
|
|
+ Resources.addTo(preemptedResources, container.getAllocatedResource());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void clearPreemptedResources() {
|
|
|
+ preemptedResources.setMemory(0);
|
|
|
+ preemptedResources.setVirtualCores(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create and return a container object reflecting an allocation for the
|
|
|
+ * given appliction on the given node with the given capability and
|
|
|
+ * priority.
|
|
|
+ */
|
|
|
+ public Container createContainer(
|
|
|
+ FSSchedulerNode node, Resource capability, Priority priority) {
|
|
|
+
|
|
|
+ NodeId nodeId = node.getRMNode().getNodeID();
|
|
|
+ ContainerId containerId = BuilderUtils.newContainerId(
|
|
|
+ getApplicationAttemptId(), getNewContainerId());
|
|
|
+
|
|
|
+ // Create the container
|
|
|
+ Container container =
|
|
|
+ BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
|
|
+ .getHttpAddress(), capability, priority, null);
|
|
|
+
|
|
|
+ return container;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reserve a spot for {@code container} on this {@code node}. If
|
|
|
+ * the container is {@code alreadyReserved} on the node, simply
|
|
|
+ * update relevant bookeeping. This dispatches ro relevant handlers
|
|
|
+ * in {@link FSSchedulerNode}..
|
|
|
+ */
|
|
|
+ private void reserve(Priority priority, FSSchedulerNode node,
|
|
|
+ Container container, boolean alreadyReserved) {
|
|
|
+ LOG.info("Making reservation: node=" + node.getNodeName() +
|
|
|
+ " app_id=" + getApplicationId());
|
|
|
+
|
|
|
+ if (!alreadyReserved) {
|
|
|
+ getMetrics().reserveResource(getUser(), container.getResource());
|
|
|
+ RMContainer rmContainer =
|
|
|
+ super.reserve(node, priority, null, container);
|
|
|
+ node.reserveResource(this, priority, rmContainer);
|
|
|
+ } else {
|
|
|
+ RMContainer rmContainer = node.getReservedContainer();
|
|
|
+ super.reserve(node, priority, rmContainer, container);
|
|
|
+ node.reserveResource(this, priority, rmContainer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove the reservation on {@code node} at the given {@link Priority}.
|
|
|
+ * This dispatches SchedulerNode handlers as well.
|
|
|
+ */
|
|
|
+ public void unreserve(Priority priority, FSSchedulerNode node) {
|
|
|
+ RMContainer rmContainer = node.getReservedContainer();
|
|
|
+ unreserveInternal(priority, node);
|
|
|
+ node.unreserveResource(this);
|
|
|
+ getMetrics().unreserveResource(
|
|
|
+ getUser(), rmContainer.getContainer().getResource());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Assign a container to this node to facilitate {@code request}. If node does
|
|
|
+ * not have enough memory, create a reservation. This is called once we are
|
|
|
+ * sure the particular request should be facilitated by this node.
|
|
|
+ *
|
|
|
+ * @param node
|
|
|
+ * The node to try placing the container on.
|
|
|
+ * @param request
|
|
|
+ * The ResourceRequest we're trying to satisfy.
|
|
|
+ * @param type
|
|
|
+ * The locality of the assignment.
|
|
|
+ * @param reserved
|
|
|
+ * Whether there's already a container reserved for this app on the node.
|
|
|
+ * @return
|
|
|
+ * If an assignment was made, returns the resources allocated to the
|
|
|
+ * container. If a reservation was made, returns
|
|
|
+ * FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was
|
|
|
+ * made, returns an empty resource.
|
|
|
+ */
|
|
|
+ private Resource assignContainer(
|
|
|
+ FSSchedulerNode node, ResourceRequest request, NodeType type,
|
|
|
+ boolean reserved) {
|
|
|
+
|
|
|
+ // How much does this request need?
|
|
|
+ Resource capability = request.getCapability();
|
|
|
+
|
|
|
+ // How much does the node have?
|
|
|
+ Resource available = node.getAvailableResource();
|
|
|
+
|
|
|
+ Container container = null;
|
|
|
+ if (reserved) {
|
|
|
+ container = node.getReservedContainer().getContainer();
|
|
|
+ } else {
|
|
|
+ container = createContainer(node, capability, request.getPriority());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Can we allocate a container on this node?
|
|
|
+ if (Resources.fitsIn(capability, available)) {
|
|
|
+ // Inform the application of the new container for this request
|
|
|
+ RMContainer allocatedContainer =
|
|
|
+ allocate(type, node, request.getPriority(), request, container);
|
|
|
+ if (allocatedContainer == null) {
|
|
|
+ // Did the application need this resource?
|
|
|
+ if (reserved) {
|
|
|
+ unreserve(request.getPriority(), node);
|
|
|
+ }
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we had previously made a reservation, delete it
|
|
|
+ if (reserved) {
|
|
|
+ unreserve(request.getPriority(), node);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Inform the node
|
|
|
+ node.allocateContainer(allocatedContainer);
|
|
|
+
|
|
|
+ // If this container is used to run AM, update the leaf queue's AM usage
|
|
|
+ if (getLiveContainers().size() == 1 && !getUnmanagedAM()) {
|
|
|
+ getQueue().addAMResourceUsage(container.getResource());
|
|
|
+ setAmRunning(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ return container.getResource();
|
|
|
+ } else {
|
|
|
+ // The desired container won't fit here, so reserve
|
|
|
+ reserve(request.getPriority(), node, container, reserved);
|
|
|
+
|
|
|
+ return FairScheduler.CONTAINER_RESERVED;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
|
|
|
+ }
|
|
|
+
|
|
|
+ Collection<Priority> prioritiesToTry = (reserved) ?
|
|
|
+ Arrays.asList(node.getReservedContainer().getReservedPriority()) :
|
|
|
+ getPriorities();
|
|
|
+
|
|
|
+ // For each priority, see if we can schedule a node local, rack local
|
|
|
+ // or off-switch request. Rack of off-switch requests may be delayed
|
|
|
+ // (not scheduled) in order to promote better locality.
|
|
|
+ synchronized (this) {
|
|
|
+ for (Priority priority : prioritiesToTry) {
|
|
|
+ if (getTotalRequiredResources(priority) <= 0 ||
|
|
|
+ !hasContainerForNode(priority, node)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ addSchedulingOpportunity(priority);
|
|
|
+
|
|
|
+ // Check the AM resource usage for the leaf queue
|
|
|
+ if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
|
|
|
+ if (!getQueue().canRunAppAM(getAMResource())) {
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ResourceRequest rackLocalRequest = getResourceRequest(priority,
|
|
|
+ node.getRackName());
|
|
|
+ ResourceRequest localRequest = getResourceRequest(priority,
|
|
|
+ node.getNodeName());
|
|
|
+
|
|
|
+ if (localRequest != null && !localRequest.getRelaxLocality()) {
|
|
|
+ LOG.warn("Relax locality off is not supported on local request: "
|
|
|
+ + localRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ NodeType allowedLocality;
|
|
|
+ if (scheduler.isContinuousSchedulingEnabled()) {
|
|
|
+ allowedLocality = getAllowedLocalityLevelByTime(priority,
|
|
|
+ scheduler.getNodeLocalityDelayMs(),
|
|
|
+ scheduler.getRackLocalityDelayMs(),
|
|
|
+ scheduler.getClock().getTime());
|
|
|
+ } else {
|
|
|
+ allowedLocality = getAllowedLocalityLevel(priority,
|
|
|
+ scheduler.getNumClusterNodes(),
|
|
|
+ scheduler.getNodeLocalityThreshold(),
|
|
|
+ scheduler.getRackLocalityThreshold());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
|
|
+ && localRequest != null && localRequest.getNumContainers() != 0) {
|
|
|
+ return assignContainer(node, localRequest,
|
|
|
+ NodeType.NODE_LOCAL, reserved);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
|
|
+ && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
|
|
|
+ allowedLocality.equals(NodeType.OFF_SWITCH))) {
|
|
|
+ return assignContainer(node, rackLocalRequest,
|
|
|
+ NodeType.RACK_LOCAL, reserved);
|
|
|
+ }
|
|
|
+
|
|
|
+ ResourceRequest offSwitchRequest =
|
|
|
+ getResourceRequest(priority, ResourceRequest.ANY);
|
|
|
+ if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
|
|
|
+ && allowedLocality.equals(NodeType.OFF_SWITCH)) {
|
|
|
+ return assignContainer(node, offSwitchRequest,
|
|
|
+ NodeType.OFF_SWITCH, reserved);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Called when this application already has an existing reservation on the
|
|
|
+ * given node. Sees whether we can turn the reservation into an allocation.
|
|
|
+ * Also checks whether the application needs the reservation anymore, and
|
|
|
+ * releases it if not.
|
|
|
+ *
|
|
|
+ * @param node
|
|
|
+ * Node that the application has an existing reservation on
|
|
|
+ */
|
|
|
+ public Resource assignReservedContainer(FSSchedulerNode node) {
|
|
|
+ RMContainer rmContainer = node.getReservedContainer();
|
|
|
+ Priority priority = rmContainer.getReservedPriority();
|
|
|
+
|
|
|
+ // Make sure the application still needs requests at this priority
|
|
|
+ if (getTotalRequiredResources(priority) == 0) {
|
|
|
+ unreserve(priority, node);
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Fail early if the reserved container won't fit.
|
|
|
+ // Note that we have an assumption here that there's only one container size
|
|
|
+ // per priority.
|
|
|
+ if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
|
|
|
+ node.getAvailableResource())) {
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ return assignContainer(node, true);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether this app has containers requests that could be satisfied on the
|
|
|
+ * given node, if the node had full space.
|
|
|
+ */
|
|
|
+ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
|
|
|
+ ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY);
|
|
|
+ ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName());
|
|
|
+ ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName());
|
|
|
+
|
|
|
+ return
|
|
|
+ // There must be outstanding requests at the given priority:
|
|
|
+ anyRequest != null && anyRequest.getNumContainers() > 0 &&
|
|
|
+ // If locality relaxation is turned off at *-level, there must be a
|
|
|
+ // non-zero request for the node's rack:
|
|
|
+ (anyRequest.getRelaxLocality() ||
|
|
|
+ (rackRequest != null && rackRequest.getNumContainers() > 0)) &&
|
|
|
+ // If locality relaxation is turned off at rack-level, there must be a
|
|
|
+ // non-zero request at the node:
|
|
|
+ (rackRequest == null || rackRequest.getRelaxLocality() ||
|
|
|
+ (nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
|
|
|
+ // The requested container must be able to fit on the node:
|
|
|
+ Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
|
|
|
+ anyRequest.getCapability(), node.getRMNode().getTotalCapability());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ static class RMContainerComparator implements Comparator<RMContainer>,
|
|
|
+ Serializable {
|
|
|
+ @Override
|
|
|
+ public int compare(RMContainer c1, RMContainer c2) {
|
|
|
+ int ret = c1.getContainer().getPriority().compareTo(
|
|
|
+ c2.getContainer().getPriority());
|
|
|
+ if (ret == 0) {
|
|
|
+ return c2.getContainerId().compareTo(c1.getContainerId());
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Schedulable methods implementation */
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getName() {
|
|
|
+ return getApplicationId().toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getDemand() {
|
|
|
+ return demand;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long getStartTime() {
|
|
|
+ return startTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getMinShare() {
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getMaxShare() {
|
|
|
+ return Resources.unbounded();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getResourceUsage() {
|
|
|
+ // Here the getPreemptedResources() always return zero, except in
|
|
|
+ // a preemption round
|
|
|
+ return Resources.subtract(getCurrentConsumption(), getPreemptedResources());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ResourceWeights getWeights() {
|
|
|
+ return scheduler.getAppWeight(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Priority getPriority() {
|
|
|
+ // Right now per-app priorities are not passed to scheduler,
|
|
|
+ // so everyone has the same priority.
|
|
|
+ return priority;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getFairShare() {
|
|
|
+ return this.fairShare;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setFairShare(Resource fairShare) {
|
|
|
+ this.fairShare = fairShare;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isActive() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void updateDemand() {
|
|
|
+ demand = Resources.createResource(0);
|
|
|
+ // Demand is current consumption plus outstanding requests
|
|
|
+ Resources.addTo(demand, getCurrentConsumption());
|
|
|
+
|
|
|
+ // Add up outstanding resource requests
|
|
|
+ synchronized (this) {
|
|
|
+ for (Priority p : getPriorities()) {
|
|
|
+ for (ResourceRequest r : getResourceRequests(p).values()) {
|
|
|
+ Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
|
|
|
+ Resources.addTo(demand, total);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource assignContainer(FSSchedulerNode node) {
|
|
|
+ return assignContainer(node, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Preempt a running container according to the priority
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public RMContainer preemptContainer() {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("App " + getName() + " is going to preempt a running " +
|
|
|
+ "container");
|
|
|
+ }
|
|
|
+
|
|
|
+ RMContainer toBePreempted = null;
|
|
|
+ for (RMContainer container : getLiveContainers()) {
|
|
|
+ if (!getPreemptionContainers().contains(container) &&
|
|
|
+ (toBePreempted == null ||
|
|
|
+ comparator.compare(toBePreempted, container) > 0)) {
|
|
|
+ toBePreempted = container;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return toBePreempted;
|
|
|
+ }
|
|
|
+}
|