|
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -106,7 +107,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|
|
/** Implementation of Job interface. Maintains the state machines of Job.
|
|
|
* The read and write calls use ReadWriteLock for concurrency.
|
|
|
*/
|
|
|
-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
|
|
|
+@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
|
EventHandler<JobEvent> {
|
|
|
|
|
@@ -153,6 +154,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
|
private boolean lazyTasksCopyNeeded = false;
|
|
|
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
|
|
|
private Counters jobCounters = new Counters();
|
|
|
+ private Object fullCountersLock = new Object();
|
|
|
+ private Counters fullCounters = null;
|
|
|
+ private Counters finalMapCounters = null;
|
|
|
+ private Counters finalReduceCounters = null;
|
|
|
// FIXME:
|
|
|
//
|
|
|
// Can then replace task-level uber counters (MR-2424) with job-level ones
|
|
@@ -473,11 +478,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
|
|
|
|
@Override
|
|
|
public Counters getAllCounters() {
|
|
|
- Counters counters = new Counters();
|
|
|
+
|
|
|
readLock.lock();
|
|
|
+
|
|
|
try {
|
|
|
+ JobState state = getState();
|
|
|
+ if (state == JobState.ERROR || state == JobState.FAILED
|
|
|
+ || state == JobState.KILLED || state == JobState.SUCCEEDED) {
|
|
|
+ this.mayBeConstructFinalFullCounters();
|
|
|
+ return fullCounters;
|
|
|
+ }
|
|
|
+
|
|
|
+ Counters counters = new Counters();
|
|
|
counters.incrAllCounters(jobCounters);
|
|
|
return incrTaskCounters(counters, tasks.values());
|
|
|
+
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
@@ -525,17 +540,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
|
try {
|
|
|
JobState state = getState();
|
|
|
|
|
|
+ // jobFile can be null if the job is not yet inited.
|
|
|
+ String jobFile =
|
|
|
+ remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
|
|
|
+
|
|
|
if (getState() == JobState.NEW) {
|
|
|
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
|
|
|
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
|
|
|
- cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
|
|
|
+ cleanupProgress, jobFile, amInfos, isUber);
|
|
|
}
|
|
|
|
|
|
computeProgress();
|
|
|
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
|
|
|
appSubmitTime, startTime, finishTime, setupProgress,
|
|
|
this.mapProgress, this.reduceProgress,
|
|
|
- cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
|
|
|
+ cleanupProgress, jobFile, amInfos, isUber);
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
@@ -1143,26 +1162,49 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
|
// not be generated for KilledJobs, etc.
|
|
|
private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
|
|
|
|
|
|
- Counters mapCounters = new Counters();
|
|
|
- Counters reduceCounters = new Counters();
|
|
|
- for (Task t : job.tasks.values()) {
|
|
|
- Counters counters = t.getCounters();
|
|
|
- switch (t.getType()) {
|
|
|
- case MAP: mapCounters.incrAllCounters(counters); break;
|
|
|
- case REDUCE: reduceCounters.incrAllCounters(counters); break;
|
|
|
- }
|
|
|
- }
|
|
|
+ job.mayBeConstructFinalFullCounters();
|
|
|
|
|
|
JobFinishedEvent jfe = new JobFinishedEvent(
|
|
|
job.oldJobId, job.finishTime,
|
|
|
job.succeededMapTaskCount, job.succeededReduceTaskCount,
|
|
|
job.failedMapTaskCount, job.failedReduceTaskCount,
|
|
|
- mapCounters,
|
|
|
- reduceCounters,
|
|
|
- job.getAllCounters());
|
|
|
+ job.finalMapCounters,
|
|
|
+ job.finalReduceCounters,
|
|
|
+ job.fullCounters);
|
|
|
return jfe;
|
|
|
}
|
|
|
|
|
|
+ private void mayBeConstructFinalFullCounters() {
|
|
|
+ // Calculating full-counters. This should happen only once for the job.
|
|
|
+ synchronized (this.fullCountersLock) {
|
|
|
+ if (this.fullCounters != null) {
|
|
|
+ // Already constructed. Just return.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ this.constructFinalFullcounters();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Private
|
|
|
+ public void constructFinalFullcounters() {
|
|
|
+ this.fullCounters = new Counters();
|
|
|
+ this.finalMapCounters = new Counters();
|
|
|
+ this.finalReduceCounters = new Counters();
|
|
|
+ this.fullCounters.incrAllCounters(jobCounters);
|
|
|
+ for (Task t : this.tasks.values()) {
|
|
|
+ Counters counters = t.getCounters();
|
|
|
+ switch (t.getType()) {
|
|
|
+ case MAP:
|
|
|
+ this.finalMapCounters.incrAllCounters(counters);
|
|
|
+ break;
|
|
|
+ case REDUCE:
|
|
|
+ this.finalReduceCounters.incrAllCounters(counters);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ this.fullCounters.incrAllCounters(counters);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Task-start has been moved out of InitTransition, so this arc simply
|
|
|
// hardcodes 0 for both map and reduce finished tasks.
|
|
|
private static class KillNewJobTransition
|