|
@@ -205,6 +205,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
State state = State.INITIALIZING;
|
|
|
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
|
|
|
static final String JOB_INFO_FILE = "job-info";
|
|
|
+ static final String JOB_TOKEN_FILE = "jobToken";
|
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
|
private NetworkTopology clusterMap = new NetworkTopology();
|
|
|
private int numTaskCacheLevels; // the max level to which we cache tasks
|
|
@@ -1215,179 +1216,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
/** A custom listener that replays the events in the order in which the
|
|
|
* events (task attempts) occurred.
|
|
|
*/
|
|
|
- class JobRecoveryListener implements Listener {
|
|
|
- // The owner job
|
|
|
- private JobInProgress jip;
|
|
|
-
|
|
|
- private JobHistory.JobInfo job; // current job's info object
|
|
|
-
|
|
|
- // Maintain the count of the (attempt) events recovered
|
|
|
- private int numEventsRecovered = 0;
|
|
|
-
|
|
|
- // Maintains open transactions
|
|
|
- private Map<String, String> hangingAttempts =
|
|
|
- new HashMap<String, String>();
|
|
|
-
|
|
|
- // Whether there are any updates for this job
|
|
|
- private boolean hasUpdates = false;
|
|
|
-
|
|
|
- public JobRecoveryListener(JobInProgress jip) {
|
|
|
- this.jip = jip;
|
|
|
- this.job = new JobHistory.JobInfo(jip.getJobID().toString());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Process a task. Note that a task might commit a previously pending
|
|
|
- * transaction.
|
|
|
- */
|
|
|
- private void processTask(String taskId, JobHistory.Task task) {
|
|
|
- // Any TASK info commits the previous transaction
|
|
|
- boolean hasHanging = hangingAttempts.remove(taskId) != null;
|
|
|
- if (hasHanging) {
|
|
|
- numEventsRecovered += 2;
|
|
|
- }
|
|
|
-
|
|
|
- TaskID id = TaskID.forName(taskId);
|
|
|
- TaskInProgress tip = getTip(id);
|
|
|
-
|
|
|
- updateTip(tip, task);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Adds a task-attempt in the listener
|
|
|
- */
|
|
|
- private void processTaskAttempt(String taskAttemptId,
|
|
|
- JobHistory.TaskAttempt attempt)
|
|
|
- throws UnknownHostException {
|
|
|
- TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
|
|
|
-
|
|
|
- // Check if the transaction for this attempt can be committed
|
|
|
- String taskStatus = attempt.get(Keys.TASK_STATUS);
|
|
|
- TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId);
|
|
|
- JobInProgress jip = getJob(taskID.getJobID());
|
|
|
- JobStatus prevStatus = (JobStatus)jip.getStatus().clone();
|
|
|
-
|
|
|
- if (taskStatus.length() > 0) {
|
|
|
- // This means this is an update event
|
|
|
- if (taskStatus.equals(Values.SUCCESS.name())) {
|
|
|
- // Mark this attempt as hanging
|
|
|
- hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
|
|
|
- addSuccessfulAttempt(jip, id, attempt);
|
|
|
- } else {
|
|
|
- addUnsuccessfulAttempt(jip, id, attempt);
|
|
|
- numEventsRecovered += 2;
|
|
|
- }
|
|
|
- } else {
|
|
|
- createTaskAttempt(jip, id, attempt);
|
|
|
- }
|
|
|
-
|
|
|
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
|
|
|
- if (prevStatus.getRunState() != newStatus.getRunState()) {
|
|
|
- if(LOG.isDebugEnabled())
|
|
|
- LOG.debug("Status changed hence informing prevStatus" + prevStatus + " currentStatus "+ newStatus);
|
|
|
- JobStatusChangeEvent event =
|
|
|
- new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
|
|
|
- prevStatus, newStatus);
|
|
|
- updateJobInProgressListeners(event);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void handle(JobHistory.RecordTypes recType, Map<Keys,
|
|
|
- String> values) throws IOException {
|
|
|
- if (recType == JobHistory.RecordTypes.Job) {
|
|
|
- // Update the meta-level job information
|
|
|
- job.handle(values);
|
|
|
-
|
|
|
- // Forcefully init the job as we have some updates for it
|
|
|
- checkAndInit();
|
|
|
- } else if (recType.equals(JobHistory.RecordTypes.Task)) {
|
|
|
- String taskId = values.get(Keys.TASKID);
|
|
|
-
|
|
|
- // Create a task
|
|
|
- JobHistory.Task task = new JobHistory.Task();
|
|
|
- task.handle(values);
|
|
|
-
|
|
|
- // Ignore if its a cleanup task
|
|
|
- if (isCleanup(task)) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Process the task i.e update the tip state
|
|
|
- processTask(taskId, task);
|
|
|
- } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
|
|
|
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
|
|
|
-
|
|
|
- // Create a task attempt
|
|
|
- JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
|
|
|
- attempt.handle(values);
|
|
|
-
|
|
|
- // Ignore if its a cleanup task
|
|
|
- if (isCleanup(attempt)) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Process the attempt i.e update the attempt state via job
|
|
|
- processTaskAttempt(attemptId, attempt);
|
|
|
- } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
|
|
|
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
|
|
|
-
|
|
|
- // Create a task attempt
|
|
|
- JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
|
|
|
- attempt.handle(values);
|
|
|
-
|
|
|
- // Ignore if its a cleanup task
|
|
|
- if (isCleanup(attempt)) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Process the attempt i.e update the job state via job
|
|
|
- processTaskAttempt(attemptId, attempt);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Check if the task is of type CLEANUP
|
|
|
- private boolean isCleanup(JobHistory.Task task) {
|
|
|
- String taskType = task.get(Keys.TASK_TYPE);
|
|
|
- return Values.CLEANUP.name().equals(taskType);
|
|
|
- }
|
|
|
-
|
|
|
- // Init the job if its ready for init. Also make sure that the scheduler
|
|
|
- // is updated
|
|
|
- private void checkAndInit() throws IOException {
|
|
|
- String jobStatus = this.job.get(Keys.JOB_STATUS);
|
|
|
- if (Values.PREP.name().equals(jobStatus)) {
|
|
|
- hasUpdates = true;
|
|
|
- LOG.info("Calling init from RM for job " + jip.getJobID().toString());
|
|
|
- try {
|
|
|
- initJob(jip);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.error("Job initialization failed : \n"
|
|
|
- + StringUtils.stringifyException(t));
|
|
|
- jip.status.setFailureInfo("Job Initialization failed: \n"
|
|
|
- + StringUtils.stringifyException(t));
|
|
|
- failJob(jip);
|
|
|
- throw new IOException(t);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void close() {
|
|
|
- if (hasUpdates) {
|
|
|
- // Apply the final (job-level) updates
|
|
|
- JobStatusChangeEvent event = updateJob(jip, job);
|
|
|
-
|
|
|
- synchronized (JobTracker.this) {
|
|
|
- // Update the job listeners
|
|
|
- updateJobInProgressListeners(event);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public int getNumEventsRecovered() {
|
|
|
- return numEventsRecovered;
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
|
|
|
public RecoveryManager() {
|
|
|
jobsToRecover = new TreeSet<JobID>();
|
|
@@ -1441,16 +1269,25 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// checks if the job dir has the required files
|
|
|
public void checkAndAddJob(FileStatus status) throws IOException {
|
|
|
String fileName = status.getPath().getName();
|
|
|
- if (isJobNameValid(fileName)) {
|
|
|
- if (JobClient.isJobDirValid(status.getPath(), fs)) {
|
|
|
- recoveryManager.addJobForRecovery(JobID.forName(fileName));
|
|
|
- shouldRecover = true; // enable actual recovery if num-files > 1
|
|
|
- } else {
|
|
|
- LOG.info("Found an incomplete job directory " + fileName + "."
|
|
|
- + " Deleting it!!");
|
|
|
- fs.delete(status.getPath(), true);
|
|
|
- }
|
|
|
+ if (isJobNameValid(fileName) && isJobDirValid(JobID.forName(fileName))) {
|
|
|
+ recoveryManager.addJobForRecovery(JobID.forName(fileName));
|
|
|
+ shouldRecover = true; // enable actual recovery if num-files > 1
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isJobDirValid(JobID jobId) throws IOException {
|
|
|
+ boolean ret = false;
|
|
|
+ Path jobInfoFile = getSystemFileForJob(jobId);
|
|
|
+ final Path jobTokenFile = getTokenFileForJob(jobId);
|
|
|
+ JobConf job = new JobConf();
|
|
|
+ if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)
|
|
|
+ && jobInfoFile.getFileSystem(job).exists(jobInfoFile)) {
|
|
|
+ ret = true;
|
|
|
+ } else {
|
|
|
+ LOG.warn("Job " + jobId
|
|
|
+ + " does not have valid info/token file so ignoring for recovery");
|
|
|
}
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
private JobStatusChangeEvent updateJob(JobInProgress jip,
|
|
@@ -1713,11 +1550,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
|
|
|
} else {
|
|
|
// For the very first time the jobtracker will create a jobtracker.info
|
|
|
- // file. If the jobtracker has restarted then disable recovery as files'
|
|
|
- // needed for recovery are missing.
|
|
|
-
|
|
|
- // disable recovery if this is a restart
|
|
|
- shouldRecover = false;
|
|
|
+ // file.
|
|
|
+ // enable recovery if this is a restart
|
|
|
+ shouldRecover = true;
|
|
|
|
|
|
// write the jobtracker.info file
|
|
|
try {
|
|
@@ -1769,205 +1604,50 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
fs.rename(tmpRestartFile, restartFile);
|
|
|
}
|
|
|
|
|
|
- // mapred.JobID::forName returns
|
|
|
- @SuppressWarnings("unchecked") // mapreduce.JobID
|
|
|
public void recover() {
|
|
|
+ int recovered = 0;
|
|
|
+ long recoveryProcessStartTime = clock.getTime();
|
|
|
if (!shouldRecover()) {
|
|
|
// clean up jobs structure
|
|
|
jobsToRecover.clear();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- LOG.info("Restart count of the jobtracker : " + restartCount);
|
|
|
-
|
|
|
- // I. Init the jobs and cache the recovered job history filenames
|
|
|
- Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
|
|
|
- Iterator<JobID> idIter = jobsToRecover.iterator();
|
|
|
- JobInProgress job = null;
|
|
|
- File jobIdFile = null;
|
|
|
-
|
|
|
- // 0. Cleanup
|
|
|
- try {
|
|
|
- JobHistory.JobInfo.deleteConfFiles();
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.info("Error in cleaning up job history folder", ioe);
|
|
|
- }
|
|
|
-
|
|
|
- while (idIter.hasNext()) {
|
|
|
- JobID id = idIter.next();
|
|
|
- LOG.info("Trying to recover details of job " + id);
|
|
|
+ LOG.info("Starting the recovery process for " + jobsToRecover.size()
|
|
|
+ + " jobs ...");
|
|
|
+ for (JobID jobId : jobsToRecover) {
|
|
|
+ LOG.info("Submitting job " + jobId);
|
|
|
try {
|
|
|
- // 1. Recover job owner and create JIP
|
|
|
- jobIdFile =
|
|
|
- new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());
|
|
|
-
|
|
|
- String user = null;
|
|
|
- if (jobIdFile != null && jobIdFile.exists()) {
|
|
|
- LOG.info("File " + jobIdFile + " exists for job " + id);
|
|
|
- FileInputStream in = new FileInputStream(jobIdFile);
|
|
|
- BufferedReader reader = null;
|
|
|
- try {
|
|
|
- reader = new BufferedReader(new InputStreamReader(in));
|
|
|
- user = reader.readLine();
|
|
|
- LOG.info("Recovered user " + user + " for job " + id);
|
|
|
- } finally {
|
|
|
- if (reader != null) {
|
|
|
- reader.close();
|
|
|
+ Path jobInfoFile = getSystemFileForJob(jobId);
|
|
|
+ final Path jobTokenFile = getTokenFileForJob(jobId);
|
|
|
+ FSDataInputStream in = fs.open(jobInfoFile);
|
|
|
+ final JobInfo token = new JobInfo();
|
|
|
+ token.readFields(in);
|
|
|
+ in.close();
|
|
|
+ final UserGroupInformation ugi = UserGroupInformation
|
|
|
+ .createRemoteUser(token.getUser().toString());
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
|
|
|
+ public JobStatus run() throws IOException, InterruptedException {
|
|
|
+ Credentials ts = null;
|
|
|
+ JobConf job = new JobConf();
|
|
|
+ if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) {
|
|
|
+ ts = Credentials.readTokenStorageFile(jobTokenFile, job);
|
|
|
}
|
|
|
- in.close();
|
|
|
+ return submitJob(JobID.downgrade(token.getJobID()), token
|
|
|
+ .getJobSubmitDir().toString(), ugi, ts, true);
|
|
|
}
|
|
|
- }
|
|
|
- if (user == null) {
|
|
|
- throw new RuntimeException("Incomplete job " + id);
|
|
|
- }
|
|
|
-
|
|
|
- // Create the job
|
|
|
- /* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE
|
|
|
- * BACKPORTED (MAPREDUCE-873)
|
|
|
- */
|
|
|
- job = new JobInProgress(JobTracker.this, conf,
|
|
|
- new JobInfo((org.apache.hadoop.mapreduce.JobID) id,
|
|
|
- new Text(user), new Path(getStagingAreaDirInternal(user))),
|
|
|
- restartCount, new Credentials() /*HACK*/);
|
|
|
-
|
|
|
- // 2. Check if the user has appropriate access
|
|
|
- // Get the user group info for the job's owner
|
|
|
- UserGroupInformation ugi =
|
|
|
- UserGroupInformation.createRemoteUser(job.getJobConf().getUser());
|
|
|
- LOG.info("Submitting job " + id + " on behalf of user "
|
|
|
- + ugi.getShortUserName() + " in groups : "
|
|
|
- + StringUtils.arrayToString(ugi.getGroupNames()));
|
|
|
-
|
|
|
- // check the access
|
|
|
- try {
|
|
|
- aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.warn("Access denied for user " + ugi.getShortUserName()
|
|
|
- + " in groups : ["
|
|
|
- + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
|
|
|
- throw t;
|
|
|
- }
|
|
|
-
|
|
|
- // 3. Get the log file and the file path
|
|
|
- String logFileName =
|
|
|
- JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
|
|
|
- if (logFileName != null) {
|
|
|
- Path jobHistoryFilePath =
|
|
|
- JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
|
|
|
-
|
|
|
- // 4. Recover the history file. This involved
|
|
|
- // - deleting file.recover if file exists
|
|
|
- // - renaming file.recover to file if file doesnt exist
|
|
|
- // This makes sure that the (master) file exists
|
|
|
- JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
|
|
|
- jobHistoryFilePath);
|
|
|
-
|
|
|
- // 5. Cache the history file name as it costs one dfs access
|
|
|
- jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
|
|
|
- } else {
|
|
|
- LOG.info("No history file found for job " + id);
|
|
|
- idIter.remove(); // remove from recovery list
|
|
|
- }
|
|
|
-
|
|
|
- // 6. Sumbit the job to the jobtracker
|
|
|
- addJob(id, job);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
|
|
|
- idIter.remove();
|
|
|
- if (jobIdFile != null) {
|
|
|
- jobIdFile.delete();
|
|
|
- jobIdFile = null;
|
|
|
- }
|
|
|
- if (job != null) {
|
|
|
- job.fail();
|
|
|
- job = null;
|
|
|
- }
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- long recoveryStartTime = clock.getTime();
|
|
|
-
|
|
|
- // II. Recover each job
|
|
|
- idIter = jobsToRecover.iterator();
|
|
|
- while (idIter.hasNext()) {
|
|
|
- JobID id = idIter.next();
|
|
|
- JobInProgress pJob = getJob(id);
|
|
|
-
|
|
|
- // 1. Get the required info
|
|
|
- // Get the recovered history file
|
|
|
- Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
|
|
|
- String logFileName = jobHistoryFilePath.getName();
|
|
|
-
|
|
|
- FileSystem fs;
|
|
|
- try {
|
|
|
- fs = jobHistoryFilePath.getFileSystem(conf);
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
|
|
|
- ioe);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // 2. Parse the history file
|
|
|
- // Note that this also involves job update
|
|
|
- JobRecoveryListener listener = new JobRecoveryListener(pJob);
|
|
|
- try {
|
|
|
- JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
|
|
|
- listener, fs);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.info("Error reading history file of job " + pJob.getJobID()
|
|
|
- + ". Ignoring the error and continuing.", t);
|
|
|
- }
|
|
|
-
|
|
|
- // 3. Close the listener
|
|
|
- listener.close();
|
|
|
-
|
|
|
- // 4. Update the recovery metric
|
|
|
- totalEventsRecovered += listener.getNumEventsRecovered();
|
|
|
-
|
|
|
- // 5. Cleanup history
|
|
|
- // Delete the master log file as an indication that the new file
|
|
|
- // should be used in future
|
|
|
- try {
|
|
|
- synchronized (pJob) {
|
|
|
- JobHistory.JobInfo.checkpointRecovery(logFileName,
|
|
|
- pJob.getJobConf());
|
|
|
- }
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.warn("Failed to delete log file (" + logFileName + ") for job "
|
|
|
- + id + ". Continuing.", t);
|
|
|
- }
|
|
|
-
|
|
|
- if (pJob.isComplete()) {
|
|
|
- idIter.remove(); // no need to keep this job info as its successful
|
|
|
+ });
|
|
|
+ recovered++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Could not recover job " + jobId, e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- recoveryDuration = clock.getTime() - recoveryStartTime;
|
|
|
+ recoveryDuration = clock.getTime() - recoveryProcessStartTime;
|
|
|
hasRecovered = true;
|
|
|
|
|
|
- // III. Finalize the recovery
|
|
|
- synchronized (trackerExpiryQueue) {
|
|
|
- // Make sure that the tracker statuses in the expiry-tracker queue
|
|
|
- // are updated
|
|
|
- long now = clock.getTime();
|
|
|
- int size = trackerExpiryQueue.size();
|
|
|
- for (int i = 0; i < size ; ++i) {
|
|
|
- // Get the first tasktracker
|
|
|
- TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
|
|
|
-
|
|
|
- // Remove it
|
|
|
- trackerExpiryQueue.remove(taskTracker);
|
|
|
-
|
|
|
- // Set the new time
|
|
|
- taskTracker.setLastSeen(now);
|
|
|
-
|
|
|
- // Add back to get the sorted list
|
|
|
- trackerExpiryQueue.add(taskTracker);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Restoration complete");
|
|
|
+ LOG.info("Recovery done! Recoverd " + recovered + " of "
|
|
|
+ + jobsToRecover.size() + " jobs.");
|
|
|
+ LOG.info("Recovery Duration (ms):" + recoveryDuration);
|
|
|
}
|
|
|
|
|
|
int totalEventsRecovered() {
|
|
@@ -3917,7 +3597,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
public synchronized JobID getNewJobId() throws IOException {
|
|
|
return new JobID(getTrackerIdentifier(), nextJobId++);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* JobTracker.submitJob() kicks off a new job.
|
|
|
*
|
|
@@ -3928,8 +3608,24 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
|
|
|
throws IOException {
|
|
|
+ return submitJob(jobId, jobSubmitDir, null, ts, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * JobTracker.submitJob() kicks off a new job.
|
|
|
+ *
|
|
|
+ * Create a 'JobInProgress' object, which contains both JobProfile and
|
|
|
+ * JobStatus. Those two sub-objects are sometimes shipped outside of the
|
|
|
+ * JobTracker. But JobInProgress adds info that's useful for the JobTracker
|
|
|
+ * alone.
|
|
|
+ */
|
|
|
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir,
|
|
|
+ UserGroupInformation ugi, Credentials ts, boolean recovered)
|
|
|
+ throws IOException {
|
|
|
JobInfo jobInfo = null;
|
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ if (ugi == null) {
|
|
|
+ ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ }
|
|
|
synchronized (this) {
|
|
|
if (jobs.containsKey(jobId)) {
|
|
|
// job already running, don't start twice
|
|
@@ -3970,10 +3666,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
} catch (IOException ioe) {
|
|
|
throw ioe;
|
|
|
}
|
|
|
- boolean recovered = true; // TODO: Once the Job recovery code is there,
|
|
|
- // (MAPREDUCE-873) we
|
|
|
- // must pass the "recovered" flag accurately.
|
|
|
- // This is handled in the trunk/0.22
|
|
|
+
|
|
|
if (!recovered) {
|
|
|
// Store the information in a file so that the job can be recovered
|
|
|
// later (if at all)
|
|
@@ -4002,7 +3695,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
failJob(job);
|
|
|
throw ioe;
|
|
|
}
|
|
|
-
|
|
|
return status;
|
|
|
}
|
|
|
}
|
|
@@ -4656,6 +4348,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
|
|
|
}
|
|
|
|
|
|
+ //Get the job token file in system directory
|
|
|
+ Path getTokenFileForJob(JobID id) {
|
|
|
+ return new Path(getSystemDirectoryForJob(id)+"/" + JOB_TOKEN_FILE);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Change the run-time priority of the given job.
|
|
|
*
|