|
@@ -79,7 +79,9 @@ import org.apache.hadoop.mapred.QueueManager.QueueACL;
|
|
|
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
|
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
+import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
|
|
@@ -205,7 +207,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
volatile State state = State.INITIALIZING;
|
|
|
private static final int FS_ACCESS_RETRY_PERIOD = 1000;
|
|
|
static final String JOB_INFO_FILE = "job-info";
|
|
|
- static final String JOB_TOKEN_FILE = "jobToken";
|
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
|
private NetworkTopology clusterMap;
|
|
|
private int numTaskCacheLevels; // the max level to which we cache tasks
|
|
@@ -3579,10 +3580,26 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
new Path(jobSubmitDir));
|
|
|
}
|
|
|
|
|
|
+ // Store the job-info in a file so that the job can be recovered
|
|
|
+ // later (if at all)
|
|
|
+ // Note: jobDir & jobInfo are owned by JT user since we are using
|
|
|
+ // his fs object
|
|
|
+ if (!recovered) {
|
|
|
+ Path jobDir = getSystemDirectoryForJob(jobId);
|
|
|
+ FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
+ FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
|
|
|
+ jobInfo.write(out);
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+
|
|
|
// Create the JobInProgress, do not lock the JobTracker since
|
|
|
- // we are about to copy job.xml from HDFS
|
|
|
+ // we are about to copy job.xml from HDFS and write jobToken file to HDFS
|
|
|
JobInProgress job = null;
|
|
|
try {
|
|
|
+ if (ts == null) {
|
|
|
+ ts = new Credentials();
|
|
|
+ }
|
|
|
+ generateAndStoreJobTokens(jobId, ts);
|
|
|
job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
|
|
|
} catch (Exception e) {
|
|
|
throw new IOException(e);
|
|
@@ -3618,16 +3635,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
throw ioe;
|
|
|
}
|
|
|
|
|
|
- if (!recovered) {
|
|
|
- // Store the information in a file so that the job can be recovered
|
|
|
- // later (if at all)
|
|
|
- Path jobDir = getSystemDirectoryForJob(jobId);
|
|
|
- FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
- FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
|
|
|
- jobInfo.write(out);
|
|
|
- out.close();
|
|
|
- }
|
|
|
-
|
|
|
try {
|
|
|
this.taskScheduler.checkJobSubmission(job);
|
|
|
} catch (IOException ioe){
|
|
@@ -4346,12 +4353,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
//Get the job token file in system directory
|
|
|
Path getSystemFileForJob(JobID id) {
|
|
|
- return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
|
|
|
+ return new Path(getSystemDirectoryForJob(id), JOB_INFO_FILE);
|
|
|
}
|
|
|
|
|
|
//Get the job token file in system directory
|
|
|
Path getTokenFileForJob(JobID id) {
|
|
|
- return new Path(getSystemDirectoryForJob(id)+"/" + JOB_TOKEN_FILE);
|
|
|
+ return new Path(
|
|
|
+ getSystemDirectoryForJob(id), TokenCache.JOB_TOKEN_HDFS_FILE);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -5184,4 +5192,47 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * generate job token and save it into the file
|
|
|
+ * @throws IOException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void
|
|
|
+ generateAndStoreJobTokens(final JobID jobId, final Credentials tokenStorage)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ // Write out jobToken as JT user
|
|
|
+ try {
|
|
|
+ getMROwner().doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
+ @Override
|
|
|
+ public Void run() throws IOException {
|
|
|
+
|
|
|
+ Path jobDir = getSystemDirectoryForJob(jobId);
|
|
|
+ Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
|
|
|
+ //create JobToken file and write token to it
|
|
|
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
|
|
|
+ .toString()));
|
|
|
+ Token<JobTokenIdentifier> token =
|
|
|
+ new Token<JobTokenIdentifier>(
|
|
|
+ identifier, getJobTokenSecretManager());
|
|
|
+ token.setService(identifier.getJobId());
|
|
|
+
|
|
|
+ TokenCache.setJobToken(token, tokenStorage);
|
|
|
+
|
|
|
+ // write TokenStorage out
|
|
|
+ tokenStorage.writeTokenStorageFile(keysFile, getConf());
|
|
|
+ LOG.info("jobToken generated and stored with users keys in "
|
|
|
+ + keysFile.toUri().getPath());
|
|
|
+
|
|
|
+ return null;
|
|
|
+
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ // TODO Auto-generated catch block
|
|
|
+ throw new IOException(ie);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
}
|