|
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
@@ -73,7 +74,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
|
|
+import org.apache.hadoop.yarn.server.utils.Lock;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.util.concurrent.SettableFuture;
|
|
@@ -94,10 +100,14 @@ public abstract class AbstractYarnScheduler
|
|
|
protected Resource minimumAllocation;
|
|
|
|
|
|
protected volatile RMContext rmContext;
|
|
|
-
|
|
|
+
|
|
|
private volatile Priority maxClusterLevelAppPriority;
|
|
|
|
|
|
protected ActivitiesManager activitiesManager;
|
|
|
+ protected SchedulerHealth schedulerHealth = new SchedulerHealth();
|
|
|
+ protected volatile long lastNodeUpdateTime;
|
|
|
+
|
|
|
+ private volatile Clock clock;
|
|
|
|
|
|
/*
|
|
|
* All schedulers which are inheriting AbstractYarnScheduler should use
|
|
@@ -130,6 +140,7 @@ public abstract class AbstractYarnScheduler
|
|
|
*/
|
|
|
public AbstractYarnScheduler(String name) {
|
|
|
super(name);
|
|
|
+ clock = SystemClock.getInstance();
|
|
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
readLock = lock.readLock();
|
|
|
writeLock = lock.writeLock();
|
|
@@ -228,13 +239,25 @@ public abstract class AbstractYarnScheduler
|
|
|
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
|
|
|
}
|
|
|
|
|
|
+ public SchedulerHealth getSchedulerHealth() {
|
|
|
+ return this.schedulerHealth;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void setLastNodeUpdateTime(long time) {
|
|
|
+ this.lastNodeUpdateTime = time;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getLastNodeUpdateTime() {
|
|
|
+ return lastNodeUpdateTime;
|
|
|
+ }
|
|
|
+
|
|
|
protected void containerLaunchedOnNode(
|
|
|
ContainerId containerId, SchedulerNode node) {
|
|
|
try {
|
|
|
readLock.lock();
|
|
|
// Get the application for the finished container
|
|
|
- SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
|
|
|
- containerId);
|
|
|
+ SchedulerApplicationAttempt application =
|
|
|
+ getCurrentAttemptForContainer(containerId);
|
|
|
if (application == null) {
|
|
|
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
|
|
|
.getApplicationId() + " launched container " + containerId
|
|
@@ -249,7 +272,7 @@ public abstract class AbstractYarnScheduler
|
|
|
readLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
protected void containerIncreasedOnNode(ContainerId containerId,
|
|
|
SchedulerNode node, Container increasedContainerReportedByNM) {
|
|
|
/*
|
|
@@ -276,6 +299,7 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
|
|
|
increasedContainerReportedByNM.getResource()));
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
|
|
@@ -360,7 +384,7 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void recoverContainersOnNode(
|
|
|
+ public synchronized void recoverContainersOnNode(
|
|
|
List<NMContainerStatus> containerReports, RMNode nm) {
|
|
|
try {
|
|
|
writeLock.lock();
|
|
@@ -475,7 +499,7 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Recover resource request back from RMContainer when a container is
|
|
|
+ * Recover resource request back from RMContainer when a container is
|
|
|
* preempted before AM pulled the same. If container is pulled by
|
|
|
* AM, then RMContainer will not have resource request to recover.
|
|
|
* @param rmContainer rmContainer
|
|
@@ -621,7 +645,7 @@ public abstract class AbstractYarnScheduler
|
|
|
SchedulerApplicationAttempt attempt);
|
|
|
|
|
|
@Override
|
|
|
- public SchedulerNode getSchedulerNode(NodeId nodeId) {
|
|
|
+ public N getSchedulerNode(NodeId nodeId) {
|
|
|
return nodeTracker.getNode(nodeId);
|
|
|
}
|
|
|
|
|
@@ -832,4 +856,152 @@ public abstract class AbstractYarnScheduler
|
|
|
return this.activitiesManager;
|
|
|
}
|
|
|
|
|
|
+ public Clock getClock() {
|
|
|
+ return clock;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public void setClock(Clock clock) {
|
|
|
+ this.clock = clock;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Lock(Lock.NoLock.class)
|
|
|
+ public SchedulerNode getNode(NodeId nodeId) {
|
|
|
+ return nodeTracker.getNode(nodeId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get lists of new containers from NodeManager and process them.
|
|
|
+ * @param nm The RMNode corresponding to the NodeManager
|
|
|
+ * @return list of completed containers
|
|
|
+ */
|
|
|
+ protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
|
|
|
+ SchedulerNode node = getNode(nm.getNodeID());
|
|
|
+
|
|
|
+ List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
|
|
+ List<ContainerStatus> newlyLaunchedContainers =
|
|
|
+ new ArrayList<>();
|
|
|
+ List<ContainerStatus> completedContainers =
|
|
|
+ new ArrayList<>();
|
|
|
+
|
|
|
+ for(UpdatedContainerInfo containerInfo : containerInfoList) {
|
|
|
+ newlyLaunchedContainers
|
|
|
+ .addAll(containerInfo.getNewlyLaunchedContainers());
|
|
|
+ completedContainers.addAll(containerInfo.getCompletedContainers());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Processing the newly launched containers
|
|
|
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
|
|
|
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Processing the newly increased containers
|
|
|
+ List<Container> newlyIncreasedContainers =
|
|
|
+ nm.pullNewlyIncreasedContainers();
|
|
|
+ for (Container container : newlyIncreasedContainers) {
|
|
|
+ containerIncreasedOnNode(container.getId(), node, container);
|
|
|
+ }
|
|
|
+
|
|
|
+ return completedContainers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process completed container list.
|
|
|
+ * @param completedContainers Extracted list of completed containers
|
|
|
+ * @param releasedResources Reference resource object for completed containers
|
|
|
+ * @return The total number of released containers
|
|
|
+ */
|
|
|
+ protected int updateCompletedContainers(List<ContainerStatus>
|
|
|
+ completedContainers, Resource releasedResources) {
|
|
|
+ int releasedContainers = 0;
|
|
|
+ for (ContainerStatus completedContainer : completedContainers) {
|
|
|
+ ContainerId containerId = completedContainer.getContainerId();
|
|
|
+ LOG.debug("Container FINISHED: " + containerId);
|
|
|
+ RMContainer container = getRMContainer(containerId);
|
|
|
+ completedContainer(getRMContainer(containerId),
|
|
|
+ completedContainer, RMContainerEventType.FINISHED);
|
|
|
+ if (container != null) {
|
|
|
+ releasedContainers++;
|
|
|
+ Resource ars = container.getAllocatedResource();
|
|
|
+ if (ars != null) {
|
|
|
+ Resources.addTo(releasedResources, ars);
|
|
|
+ }
|
|
|
+ Resource rrs = container.getReservedResource();
|
|
|
+ if (rrs != null) {
|
|
|
+ Resources.addTo(releasedResources, rrs);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return releasedContainers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Update schedulerHealth information.
|
|
|
+ * @param releasedResources Reference resource object for completed containers
|
|
|
+ * @param releasedContainers Count of released containers
|
|
|
+ */
|
|
|
+ protected void updateSchedulerHealthInformation(Resource releasedResources,
|
|
|
+ int releasedContainers) {
|
|
|
+
|
|
|
+ schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(),
|
|
|
+ releasedResources);
|
|
|
+ schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Update container and utilization information on the NodeManager.
|
|
|
+ * @param nm The NodeManager to update
|
|
|
+ */
|
|
|
+ protected void updateNodeResourceUtilization(RMNode nm) {
|
|
|
+ SchedulerNode node = getNode(nm.getNodeID());
|
|
|
+ // Updating node resource utilization
|
|
|
+ node.setAggregatedContainersUtilization(
|
|
|
+ nm.getAggregatedContainersUtilization());
|
|
|
+ node.setNodeUtilization(nm.getNodeUtilization());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process a heartbeat update from a node.
|
|
|
+ * @param nm The RMNode corresponding to the NodeManager
|
|
|
+ */
|
|
|
+ protected synchronized void nodeUpdate(RMNode nm) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("nodeUpdate: " + nm +
|
|
|
+ " cluster capacity: " + getClusterResource());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process new container information
|
|
|
+ List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
|
|
|
+
|
|
|
+ // Process completed containers
|
|
|
+ Resource releasedResources = Resource.newInstance(0, 0);
|
|
|
+ int releasedContainers = updateCompletedContainers(completedContainers,
|
|
|
+ releasedResources);
|
|
|
+
|
|
|
+ // If the node is decommissioning, send an update to have the total
|
|
|
+ // resource equal to the used resource, so no available resource to
|
|
|
+ // schedule.
|
|
|
+ // TODO YARN-5128: Fix possible race-condition when request comes in before
|
|
|
+ // update is propagated
|
|
|
+ if (nm.getState() == NodeState.DECOMMISSIONING) {
|
|
|
+ this.rmContext
|
|
|
+ .getDispatcher()
|
|
|
+ .getEventHandler()
|
|
|
+ .handle(
|
|
|
+ new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
|
|
|
+ .newInstance(getSchedulerNode(nm.getNodeID())
|
|
|
+ .getAllocatedResource(), 0)));
|
|
|
+ }
|
|
|
+
|
|
|
+ updateSchedulerHealthInformation(releasedResources, releasedContainers);
|
|
|
+ updateNodeResourceUtilization(nm);
|
|
|
+
|
|
|
+ // Now node data structures are up-to-date and ready for scheduling.
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ SchedulerNode node = getNode(nm.getNodeID());
|
|
|
+ LOG.debug("Node being looked for scheduling " + nm +
|
|
|
+ " availableResource: " + node.getUnallocatedResource());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|