|
@@ -39,6 +39,7 @@ import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.Vector;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -609,6 +610,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
Path systemDir = null;
|
|
|
private JobConf conf;
|
|
|
|
|
|
+ private Thread taskCommitThread;
|
|
|
+
|
|
|
/**
|
|
|
* Start the JobTracker process, listen on the indicated port
|
|
|
*/
|
|
@@ -663,15 +666,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
myMetrics = new JobTrackerMetrics(this, jobConf);
|
|
|
|
|
|
- this.expireTrackersThread = new Thread(this.expireTrackers,
|
|
|
- "expireTrackers");
|
|
|
- this.expireTrackersThread.start();
|
|
|
- this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
|
|
|
- this.retireJobsThread.start();
|
|
|
- this.initJobsThread = new Thread(this.initJobs, "initJobs");
|
|
|
- this.initJobsThread.start();
|
|
|
- expireLaunchingTaskThread.start();
|
|
|
-
|
|
|
// The rpc/web-server ports can be ephemeral ports...
|
|
|
// ... ensure we have the correct info
|
|
|
this.port = interTrackerServer.getListenerAddress().getPort();
|
|
@@ -726,6 +720,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* Run forever
|
|
|
*/
|
|
|
public void offerService() throws InterruptedException {
|
|
|
+ this.expireTrackersThread = new Thread(this.expireTrackers,
|
|
|
+ "expireTrackers");
|
|
|
+ this.expireTrackersThread.start();
|
|
|
+ this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
|
|
|
+ this.retireJobsThread.start();
|
|
|
+ this.initJobsThread = new Thread(this.initJobs, "initJobs");
|
|
|
+ this.initJobsThread.start();
|
|
|
+ expireLaunchingTaskThread.start();
|
|
|
+ this.taskCommitThread = new TaskCommitQueue();
|
|
|
+ this.taskCommitThread.start();
|
|
|
+
|
|
|
this.interTrackerServer.join();
|
|
|
LOG.info("Stopped interTrackerServer");
|
|
|
}
|
|
@@ -781,6 +786,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
+ if (this.taskCommitThread != null) {
|
|
|
+ LOG.info("Stopping TaskCommit thread");
|
|
|
+ this.taskCommitThread.interrupt();
|
|
|
+ try {
|
|
|
+ this.taskCommitThread.interrupt();
|
|
|
+ this.taskCommitThread.join();
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
LOG.info("stopped all jobtracker services");
|
|
|
return;
|
|
|
}
|
|
@@ -853,7 +868,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
void markCompletedJob(JobInProgress job) {
|
|
|
for (TaskInProgress tip : job.getMapTasks()) {
|
|
|
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
|
|
|
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
|
|
|
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
|
|
|
taskStatus.getTaskId());
|
|
|
}
|
|
@@ -861,7 +877,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
for (TaskInProgress tip : job.getReduceTasks()) {
|
|
|
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
|
|
|
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
|
|
|
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
|
|
|
taskStatus.getTaskId());
|
|
|
}
|
|
@@ -1837,6 +1854,146 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
|
|
|
+ ((TaskCommitQueue)taskCommitThread).addToQueue(j);
|
|
|
+ }
|
|
|
+ //This thread takes care of things like moving outputs to their final
|
|
|
+ //locations & deleting temporary outputs
|
|
|
+ private class TaskCommitQueue extends Thread {
|
|
|
+
|
|
|
+ private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue =
|
|
|
+ new LinkedBlockingQueue <JobInProgress.JobWithTaskContext>();
|
|
|
+
|
|
|
+ public TaskCommitQueue() {
|
|
|
+ setName("Task Commit Thread");
|
|
|
+ setDaemon(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void addToQueue(JobInProgress.JobWithTaskContext j) {
|
|
|
+ while (!queue.add(j)) {
|
|
|
+ LOG.warn("Couldn't add to the Task Commit queue now. Will " +
|
|
|
+ "try again");
|
|
|
+ try {
|
|
|
+ Thread.sleep(2000);
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ while (!isInterrupted()) {
|
|
|
+ JobInProgress.JobWithTaskContext j;
|
|
|
+ try {
|
|
|
+ j = queue.take();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ JobInProgress job = j.getJob();
|
|
|
+ TaskInProgress tip = j.getTIP();
|
|
|
+ String taskid = j.getTaskId();
|
|
|
+ JobTrackerMetrics metrics = j.getJobTrackerMetrics();
|
|
|
+ Task t;
|
|
|
+ TaskStatus status;
|
|
|
+ boolean isTipComplete = false;
|
|
|
+ TaskStatus.State state;
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ synchronized (job) {
|
|
|
+ synchronized (tip) {
|
|
|
+ status = tip.getTaskStatus(taskid);
|
|
|
+ t = tip.getTaskObject(taskid);
|
|
|
+ state = status.getRunState();
|
|
|
+ isTipComplete = tip.isComplete();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ //For COMMIT_PENDING tasks, we save the task output in the dfs
|
|
|
+ //as well as manipulate the JT datastructures to reflect a
|
|
|
+ //successful task. This guarantees that we don't declare a task
|
|
|
+ //as having succeeded until we have successfully completed the
|
|
|
+ //dfs operations.
|
|
|
+ //For failed tasks, we just do the dfs operations here. The
|
|
|
+ //datastructures updates is done earlier as soon as the failure
|
|
|
+ //is detected so that the JT can immediately schedule another
|
|
|
+ //attempt for that task.
|
|
|
+ if (state == TaskStatus.State.COMMIT_PENDING) {
|
|
|
+ if (!isTipComplete) {
|
|
|
+ t.saveTaskOutput();
|
|
|
+ }
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ //do a check for the case where after the task went to
|
|
|
+ //COMMIT_PENDING, it was lost. So although we would have
|
|
|
+ //saved the task output, we cannot declare it a SUCCESS.
|
|
|
+ TaskStatus newStatus = null;
|
|
|
+ synchronized (job) {
|
|
|
+ synchronized (tip) {
|
|
|
+ status = tip.getTaskStatus(taskid);
|
|
|
+ if (!isTipComplete) {
|
|
|
+ if (status.getRunState() !=
|
|
|
+ TaskStatus.State.COMMIT_PENDING) {
|
|
|
+ state = TaskStatus.State.KILLED;
|
|
|
+ } else {
|
|
|
+ state = TaskStatus.State.SUCCEEDED;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
|
|
|
+ "TIP");
|
|
|
+ state = TaskStatus.State.KILLED;
|
|
|
+
|
|
|
+ }
|
|
|
+ //create new status if required. If the state changed from
|
|
|
+ //COMMIT_PENDING to KILLED in the JobTracker, while we were
|
|
|
+ //saving the output,the JT would have called updateTaskStatus
|
|
|
+ //and we don't need to call it again
|
|
|
+ if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
|
|
|
+ newStatus = TaskStatus.createTaskStatus(
|
|
|
+ tip.isMapTask(),
|
|
|
+ taskid,
|
|
|
+ state == TaskStatus.State.SUCCEEDED ? 1.0f : 0.0f,
|
|
|
+ state,
|
|
|
+ status.getDiagnosticInfo(),
|
|
|
+ status.getStateString(),
|
|
|
+ status.getTaskTracker(), status.getPhase(),
|
|
|
+ status.getCounters());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (newStatus != null) {
|
|
|
+ job.updateTaskStatus(tip, newStatus, metrics);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ // Oops! Failed to copy the task's output to its final place;
|
|
|
+ // fail the task!
|
|
|
+ state = TaskStatus.State.FAILED;
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ job.failedTask(tip, status.getTaskId(),
|
|
|
+ "Failed to rename output with the exception: " +
|
|
|
+ StringUtils.stringifyException(ioe),
|
|
|
+ (tip.isMapTask() ?
|
|
|
+ TaskStatus.Phase.MAP :
|
|
|
+ TaskStatus.Phase.REDUCE),
|
|
|
+ TaskStatus.State.FAILED,
|
|
|
+ status.getTaskTracker(), null);
|
|
|
+ }
|
|
|
+ LOG.info("Failed to rename the output of " + status.getTaskId() +
|
|
|
+ " with: " + StringUtils.stringifyException(ioe));
|
|
|
+ }
|
|
|
+ if (state == TaskStatus.State.FAILED ||
|
|
|
+ state == TaskStatus.State.KILLED) {
|
|
|
+ try {
|
|
|
+ t.discardTaskOutput();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.info("Failed to discard the output of task " +
|
|
|
+ status.getTaskId() + " with: " +
|
|
|
+ StringUtils.stringifyException(ioe));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Get the localized job file path on the job trackers local file system
|
|
|
* @param jobId id of the job
|