|
@@ -30,6 +30,9 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -71,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException;
|
|
|
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
|
|
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
@@ -83,6 +87,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.NodeManagerEventType;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
|
@@ -120,6 +125,11 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
ServiceStateChangeListener, ContainerManagementProtocol,
|
|
|
EventHandler<ContainerManagerEvent> {
|
|
|
|
|
|
+ /**
|
|
|
+ * Extra duration to wait for applications to be killed on shutdown.
|
|
|
+ */
|
|
|
+ private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
|
|
|
+
|
|
|
private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
|
|
|
|
|
|
final Context context;
|
|
@@ -138,6 +148,11 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
|
|
|
private final DeletionService deletionService;
|
|
|
private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
|
|
|
+ private boolean serviceStopped = false;
|
|
|
+ private final ReadLock readLock;
|
|
|
+ private final WriteLock writeLock;
|
|
|
+
|
|
|
+ private long waitForContainersOnShutdownMillis;
|
|
|
|
|
|
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
|
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
|
@@ -181,6 +196,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
|
|
|
|
|
|
addService(dispatcher);
|
|
|
+
|
|
|
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
+ this.readLock = lock.readLock();
|
|
|
+ this.writeLock = lock.writeLock();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -190,6 +209,13 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
addIfService(logHandler);
|
|
|
dispatcher.register(LogHandlerEventType.class, logHandler);
|
|
|
|
|
|
+ waitForContainersOnShutdownMillis =
|
|
|
+ conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
|
|
|
+ conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
|
|
|
+ SHUTDOWN_CLEANUP_SLOP_MS;
|
|
|
+
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -275,6 +301,16 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
|
|
|
@Override
|
|
|
public void serviceStop() throws Exception {
|
|
|
+ setBlockNewContainerRequests(true);
|
|
|
+ this.writeLock.lock();
|
|
|
+ try {
|
|
|
+ serviceStopped = true;
|
|
|
+ if (context != null) {
|
|
|
+ cleanUpApplications(NodeManagerEventType.SHUTDOWN);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ this.writeLock.unlock();
|
|
|
+ }
|
|
|
if (auxiliaryServices.getServiceState() == STARTED) {
|
|
|
auxiliaryServices.unregisterServiceListener(this);
|
|
|
}
|
|
@@ -284,6 +320,60 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
|
+ public void cleanUpApplications(NodeManagerEventType eventType) {
|
|
|
+ Map<ApplicationId, Application> applications =
|
|
|
+ this.context.getApplications();
|
|
|
+ if (applications.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LOG.info("Applications still running : " + applications.keySet());
|
|
|
+
|
|
|
+ List<ApplicationId> appIds =
|
|
|
+ new ArrayList<ApplicationId>(applications.keySet());
|
|
|
+ this.handle(
|
|
|
+ new CMgrCompletedAppsEvent(appIds,
|
|
|
+ CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
|
|
|
+
|
|
|
+ LOG.info("Waiting for Applications to be Finished");
|
|
|
+
|
|
|
+ switch (eventType) {
|
|
|
+ case SHUTDOWN:
|
|
|
+ long waitStartTime = System.currentTimeMillis();
|
|
|
+ while (!applications.isEmpty()
|
|
|
+ && System.currentTimeMillis() - waitStartTime
|
|
|
+ < waitForContainersOnShutdownMillis) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ LOG.warn("Interrupted while sleeping on applications finish on shutdown",
|
|
|
+ ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case RESYNC:
|
|
|
+ while (!applications.isEmpty()) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ LOG.warn("Interrupted while sleeping on applications finish on resync",
|
|
|
+ ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new YarnRuntimeException("Get an unknown NodeManagerEventType: "
|
|
|
+ + eventType);
|
|
|
+ }
|
|
|
+
|
|
|
+ // All applications Finished
|
|
|
+ if (applications.isEmpty()) {
|
|
|
+ LOG.info("All applications in FINISHED state");
|
|
|
+ } else {
|
|
|
+ LOG.info("Done waiting for Applications to be Finished. Still alive: " +
|
|
|
+ applications.keySet());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Get the remoteUGI corresponding to the api call.
|
|
|
protected UserGroupInformation getRemoteUgi()
|
|
|
throws YarnException {
|
|
@@ -432,14 +522,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
* correct RMIdentifier. d) It is not expired.
|
|
|
*/
|
|
|
authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
|
|
|
-
|
|
|
+
|
|
|
if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater
|
|
|
- .getRMIdentifier()) {
|
|
|
- // Is the container coming from unknown RM
|
|
|
- StringBuilder sb = new StringBuilder("\nContainer ");
|
|
|
- sb.append(containerTokenIdentifier.getContainerID().toString())
|
|
|
- .append(" rejected as it is allocated by a previous RM");
|
|
|
- throw new InvalidContainerException(sb.toString());
|
|
|
+ .getRMIdentifier()) {
|
|
|
+ // Is the container coming from unknown RM
|
|
|
+ StringBuilder sb = new StringBuilder("\nContainer ");
|
|
|
+ sb.append(containerTokenIdentifier.getContainerID().toString()).append(
|
|
|
+ " rejected as it is allocated by a previous RM");
|
|
|
+ throw new InvalidContainerException(sb.toString());
|
|
|
}
|
|
|
// update NMToken
|
|
|
updateNMTokenIdentifier(nmTokenIdentifier);
|
|
@@ -453,13 +543,13 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
|
|
|
|
|
Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
|
|
|
- if (launchContext.getServiceData()!=null &&
|
|
|
- !launchContext.getServiceData().isEmpty()) {
|
|
|
+ if (launchContext.getServiceData() != null
|
|
|
+ && !launchContext.getServiceData().isEmpty()) {
|
|
|
for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
|
|
|
- .entrySet()) {
|
|
|
+ .entrySet()) {
|
|
|
if (null == serviceData.get(meta.getKey())) {
|
|
|
- throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
|
|
|
- + " does not exist");
|
|
|
+ throw new InvalidAuxServiceException("The auxService:"
|
|
|
+ + meta.getKey() + " does not exist");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -479,30 +569,42 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
+ " already is running on this node!!");
|
|
|
}
|
|
|
|
|
|
- // Create the application
|
|
|
- Application application =
|
|
|
- new ApplicationImpl(dispatcher, this.aclsManager, user, applicationID,
|
|
|
- credentials, context);
|
|
|
- if (null == context.getApplications().putIfAbsent(applicationID,
|
|
|
- application)) {
|
|
|
- LOG.info("Creating a new application reference for app " + applicationID);
|
|
|
-
|
|
|
- dispatcher.getEventHandler().handle(
|
|
|
- new ApplicationInitEvent(applicationID, container.getLaunchContext()
|
|
|
- .getApplicationACLs()));
|
|
|
- }
|
|
|
+ this.readLock.lock();
|
|
|
+ try {
|
|
|
+ if (!serviceStopped) {
|
|
|
+ // Create the application
|
|
|
+ Application application =
|
|
|
+ new ApplicationImpl(dispatcher, this.aclsManager, user,
|
|
|
+ applicationID, credentials, context);
|
|
|
+ if (null == context.getApplications().putIfAbsent(applicationID,
|
|
|
+ application)) {
|
|
|
+ LOG.info("Creating a new application reference for app "
|
|
|
+ + applicationID);
|
|
|
+
|
|
|
+ dispatcher.getEventHandler().handle(
|
|
|
+ new ApplicationInitEvent(applicationID, container
|
|
|
+ .getLaunchContext().getApplicationACLs()));
|
|
|
+ }
|
|
|
|
|
|
- dispatcher.getEventHandler().handle(
|
|
|
- new ApplicationContainerInitEvent(container));
|
|
|
+ dispatcher.getEventHandler().handle(
|
|
|
+ new ApplicationContainerInitEvent(container));
|
|
|
|
|
|
- this.context.getContainerTokenSecretManager().startContainerSuccessful(
|
|
|
- containerTokenIdentifier);
|
|
|
- NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
|
|
|
- "ContainerManageImpl", applicationID, containerId);
|
|
|
- // TODO launchedContainer misplaced -> doesn't necessarily mean a container
|
|
|
- // launch. A finished Application will not launch containers.
|
|
|
- metrics.launchedContainer();
|
|
|
- metrics.allocateContainer(containerTokenIdentifier.getResource());
|
|
|
+ this.context.getContainerTokenSecretManager().startContainerSuccessful(
|
|
|
+ containerTokenIdentifier);
|
|
|
+ NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
|
|
|
+ "ContainerManageImpl", applicationID, containerId);
|
|
|
+ // TODO launchedContainer misplaced -> doesn't necessarily mean a
|
|
|
+ // container
|
|
|
+ // launch. A finished Application will not launch containers.
|
|
|
+ metrics.launchedContainer();
|
|
|
+ metrics.allocateContainer(containerTokenIdentifier.getResource());
|
|
|
+ } else {
|
|
|
+ throw new YarnException("Container start failed as the NodeManager is "
|
|
|
+ + "in the process of shutting down");
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
|
|
@@ -727,9 +829,15 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
CMgrCompletedAppsEvent appsFinishedEvent =
|
|
|
(CMgrCompletedAppsEvent) event;
|
|
|
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
|
|
|
+ String diagnostic = "";
|
|
|
+ if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
|
|
|
+ diagnostic = "Application killed on shutdown";
|
|
|
+ } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
|
|
|
+ diagnostic = "Application killed by ResourceManager";
|
|
|
+ }
|
|
|
this.dispatcher.getEventHandler().handle(
|
|
|
new ApplicationFinishEvent(appID,
|
|
|
- "Application Killed by ResourceManager"));
|
|
|
+ diagnostic));
|
|
|
}
|
|
|
break;
|
|
|
case FINISH_CONTAINERS:
|
|
@@ -737,20 +845,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
(CMgrCompletedContainersEvent) event;
|
|
|
for (ContainerId container : containersFinishedEvent
|
|
|
.getContainersToCleanup()) {
|
|
|
- String diagnostic = "";
|
|
|
- if (containersFinishedEvent.getReason() ==
|
|
|
- CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) {
|
|
|
- diagnostic = "Container Killed on Shutdown";
|
|
|
- } else if (containersFinishedEvent.getReason() ==
|
|
|
- CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
|
|
|
- diagnostic = "Container Killed by ResourceManager";
|
|
|
- }
|
|
|
- this.dispatcher.getEventHandler().handle(
|
|
|
- new ContainerKillEvent(container, diagnostic));
|
|
|
+ this.dispatcher.getEventHandler().handle(
|
|
|
+ new ContainerKillEvent(container,
|
|
|
+ "Container Killed by ResourceManager"));
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
|
- LOG.warn("Invalid event " + event.getType() + ". Ignoring.");
|
|
|
+ throw new YarnRuntimeException(
|
|
|
+ "Get an unknown ContainerManagerEvent type: " + event.getType());
|
|
|
}
|
|
|
}
|
|
|
|