|
@@ -43,6 +43,7 @@ import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
+import javax.crypto.SecretKey;
|
|
|
import javax.servlet.ServletContext;
|
|
|
import javax.servlet.ServletException;
|
|
|
import javax.servlet.http.HttpServlet;
|
|
@@ -74,8 +75,9 @@ import org.apache.hadoop.mapred.TaskStatus.Phase;
|
|
|
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
|
import org.apache.hadoop.mapred.pipes.Submitter;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
-import org.apache.hadoop.mapreduce.security.JobTokens;
|
|
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
import org.apache.hadoop.metrics.MetricsContext;
|
|
|
import org.apache.hadoop.metrics.MetricsException;
|
|
|
import org.apache.hadoop.metrics.MetricsRecord;
|
|
@@ -90,6 +92,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
|
import org.apache.hadoop.util.MemoryCalculatorPlugin;
|
|
|
import org.apache.hadoop.util.ProcfsBasedProcessTree;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.RunJar;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
@@ -189,6 +192,13 @@ public class TaskTracker
|
|
|
*/
|
|
|
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
|
|
|
Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
|
|
|
+ private final JobTokenSecretManager jobTokenSecretManager
|
|
|
+ = new JobTokenSecretManager();
|
|
|
+
|
|
|
+ JobTokenSecretManager getJobTokenSecretManager() {
|
|
|
+ return jobTokenSecretManager;
|
|
|
+ }
|
|
|
+
|
|
|
volatile int mapTotal = 0;
|
|
|
volatile int reduceTotal = 0;
|
|
|
boolean justStarted = true;
|
|
@@ -912,9 +922,9 @@ public class TaskTracker
|
|
|
localizeJobTokenFile(t.getUser(), jobId, localJobConf);
|
|
|
FSDataInputStream in = localFs.open(new Path(
|
|
|
rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
|
|
|
- JobTokens jt = new JobTokens();
|
|
|
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
|
|
|
jt.readFields(in);
|
|
|
- rjob.jobTokens = jt; // store JobToken object per job
|
|
|
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
|
|
|
|
|
|
rjob.localized = true;
|
|
|
taskController.initializeJob(jobId);
|
|
@@ -1544,7 +1554,7 @@ public class TaskTracker
|
|
|
synchronized(runningJobs) {
|
|
|
runningJobs.remove(jobId);
|
|
|
}
|
|
|
-
|
|
|
+ getJobTokenSecretManager().removeTokenForJob(jobId.toString());
|
|
|
}
|
|
|
|
|
|
|
|
@@ -2911,7 +2921,6 @@ public class TaskTracker
|
|
|
boolean localized;
|
|
|
boolean keepJobFiles;
|
|
|
FetchStatus f;
|
|
|
- JobTokens jobTokens;
|
|
|
RunningJob(JobID jobid) {
|
|
|
this.jobid = jobid;
|
|
|
localized = false;
|
|
@@ -3221,14 +3230,8 @@ public class TaskTracker
|
|
|
private void verifyRequest(HttpServletRequest request,
|
|
|
HttpServletResponse response, TaskTracker tracker, String jobId)
|
|
|
throws IOException {
|
|
|
- JobTokens jt = null;
|
|
|
- synchronized (tracker.runningJobs) {
|
|
|
- RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
|
|
|
- if (rjob == null) {
|
|
|
- throw new IOException("Unknown job " + jobId + "!!");
|
|
|
- }
|
|
|
- jt = rjob.jobTokens;
|
|
|
- }
|
|
|
+ SecretKey tokenSecret = tracker.getJobTokenSecretManager()
|
|
|
+ .retrieveTokenSecret(jobId);
|
|
|
// string to encrypt
|
|
|
String enc_str = SecureShuffleUtils.buildMsgFrom(request);
|
|
|
|
|
@@ -3242,17 +3245,16 @@ public class TaskTracker
|
|
|
LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
|
|
|
urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
|
|
|
|
|
|
- SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
|
|
|
// verify - throws exception
|
|
|
try {
|
|
|
- ssutil.verifyReply(urlHashStr, enc_str);
|
|
|
+ SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
|
|
|
} catch (IOException ioe) {
|
|
|
response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
|
|
|
throw ioe;
|
|
|
}
|
|
|
|
|
|
// verification passed - encode the reply
|
|
|
- String reply = ssutil.generateHash(urlHashStr.getBytes());
|
|
|
+ String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
|
|
|
response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
|
|
|
|
|
|
len = reply.length();
|
|
@@ -3499,7 +3501,7 @@ public class TaskTracker
|
|
|
throws IOException {
|
|
|
// check if the tokenJob file is there..
|
|
|
Path skPath = new Path(systemDirectory,
|
|
|
- jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
|
|
|
+ jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
|
|
|
|
|
|
FileStatus status = null;
|
|
|
long jobTokenSize = -1;
|