|
@@ -19,6 +19,9 @@
|
|
|
package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
@@ -61,14 +64,24 @@ public class NodeManager extends CompositeService implements
|
|
|
* Priority of the NodeManager shutdown hook.
|
|
|
*/
|
|
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Extra duration to wait for containers to be killed on shutdown.
|
|
|
+ */
|
|
|
+ private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(NodeManager.class);
|
|
|
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
|
|
private ApplicationACLsManager aclsManager;
|
|
|
private NodeHealthCheckerService nodeHealthChecker;
|
|
|
private LocalDirsHandlerService dirsHandler;
|
|
|
+ private Context context;
|
|
|
+ private AsyncDispatcher dispatcher;
|
|
|
+ private ContainerManagerImpl containerManager;
|
|
|
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
|
|
|
|
|
+ private long waitForContainersOnShutdownMillis;
|
|
|
+
|
|
|
public NodeManager() {
|
|
|
super(NodeManager.class.getName());
|
|
|
}
|
|
@@ -115,7 +128,7 @@ public class NodeManager extends CompositeService implements
|
|
|
containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
|
|
|
}
|
|
|
|
|
|
- Context context = new NMContext(containerTokenSecretManager);
|
|
|
+ this.context = new NMContext(containerTokenSecretManager);
|
|
|
|
|
|
this.aclsManager = new ApplicationACLsManager(conf);
|
|
|
|
|
@@ -131,7 +144,7 @@ public class NodeManager extends CompositeService implements
|
|
|
addService(del);
|
|
|
|
|
|
// NodeManager level dispatcher
|
|
|
- AsyncDispatcher dispatcher = new AsyncDispatcher();
|
|
|
+ this.dispatcher = new AsyncDispatcher();
|
|
|
|
|
|
nodeHealthChecker = new NodeHealthCheckerService();
|
|
|
addService(nodeHealthChecker);
|
|
@@ -144,7 +157,7 @@ public class NodeManager extends CompositeService implements
|
|
|
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
|
|
addService(nodeResourceMonitor);
|
|
|
|
|
|
- ContainerManagerImpl containerManager =
|
|
|
+ containerManager =
|
|
|
createContainerManager(context, exec, del, nodeStatusUpdater,
|
|
|
this.aclsManager, dirsHandler);
|
|
|
addService(containerManager);
|
|
@@ -155,13 +168,20 @@ public class NodeManager extends CompositeService implements
|
|
|
|
|
|
dispatcher.register(ContainerManagerEventType.class, containerManager);
|
|
|
addService(dispatcher);
|
|
|
-
|
|
|
+
|
|
|
DefaultMetricsSystem.initialize("NodeManager");
|
|
|
|
|
|
// StatusUpdater should be added last so that it get started last
|
|
|
// so that we make sure everything is up before registering with RM.
|
|
|
addService(nodeStatusUpdater);
|
|
|
-
|
|
|
+
|
|
|
+ waitForContainersOnShutdownMillis =
|
|
|
+ conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
|
|
|
+ conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
|
|
|
+ SHUTDOWN_CLEANUP_SLOP_MS;
|
|
|
+
|
|
|
super.init(conf);
|
|
|
// TODO add local dirs to del
|
|
|
}
|
|
@@ -178,9 +198,44 @@ public class NodeManager extends CompositeService implements
|
|
|
|
|
|
@Override
|
|
|
public void stop() {
|
|
|
+ cleanupContainers();
|
|
|
super.stop();
|
|
|
DefaultMetricsSystem.shutdown();
|
|
|
}
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void cleanupContainers() {
|
|
|
+ Map<ContainerId, Container> containers = context.getContainers();
|
|
|
+ if (containers.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LOG.info("Containers still running on shutdown: " + containers.keySet());
|
|
|
+
|
|
|
+ List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
|
|
|
+ dispatcher.getEventHandler().handle(
|
|
|
+ new CMgrCompletedContainersEvent(containerIds,
|
|
|
+ CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
|
|
|
+
|
|
|
+ LOG.info("Waiting for containers to be killed");
|
|
|
+
|
|
|
+ long waitStartTime = System.currentTimeMillis();
|
|
|
+ while (!containers.isEmpty() &&
|
|
|
+ System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ LOG.warn("Interrupted while sleeping on container kill", ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // All containers killed
|
|
|
+ if (containers.isEmpty()) {
|
|
|
+ LOG.info("All containers in DONE state");
|
|
|
+ } else {
|
|
|
+ LOG.info("Done waiting for containers to be killed. Still alive: " +
|
|
|
+ containers.keySet());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public static class NMContext implements Context {
|
|
|
|
|
@@ -282,6 +337,11 @@ public class NodeManager extends CompositeService implements
|
|
|
NodeManager createNewNodeManager() {
|
|
|
return new NodeManager();
|
|
|
}
|
|
|
+
|
|
|
+ // For testing
|
|
|
+ ContainerManagerImpl getContainerManager() {
|
|
|
+ return containerManager;
|
|
|
+ }
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|