|
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -76,6 +77,7 @@ public class ContainerImpl implements Container {
|
|
|
private final Credentials credentials;
|
|
|
private final NodeManagerMetrics metrics;
|
|
|
private final ContainerLaunchContext launchContext;
|
|
|
+ private final org.apache.hadoop.yarn.api.records.Container container;
|
|
|
private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS;
|
|
|
private final StringBuilder diagnostics;
|
|
|
|
|
@@ -96,12 +98,13 @@ public class ContainerImpl implements Container {
|
|
|
new ArrayList<LocalResourceRequest>();
|
|
|
|
|
|
public ContainerImpl(Configuration conf,
|
|
|
- Dispatcher dispatcher,
|
|
|
- ContainerLaunchContext launchContext, Credentials creds,
|
|
|
- NodeManagerMetrics metrics) {
|
|
|
+ Dispatcher dispatcher, ContainerLaunchContext launchContext,
|
|
|
+ org.apache.hadoop.yarn.api.records.Container container,
|
|
|
+ Credentials creds, NodeManagerMetrics metrics) {
|
|
|
this.daemonConf = conf;
|
|
|
this.dispatcher = dispatcher;
|
|
|
this.launchContext = launchContext;
|
|
|
+ this.container = container;
|
|
|
this.diagnostics = new StringBuilder();
|
|
|
this.credentials = creds;
|
|
|
this.metrics = metrics;
|
|
@@ -312,7 +315,7 @@ public class ContainerImpl implements Container {
|
|
|
public ContainerId getContainerID() {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
- return this.launchContext.getContainerId();
|
|
|
+ return this.container.getId();
|
|
|
} finally {
|
|
|
this.readLock.unlock();
|
|
|
}
|
|
@@ -373,50 +376,61 @@ public class ContainerImpl implements Container {
|
|
|
public ContainerStatus cloneAndGetContainerStatus() {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
- return BuilderUtils.newContainerStatus(this.getContainerID(),
|
|
|
+ return BuilderUtils.newContainerStatus(this.container.getId(),
|
|
|
getCurrentState(), diagnostics.toString(), exitCode);
|
|
|
} finally {
|
|
|
this.readLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Resource getResource() {
|
|
|
+ this.readLock.lock();
|
|
|
+ try {
|
|
|
+ return this.container.getResource();
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings({"fallthrough", "unchecked"})
|
|
|
private void finished() {
|
|
|
+ ContainerId containerID = this.container.getId();
|
|
|
+ String user = this.launchContext.getUser();
|
|
|
switch (getContainerState()) {
|
|
|
case EXITED_WITH_SUCCESS:
|
|
|
metrics.endRunningContainer();
|
|
|
metrics.completedContainer();
|
|
|
- NMAuditLogger.logSuccess(getUser(),
|
|
|
+ NMAuditLogger.logSuccess(user,
|
|
|
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
|
|
|
- getContainerID().getApplicationAttemptId().getApplicationId(),
|
|
|
- getContainerID());
|
|
|
+ containerID.getApplicationAttemptId()
|
|
|
+ .getApplicationId(), containerID);
|
|
|
break;
|
|
|
case EXITED_WITH_FAILURE:
|
|
|
metrics.endRunningContainer();
|
|
|
// fall through
|
|
|
case LOCALIZATION_FAILED:
|
|
|
metrics.failedContainer();
|
|
|
- NMAuditLogger.logFailure(getUser(),
|
|
|
+ NMAuditLogger.logFailure(user,
|
|
|
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
|
|
|
"Container failed with state: " + getContainerState(),
|
|
|
- getContainerID().getApplicationAttemptId().getApplicationId(),
|
|
|
- getContainerID());
|
|
|
+ containerID.getApplicationAttemptId()
|
|
|
+ .getApplicationId(), containerID);
|
|
|
break;
|
|
|
case CONTAINER_CLEANEDUP_AFTER_KILL:
|
|
|
metrics.endRunningContainer();
|
|
|
// fall through
|
|
|
case NEW:
|
|
|
metrics.killedContainer();
|
|
|
- NMAuditLogger.logSuccess(getUser(),
|
|
|
+ NMAuditLogger.logSuccess(user,
|
|
|
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
|
|
|
- getContainerID().getApplicationAttemptId().getApplicationId(),
|
|
|
- getContainerID());
|
|
|
+ containerID.getApplicationAttemptId().getApplicationId(),
|
|
|
+ containerID);
|
|
|
}
|
|
|
|
|
|
- metrics.releaseContainer(getLaunchContext().getResource());
|
|
|
+ metrics.releaseContainer(this.container.getResource());
|
|
|
|
|
|
// Inform the application
|
|
|
- ContainerId containerID = getContainerID();
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
EventHandler eventHandler = dispatcher.getEventHandler();
|
|
|
eventHandler.handle(new ApplicationContainerFinishedEvent(containerID));
|
|
@@ -475,7 +489,7 @@ public class ContainerImpl implements Container {
|
|
|
@Override
|
|
|
public ContainerState transition(ContainerImpl container,
|
|
|
ContainerEvent event) {
|
|
|
- final ContainerLaunchContext ctxt = container.getLaunchContext();
|
|
|
+ final ContainerLaunchContext ctxt = container.launchContext;
|
|
|
container.metrics.initingContainer();
|
|
|
|
|
|
// Inform the AuxServices about the opaque serviceData
|
|
@@ -486,9 +500,9 @@ public class ContainerImpl implements Container {
|
|
|
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
|
|
|
container.dispatcher.getEventHandler().handle(
|
|
|
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
|
|
|
- ctxt.getUser(),
|
|
|
- ctxt.getContainerId().getApplicationAttemptId().getApplicationId(),
|
|
|
- service.getKey().toString(), service.getValue()));
|
|
|
+ ctxt.getUser(), container.container.getId()
|
|
|
+ .getApplicationAttemptId().getApplicationId(),
|
|
|
+ service.getKey().toString(), service.getValue()));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -571,7 +585,7 @@ public class ContainerImpl implements Container {
|
|
|
container.pendingResources.remove(rsrcEvent.getResource());
|
|
|
if (null == syms) {
|
|
|
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
|
|
- " for container " + container.getContainerID());
|
|
|
+ " for container " + container.container.getId());
|
|
|
assert false;
|
|
|
// fail container?
|
|
|
return ContainerState.LOCALIZING;
|
|
@@ -599,14 +613,14 @@ public class ContainerImpl implements Container {
|
|
|
// Inform the ContainersMonitor to start monitoring the container's
|
|
|
// resource usage.
|
|
|
long pmemBytes =
|
|
|
- container.getLaunchContext().getResource().getMemory() * 1024 * 1024L;
|
|
|
+ container.container.getResource().getMemory() * 1024 * 1024L;
|
|
|
float pmemRatio = container.daemonConf.getFloat(
|
|
|
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
|
|
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
|
|
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
|
|
|
|
|
container.dispatcher.getEventHandler().handle(
|
|
|
- new ContainerStartMonitoringEvent(container.getContainerID(),
|
|
|
+ new ContainerStartMonitoringEvent(container.container.getId(),
|
|
|
vmemBytes, pmemBytes));
|
|
|
container.metrics.runningContainer();
|
|
|
}
|
|
@@ -740,7 +754,7 @@ public class ContainerImpl implements Container {
|
|
|
container.pendingResources.remove(rsrcEvent.getResource());
|
|
|
if (null == syms) {
|
|
|
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
|
|
- " for container " + container.getContainerID());
|
|
|
+ " for container " + container.container.getId());
|
|
|
assert false;
|
|
|
// fail container?
|
|
|
return;
|
|
@@ -845,10 +859,9 @@ public class ContainerImpl implements Container {
|
|
|
public String toString() {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
- return ConverterUtils.toString(launchContext.getContainerId());
|
|
|
+ return ConverterUtils.toString(container.getId());
|
|
|
} finally {
|
|
|
this.readLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|