|
@@ -36,10 +36,9 @@ import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.yarn.Lock;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
@@ -59,11 +58,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
|
@@ -127,6 +126,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
|
|
|
private boolean initialized = false;
|
|
|
|
|
|
+ public CapacityScheduler() {}
|
|
|
+
|
|
|
public CSQueue getRootQueue() {
|
|
|
return root;
|
|
|
}
|
|
@@ -392,12 +393,20 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
|
|
|
// Release all the running containers
|
|
|
for (RMContainer rmContainer : application.getLiveContainers()) {
|
|
|
- completedContainer(rmContainer, RMContainerEventType.KILL);
|
|
|
+ completedContainer(rmContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ rmContainer.getContainerId(),
|
|
|
+ SchedulerUtils.COMPLETED_APPLICATION),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
// Release all reserved containers
|
|
|
for (RMContainer rmContainer : application.getAllReservedContainers()) {
|
|
|
- completedContainer(rmContainer, RMContainerEventType.KILL);
|
|
|
+ completedContainer(rmContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ rmContainer.getContainerId(),
|
|
|
+ "Application Complete"),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
// Clean up pending requests, metrics etc.
|
|
@@ -445,7 +454,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
"Trying to release container not owned by app or with invalid id",
|
|
|
application.getApplicationId(), releasedContainerId);
|
|
|
}
|
|
|
- completedContainer(rmContainer, RMContainerEventType.RELEASED);
|
|
|
+ completedContainer(rmContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ releasedContainerId,
|
|
|
+ SchedulerUtils.RELEASED_CONTAINER),
|
|
|
+ RMContainerEventType.RELEASED);
|
|
|
}
|
|
|
|
|
|
synchronized (application) {
|
|
@@ -521,22 +534,23 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
}
|
|
|
|
|
|
private synchronized void nodeUpdate(RMNode nm,
|
|
|
- Map<ApplicationId, List<Container>> containers ) {
|
|
|
+ List<ContainerStatus> newlyLaunchedContainers,
|
|
|
+ List<ContainerStatus> completedContainers) {
|
|
|
LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
|
|
|
|
|
|
SchedulerNode node = getNode(nm.getNodeID());
|
|
|
|
|
|
- // Processing the current containers running/finished on node
|
|
|
- for (List<Container> appContainers : containers.values()) {
|
|
|
- for (Container container : appContainers) {
|
|
|
- if (container.getState() == ContainerState.RUNNING) {
|
|
|
- containerLaunchedOnNode(container, node);
|
|
|
- } else { // has to be 'COMPLETE'
|
|
|
- LOG.info("DEBUG --- Container FINISHED: " + container.getId());
|
|
|
- completedContainer(getRMContainer(container.getId()),
|
|
|
- RMContainerEventType.FINISHED);
|
|
|
- }
|
|
|
- }
|
|
|
+ // Processing the newly launched containers
|
|
|
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
|
|
|
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process completed containers
|
|
|
+ for (ContainerStatus completedContainer : completedContainers) {
|
|
|
+ ContainerId containerId = completedContainer.getContainerId();
|
|
|
+ LOG.info("DEBUG --- Container FINISHED: " + containerId);
|
|
|
+ completedContainer(getRMContainer(containerId),
|
|
|
+ completedContainer, RMContainerEventType.FINISHED);
|
|
|
}
|
|
|
|
|
|
// Now node data structures are upto date and ready for scheduling.
|
|
@@ -571,18 +585,18 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void containerLaunchedOnNode(Container container, SchedulerNode node) {
|
|
|
+ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
|
|
|
// Get the application for the finished container
|
|
|
- ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
|
|
|
+ ApplicationAttemptId applicationAttemptId = containerId.getAppAttemptId();
|
|
|
SchedulerApp application = getApplication(applicationAttemptId);
|
|
|
if (application == null) {
|
|
|
LOG.info("Unknown application: " + applicationAttemptId +
|
|
|
- " launched container " + container.getId() +
|
|
|
+ " launched container " + containerId +
|
|
|
" on node: " + node);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- application.containerLaunchedOnNode(container.getId());
|
|
|
+ application.containerLaunchedOnNode(containerId);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -604,7 +618,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
{
|
|
|
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
|
|
nodeUpdate(nodeUpdatedEvent.getRMNode(),
|
|
|
- nodeUpdatedEvent.getContainers());
|
|
|
+ nodeUpdatedEvent.getNewlyLaunchedContainers(),
|
|
|
+ nodeUpdatedEvent.getCompletedContainers());
|
|
|
}
|
|
|
break;
|
|
|
case APP_ADDED:
|
|
@@ -625,7 +640,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
{
|
|
|
ContainerExpiredSchedulerEvent containerExpiredEvent =
|
|
|
(ContainerExpiredSchedulerEvent) event;
|
|
|
- completedContainer(getRMContainer(containerExpiredEvent.getContainerId()),
|
|
|
+ ContainerId containerId = containerExpiredEvent.getContainerId();
|
|
|
+ completedContainer(getRMContainer(containerId),
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ containerId,
|
|
|
+ SchedulerUtils.EXPIRED_CONTAINER),
|
|
|
RMContainerEventType.EXPIRE);
|
|
|
}
|
|
|
break;
|
|
@@ -652,13 +671,21 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
// Remove running containers
|
|
|
List<RMContainer> runningContainers = node.getRunningContainers();
|
|
|
for (RMContainer container : runningContainers) {
|
|
|
- completedContainer(container, RMContainerEventType.KILL);
|
|
|
+ completedContainer(container,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ container.getContainerId(),
|
|
|
+ SchedulerUtils.LOST_CONTAINER),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
// Remove reservations, if any
|
|
|
RMContainer reservedContainer = node.getReservedContainer();
|
|
|
if (reservedContainer != null) {
|
|
|
- completedContainer(reservedContainer, RMContainerEventType.KILL);
|
|
|
+ completedContainer(reservedContainer,
|
|
|
+ SchedulerUtils.createAbnormalContainerStatus(
|
|
|
+ reservedContainer.getContainerId(),
|
|
|
+ SchedulerUtils.LOST_CONTAINER),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
this.nodes.remove(nodeInfo.getNodeID());
|
|
@@ -667,8 +694,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
}
|
|
|
|
|
|
@Lock(CapacityScheduler.class)
|
|
|
- private synchronized void completedContainer(RMContainer rmContainer,
|
|
|
- RMContainerEventType event) {
|
|
|
+ private synchronized void completedContainer(RMContainer rmContainer,
|
|
|
+ ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
if (rmContainer == null) {
|
|
|
LOG.info("Null container completed...");
|
|
|
return;
|
|
@@ -692,7 +719,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
// Inform the queue
|
|
|
LeafQueue queue = (LeafQueue)application.getQueue();
|
|
|
queue.completedContainer(clusterResource, application, node,
|
|
|
- rmContainer, event);
|
|
|
+ rmContainer, containerStatus, event);
|
|
|
|
|
|
LOG.info("Application " + applicationAttemptId +
|
|
|
" released container " + container.getId() +
|