|
@@ -95,6 +95,7 @@ import org.apache.hadoop.util.DiskChecker;
|
|
|
import org.apache.hadoop.util.MemoryCalculatorPlugin;
|
|
|
import org.apache.hadoop.util.ProcfsBasedProcessTree;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.RunJar;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
@@ -102,7 +103,7 @@ import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
-import org.apache.hadoop.mapreduce.security.TokenStorage;
|
|
|
+import org.apache.hadoop.security.TokenStorage;
|
|
|
|
|
|
/*******************************************************
|
|
|
* TaskTracker is a process that starts and tracks MR Tasks
|
|
@@ -205,6 +206,10 @@ public class TaskTracker
|
|
|
return jobTokenSecretManager;
|
|
|
}
|
|
|
|
|
|
+ RunningJob getRunningJob(JobID jobId) {
|
|
|
+ return runningJobs.get(jobId);
|
|
|
+ }
|
|
|
+
|
|
|
volatile int mapTotal = 0;
|
|
|
volatile int reduceTotal = 0;
|
|
|
boolean justStarted = true;
|
|
@@ -543,10 +548,11 @@ public class TaskTracker
|
|
|
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
|
|
|
}
|
|
|
|
|
|
- private FileSystem getFS(final Path filePath, String user,
|
|
|
+ private FileSystem getFS(final Path filePath, JobID jobId,
|
|
|
final Configuration conf) throws IOException, InterruptedException {
|
|
|
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
|
|
- FileSystem userFs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
|
|
+ RunningJob rJob = runningJobs.get(jobId);
|
|
|
+ FileSystem userFs =
|
|
|
+ rJob.ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
|
|
public FileSystem run() throws IOException {
|
|
|
return filePath.getFileSystem(conf);
|
|
|
}});
|
|
@@ -940,7 +946,7 @@ public class TaskTracker
|
|
|
|
|
|
synchronized (rjob) {
|
|
|
if (!rjob.localized) {
|
|
|
- JobConf localJobConf = localizeJobFiles(t);
|
|
|
+ JobConf localJobConf = localizeJobFiles(t, rjob);
|
|
|
|
|
|
// Now initialize the job via task-controller so as to set
|
|
|
// ownership/permissions of jars, job-work-dir. Note that initializeJob
|
|
@@ -955,11 +961,6 @@ public class TaskTracker
|
|
|
rjob.jobConf = localJobConf;
|
|
|
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
|
|
|
localJobConf.getKeepFailedTaskFiles());
|
|
|
-
|
|
|
- TokenStorage ts = TokenCache.loadTokens(rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
|
|
|
-
|
|
|
- Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
|
|
|
- getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
|
|
|
|
|
|
rjob.localized = true;
|
|
|
}
|
|
@@ -983,18 +984,35 @@ public class TaskTracker
|
|
|
* job as a starting point.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- JobConf localizeJobFiles(Task t)
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ JobConf localizeJobFiles(Task t, RunningJob rjob)
|
|
|
throws IOException, InterruptedException {
|
|
|
JobID jobId = t.getJobID();
|
|
|
|
|
|
Path jobFile = new Path(t.getJobFile());
|
|
|
String userName = t.getUser();
|
|
|
JobConf userConf = new JobConf(getJobConf());
|
|
|
- FileSystem userFs = getFS(jobFile, userName, userConf);
|
|
|
-
|
|
|
+
|
|
|
// Initialize the job directories first
|
|
|
FileSystem localFs = FileSystem.getLocal(fConf);
|
|
|
getLocalizer().initializeJobDirs(userName, jobId);
|
|
|
+
|
|
|
+ // save local copy of JobToken file
|
|
|
+ String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
|
|
|
+ rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
|
|
|
+
|
|
|
+
|
|
|
+ TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
|
|
|
+ Token<JobTokenIdentifier> jt =
|
|
|
+ (Token<JobTokenIdentifier>)TokenCache.getJobToken(ts);
|
|
|
+ if (jt != null) { //could be null in the case of some unit tests
|
|
|
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
|
|
|
+ }
|
|
|
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
|
|
|
+ rjob.ugi.addToken(token);
|
|
|
+ }
|
|
|
+
|
|
|
+ FileSystem userFs = getFS(jobFile, jobId, userConf);
|
|
|
|
|
|
// Download the job.xml for this job from the system FS
|
|
|
Path localJobFile =
|
|
@@ -1004,6 +1022,11 @@ public class TaskTracker
|
|
|
//WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
|
|
|
//AS PART OF THE TASK OBJECT
|
|
|
localJobConf.setUser(userName);
|
|
|
+
|
|
|
+ // set the location of the token file into jobConf to transfer
|
|
|
+ // the name to TaskRunner
|
|
|
+ localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
|
|
|
+ localJobTokenFile.toString());
|
|
|
// create the 'job-work' directory: job-specific shared directory for use as
|
|
|
// scratch space by all tasks of the same job running on this TaskTracker.
|
|
|
Path workDir =
|
|
@@ -1019,9 +1042,6 @@ public class TaskTracker
|
|
|
// Download the job.jar for this job from the system FS
|
|
|
localizeJobJarFile(userName, jobId, userFs, localJobConf);
|
|
|
|
|
|
- // save local copy of JobToken file
|
|
|
- localizeJobTokenFile(userName, jobId, localJobConf);
|
|
|
-
|
|
|
return localJobConf;
|
|
|
}
|
|
|
|
|
@@ -3119,6 +3139,7 @@ public class TaskTracker
|
|
|
volatile Set<TaskInProgress> tasks;
|
|
|
boolean localized;
|
|
|
boolean keepJobFiles;
|
|
|
+ UserGroupInformation ugi;
|
|
|
FetchStatus f;
|
|
|
RunningJob(JobID jobid) {
|
|
|
this.jobid = jobid;
|
|
@@ -3131,6 +3152,10 @@ public class TaskTracker
|
|
|
return jobid;
|
|
|
}
|
|
|
|
|
|
+ UserGroupInformation getUGI() {
|
|
|
+ return ugi;
|
|
|
+ }
|
|
|
+
|
|
|
void setFetchStatus(FetchStatus f) {
|
|
|
this.f = f;
|
|
|
}
|
|
@@ -3701,11 +3726,10 @@ public class TaskTracker
|
|
|
* Download the job-token file from the FS and save on local fs.
|
|
|
* @param user
|
|
|
* @param jobId
|
|
|
- * @param jobConf
|
|
|
* @return the local file system path of the downloaded file.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
|
|
|
+ private String localizeJobTokenFile(String user, JobID jobId)
|
|
|
throws IOException {
|
|
|
// check if the tokenJob file is there..
|
|
|
Path skPath = new Path(systemDirectory,
|
|
@@ -3720,13 +3744,14 @@ public class TaskTracker
|
|
|
lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
|
|
|
jobId.toString()), jobTokenSize, fConf);
|
|
|
|
|
|
- LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
|
|
|
- " to " + localJobTokenFile.toUri().getPath());
|
|
|
+ String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
|
|
|
+ if(LOG.isDebugEnabled())
|
|
|
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
|
|
|
+ " to " + localJobTokenFileStr);
|
|
|
|
|
|
// Download job_token
|
|
|
systemFS.copyToLocalFile(skPath, localJobTokenFile);
|
|
|
- // set it into jobConf to transfer the name to TaskRunner
|
|
|
- jobConf.set(TokenCache.JOB_TOKEN_FILENAME, localJobTokenFile.toString());
|
|
|
+ return localJobTokenFileStr;
|
|
|
}
|
|
|
|
|
|
}
|