|
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.security.PrivilegedAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
@@ -38,23 +39,22 @@ import org.apache.commons.cli.Options;
|
|
|
import org.apache.commons.cli.ParseException;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
|
import org.apache.hadoop.yarn.api.ContainerExitStatus;
|
|
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
|
|
-
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
|
-
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
@@ -74,7 +74,9 @@ import org.apache.hadoop.yarn.client.AMRMClientAsync;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
+import org.apache.hadoop.yarn.util.ProtoUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
|
|
|
/**
|
|
@@ -663,10 +665,22 @@ public class ApplicationMaster {
|
|
|
+ container.getId());
|
|
|
String cmIpPortStr = container.getNodeId().getHost() + ":"
|
|
|
+ container.getNodeId().getPort();
|
|
|
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
|
|
|
+ final InetSocketAddress cmAddress =
|
|
|
+ NetUtils.createSocketAddr(cmIpPortStr);
|
|
|
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
|
|
|
- this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
|
|
|
- cmAddress, conf));
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.createRemoteUser(container.getId().toString());
|
|
|
+ Token<ContainerTokenIdentifier> token =
|
|
|
+ ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
|
|
|
+ cmAddress);
|
|
|
+ ugi.addToken(token);
|
|
|
+ this.cm = ugi.doAs(new PrivilegedAction<ContainerManager>() {
|
|
|
+ @Override
|
|
|
+ public ContainerManager run() {
|
|
|
+ return ((ContainerManager) rpc.getProxy(ContainerManager.class,
|
|
|
+ cmAddress, conf));
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Override
|