|
@@ -790,7 +790,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
LOG.info("Stopping TaskCommit thread");
|
|
LOG.info("Stopping TaskCommit thread");
|
|
this.taskCommitThread.interrupt();
|
|
this.taskCommitThread.interrupt();
|
|
try {
|
|
try {
|
|
- this.taskCommitThread.interrupt();
|
|
|
|
this.taskCommitThread.join();
|
|
this.taskCommitThread.join();
|
|
} catch (InterruptedException ex) {
|
|
} catch (InterruptedException ex) {
|
|
ex.printStackTrace();
|
|
ex.printStackTrace();
|
|
@@ -1871,11 +1870,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Add a job's completed task (either successful or failed/killed) to the
|
|
|
|
+ * {@link TaskCommitQueue}.
|
|
|
|
+ * @param j completed task (either successful or failed/killed)
|
|
|
|
+ */
|
|
|
|
+ void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
|
|
((TaskCommitQueue)taskCommitThread).addToQueue(j);
|
|
((TaskCommitQueue)taskCommitThread).addToQueue(j);
|
|
}
|
|
}
|
|
- //This thread takes care of things like moving outputs to their final
|
|
|
|
- //locations & deleting temporary outputs
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A thread which does all of the {@link FileSystem}-related operations for
|
|
|
|
+ * tasks. It picks the next task in the queue, promotes outputs of
|
|
|
|
+ * {@link TaskStatus.State#SUCCEEDED} tasks & discards outputs for
|
|
|
|
+ * {@link TaskStatus.State#FAILED} or {@link TaskStatus.State#KILLED} tasks.
|
|
|
|
+ */
|
|
private class TaskCommitQueue extends Thread {
|
|
private class TaskCommitQueue extends Thread {
|
|
|
|
|
|
private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue =
|
|
private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue =
|
|
@@ -1898,109 +1907,116 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
public void run() {
|
|
public void run() {
|
|
while (!isInterrupted()) {
|
|
while (!isInterrupted()) {
|
|
- JobInProgress.JobWithTaskContext j;
|
|
|
|
try {
|
|
try {
|
|
- j = queue.take();
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- JobInProgress job = j.getJob();
|
|
|
|
- TaskInProgress tip = j.getTIP();
|
|
|
|
- String taskid = j.getTaskId();
|
|
|
|
- JobTrackerMetrics metrics = j.getJobTrackerMetrics();
|
|
|
|
- Task t;
|
|
|
|
- TaskStatus status;
|
|
|
|
- boolean isTipComplete = false;
|
|
|
|
- TaskStatus.State state;
|
|
|
|
- synchronized (JobTracker.this) {
|
|
|
|
- synchronized (job) {
|
|
|
|
- synchronized (tip) {
|
|
|
|
- status = tip.getTaskStatus(taskid);
|
|
|
|
- t = tip.getTaskObject(taskid);
|
|
|
|
- state = status.getRunState();
|
|
|
|
- isTipComplete = tip.isComplete();
|
|
|
|
|
|
+ JobInProgress.JobWithTaskContext j = queue.take();
|
|
|
|
+ JobInProgress job = j.getJob();
|
|
|
|
+ TaskInProgress tip = j.getTIP();
|
|
|
|
+ String taskid = j.getTaskId();
|
|
|
|
+ JobTrackerMetrics metrics = j.getJobTrackerMetrics();
|
|
|
|
+ Task t;
|
|
|
|
+ TaskStatus status;
|
|
|
|
+ boolean isTipComplete = false;
|
|
|
|
+ TaskStatus.State state;
|
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
|
+ synchronized (job) {
|
|
|
|
+ synchronized (tip) {
|
|
|
|
+ status = tip.getTaskStatus(taskid);
|
|
|
|
+ t = tip.getTaskObject(taskid);
|
|
|
|
+ state = status.getRunState();
|
|
|
|
+ isTipComplete = tip.isComplete();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
- try {
|
|
|
|
- //For COMMIT_PENDING tasks, we save the task output in the dfs
|
|
|
|
- //as well as manipulate the JT datastructures to reflect a
|
|
|
|
- //successful task. This guarantees that we don't declare a task
|
|
|
|
- //as having succeeded until we have successfully completed the
|
|
|
|
- //dfs operations.
|
|
|
|
- //For failed tasks, we just do the dfs operations here. The
|
|
|
|
- //datastructures updates is done earlier as soon as the failure
|
|
|
|
- //is detected so that the JT can immediately schedule another
|
|
|
|
- //attempt for that task.
|
|
|
|
- if (state == TaskStatus.State.COMMIT_PENDING) {
|
|
|
|
- if (!isTipComplete) {
|
|
|
|
- t.saveTaskOutput();
|
|
|
|
- }
|
|
|
|
- synchronized (JobTracker.this) {
|
|
|
|
- //do a check for the case where after the task went to
|
|
|
|
- //COMMIT_PENDING, it was lost. So although we would have
|
|
|
|
- //saved the task output, we cannot declare it a SUCCESS.
|
|
|
|
- TaskStatus newStatus = null;
|
|
|
|
- synchronized (job) {
|
|
|
|
- synchronized (tip) {
|
|
|
|
- status = tip.getTaskStatus(taskid);
|
|
|
|
- if (!isTipComplete) {
|
|
|
|
- if (status.getRunState() !=
|
|
|
|
- TaskStatus.State.COMMIT_PENDING) {
|
|
|
|
- state = TaskStatus.State.KILLED;
|
|
|
|
|
|
+ try {
|
|
|
|
+ //For COMMIT_PENDING tasks, we save the task output in the dfs
|
|
|
|
+ //as well as manipulate the JT datastructures to reflect a
|
|
|
|
+ //successful task. This guarantees that we don't declare a task
|
|
|
|
+ //as having succeeded until we have successfully completed the
|
|
|
|
+ //dfs operations.
|
|
|
|
+ //For failed tasks, we just do the dfs operations here. The
|
|
|
|
+ //datastructures updates is done earlier as soon as the failure
|
|
|
|
+ //is detected so that the JT can immediately schedule another
|
|
|
|
+ //attempt for that task.
|
|
|
|
+ if (state == TaskStatus.State.COMMIT_PENDING) {
|
|
|
|
+ if (!isTipComplete) {
|
|
|
|
+ t.saveTaskOutput();
|
|
|
|
+ }
|
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
|
+ //do a check for the case where after the task went to
|
|
|
|
+ //COMMIT_PENDING, it was lost. So although we would have
|
|
|
|
+ //saved the task output, we cannot declare it a SUCCESS.
|
|
|
|
+ TaskStatus newStatus = null;
|
|
|
|
+ synchronized (job) {
|
|
|
|
+ synchronized (tip) {
|
|
|
|
+ status = tip.getTaskStatus(taskid);
|
|
|
|
+ if (!isTipComplete) {
|
|
|
|
+ if (status.getRunState() !=
|
|
|
|
+ TaskStatus.State.COMMIT_PENDING) {
|
|
|
|
+ state = TaskStatus.State.KILLED;
|
|
|
|
+ } else {
|
|
|
|
+ state = TaskStatus.State.SUCCEEDED;
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
- state = TaskStatus.State.SUCCEEDED;
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
|
|
|
|
- "TIP");
|
|
|
|
- state = TaskStatus.State.KILLED;
|
|
|
|
|
|
+ tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
|
|
|
|
+ "TIP");
|
|
|
|
+ state = TaskStatus.State.KILLED;
|
|
|
|
|
|
|
|
+ }
|
|
|
|
+ //create new status if required. If the state changed from
|
|
|
|
+ //COMMIT_PENDING to KILLED in the JobTracker, while we were
|
|
|
|
+ //saving the output,the JT would have called updateTaskStatus
|
|
|
|
+ //and we don't need to call it again
|
|
|
|
+ if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
|
|
|
|
+ newStatus = (TaskStatus)status.clone();
|
|
|
|
+ newStatus.setRunState(state);
|
|
|
|
+ newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- //create new status if required. If the state changed from
|
|
|
|
- //COMMIT_PENDING to KILLED in the JobTracker, while we were
|
|
|
|
- //saving the output,the JT would have called updateTaskStatus
|
|
|
|
- //and we don't need to call it again
|
|
|
|
- if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
|
|
|
|
- newStatus = (TaskStatus)status.clone();
|
|
|
|
- newStatus.setRunState(state);
|
|
|
|
- newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
|
|
|
|
|
|
+ if (newStatus != null) {
|
|
|
|
+ job.updateTaskStatus(tip, newStatus, metrics);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if (newStatus != null) {
|
|
|
|
- job.updateTaskStatus(tip, newStatus, metrics);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // Oops! Failed to copy the task's output to its final place;
|
|
|
|
+ // fail the task!
|
|
|
|
+ state = TaskStatus.State.FAILED;
|
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
|
+ job.failedTask(tip, status.getTaskId(),
|
|
|
|
+ "Failed to rename output with the exception: " +
|
|
|
|
+ StringUtils.stringifyException(ioe),
|
|
|
|
+ (tip.isMapTask() ?
|
|
|
|
+ TaskStatus.Phase.MAP :
|
|
|
|
+ TaskStatus.Phase.REDUCE),
|
|
|
|
+ TaskStatus.State.FAILED,
|
|
|
|
+ status.getTaskTracker(), null);
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Failed to rename the output of " + status.getTaskId() +
|
|
|
|
+ " with: " + StringUtils.stringifyException(ioe));
|
|
}
|
|
}
|
|
- } catch (IOException ioe) {
|
|
|
|
- // Oops! Failed to copy the task's output to its final place;
|
|
|
|
- // fail the task!
|
|
|
|
- state = TaskStatus.State.FAILED;
|
|
|
|
- synchronized (JobTracker.this) {
|
|
|
|
- job.failedTask(tip, status.getTaskId(),
|
|
|
|
- "Failed to rename output with the exception: " +
|
|
|
|
- StringUtils.stringifyException(ioe),
|
|
|
|
- (tip.isMapTask() ?
|
|
|
|
- TaskStatus.Phase.MAP :
|
|
|
|
- TaskStatus.Phase.REDUCE),
|
|
|
|
- TaskStatus.State.FAILED,
|
|
|
|
- status.getTaskTracker(), null);
|
|
|
|
|
|
+ if (state == TaskStatus.State.FAILED ||
|
|
|
|
+ state == TaskStatus.State.KILLED) {
|
|
|
|
+ try {
|
|
|
|
+ t.discardTaskOutput();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.info("Failed to discard the output of task " +
|
|
|
|
+ status.getTaskId() + " with: " +
|
|
|
|
+ StringUtils.stringifyException(ioe));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- LOG.info("Failed to rename the output of " + status.getTaskId() +
|
|
|
|
- " with: " + StringUtils.stringifyException(ioe));
|
|
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.warn(getName() + " exiting, got interrupted: " +
|
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
- if (state == TaskStatus.State.FAILED ||
|
|
|
|
- state == TaskStatus.State.KILLED) {
|
|
|
|
- try {
|
|
|
|
- t.discardTaskOutput();
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- LOG.info("Failed to discard the output of task " +
|
|
|
|
- status.getTaskId() + " with: " +
|
|
|
|
- StringUtils.stringifyException(ioe));
|
|
|
|
- }
|
|
|
|
|
|
+ catch (Throwable t) {
|
|
|
|
+ LOG.error(getName() + " got an exception: " +
|
|
|
|
+ StringUtils.stringifyException(t));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ LOG.warn(getName() + " exiting...");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|