|
@@ -31,6 +31,7 @@ import java.util.Set;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.Vector;
|
|
import java.util.Vector;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
+
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -83,6 +84,7 @@ class JobInProgress {
|
|
int failedReduceTIPs = 0;
|
|
int failedReduceTIPs = 0;
|
|
private volatile boolean launchedCleanup = false;
|
|
private volatile boolean launchedCleanup = false;
|
|
private volatile boolean jobKilled = false;
|
|
private volatile boolean jobKilled = false;
|
|
|
|
+ private volatile boolean jobFailed = false;
|
|
|
|
|
|
JobPriority priority = JobPriority.NORMAL;
|
|
JobPriority priority = JobPriority.NORMAL;
|
|
JobTracker jobtracker = null;
|
|
JobTracker jobtracker = null;
|
|
@@ -873,7 +875,7 @@ class JobInProgress {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
// check if job has failed or killed
|
|
// check if job has failed or killed
|
|
- if (jobKilled) {
|
|
|
|
|
|
+ if (jobKilled || jobFailed) {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
// Check if all maps and reducers have finished.
|
|
// Check if all maps and reducers have finished.
|
|
@@ -1702,10 +1704,13 @@ class JobInProgress {
|
|
}
|
|
}
|
|
//
|
|
//
|
|
// The Job is done
|
|
// The Job is done
|
|
- //
|
|
|
|
- // if the job is killed, then mark the job failed.
|
|
|
|
|
|
+ // if the job is failed, then mark the job failed.
|
|
|
|
+ if (jobFailed) {
|
|
|
|
+ terminateJob(JobStatus.FAILED);
|
|
|
|
+ }
|
|
|
|
+ // if the job is killed, then mark the job killed.
|
|
if (jobKilled) {
|
|
if (jobKilled) {
|
|
- killJob();
|
|
|
|
|
|
+ terminateJob(JobStatus.KILLED);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
jobComplete(metrics);
|
|
jobComplete(metrics);
|
|
@@ -1747,23 +1752,35 @@ class JobInProgress {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void killJob() {
|
|
|
|
|
|
+ private synchronized void terminateJob(int jobTerminationState) {
|
|
if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
(status.getRunState() == JobStatus.PREP)) {
|
|
(status.getRunState() == JobStatus.PREP)) {
|
|
- this.status = new JobStatus(status.getJobID(),
|
|
|
|
- 1.0f, 1.0f, 1.0f, JobStatus.FAILED,
|
|
|
|
- status.getJobPriority());
|
|
|
|
- this.finishTime = System.currentTimeMillis();
|
|
|
|
- JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
|
|
|
|
- this.finishedMapTasks, this.finishedReduceTasks);
|
|
|
|
|
|
+ if (jobTerminationState == JobStatus.FAILED) {
|
|
|
|
+ this.status = new JobStatus(status.getJobID(),
|
|
|
|
+ 1.0f, 1.0f, 1.0f, JobStatus.FAILED,
|
|
|
|
+ status.getJobPriority());
|
|
|
|
+ this.finishTime = System.currentTimeMillis();
|
|
|
|
+ JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
|
|
|
|
+ this.finishedMapTasks,
|
|
|
|
+ this.finishedReduceTasks);
|
|
|
|
+ } else {
|
|
|
|
+ this.status = new JobStatus(status.getJobID(),
|
|
|
|
+ 1.0f, 1.0f, 1.0f, JobStatus.KILLED,
|
|
|
|
+ status.getJobPriority());
|
|
|
|
+ this.finishTime = System.currentTimeMillis();
|
|
|
|
+ JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime,
|
|
|
|
+ this.finishedMapTasks,
|
|
|
|
+ this.finishedReduceTasks);
|
|
|
|
+ }
|
|
garbageCollect();
|
|
garbageCollect();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Kill the job and all its component tasks.
|
|
|
|
|
|
+ * Terminate the job and all its component tasks.
|
|
|
|
+ * @param jobTerminationState job termination state
|
|
*/
|
|
*/
|
|
- public synchronized void kill() {
|
|
|
|
|
|
+ private synchronized void terminate(int jobTerminationState) {
|
|
if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
(status.getRunState() == JobStatus.PREP)) {
|
|
(status.getRunState() == JobStatus.PREP)) {
|
|
LOG.info("Killing job '" + this.status.getJobID() + "'");
|
|
LOG.info("Killing job '" + this.status.getJobID() + "'");
|
|
@@ -1778,10 +1795,28 @@ class JobInProgress {
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
reduces[i].kill();
|
|
reduces[i].kill();
|
|
}
|
|
}
|
|
- jobKilled = true;
|
|
|
|
|
|
+ if (jobTerminationState == JobStatus.FAILED) {
|
|
|
|
+ jobFailed = true;
|
|
|
|
+ } else if (jobTerminationState == JobStatus.KILLED) {
|
|
|
|
+ jobKilled = true;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Kill the job and all its component tasks.
|
|
|
|
+ */
|
|
|
|
+ public synchronized void kill() {
|
|
|
|
+ terminate(JobStatus.KILLED);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Fails the job and all its component tasks.
|
|
|
|
+ */
|
|
|
|
+ synchronized void fail() {
|
|
|
|
+ terminate(JobStatus.FAILED);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* A task assigned to this JobInProgress has reported in as failed.
|
|
* A task assigned to this JobInProgress has reported in as failed.
|
|
* Most of the time, we'll just reschedule execution. However, after
|
|
* Most of the time, we'll just reschedule execution. However, after
|
|
@@ -1930,9 +1965,9 @@ class JobInProgress {
|
|
} else {
|
|
} else {
|
|
cleanup[0].kill();
|
|
cleanup[0].kill();
|
|
}
|
|
}
|
|
- killJob();
|
|
|
|
|
|
+ terminateJob(JobStatus.FAILED);
|
|
} else {
|
|
} else {
|
|
- kill();
|
|
|
|
|
|
+ fail();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|