|
@@ -732,7 +732,7 @@ public class JobInProgress {
|
|
|
if (numMapTasks > 0) {
|
|
|
nonRunningMapCache = createCache(splits, maxLevel);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// set the launch time
|
|
|
this.launchTime = jobtracker.getClock().getTime();
|
|
|
|
|
@@ -789,12 +789,15 @@ public class JobInProgress {
|
|
|
|
|
|
synchronized(jobInitKillStatus){
|
|
|
jobInitKillStatus.initDone = true;
|
|
|
+
|
|
|
+ // set this before the throw to make sure cleanup works properly
|
|
|
+ tasksInited = true;
|
|
|
+
|
|
|
if(jobInitKillStatus.killed) {
|
|
|
throw new KillInterruptedException("Job " + jobId + " killed in init");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- tasksInited = true;
|
|
|
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
|
|
|
numMapTasks, numReduceTasks);
|
|
|
|
|
@@ -3204,11 +3207,16 @@ public class JobInProgress {
|
|
|
// Cancel task tracker reservation
|
|
|
cancelReservedSlots();
|
|
|
|
|
|
+ // Waiting metrics are incremented in JobInProgress.initTasks()
|
|
|
+ // If a job gets an exception before that, we do not want to
|
|
|
+ // incorrectly decrement.
|
|
|
+ if (tasksInited) {
|
|
|
+ jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
|
|
|
+ jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
|
|
|
+ this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
|
|
|
+ this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
|
|
|
+ }
|
|
|
// Let the JobTracker know that a job is complete
|
|
|
- jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
|
|
|
- jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
|
|
|
- this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
|
|
|
- this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
|
|
|
jobtracker.storeCompletedJob(this);
|
|
|
jobtracker.finalizeJob(this);
|
|
|
|