|
@@ -258,14 +258,14 @@ class JobInProgress {
|
|
|
/**
|
|
|
* Create an almost empty JobInProgress, which can be used only for tests
|
|
|
*/
|
|
|
- protected JobInProgress(JobID jobid, JobConf conf) {
|
|
|
+ protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
|
|
|
this.conf = conf;
|
|
|
this.jobId = jobid;
|
|
|
this.numMapTasks = conf.getNumMapTasks();
|
|
|
this.numReduceTasks = conf.getNumReduceTasks();
|
|
|
this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
|
|
|
this.anyCacheLevel = this.maxLevel+1;
|
|
|
- this.jobtracker = null;
|
|
|
+ this.jobtracker = tracker;
|
|
|
this.restartCount = 0;
|
|
|
}
|
|
|
|
|
@@ -292,6 +292,7 @@ class JobInProgress {
|
|
|
+ jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
|
|
|
this.jobtracker = jobtracker;
|
|
|
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
|
|
|
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
|
|
|
this.startTime = System.currentTimeMillis();
|
|
|
status.setStartTime(startTime);
|
|
|
this.localFs = FileSystem.getLocal(default_conf);
|
|
@@ -1413,6 +1414,7 @@ class JobInProgress {
|
|
|
name = Values.CLEANUP.name();
|
|
|
} else if (tip.isMapTask()) {
|
|
|
++runningMapTasks;
|
|
|
+ metrics.addRunningMaps(jobId, 1);
|
|
|
name = Values.MAP.name();
|
|
|
counter = Counter.TOTAL_LAUNCHED_MAPS;
|
|
|
splits = tip.getSplitNodes();
|
|
@@ -1421,6 +1423,7 @@ class JobInProgress {
|
|
|
metrics.launchMap(id);
|
|
|
} else {
|
|
|
++runningReduceTasks;
|
|
|
+ metrics.addRunningReduces(jobId, 1);
|
|
|
name = Values.REDUCE.name();
|
|
|
counter = Counter.TOTAL_LAUNCHED_REDUCES;
|
|
|
if (tip.getActiveTasks().size() > 1)
|
|
@@ -1541,8 +1544,10 @@ class JobInProgress {
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
FallowSlotInfo info = map.get(taskTracker);
|
|
|
+ int reservedSlots = 0;
|
|
|
if (info == null) {
|
|
|
info = new FallowSlotInfo(now, numSlots);
|
|
|
+ reservedSlots = numSlots;
|
|
|
} else {
|
|
|
// Increment metering info if the reservation is changing
|
|
|
if (info.getNumSlots() != numSlots) {
|
|
@@ -1554,11 +1559,18 @@ class JobInProgress {
|
|
|
jobCounters.incrCounter(counter, fallowSlotMillis);
|
|
|
|
|
|
// Update
|
|
|
+ reservedSlots = numSlots - info.getNumSlots();
|
|
|
info.setTimestamp(now);
|
|
|
info.setNumSlots(numSlots);
|
|
|
}
|
|
|
}
|
|
|
map.put(taskTracker, info);
|
|
|
+ if (type == TaskType.MAP) {
|
|
|
+ jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
|
|
@@ -1584,6 +1596,13 @@ class JobInProgress {
|
|
|
jobCounters.incrCounter(counter, fallowSlotMillis);
|
|
|
|
|
|
map.remove(taskTracker);
|
|
|
+ if (type == TaskType.MAP) {
|
|
|
+ jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ jobtracker.getInstrumentation().decReservedReduceSlots(
|
|
|
+ info.getNumSlots());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public int getNumReservedTaskTrackersForMaps() {
|
|
@@ -2265,7 +2284,7 @@ class JobInProgress {
|
|
|
this.status.setSetupProgress(1.0f);
|
|
|
// move the job to running state if the job is in prep state
|
|
|
if (this.status.getRunState() == JobStatus.PREP) {
|
|
|
- this.status.setRunState(JobStatus.RUNNING);
|
|
|
+ changeStateTo(JobStatus.RUNNING);
|
|
|
JobHistory.JobInfo.logStarted(profile.getJobID());
|
|
|
}
|
|
|
} else if (tip.isJobCleanupTask()) {
|
|
@@ -2294,6 +2313,7 @@ class JobInProgress {
|
|
|
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
|
|
|
} else if (tip.isMapTask()) {
|
|
|
runningMapTasks -= 1;
|
|
|
+ metrics.decRunningMaps(jobId, 1);
|
|
|
// check if this was a sepculative task
|
|
|
if (oldNumAttempts > 1) {
|
|
|
speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
|
|
@@ -2307,6 +2327,7 @@ class JobInProgress {
|
|
|
}
|
|
|
} else {
|
|
|
runningReduceTasks -= 1;
|
|
|
+ metrics.decRunningReduces(jobId, 1);
|
|
|
if (oldNumAttempts > 1) {
|
|
|
speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
|
|
|
}
|
|
@@ -2322,6 +2343,31 @@ class JobInProgress {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Job state change must happen thru this call
|
|
|
+ */
|
|
|
+ private void changeStateTo(int newState) {
|
|
|
+ int oldState = this.status.getRunState();
|
|
|
+ if (oldState == newState) {
|
|
|
+ return; //old and new states are same
|
|
|
+ }
|
|
|
+ this.status.setRunState(newState);
|
|
|
+
|
|
|
+ //update the metrics
|
|
|
+ if (oldState == JobStatus.PREP) {
|
|
|
+ this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
|
|
|
+ } else if (oldState == JobStatus.RUNNING) {
|
|
|
+ this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (newState == JobStatus.PREP) {
|
|
|
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
|
|
|
+ } else if (newState == JobStatus.RUNNING) {
|
|
|
+ this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The job is done since all it's component tasks are either
|
|
|
* successful or have failed.
|
|
@@ -2332,7 +2378,7 @@ class JobInProgress {
|
|
|
// All tasks are complete, then the job is done!
|
|
|
//
|
|
|
if (this.status.getRunState() == JobStatus.RUNNING ) {
|
|
|
- this.status.setRunState(JobStatus.SUCCEEDED);
|
|
|
+ changeStateTo(JobStatus.SUCCEEDED);
|
|
|
this.status.setCleanupProgress(1.0f);
|
|
|
if (maps.length == 0) {
|
|
|
this.status.setMapProgress(1.0f);
|
|
@@ -2371,7 +2417,7 @@ class JobInProgress {
|
|
|
this.status.setCleanupProgress(1.0f);
|
|
|
|
|
|
if (jobTerminationState == JobStatus.FAILED) {
|
|
|
- this.status.setRunState(JobStatus.FAILED);
|
|
|
+ changeStateTo(JobStatus.FAILED);
|
|
|
|
|
|
// Log the job summary
|
|
|
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
|
|
@@ -2381,7 +2427,7 @@ class JobInProgress {
|
|
|
this.finishedMapTasks,
|
|
|
this.finishedReduceTasks);
|
|
|
} else {
|
|
|
- this.status.setRunState(JobStatus.KILLED);
|
|
|
+ changeStateTo(JobStatus.KILLED);
|
|
|
|
|
|
// Log the job summary
|
|
|
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
|
|
@@ -2395,6 +2441,13 @@ class JobInProgress {
|
|
|
|
|
|
jobtracker.getInstrumentation().terminateJob(
|
|
|
this.conf, this.status.getJobID());
|
|
|
+ if (jobTerminationState == JobStatus.FAILED) {
|
|
|
+ jobtracker.getInstrumentation().failedJob(
|
|
|
+ this.conf, this.status.getJobID());
|
|
|
+ } else {
|
|
|
+ jobtracker.getInstrumentation().killedJob(
|
|
|
+ this.conf, this.status.getJobID());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2536,6 +2589,7 @@ class JobInProgress {
|
|
|
launchedSetup = false;
|
|
|
} else if (tip.isMapTask()) {
|
|
|
runningMapTasks -= 1;
|
|
|
+ metrics.decRunningMaps(jobId, 1);
|
|
|
metrics.failedMap(taskid);
|
|
|
// remove from the running queue and put it in the non-running cache
|
|
|
// if the tip is not complete i.e if the tip still needs to be run
|
|
@@ -2545,6 +2599,7 @@ class JobInProgress {
|
|
|
}
|
|
|
} else {
|
|
|
runningReduceTasks -= 1;
|
|
|
+ metrics.decRunningReduces(jobId, 1);
|
|
|
metrics.failedReduce(taskid);
|
|
|
// remove from the running queue and put in the failed queue if the tip
|
|
|
// is not complete
|