|
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
|
|
import java.net.UnknownHostException;
|
|
import java.net.UnknownHostException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Collection;
|
|
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
@@ -102,7 +101,6 @@ import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.yarn.Clock;
|
|
import org.apache.hadoop.yarn.Clock;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
@@ -157,7 +155,7 @@ public abstract class TaskAttemptImpl implements
|
|
private final Lock readLock;
|
|
private final Lock readLock;
|
|
private final Lock writeLock;
|
|
private final Lock writeLock;
|
|
private final AppContext appContext;
|
|
private final AppContext appContext;
|
|
- private Collection<Token<? extends TokenIdentifier>> fsTokens;
|
|
|
|
|
|
+ private Credentials credentials;
|
|
private Token<JobTokenIdentifier> jobToken;
|
|
private Token<JobTokenIdentifier> jobToken;
|
|
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
|
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
|
private static String initialClasspath = null;
|
|
private static String initialClasspath = null;
|
|
@@ -458,7 +456,7 @@ public abstract class TaskAttemptImpl implements
|
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
|
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
|
|
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
|
|
Token<JobTokenIdentifier> jobToken,
|
|
Token<JobTokenIdentifier> jobToken,
|
|
- Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
|
|
|
|
|
+ Credentials credentials, Clock clock,
|
|
AppContext appContext) {
|
|
AppContext appContext) {
|
|
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
|
|
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
@@ -477,7 +475,7 @@ public abstract class TaskAttemptImpl implements
|
|
readLock = readWriteLock.readLock();
|
|
readLock = readWriteLock.readLock();
|
|
writeLock = readWriteLock.writeLock();
|
|
writeLock = readWriteLock.writeLock();
|
|
|
|
|
|
- this.fsTokens = fsTokens;
|
|
|
|
|
|
+ this.credentials = credentials;
|
|
this.jobToken = jobToken;
|
|
this.jobToken = jobToken;
|
|
this.eventHandler = eventHandler;
|
|
this.eventHandler = eventHandler;
|
|
this.committer = committer;
|
|
this.committer = committer;
|
|
@@ -554,7 +552,7 @@ public abstract class TaskAttemptImpl implements
|
|
Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
|
|
Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
|
|
Token<JobTokenIdentifier> jobToken,
|
|
Token<JobTokenIdentifier> jobToken,
|
|
final org.apache.hadoop.mapred.JobID oldJobId,
|
|
final org.apache.hadoop.mapred.JobID oldJobId,
|
|
- Collection<Token<? extends TokenIdentifier>> fsTokens) {
|
|
|
|
|
|
+ Credentials credentials) {
|
|
|
|
|
|
// Application resources
|
|
// Application resources
|
|
Map<String, LocalResource> localResources =
|
|
Map<String, LocalResource> localResources =
|
|
@@ -567,7 +565,7 @@ public abstract class TaskAttemptImpl implements
|
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
|
|
|
|
// Tokens
|
|
// Tokens
|
|
- ByteBuffer tokens = ByteBuffer.wrap(new byte[]{});
|
|
|
|
|
|
+ ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
|
|
try {
|
|
try {
|
|
FileSystem remoteFS = FileSystem.get(conf);
|
|
FileSystem remoteFS = FileSystem.get(conf);
|
|
|
|
|
|
@@ -609,16 +607,14 @@ public abstract class TaskAttemptImpl implements
|
|
// Setup DistributedCache
|
|
// Setup DistributedCache
|
|
MRApps.setupDistributedCache(conf, localResources);
|
|
MRApps.setupDistributedCache(conf, localResources);
|
|
|
|
|
|
- // Setup up tokens
|
|
|
|
|
|
+ // Setup up task credentials buffer
|
|
Credentials taskCredentials = new Credentials();
|
|
Credentials taskCredentials = new Credentials();
|
|
|
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
- // Add file-system tokens
|
|
|
|
- for (Token<? extends TokenIdentifier> token : fsTokens) {
|
|
|
|
- LOG.info("Putting fs-token for NM use for launching container : "
|
|
|
|
- + token.toString());
|
|
|
|
- taskCredentials.addToken(token.getService(), token);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.info("Adding #" + credentials.numberOfTokens()
|
|
|
|
+ + " tokens and #" + credentials.numberOfSecretKeys()
|
|
|
|
+ + " secret keys for NM use for launching container");
|
|
|
|
+ taskCredentials.addAll(credentials);
|
|
}
|
|
}
|
|
|
|
|
|
// LocalStorageToken is needed irrespective of whether security is enabled
|
|
// LocalStorageToken is needed irrespective of whether security is enabled
|
|
@@ -629,7 +625,7 @@ public abstract class TaskAttemptImpl implements
|
|
LOG.info("Size of containertokens_dob is "
|
|
LOG.info("Size of containertokens_dob is "
|
|
+ taskCredentials.numberOfTokens());
|
|
+ taskCredentials.numberOfTokens());
|
|
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
|
|
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
|
|
- tokens =
|
|
|
|
|
|
+ taskCredentialsBuffer =
|
|
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
|
|
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
|
|
containerTokens_dob.getLength());
|
|
containerTokens_dob.getLength());
|
|
|
|
|
|
@@ -674,7 +670,8 @@ public abstract class TaskAttemptImpl implements
|
|
ContainerLaunchContext container = BuilderUtils
|
|
ContainerLaunchContext container = BuilderUtils
|
|
.newContainerLaunchContext(null, conf
|
|
.newContainerLaunchContext(null, conf
|
|
.get(MRJobConfig.USER_NAME), null, localResources,
|
|
.get(MRJobConfig.USER_NAME), null, localResources,
|
|
- environment, null, serviceData, tokens, applicationACLs);
|
|
|
|
|
|
+ environment, null, serviceData, taskCredentialsBuffer,
|
|
|
|
+ applicationACLs);
|
|
|
|
|
|
return container;
|
|
return container;
|
|
}
|
|
}
|
|
@@ -686,12 +683,12 @@ public abstract class TaskAttemptImpl implements
|
|
final org.apache.hadoop.mapred.JobID oldJobId,
|
|
final org.apache.hadoop.mapred.JobID oldJobId,
|
|
Resource assignedCapability, WrappedJvmID jvmID,
|
|
Resource assignedCapability, WrappedJvmID jvmID,
|
|
TaskAttemptListener taskAttemptListener,
|
|
TaskAttemptListener taskAttemptListener,
|
|
- Collection<Token<? extends TokenIdentifier>> fsTokens) {
|
|
|
|
|
|
+ Credentials credentials) {
|
|
|
|
|
|
synchronized (commonContainerSpecLock) {
|
|
synchronized (commonContainerSpecLock) {
|
|
if (commonContainerSpec == null) {
|
|
if (commonContainerSpec == null) {
|
|
commonContainerSpec = createCommonContainerLaunchContext(
|
|
commonContainerSpec = createCommonContainerLaunchContext(
|
|
- applicationACLs, conf, jobToken, oldJobId, fsTokens);
|
|
|
|
|
|
+ applicationACLs, conf, jobToken, oldJobId, credentials);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1162,7 +1159,7 @@ public abstract class TaskAttemptImpl implements
|
|
taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
|
|
taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
|
|
taskAttempt.oldJobId, taskAttempt.assignedCapability,
|
|
taskAttempt.oldJobId, taskAttempt.assignedCapability,
|
|
taskAttempt.jvmID, taskAttempt.taskAttemptListener,
|
|
taskAttempt.jvmID, taskAttempt.taskAttemptListener,
|
|
- taskAttempt.fsTokens);
|
|
|
|
|
|
+ taskAttempt.credentials);
|
|
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
|
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
|
taskAttempt.attemptId, taskAttempt.containerID,
|
|
taskAttempt.attemptId, taskAttempt.containerID,
|
|
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|
|
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|