|
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
@@ -174,6 +175,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
rmContainer.getNodeLabelExpression(),
|
|
rmContainer.getNodeLabelExpression(),
|
|
getUser(), 1, containerResource);
|
|
getUser(), 1, containerResource);
|
|
this.attemptResourceUsage.decUsed(containerResource);
|
|
this.attemptResourceUsage.decUsed(containerResource);
|
|
|
|
+ getQueue().decUsedResource(containerResource);
|
|
|
|
|
|
// Clear resource utilization metrics cache.
|
|
// Clear resource utilization metrics cache.
|
|
lastMemoryAggregateAllocationUpdateTime = -1;
|
|
lastMemoryAggregateAllocationUpdateTime = -1;
|
|
@@ -468,6 +470,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
|
|
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
|
|
type, node, schedulerKey, container);
|
|
type, node, schedulerKey, container);
|
|
this.attemptResourceUsage.incUsed(container.getResource());
|
|
this.attemptResourceUsage.incUsed(container.getResource());
|
|
|
|
+ getQueue().incUsedResource(container.getResource());
|
|
|
|
|
|
// Update resource requests related to "request" and store in RMContainer
|
|
// Update resource requests related to "request" and store in RMContainer
|
|
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
|
|
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
|
|
@@ -651,6 +654,22 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
schedulerKey.getAllocationRequestId());
|
|
schedulerKey.getAllocationRequestId());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void recoverContainer(SchedulerNode node,
|
|
|
|
+ RMContainer rmContainer) {
|
|
|
|
+ try {
|
|
|
|
+ writeLock.lock();
|
|
|
|
+
|
|
|
|
+ super.recoverContainer(node, rmContainer);
|
|
|
|
+
|
|
|
|
+ if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
|
|
|
+ getQueue().incUsedResource(rmContainer.getContainer().getResource());
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ writeLock.unlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Reserve a spot for {@code container} on this {@code node}. If
|
|
* Reserve a spot for {@code container} on this {@code node}. If
|
|
* the container is {@code alreadyReserved} on the node, simply
|
|
* the container is {@code alreadyReserved} on the node, simply
|