|
@@ -21,10 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
|
|
import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -96,7 +96,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
|
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
|
import org.apache.hadoop.yarn.service.Service;
|
|
|
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
|
|
@@ -110,14 +109,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
final Context context;
|
|
|
private final ContainersMonitor containersMonitor;
|
|
|
private Server server;
|
|
|
- private InetAddress resolvedAddress = null;
|
|
|
private final ResourceLocalizationService rsrcLocalizationSrvc;
|
|
|
private final ContainersLauncher containersLauncher;
|
|
|
private final AuxServices auxiliaryServices;
|
|
|
private final NodeManagerMetrics metrics;
|
|
|
|
|
|
private final NodeStatusUpdater nodeStatusUpdater;
|
|
|
- private ContainerTokenSecretManager containerTokenSecretManager;
|
|
|
|
|
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
|
|
@@ -129,8 +126,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
|
|
|
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
|
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
|
|
- NodeManagerMetrics metrics, ContainerTokenSecretManager
|
|
|
- containerTokenSecretManager, ApplicationACLsManager aclsManager,
|
|
|
+ NodeManagerMetrics metrics, ApplicationACLsManager aclsManager,
|
|
|
LocalDirsHandlerService dirsHandler) {
|
|
|
super(ContainerManagerImpl.class.getName());
|
|
|
this.context = context;
|
|
@@ -149,7 +145,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
addService(containersLauncher);
|
|
|
|
|
|
this.nodeStatusUpdater = nodeStatusUpdater;
|
|
|
- this.containerTokenSecretManager = containerTokenSecretManager;
|
|
|
this.aclsManager = aclsManager;
|
|
|
|
|
|
// Start configurable services
|
|
@@ -232,7 +227,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
|
|
|
server =
|
|
|
rpc.getServer(ContainerManager.class, this, initialAddress, conf,
|
|
|
- this.containerTokenSecretManager,
|
|
|
+ this.context.getContainerTokenSecretManager(),
|
|
|
conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
|
|
|
YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
|
|
|
|
|
@@ -267,56 +262,78 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
super.stop();
|
|
|
}
|
|
|
|
|
|
+ // Get the remoteUGI corresponding to the api call.
|
|
|
+ private UserGroupInformation getRemoteUgi(String containerIDStr)
|
|
|
+ throws YarnRemoteException {
|
|
|
+ UserGroupInformation remoteUgi;
|
|
|
+ try {
|
|
|
+ remoteUgi = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ String msg = "Cannot obtain the user-name for containerId: "
|
|
|
+ + containerIDStr + ". Got exception: "
|
|
|
+ + StringUtils.stringifyException(e);
|
|
|
+ LOG.warn(msg);
|
|
|
+ throw RPCUtil.getRemoteException(msg);
|
|
|
+ }
|
|
|
+ return remoteUgi;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Obtain the needed ContainerTokenIdentifier from the remote-UGI. RPC layer
|
|
|
+ // currently sets only the required id, but iterate through anyways just to
|
|
|
+ // be sure.
|
|
|
+ private ContainerTokenIdentifier selectContainerTokenIdentifier(
|
|
|
+ UserGroupInformation remoteUgi) {
|
|
|
+ Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
|
|
|
+ ContainerTokenIdentifier resultId = null;
|
|
|
+ for (TokenIdentifier id : tokenIdentifiers) {
|
|
|
+ if (id instanceof ContainerTokenIdentifier) {
|
|
|
+ resultId = (ContainerTokenIdentifier) id;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return resultId;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Authorize the request.
|
|
|
*
|
|
|
- * @param containerID
|
|
|
+ * @param containerIDStr
|
|
|
* of the container
|
|
|
* @param launchContext
|
|
|
* passed if verifying the startContainer, null otherwise.
|
|
|
+ * @param remoteUgi
|
|
|
+ * ugi corresponding to the remote end making the api-call
|
|
|
* @throws YarnRemoteException
|
|
|
*/
|
|
|
- private void authorizeRequest(ContainerId containerID,
|
|
|
- ContainerLaunchContext launchContext) throws YarnRemoteException {
|
|
|
+ private void authorizeRequest(String containerIDStr,
|
|
|
+ ContainerLaunchContext launchContext, UserGroupInformation remoteUgi)
|
|
|
+ throws YarnRemoteException {
|
|
|
|
|
|
if (!UserGroupInformation.isSecurityEnabled()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- String containerIDStr = containerID.toString();
|
|
|
-
|
|
|
- UserGroupInformation remoteUgi;
|
|
|
- try {
|
|
|
- remoteUgi = UserGroupInformation.getCurrentUser();
|
|
|
- } catch (IOException e) {
|
|
|
- String msg = "Cannot obtain the user-name for containerId: "
|
|
|
- + containerIDStr + ". Got exception: "
|
|
|
- + StringUtils.stringifyException(e);
|
|
|
- LOG.warn(msg);
|
|
|
- throw RPCUtil.getRemoteException(msg);
|
|
|
- }
|
|
|
-
|
|
|
boolean unauthorized = false;
|
|
|
- StringBuilder messageBuilder = new StringBuilder(
|
|
|
- "Unauthorized request to start container. ");
|
|
|
+ StringBuilder messageBuilder =
|
|
|
+ new StringBuilder("Unauthorized request to start container. ");
|
|
|
|
|
|
if (!remoteUgi.getUserName().equals(containerIDStr)) {
|
|
|
unauthorized = true;
|
|
|
messageBuilder.append("\nExpected containerId: "
|
|
|
+ remoteUgi.getUserName() + " Found: " + containerIDStr);
|
|
|
- }
|
|
|
-
|
|
|
- if (launchContext != null) {
|
|
|
-
|
|
|
- // Verify other things for startContainer() request.
|
|
|
+ } else if (launchContext != null) {
|
|
|
+ // Verify other things also for startContainer() request.
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
|
|
|
- + remoteUgi.getTokenIdentifiers().size());
|
|
|
+ LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
|
|
|
+ + remoteUgi.getTokenIdentifiers().size());
|
|
|
}
|
|
|
- // We must and should get only one TokenIdentifier from the RPC.
|
|
|
- ContainerTokenIdentifier tokenId = (ContainerTokenIdentifier) remoteUgi
|
|
|
- .getTokenIdentifiers().iterator().next();
|
|
|
+
|
|
|
+
|
|
|
+ // Get the tokenId from the remote user ugi
|
|
|
+ ContainerTokenIdentifier tokenId =
|
|
|
+ selectContainerTokenIdentifier(remoteUgi);
|
|
|
+
|
|
|
if (tokenId == null) {
|
|
|
unauthorized = true;
|
|
|
messageBuilder
|
|
@@ -324,6 +341,15 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
+ containerIDStr);
|
|
|
} else {
|
|
|
|
|
|
+ // Is the container being relaunched? Or RPC layer let startCall with
|
|
|
+ // tokens generated off old-secret through
|
|
|
+ if (!this.context.getContainerTokenSecretManager()
|
|
|
+ .isValidStartContainerRequest(tokenId)) {
|
|
|
+ unauthorized = true;
|
|
|
+ messageBuilder.append("\n Attempt to relaunch the same " +
|
|
|
+ "container with id " + containerIDStr + ".");
|
|
|
+ }
|
|
|
+
|
|
|
// Ensure the token is not expired.
|
|
|
// Token expiry is not checked for stopContainer/getContainerStatus
|
|
|
if (tokenId.getExpiryTimeStamp() < System.currentTimeMillis()) {
|
|
@@ -348,7 +374,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
throw RPCUtil.getRemoteException(msg);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Start a container on this NodeManager.
|
|
|
*/
|
|
@@ -359,10 +385,13 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
|
|
|
|
|
ContainerId containerID = launchContext.getContainerId();
|
|
|
- authorizeRequest(containerID, launchContext);
|
|
|
+ String containerIDStr = containerID.toString();
|
|
|
+
|
|
|
+ UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
|
|
+ authorizeRequest(containerIDStr, launchContext, remoteUgi);
|
|
|
|
|
|
- LOG.info("Start request for " + launchContext.getContainerId()
|
|
|
- + " by user " + launchContext.getUser());
|
|
|
+ LOG.info("Start request for " + containerIDStr + " by user "
|
|
|
+ + launchContext.getUser());
|
|
|
|
|
|
// //////////// Parse credentials
|
|
|
ByteBuffer tokens = launchContext.getContainerTokens();
|
|
@@ -394,14 +423,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
AuditConstants.START_CONTAINER, "ContainerManagerImpl",
|
|
|
"Container already running on this node!",
|
|
|
applicationID, containerID);
|
|
|
- throw RPCUtil.getRemoteException("Container " + containerID
|
|
|
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
|
|
|
+ " already is running on this node!!");
|
|
|
}
|
|
|
|
|
|
// Create the application
|
|
|
Application application =
|
|
|
new ApplicationImpl(dispatcher, this.aclsManager,
|
|
|
- launchContext.getUser(), applicationID, credentials, context);
|
|
|
+ launchContext.getUser(), applicationID, credentials, context);
|
|
|
if (null ==
|
|
|
context.getApplications().putIfAbsent(applicationID, application)) {
|
|
|
LOG.info("Creating a new application reference for app "
|
|
@@ -414,6 +443,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
// TODO: Validate the request
|
|
|
dispatcher.getEventHandler().handle(
|
|
|
new ApplicationContainerInitEvent(container));
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ ContainerTokenIdentifier tokenId =
|
|
|
+ selectContainerTokenIdentifier(remoteUgi);
|
|
|
+ this.context.getContainerTokenSecretManager().startContainerSuccessful(
|
|
|
+ tokenId);
|
|
|
+ }
|
|
|
|
|
|
NMAuditLogger.logSuccess(launchContext.getUser(),
|
|
|
AuditConstants.START_CONTAINER, "ContainerManageImpl",
|
|
@@ -438,8 +473,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
throws YarnRemoteException {
|
|
|
|
|
|
ContainerId containerID = request.getContainerId();
|
|
|
+ String containerIDStr = containerID.toString();
|
|
|
+
|
|
|
// TODO: Only the container's owner can kill containers today.
|
|
|
- authorizeRequest(containerID, null);
|
|
|
+
|
|
|
+ UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
|
|
+ authorizeRequest(containerIDStr, null, remoteUgi);
|
|
|
|
|
|
StopContainerResponse response =
|
|
|
recordFactory.newRecordInstance(StopContainerResponse.class);
|
|
@@ -476,10 +515,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
GetContainerStatusRequest request) throws YarnRemoteException {
|
|
|
|
|
|
ContainerId containerID = request.getContainerId();
|
|
|
+ String containerIDStr = containerID.toString();
|
|
|
+
|
|
|
// TODO: Only the container's owner can get containers' status today.
|
|
|
- authorizeRequest(containerID, null);
|
|
|
|
|
|
- LOG.info("Getting container-status for " + containerID);
|
|
|
+ UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
|
|
+ authorizeRequest(containerIDStr, null, remoteUgi);
|
|
|
+
|
|
|
+ LOG.info("Getting container-status for " + containerIDStr);
|
|
|
Container container = this.context.getContainers().get(containerID);
|
|
|
if (container != null) {
|
|
|
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
|
|
@@ -490,7 +533,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
- throw RPCUtil.getRemoteException("Container " + containerID
|
|
|
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
|
|
|
+ " is not handled by this NodeManager");
|
|
|
}
|
|
|
|