|
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
@@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
|
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
|
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -121,6 +124,11 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
|
hasResourcesAvailable(allocatedContInfo.getPti())) {
|
|
hasResourcesAvailable(allocatedContInfo.getPti())) {
|
|
startAllocatedContainer(allocatedContInfo);
|
|
startAllocatedContainer(allocatedContInfo);
|
|
} else {
|
|
} else {
|
|
|
|
+ this.context.getNMStateStore().storeContainer(containerTokenIdentifier
|
|
|
|
+ .getContainerID(), request);
|
|
|
|
+ this.context.getNMStateStore().storeContainerQueued(
|
|
|
|
+ containerTokenIdentifier.getContainerID());
|
|
|
|
+
|
|
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
|
|
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
|
|
queuedGuaranteedContainers.add(allocatedContInfo);
|
|
queuedGuaranteedContainers.add(allocatedContInfo);
|
|
// Kill running opportunistic containers to make space for
|
|
// Kill running opportunistic containers to make space for
|
|
@@ -150,6 +158,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
|
this.context.getQueuingContext().getKilledQueuedContainers().put(
|
|
this.context.getQueuingContext().getKilledQueuedContainers().put(
|
|
containerTokenId,
|
|
containerTokenId,
|
|
"Queued container request removed by ApplicationMaster.");
|
|
"Queued container request removed by ApplicationMaster.");
|
|
|
|
+ this.context.getNMStateStore().storeContainerKilled(containerID);
|
|
} else {
|
|
} else {
|
|
// The container started execution in the meanwhile.
|
|
// The container started execution in the meanwhile.
|
|
try {
|
|
try {
|
|
@@ -447,6 +456,38 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
|
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
|
|
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Recover running or queued container.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ protected void recoverActiveContainer(
|
|
|
|
+ ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
|
|
|
|
+ RecoveredContainerState rcs) throws IOException {
|
|
|
|
+ if (rcs.getStatus() ==
|
|
|
|
+ RecoveredContainerStatus.QUEUED && !rcs.getKilled()) {
|
|
|
|
+ LOG.info(token.getContainerID()
|
|
|
|
+ + "will be added to the queued containers.");
|
|
|
|
+
|
|
|
|
+ AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
|
|
|
|
+ token, rcs.getStartRequest(), token.getExecutionType(),
|
|
|
|
+ token.getResource(), getConfig());
|
|
|
|
+
|
|
|
|
+ this.context.getQueuingContext().getQueuedContainers().put(
|
|
|
|
+ token.getContainerID(), token);
|
|
|
|
+
|
|
|
|
+ if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
|
|
|
|
+ queuedGuaranteedContainers.add(allocatedContInfo);
|
|
|
|
+ // Kill running opportunistic containers to make space for
|
|
|
|
+ // guaranteed container.
|
|
|
|
+ killOpportunisticContainers(allocatedContInfo);
|
|
|
|
+ } else {
|
|
|
|
+ queuedOpportunisticContainers.add(allocatedContInfo);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ super.recoverActiveContainer(launchContext, token, rcs);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public int getNumAllocatedGuaranteedContainers() {
|
|
public int getNumAllocatedGuaranteedContainers() {
|
|
return allocatedGuaranteedContainers.size();
|
|
return allocatedGuaranteedContainers.size();
|