|
@@ -147,8 +147,7 @@ public class NMClientImpl extends NMClient {
|
|
|
private ContainerState state;
|
|
|
|
|
|
|
|
|
- public StartedContainer(ContainerId containerId, NodeId nodeId,
|
|
|
- Token containerToken) {
|
|
|
+ public StartedContainer(ContainerId containerId, NodeId nodeId) {
|
|
|
this.containerId = containerId;
|
|
|
this.nodeId = nodeId;
|
|
|
state = ContainerState.NEW;
|
|
@@ -170,8 +169,6 @@ public class NMClientImpl extends NMClient {
|
|
|
throw RPCUtil.getRemoteException("Container "
|
|
|
+ startedContainer.containerId.toString() + " is already started");
|
|
|
}
|
|
|
- startedContainers
|
|
|
- .put(startedContainer.getContainerId(), startedContainer);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -181,7 +178,8 @@ public class NMClientImpl extends NMClient {
|
|
|
// Do synchronization on StartedContainer to prevent race condition
|
|
|
// between startContainer and stopContainer only when startContainer is
|
|
|
// in progress for a given container.
|
|
|
- StartedContainer startingContainer = createStartedContainer(container);
|
|
|
+ StartedContainer startingContainer =
|
|
|
+ new StartedContainer(container.getId(), container.getNodeId());
|
|
|
synchronized (startingContainer) {
|
|
|
addStartingContainer(startingContainer);
|
|
|
|
|
@@ -212,15 +210,16 @@ public class NMClientImpl extends NMClient {
|
|
|
} catch (YarnException e) {
|
|
|
startingContainer.state = ContainerState.COMPLETE;
|
|
|
// Remove the started container if it failed to start
|
|
|
- removeStartedContainer(startingContainer);
|
|
|
+ startedContainers.remove(startingContainer.containerId);
|
|
|
throw e;
|
|
|
} catch (IOException e) {
|
|
|
startingContainer.state = ContainerState.COMPLETE;
|
|
|
- removeStartedContainer(startingContainer);
|
|
|
+ // Remove the started container if it failed to start
|
|
|
+ startedContainers.remove(startingContainer.containerId);
|
|
|
throw e;
|
|
|
} catch (Throwable t) {
|
|
|
startingContainer.state = ContainerState.COMPLETE;
|
|
|
- removeStartedContainer(startingContainer);
|
|
|
+ startedContainers.remove(startingContainer.containerId);
|
|
|
throw RPCUtil.getRemoteException(t);
|
|
|
} finally {
|
|
|
if (proxy != null) {
|
|
@@ -234,7 +233,7 @@ public class NMClientImpl extends NMClient {
|
|
|
@Override
|
|
|
public void stopContainer(ContainerId containerId, NodeId nodeId)
|
|
|
throws YarnException, IOException {
|
|
|
- StartedContainer startedContainer = getStartedContainer(containerId);
|
|
|
+ StartedContainer startedContainer = startedContainers.get(containerId);
|
|
|
|
|
|
// Only allow one request of stopping the container to move forward
|
|
|
// When entering the block, check whether the precursor has already stopped
|
|
@@ -247,7 +246,7 @@ public class NMClientImpl extends NMClient {
|
|
|
stopContainerInternal(containerId, nodeId);
|
|
|
// Only after successful
|
|
|
startedContainer.state = ContainerState.COMPLETE;
|
|
|
- removeStartedContainer(startedContainer);
|
|
|
+ startedContainers.remove(startedContainer.containerId);
|
|
|
}
|
|
|
} else {
|
|
|
stopContainerInternal(containerId, nodeId);
|
|
@@ -304,23 +303,6 @@ public class NMClientImpl extends NMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- protected synchronized StartedContainer createStartedContainer(
|
|
|
- Container container) throws YarnException, IOException {
|
|
|
- StartedContainer startedContainer = new StartedContainer(container.getId(),
|
|
|
- container.getNodeId(), container.getContainerToken());
|
|
|
- return startedContainer;
|
|
|
- }
|
|
|
-
|
|
|
- protected synchronized void
|
|
|
- removeStartedContainer(StartedContainer container) {
|
|
|
- startedContainers.remove(container.containerId);
|
|
|
- }
|
|
|
-
|
|
|
- protected synchronized StartedContainer getStartedContainer(
|
|
|
- ContainerId containerId) {
|
|
|
- return startedContainers.get(containerId);
|
|
|
- }
|
|
|
|
|
|
public AtomicBoolean getCleanupRunningContainers() {
|
|
|
return cleanupRunningContainers;
|