|
@@ -25,6 +25,7 @@ import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
|
@@ -54,11 +56,10 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
|
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
|
import org.apache.hadoop.yarn.service.Service;
|
|
|
-import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
|
|
|
-public class NodeManager extends CompositeService implements
|
|
|
- ServiceStateChangeListener {
|
|
|
+public class NodeManager extends CompositeService
|
|
|
+ implements EventHandler<NodeManagerEvent> {
|
|
|
|
|
|
/**
|
|
|
* Priority of the NodeManager shutdown hook.
|
|
@@ -82,6 +83,8 @@ public class NodeManager extends CompositeService implements
|
|
|
|
|
|
private long waitForContainersOnShutdownMillis;
|
|
|
|
|
|
+ private AtomicBoolean isStopping = new AtomicBoolean(false);
|
|
|
+
|
|
|
public NodeManager() {
|
|
|
super(NodeManager.class.getName());
|
|
|
}
|
|
@@ -152,7 +155,6 @@ public class NodeManager extends CompositeService implements
|
|
|
|
|
|
NodeStatusUpdater nodeStatusUpdater =
|
|
|
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
|
|
- nodeStatusUpdater.register(this);
|
|
|
|
|
|
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
|
|
addService(nodeResourceMonitor);
|
|
@@ -167,6 +169,7 @@ public class NodeManager extends CompositeService implements
|
|
|
addService(webServer);
|
|
|
|
|
|
dispatcher.register(ContainerManagerEventType.class, containerManager);
|
|
|
+ dispatcher.register(NodeManagerEventType.class, this);
|
|
|
addService(dispatcher);
|
|
|
|
|
|
DefaultMetricsSystem.initialize("NodeManager");
|
|
@@ -198,13 +201,17 @@ public class NodeManager extends CompositeService implements
|
|
|
|
|
|
@Override
|
|
|
public void stop() {
|
|
|
+ if (isStopping.getAndSet(true)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
cleanupContainers();
|
|
|
super.stop();
|
|
|
DefaultMetricsSystem.shutdown();
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- private void cleanupContainers() {
|
|
|
+ protected void cleanupContainers() {
|
|
|
Map<ContainerId, Container> containers = context.getContainers();
|
|
|
if (containers.isEmpty()) {
|
|
|
return;
|
|
@@ -293,24 +300,10 @@ public class NodeManager extends CompositeService implements
|
|
|
return nodeHealthChecker;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void stateChanged(Service service) {
|
|
|
- if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
|
|
|
- && STATE.STOPPED.equals(service.getServiceState())) {
|
|
|
-
|
|
|
- boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
|
|
|
-
|
|
|
- // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
|
|
|
- stop();
|
|
|
-
|
|
|
- // Reboot the whole node-manager if NodeStatusUpdater got a reboot command
|
|
|
- // from the RM.
|
|
|
- if (hasToReboot) {
|
|
|
- LOG.info("Rebooting the node manager.");
|
|
|
- NodeManager nodeManager = createNewNodeManager();
|
|
|
- nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot);
|
|
|
- }
|
|
|
- }
|
|
|
+ private void reboot() {
|
|
|
+ LOG.info("Rebooting the node manager.");
|
|
|
+ NodeManager nodeManager = createNewNodeManager();
|
|
|
+ nodeManager.initAndStartNodeManager(this.getConfig(), true);
|
|
|
}
|
|
|
|
|
|
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
|
|
@@ -333,6 +326,21 @@ public class NodeManager extends CompositeService implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void handle(NodeManagerEvent event) {
|
|
|
+ switch (event.getType()) {
|
|
|
+ case SHUTDOWN:
|
|
|
+ stop();
|
|
|
+ break;
|
|
|
+ case REBOOT:
|
|
|
+ stop();
|
|
|
+ reboot();
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// For testing
|
|
|
NodeManager createNewNodeManager() {
|
|
|
return new NodeManager();
|