|
@@ -74,6 +74,8 @@ import org.apache.hadoop.mapred.TaskStatus.Phase;
|
|
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
import org.apache.hadoop.mapred.pipes.Submitter;
|
|
import org.apache.hadoop.mapred.pipes.Submitter;
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
|
+import org.apache.hadoop.mapreduce.security.JobTokens;
|
|
|
|
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
|
import org.apache.hadoop.metrics.MetricsContext;
|
|
import org.apache.hadoop.metrics.MetricsContext;
|
|
import org.apache.hadoop.metrics.MetricsException;
|
|
import org.apache.hadoop.metrics.MetricsException;
|
|
import org.apache.hadoop.metrics.MetricsRecord;
|
|
import org.apache.hadoop.metrics.MetricsRecord;
|
|
@@ -186,7 +188,7 @@ public class TaskTracker
|
|
* Map from taskId -> TaskInProgress.
|
|
* Map from taskId -> TaskInProgress.
|
|
*/
|
|
*/
|
|
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
|
|
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
|
|
- Map<JobID, RunningJob> runningJobs = null;
|
|
|
|
|
|
+ Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
|
|
volatile int mapTotal = 0;
|
|
volatile int mapTotal = 0;
|
|
volatile int reduceTotal = 0;
|
|
volatile int reduceTotal = 0;
|
|
boolean justStarted = true;
|
|
boolean justStarted = true;
|
|
@@ -211,6 +213,7 @@ public class TaskTracker
|
|
private static final String CACHEDIR = "archive";
|
|
private static final String CACHEDIR = "archive";
|
|
private static final String JOBCACHE = "jobcache";
|
|
private static final String JOBCACHE = "jobcache";
|
|
private static final String OUTPUT = "output";
|
|
private static final String OUTPUT = "output";
|
|
|
|
+ static final String JOB_TOKEN_FILE="jobToken"; //localized file
|
|
private JobConf originalConf;
|
|
private JobConf originalConf;
|
|
private JobConf fConf;
|
|
private JobConf fConf;
|
|
private int maxMapSlots;
|
|
private int maxMapSlots;
|
|
@@ -420,6 +423,10 @@ public class TaskTracker
|
|
this.taskLogsMonitor = t;
|
|
this.taskLogsMonitor = t;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static String getUserDir(String user) {
|
|
|
|
+ return TaskTracker.SUBDIR + Path.SEPARATOR + user;
|
|
|
|
+ }
|
|
|
|
+
|
|
static String getCacheSubdir() {
|
|
static String getCacheSubdir() {
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
|
|
}
|
|
}
|
|
@@ -428,6 +435,11 @@ public class TaskTracker
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static String getLocalJobDir(String user, String jobid) {
|
|
|
|
+ return getUserDir(user) + Path.SEPARATOR + getJobCacheSubdir()
|
|
|
|
+ + Path.SEPARATOR + jobid;
|
|
|
|
+ }
|
|
|
|
+
|
|
static String getLocalJobDir(String jobid) {
|
|
static String getLocalJobDir(String jobid) {
|
|
return getJobCacheSubdir() + Path.SEPARATOR + jobid;
|
|
return getJobCacheSubdir() + Path.SEPARATOR + jobid;
|
|
}
|
|
}
|
|
@@ -440,6 +452,11 @@ public class TaskTracker
|
|
return getLocalTaskDir(jobid, taskid)
|
|
return getLocalTaskDir(jobid, taskid)
|
|
+ Path.SEPARATOR + TaskTracker.OUTPUT ;
|
|
+ Path.SEPARATOR + TaskTracker.OUTPUT ;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ static String getLocalJobTokenFile(String user, String jobid) {
|
|
|
|
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
static String getLocalTaskDir(String jobid,
|
|
static String getLocalTaskDir(String jobid,
|
|
String taskid,
|
|
String taskid,
|
|
@@ -890,8 +907,16 @@ public class TaskTracker
|
|
}
|
|
}
|
|
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
|
|
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
|
|
localJobConf.getKeepFailedTaskFiles());
|
|
localJobConf.getKeepFailedTaskFiles());
|
|
- rjob.localized = true;
|
|
|
|
rjob.jobConf = localJobConf;
|
|
rjob.jobConf = localJobConf;
|
|
|
|
+ // save local copy of JobToken file
|
|
|
|
+ localizeJobTokenFile(t.getUser(), jobId, localJobConf);
|
|
|
|
+ FSDataInputStream in = localFs.open(new Path(
|
|
|
|
+ rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
|
|
|
|
+ JobTokens jt = new JobTokens();
|
|
|
|
+ jt.readFields(in);
|
|
|
|
+ rjob.jobTokens = jt; // store JobToken object per job
|
|
|
|
+
|
|
|
|
+ rjob.localized = true;
|
|
taskController.initializeJob(jobId);
|
|
taskController.initializeJob(jobId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -2886,6 +2911,7 @@ public class TaskTracker
|
|
boolean localized;
|
|
boolean localized;
|
|
boolean keepJobFiles;
|
|
boolean keepJobFiles;
|
|
FetchStatus f;
|
|
FetchStatus f;
|
|
|
|
+ JobTokens jobTokens;
|
|
RunningJob(JobID jobid) {
|
|
RunningJob(JobID jobid) {
|
|
this.jobid = jobid;
|
|
this.jobid = jobid;
|
|
localized = false;
|
|
localized = false;
|
|
@@ -3073,6 +3099,8 @@ public class TaskTracker
|
|
TaskTracker tracker =
|
|
TaskTracker tracker =
|
|
(TaskTracker) context.getAttribute("task.tracker");
|
|
(TaskTracker) context.getAttribute("task.tracker");
|
|
|
|
|
|
|
|
+ verifyRequest(request, response, tracker, jobId);
|
|
|
|
+
|
|
long startTime = 0;
|
|
long startTime = 0;
|
|
try {
|
|
try {
|
|
shuffleMetrics.serverHandlerBusy();
|
|
shuffleMetrics.serverHandlerBusy();
|
|
@@ -3181,7 +3209,58 @@ public class TaskTracker
|
|
outStream.close();
|
|
outStream.close();
|
|
shuffleMetrics.successOutput();
|
|
shuffleMetrics.successOutput();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * verify that request has correct HASH for the url
|
|
|
|
+ * and also add a field to reply header with hash of the HASH
|
|
|
|
+ * @param request
|
|
|
|
+ * @param response
|
|
|
|
+ * @param jt the job token
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void verifyRequest(HttpServletRequest request,
|
|
|
|
+ HttpServletResponse response, TaskTracker tracker, String jobId)
|
|
|
|
+ throws IOException {
|
|
|
|
+ JobTokens jt = null;
|
|
|
|
+ synchronized (tracker.runningJobs) {
|
|
|
|
+ RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
|
|
|
|
+ if (rjob == null) {
|
|
|
|
+ throw new IOException("Unknown job " + jobId + "!!");
|
|
|
|
+ }
|
|
|
|
+ jt = rjob.jobTokens;
|
|
|
|
+ }
|
|
|
|
+ // string to encrypt
|
|
|
|
+ String enc_str = SecureShuffleUtils.buildMsgFrom(request);
|
|
|
|
+
|
|
|
|
+ // hash from the fetcher
|
|
|
|
+ String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
|
|
|
|
+ if(urlHashStr == null) {
|
|
|
|
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
|
|
|
|
+ throw new IOException("fetcher cannot be authenticated");
|
|
|
|
+ }
|
|
|
|
+ int len = urlHashStr.length();
|
|
|
|
+ LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
|
|
|
|
+ urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
|
|
|
|
+
|
|
|
|
+ SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
|
|
|
|
+ // verify - throws exception
|
|
|
|
+ try {
|
|
|
|
+ ssutil.verifyReply(urlHashStr, enc_str);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // verification passed - encode the reply
|
|
|
|
+ String reply = ssutil.generateHash(urlHashStr.getBytes());
|
|
|
|
+ response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
|
|
|
|
+
|
|
|
|
+ len = reply.length();
|
|
|
|
+ LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply="
|
|
|
|
+ +reply.substring(len-len/2, len-1));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
|
|
// get the full paths of the directory in all the local disks.
|
|
// get the full paths of the directory in all the local disks.
|
|
Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
|
|
Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
|
|
@@ -3407,4 +3486,37 @@ public class TaskTracker
|
|
healthChecker = new NodeHealthCheckerService(conf);
|
|
healthChecker = new NodeHealthCheckerService(conf);
|
|
healthChecker.start();
|
|
healthChecker.start();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Download the job-token file from the FS and save on local fs.
|
|
|
|
+ * @param user
|
|
|
|
+ * @param jobId
|
|
|
|
+ * @param jobConf
|
|
|
|
+ * @return the local file system path of the downloaded file.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
|
|
|
|
+ throws IOException {
|
|
|
|
+ // check if the tokenJob file is there..
|
|
|
|
+ Path skPath = new Path(systemDirectory,
|
|
|
|
+ jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
|
|
|
|
+
|
|
|
|
+ FileStatus status = null;
|
|
|
|
+ long jobTokenSize = -1;
|
|
|
|
+ status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
|
|
|
|
+ jobTokenSize = status.getLen();
|
|
|
|
+
|
|
|
|
+ Path localJobTokenFile =
|
|
|
|
+ lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
|
|
|
|
+ jobId.toString()), jobTokenSize, fConf);
|
|
|
|
+
|
|
|
|
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
|
|
|
|
+ " to " + localJobTokenFile.toUri().getPath());
|
|
|
|
+
|
|
|
|
+ // Download job_token
|
|
|
|
+ systemFS.copyToLocalFile(skPath, localJobTokenFile);
|
|
|
|
+ // set it into jobConf to transfer the name to TaskRunner
|
|
|
|
+ jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|