|
@@ -18,10 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
@@ -30,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
@@ -38,92 +34,39 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
-import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
-import com.google.common.collect.HashMultiset;
|
|
|
-import com.google.common.collect.Multiset;
|
|
|
-
|
|
|
+/**
|
|
|
+ * Represents an application attempt from the viewpoint of the Fair Scheduler.
|
|
|
+ */
|
|
|
@Private
|
|
|
@Unstable
|
|
|
public class FSSchedulerApp extends SchedulerApplication {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
|
|
|
|
|
|
- private final RecordFactory recordFactory = RecordFactoryProvider
|
|
|
- .getRecordFactory(null);
|
|
|
-
|
|
|
- private final AppSchedulingInfo appSchedulingInfo;
|
|
|
private AppSchedulable appSchedulable;
|
|
|
- private final Queue queue;
|
|
|
-
|
|
|
- private final Resource currentConsumption = recordFactory
|
|
|
- .newRecordInstance(Resource.class);
|
|
|
- private Resource resourceLimit = recordFactory
|
|
|
- .newRecordInstance(Resource.class);
|
|
|
-
|
|
|
- private Map<ContainerId, RMContainer> liveContainers
|
|
|
- = new HashMap<ContainerId, RMContainer>();
|
|
|
- private List<RMContainer> newlyAllocatedContainers =
|
|
|
- new ArrayList<RMContainer>();
|
|
|
-
|
|
|
- final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
|
|
|
- new HashMap<Priority, Map<NodeId, RMContainer>>();
|
|
|
|
|
|
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
|
|
|
-
|
|
|
- /**
|
|
|
- * Count how many times the application has been given an opportunity
|
|
|
- * to schedule a task at each priority. Each time the scheduler
|
|
|
- * asks the application for a task at this priority, it is incremented,
|
|
|
- * and each time the application successfully schedules a task, it
|
|
|
- * is reset to 0.
|
|
|
- */
|
|
|
- Multiset<Priority> schedulingOpportunities = HashMultiset.create();
|
|
|
|
|
|
- Multiset<Priority> reReservations = HashMultiset.create();
|
|
|
-
|
|
|
- Resource currentReservation = recordFactory
|
|
|
- .newRecordInstance(Resource.class);
|
|
|
-
|
|
|
- private final RMContext rmContext;
|
|
|
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
|
|
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
|
|
RMContext rmContext) {
|
|
|
- this.rmContext = rmContext;
|
|
|
- this.appSchedulingInfo =
|
|
|
- new AppSchedulingInfo(applicationAttemptId, user, queue,
|
|
|
- activeUsersManager);
|
|
|
- this.queue = queue;
|
|
|
+ super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
|
|
|
}
|
|
|
|
|
|
- public ApplicationId getApplicationId() {
|
|
|
- return appSchedulingInfo.getApplicationId();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public ApplicationAttemptId getApplicationAttemptId() {
|
|
|
- return appSchedulingInfo.getApplicationAttemptId();
|
|
|
- }
|
|
|
-
|
|
|
public void setAppSchedulable(AppSchedulable appSchedulable) {
|
|
|
this.appSchedulable = appSchedulable;
|
|
|
}
|
|
@@ -132,83 +75,6 @@ public class FSSchedulerApp extends SchedulerApplication {
|
|
|
return appSchedulable;
|
|
|
}
|
|
|
|
|
|
- public String getUser() {
|
|
|
- return appSchedulingInfo.getUser();
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void updateResourceRequests(
|
|
|
- List<ResourceRequest> requests) {
|
|
|
- this.appSchedulingInfo.updateResourceRequests(requests);
|
|
|
- }
|
|
|
-
|
|
|
- public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
|
|
|
- return appSchedulingInfo.getResourceRequests(priority);
|
|
|
- }
|
|
|
-
|
|
|
- public int getNewContainerId() {
|
|
|
- return appSchedulingInfo.getNewContainerId();
|
|
|
- }
|
|
|
-
|
|
|
- public Collection<Priority> getPriorities() {
|
|
|
- return appSchedulingInfo.getPriorities();
|
|
|
- }
|
|
|
-
|
|
|
- public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
|
|
|
- return appSchedulingInfo.getResourceRequest(priority, nodeAddress);
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized int getTotalRequiredResources(Priority priority) {
|
|
|
- return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
|
|
|
- }
|
|
|
-
|
|
|
- public Resource getResource(Priority priority) {
|
|
|
- return appSchedulingInfo.getResource(priority);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Is this application pending?
|
|
|
- * @return true if it is else false.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public boolean isPending() {
|
|
|
- return appSchedulingInfo.isPending();
|
|
|
- }
|
|
|
-
|
|
|
- public String getQueueName() {
|
|
|
- return appSchedulingInfo.getQueueName();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get the list of live containers
|
|
|
- * @return All of the live containers
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized Collection<RMContainer> getLiveContainers() {
|
|
|
- return new ArrayList<RMContainer>(liveContainers.values());
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
|
|
- // Cleanup all scheduling information
|
|
|
- appSchedulingInfo.stop(rmAppAttemptFinalState);
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public synchronized void containerLaunchedOnNode(ContainerId containerId,
|
|
|
- NodeId nodeId) {
|
|
|
- // Inform the container
|
|
|
- RMContainer rmContainer =
|
|
|
- getRMContainer(containerId);
|
|
|
- if (rmContainer == null) {
|
|
|
- // Some unknown container sneaked into the system. Kill it.
|
|
|
- rmContext.getDispatcher().getEventHandler()
|
|
|
- .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- rmContainer.handle(new RMContainerEvent(containerId,
|
|
|
- RMContainerEventType.LAUNCHED));
|
|
|
- }
|
|
|
-
|
|
|
synchronized public void containerCompleted(RMContainer rmContainer,
|
|
|
ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
|
|
@@ -241,122 +107,6 @@ public class FSSchedulerApp extends SchedulerApplication {
|
|
|
preemptionMap.remove(rmContainer);
|
|
|
}
|
|
|
|
|
|
- synchronized public List<Container> pullNewlyAllocatedContainers() {
|
|
|
- List<Container> returnContainerList = new ArrayList<Container>(
|
|
|
- newlyAllocatedContainers.size());
|
|
|
- for (RMContainer rmContainer : newlyAllocatedContainers) {
|
|
|
- rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
|
|
|
- RMContainerEventType.ACQUIRED));
|
|
|
- returnContainerList.add(rmContainer.getContainer());
|
|
|
- }
|
|
|
- newlyAllocatedContainers.clear();
|
|
|
- return returnContainerList;
|
|
|
- }
|
|
|
-
|
|
|
- public Resource getCurrentConsumption() {
|
|
|
- return this.currentConsumption;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized public void showRequests() {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- for (Priority priority : getPriorities()) {
|
|
|
- Map<String, ResourceRequest> requests = getResourceRequests(priority);
|
|
|
- if (requests != null) {
|
|
|
- LOG.debug("showRequests:" + " application=" + getApplicationId() +
|
|
|
- " headRoom=" + getHeadroom() +
|
|
|
- " currentConsumption=" + currentConsumption.getMemory());
|
|
|
- for (ResourceRequest request : requests.values()) {
|
|
|
- LOG.debug("showRequests:" + " application=" + getApplicationId()
|
|
|
- + " request=" + request);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized RMContainer getRMContainer(ContainerId id) {
|
|
|
- return liveContainers.get(id);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized public void addSchedulingOpportunity(Priority priority) {
|
|
|
- schedulingOpportunities.setCount(priority,
|
|
|
- schedulingOpportunities.count(priority) + 1);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the number of times the application has been given an opportunity
|
|
|
- * to schedule a task at the given priority since the last time it
|
|
|
- * successfully did so.
|
|
|
- */
|
|
|
- synchronized public int getSchedulingOpportunities(Priority priority) {
|
|
|
- return schedulingOpportunities.count(priority);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void resetReReservations(Priority priority) {
|
|
|
- reReservations.setCount(priority, 0);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void addReReservation(Priority priority) {
|
|
|
- reReservations.add(priority);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized public int getReReservations(Priority priority) {
|
|
|
- return reReservations.count(priority);
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized int getNumReservedContainers(Priority priority) {
|
|
|
- Map<NodeId, RMContainer> reservedContainers =
|
|
|
- this.reservedContainers.get(priority);
|
|
|
- return (reservedContainers == null) ? 0 : reservedContainers.size();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get total current reservations.
|
|
|
- * Used only by unit tests
|
|
|
- * @return total current reservations
|
|
|
- */
|
|
|
- @VisibleForTesting
|
|
|
- public synchronized Resource getCurrentReservation() {
|
|
|
- return currentReservation;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority,
|
|
|
- RMContainer rmContainer, Container container) {
|
|
|
- // Create RMContainer if necessary
|
|
|
- if (rmContainer == null) {
|
|
|
- rmContainer =
|
|
|
- new RMContainerImpl(container, getApplicationAttemptId(),
|
|
|
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
|
|
|
- rmContext.getContainerAllocationExpirer());
|
|
|
-
|
|
|
- Resources.addTo(currentReservation, container.getResource());
|
|
|
-
|
|
|
- // Reset the re-reservation count
|
|
|
- resetReReservations(priority);
|
|
|
- } else {
|
|
|
- // Note down the re-reservation
|
|
|
- addReReservation(priority);
|
|
|
- }
|
|
|
- rmContainer.handle(new RMContainerReservedEvent(container.getId(),
|
|
|
- container.getResource(), node.getNodeID(), priority));
|
|
|
-
|
|
|
- Map<NodeId, RMContainer> reservedContainers =
|
|
|
- this.reservedContainers.get(priority);
|
|
|
- if (reservedContainers == null) {
|
|
|
- reservedContainers = new HashMap<NodeId, RMContainer>();
|
|
|
- this.reservedContainers.put(priority, reservedContainers);
|
|
|
- }
|
|
|
- reservedContainers.put(node.getNodeID(), rmContainer);
|
|
|
-
|
|
|
- LOG.info("Application " + getApplicationId()
|
|
|
- + " reserved container " + rmContainer
|
|
|
- + " on node " + node + ", currently has " + reservedContainers.size()
|
|
|
- + " at priority " + priority
|
|
|
- + "; currentReservation " + currentReservation.getMemory());
|
|
|
-
|
|
|
- return rmContainer;
|
|
|
- }
|
|
|
-
|
|
|
public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
|
|
|
Map<NodeId, RMContainer> reservedContainers =
|
|
|
this.reservedContainers.get(priority);
|
|
@@ -376,22 +126,6 @@ public class FSSchedulerApp extends SchedulerApplication {
|
|
|
+ priority + "; currentReservation " + currentReservation);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Has the application reserved the given <code>node</code> at the
|
|
|
- * given <code>priority</code>?
|
|
|
- * @param node node to be checked
|
|
|
- * @param priority priority of reserved container
|
|
|
- * @return true is reserved, false if not
|
|
|
- */
|
|
|
- public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) {
|
|
|
- Map<NodeId, RMContainer> reservedContainers =
|
|
|
- this.reservedContainers.get(priority);
|
|
|
- if (reservedContainers != null) {
|
|
|
- return reservedContainers.containsKey(node.getNodeID());
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
public synchronized float getLocalityWaitFactor(
|
|
|
Priority priority, int clusterNodes) {
|
|
|
// Estimate: Required unique resources (i.e. hosts + racks)
|
|
@@ -402,42 +136,7 @@ public class FSSchedulerApp extends SchedulerApplication {
|
|
|
// i.e. no point skipping more than clustersize opportunities
|
|
|
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Get the list of reserved containers
|
|
|
- * @return All of the reserved containers.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized List<RMContainer> getReservedContainers() {
|
|
|
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
|
|
|
- for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
|
|
|
- this.reservedContainers.entrySet()) {
|
|
|
- reservedContainers.addAll(e.getValue().values());
|
|
|
- }
|
|
|
- return reservedContainers;
|
|
|
- }
|
|
|
|
|
|
- public synchronized void setHeadroom(Resource globalLimit) {
|
|
|
- this.resourceLimit = globalLimit;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get available headroom in terms of resources for the application's user.
|
|
|
- * @return available resource headroom
|
|
|
- */
|
|
|
- public synchronized Resource getHeadroom() {
|
|
|
- // Corner case to deal with applications being slightly over-limit
|
|
|
- if (resourceLimit.getMemory() < 0) {
|
|
|
- resourceLimit.setMemory(0);
|
|
|
- }
|
|
|
-
|
|
|
- return resourceLimit;
|
|
|
- }
|
|
|
-
|
|
|
- public Queue getQueue() {
|
|
|
- return queue;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Delay scheduling: We often want to prioritize scheduling of node-local
|
|
|
* containers over rack-local or off-switch containers. To acheive this
|
|
@@ -453,26 +152,6 @@ public class FSSchedulerApp extends SchedulerApplication {
|
|
|
final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
|
|
|
Priority, NodeType>();
|
|
|
|
|
|
- // Time of the last container scheduled at the current allowed level
|
|
|
- Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
|
|
|
-
|
|
|
- /**
|
|
|
- * Should be called when an application has successfully scheduled a container,
|
|
|
- * or when the scheduling locality threshold is relaxed.
|
|
|
- * Reset various internal counters which affect delay scheduling
|
|
|
- *
|
|
|
- * @param priority The priority of the container scheduled.
|
|
|
- */
|
|
|
- synchronized public void resetSchedulingOpportunities(Priority priority) {
|
|
|
- resetSchedulingOpportunities(priority, System.currentTimeMillis());
|
|
|
- }
|
|
|
- // used for continuous scheduling
|
|
|
- synchronized public void resetSchedulingOpportunities(Priority priority,
|
|
|
- long currentTimeMs) {
|
|
|
- lastScheduledContainer.put(priority, currentTimeMs);
|
|
|
- schedulingOpportunities.setCount(priority, 0);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Return the level at which we are allowed to schedule containers, given the
|
|
|
* current size of the cluster and thresholds indicating how many nodes to
|