|
@@ -31,7 +31,6 @@ import java.util.Set;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.Vector;
|
|
import java.util.Vector;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
-
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -84,7 +83,6 @@ class JobInProgress {
|
|
int failedReduceTIPs = 0;
|
|
int failedReduceTIPs = 0;
|
|
private volatile boolean launchedCleanup = false;
|
|
private volatile boolean launchedCleanup = false;
|
|
private volatile boolean jobKilled = false;
|
|
private volatile boolean jobKilled = false;
|
|
- private volatile boolean jobFailed = false;
|
|
|
|
|
|
|
|
JobPriority priority = JobPriority.NORMAL;
|
|
JobPriority priority = JobPriority.NORMAL;
|
|
JobTracker jobtracker = null;
|
|
JobTracker jobtracker = null;
|
|
@@ -870,7 +868,7 @@ class JobInProgress {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
// check if job has failed or killed
|
|
// check if job has failed or killed
|
|
- if (jobKilled || jobFailed) {
|
|
|
|
|
|
+ if (jobKilled) {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
// Check if all maps and reducers have finished.
|
|
// Check if all maps and reducers have finished.
|
|
@@ -1699,13 +1697,10 @@ class JobInProgress {
|
|
}
|
|
}
|
|
//
|
|
//
|
|
// The Job is done
|
|
// The Job is done
|
|
- // if the job is failed, then mark the job failed.
|
|
|
|
- if (jobFailed) {
|
|
|
|
- terminateJob(JobStatus.FAILED);
|
|
|
|
- }
|
|
|
|
- // if the job is killed, then mark the job killed.
|
|
|
|
|
|
+ //
|
|
|
|
+ // if the job is killed, then mark the job failed.
|
|
if (jobKilled) {
|
|
if (jobKilled) {
|
|
- terminateJob(JobStatus.KILLED);
|
|
|
|
|
|
+ killJob();
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
jobComplete(metrics);
|
|
jobComplete(metrics);
|
|
@@ -1747,31 +1742,24 @@ class JobInProgress {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void terminateJob(int jobState) {
|
|
|
|
- if ((status.getRunState() == JobStatus.RUNNING)
|
|
|
|
- || (status.getRunState() == JobStatus.PREP)) {
|
|
|
|
|
|
+ private synchronized void killJob() {
|
|
|
|
+ if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
|
|
+ (status.getRunState() == JobStatus.PREP)) {
|
|
|
|
+ this.status = new JobStatus(status.getJobID(),
|
|
|
|
+ 1.0f, 1.0f, 1.0f, JobStatus.FAILED);
|
|
this.finishTime = System.currentTimeMillis();
|
|
this.finishTime = System.currentTimeMillis();
|
|
- if (jobState == JobStatus.FAILED) {
|
|
|
|
- this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f,
|
|
|
|
- JobStatus.FAILED);
|
|
|
|
- JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
|
|
|
|
- this.finishedMapTasks, this.finishedReduceTasks);
|
|
|
|
- } else if (jobState == JobStatus.KILLED) {
|
|
|
|
- this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f,
|
|
|
|
- JobStatus.KILLED);
|
|
|
|
- JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime,
|
|
|
|
- this.finishedMapTasks, this.finishedReduceTasks);
|
|
|
|
- }
|
|
|
|
|
|
+ JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
|
|
|
|
+ this.finishedMapTasks, this.finishedReduceTasks);
|
|
garbageCollect();
|
|
garbageCollect();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Kill the job and all its component tasks.
|
|
* Kill the job and all its component tasks.
|
|
*/
|
|
*/
|
|
- private synchronized void terminate(int jobState) {
|
|
|
|
- if ((status.getRunState() == JobStatus.RUNNING)
|
|
|
|
- || (status.getRunState() == JobStatus.PREP)) {
|
|
|
|
|
|
+ public synchronized void kill() {
|
|
|
|
+ if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
|
|
+ (status.getRunState() == JobStatus.PREP)) {
|
|
LOG.info("Killing job '" + this.status.getJobID() + "'");
|
|
LOG.info("Killing job '" + this.status.getJobID() + "'");
|
|
this.runningMapTasks = 0;
|
|
this.runningMapTasks = 0;
|
|
this.runningReduceTasks = 0;
|
|
this.runningReduceTasks = 0;
|
|
@@ -1784,28 +1772,10 @@ class JobInProgress {
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
reduces[i].kill();
|
|
reduces[i].kill();
|
|
}
|
|
}
|
|
- if (jobState == JobStatus.FAILED) {
|
|
|
|
- jobFailed = true;
|
|
|
|
- } else if (jobState == JobStatus.KILLED) {
|
|
|
|
- jobKilled = true;
|
|
|
|
- }
|
|
|
|
|
|
+ jobKilled = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Kill the job and all its component tasks.
|
|
|
|
- */
|
|
|
|
- public synchronized void kill() {
|
|
|
|
- terminate(JobStatus.KILLED);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Fails the job and all its component tasks.
|
|
|
|
- */
|
|
|
|
- synchronized void fail() {
|
|
|
|
- terminate(JobStatus.FAILED);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* A task assigned to this JobInProgress has reported in as failed.
|
|
* A task assigned to this JobInProgress has reported in as failed.
|
|
* Most of the time, we'll just reschedule execution. However, after
|
|
* Most of the time, we'll just reschedule execution. However, after
|
|
@@ -1954,9 +1924,9 @@ class JobInProgress {
|
|
} else {
|
|
} else {
|
|
cleanup[0].kill();
|
|
cleanup[0].kill();
|
|
}
|
|
}
|
|
- terminateJob(JobStatus.FAILED);
|
|
|
|
|
|
+ killJob();
|
|
} else {
|
|
} else {
|
|
- terminate(JobStatus.FAILED);
|
|
|
|
|
|
+ kill();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|