|
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobFinished;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
|
|
@@ -45,14 +46,15 @@ import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailed;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
|
|
|
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
|
|
|
-import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
@@ -67,16 +69,16 @@ public class JobBuilder {
|
|
|
|
|
|
private boolean finalized = false;
|
|
|
|
|
|
- private LoggedJob result = new LoggedJob();
|
|
|
+ private ParsedJob result = new ParsedJob();
|
|
|
|
|
|
- private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
|
|
|
- private Map<String, LoggedTask> reduceTasks =
|
|
|
- new HashMap<String, LoggedTask>();
|
|
|
- private Map<String, LoggedTask> otherTasks =
|
|
|
- new HashMap<String, LoggedTask>();
|
|
|
+ private Map<String, ParsedTask> mapTasks = new HashMap<String, ParsedTask>();
|
|
|
+ private Map<String, ParsedTask> reduceTasks =
|
|
|
+ new HashMap<String, ParsedTask>();
|
|
|
+ private Map<String, ParsedTask> otherTasks =
|
|
|
+ new HashMap<String, ParsedTask>();
|
|
|
|
|
|
- private Map<String, LoggedTaskAttempt> attempts =
|
|
|
- new HashMap<String, LoggedTaskAttempt>();
|
|
|
+ private Map<String, ParsedTaskAttempt> attempts =
|
|
|
+ new HashMap<String, ParsedTaskAttempt>();
|
|
|
|
|
|
private Map<ParsedHost, ParsedHost> allHosts =
|
|
|
new HashMap<ParsedHost, ParsedHost>();
|
|
@@ -123,7 +125,7 @@ public class JobBuilder {
|
|
|
public void process(HistoryEvent event) {
|
|
|
if (finalized) {
|
|
|
throw new IllegalStateException(
|
|
|
- "JobBuilder.process(HistoryEvent event) called after LoggedJob built");
|
|
|
+ "JobBuilder.process(HistoryEvent event) called after ParsedJob built");
|
|
|
}
|
|
|
|
|
|
// these are in lexicographical order by class name.
|
|
@@ -229,12 +231,16 @@ public class JobBuilder {
|
|
|
public void process(Properties conf) {
|
|
|
if (finalized) {
|
|
|
throw new IllegalStateException(
|
|
|
- "JobBuilder.process(Properties conf) called after LoggedJob built");
|
|
|
+ "JobBuilder.process(Properties conf) called after ParsedJob built");
|
|
|
}
|
|
|
|
|
|
//TODO remove this once the deprecate APIs in LoggedJob are removed
|
|
|
- result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
|
|
|
- .getCandidates(), "default"));
|
|
|
+ String queue = extract(conf, JobConfPropertyNames.QUEUE_NAMES
|
|
|
+ .getCandidates(), null);
|
|
|
+ // set the queue name if existing
|
|
|
+ if (queue != null) {
|
|
|
+ result.setQueue(queue);
|
|
|
+ }
|
|
|
result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
|
|
|
.getCandidates(), null));
|
|
|
|
|
@@ -252,9 +258,9 @@ public class JobBuilder {
|
|
|
* Request the builder to build the final object. Once called, the
|
|
|
* {@link JobBuilder} would accept no more events or job-conf properties.
|
|
|
*
|
|
|
- * @return Parsed {@link LoggedJob} object.
|
|
|
+ * @return Parsed {@link ParsedJob} object.
|
|
|
*/
|
|
|
- public LoggedJob build() {
|
|
|
+ public ParsedJob build() {
|
|
|
// The main job here is to build CDFs and manage the conf
|
|
|
finalized = true;
|
|
|
|
|
@@ -416,7 +422,7 @@ public class JobBuilder {
|
|
|
}
|
|
|
|
|
|
private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
|
|
|
- LoggedTask task = getTask(event.getTaskId().toString());
|
|
|
+ ParsedTask task = getTask(event.getTaskId().toString());
|
|
|
if (task == null) {
|
|
|
return;
|
|
|
}
|
|
@@ -424,7 +430,7 @@ public class JobBuilder {
|
|
|
}
|
|
|
|
|
|
private void processTaskStartedEvent(TaskStartedEvent event) {
|
|
|
- LoggedTask task =
|
|
|
+ ParsedTask task =
|
|
|
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
|
|
|
task.setStartTime(event.getStartTime());
|
|
|
task.setPreferredLocations(preferredLocationForSplits(event
|
|
@@ -432,7 +438,7 @@ public class JobBuilder {
|
|
|
}
|
|
|
|
|
|
private void processTaskFinishedEvent(TaskFinishedEvent event) {
|
|
|
- LoggedTask task =
|
|
|
+ ParsedTask task =
|
|
|
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
|
|
|
if (task == null) {
|
|
|
return;
|
|
@@ -443,18 +449,22 @@ public class JobBuilder {
|
|
|
}
|
|
|
|
|
|
private void processTaskFailedEvent(TaskFailedEvent event) {
|
|
|
- LoggedTask task =
|
|
|
+ ParsedTask task =
|
|
|
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
|
|
|
if (task == null) {
|
|
|
return;
|
|
|
}
|
|
|
task.setFinishTime(event.getFinishTime());
|
|
|
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
|
|
|
+ TaskFailed t = (TaskFailed)(event.getDatum());
|
|
|
+ task.putDiagnosticInfo(t.error.toString());
|
|
|
+ task.putFailedDueToAttemptId(t.failedDueToAttempt.toString());
|
|
|
+ // No counters in TaskFailedEvent
|
|
|
}
|
|
|
|
|
|
private void processTaskAttemptUnsuccessfulCompletionEvent(
|
|
|
TaskAttemptUnsuccessfulCompletionEvent event) {
|
|
|
- LoggedTaskAttempt attempt =
|
|
|
+ ParsedTaskAttempt attempt =
|
|
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
|
|
event.getTaskAttemptId().toString());
|
|
|
|
|
@@ -476,20 +486,27 @@ public class JobBuilder {
|
|
|
attempt.arraySetCpuUsages(event.getCpuUsages());
|
|
|
attempt.arraySetVMemKbytes(event.getVMemKbytes());
|
|
|
attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
|
|
|
+ TaskAttemptUnsuccessfulCompletion t =
|
|
|
+ (TaskAttemptUnsuccessfulCompletion) (event.getDatum());
|
|
|
+ attempt.putDiagnosticInfo(t.error.toString());
|
|
|
+ // No counters in TaskAttemptUnsuccessfulCompletionEvent
|
|
|
}
|
|
|
|
|
|
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
|
|
|
- LoggedTaskAttempt attempt =
|
|
|
+ ParsedTaskAttempt attempt =
|
|
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
|
|
event.getTaskAttemptId().toString());
|
|
|
if (attempt == null) {
|
|
|
return;
|
|
|
}
|
|
|
attempt.setStartTime(event.getStartTime());
|
|
|
+ attempt.putTrackerName(event.getTrackerName());
|
|
|
+ attempt.putHttpPort(event.getHttpPort());
|
|
|
+ attempt.putShufflePort(event.getShufflePort());
|
|
|
}
|
|
|
|
|
|
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
|
|
|
- LoggedTaskAttempt attempt =
|
|
|
+ ParsedTaskAttempt attempt =
|
|
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
|
|
event.getAttemptId().toString());
|
|
|
if (attempt == null) {
|
|
@@ -507,7 +524,7 @@ public class JobBuilder {
|
|
|
|
|
|
private void processReduceAttemptFinishedEvent(
|
|
|
ReduceAttemptFinishedEvent event) {
|
|
|
- LoggedTaskAttempt attempt =
|
|
|
+ ParsedTaskAttempt attempt =
|
|
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
|
|
event.getAttemptId().toString());
|
|
|
if (attempt == null) {
|
|
@@ -536,7 +553,7 @@ public class JobBuilder {
|
|
|
}
|
|
|
|
|
|
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
|
|
|
- LoggedTaskAttempt attempt =
|
|
|
+ ParsedTaskAttempt attempt =
|
|
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
|
|
event.getAttemptId().toString());
|
|
|
if (attempt == null) {
|
|
@@ -568,6 +585,7 @@ public class JobBuilder {
|
|
|
result.setOutcome(Pre21JobHistoryConstants.Values
|
|
|
.valueOf(event.getStatus()));
|
|
|
result.setFinishTime(event.getFinishTime());
|
|
|
+ // No counters in JobUnsuccessfulCompletionEvent
|
|
|
}
|
|
|
|
|
|
private void processJobSubmittedEvent(JobSubmittedEvent event) {
|
|
@@ -575,8 +593,14 @@ public class JobBuilder {
|
|
|
result.setJobName(event.getJobName());
|
|
|
result.setUser(event.getUserName());
|
|
|
result.setSubmitTime(event.getSubmitTime());
|
|
|
- // job queue name is set when conf file is processed.
|
|
|
- // See JobBuilder.process(Properties) method for details.
|
|
|
+ result.putJobConfPath(event.getJobConfPath());
|
|
|
+ result.putJobAcls(event.getJobAcls());
|
|
|
+
|
|
|
+ // set the queue name if existing
|
|
|
+ String queue = event.getJobQueueName();
|
|
|
+ if (queue != null) {
|
|
|
+ result.setQueue(queue);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
|
|
@@ -603,10 +627,19 @@ public class JobBuilder {
|
|
|
result.setFinishTime(event.getFinishTime());
|
|
|
result.setJobID(jobID);
|
|
|
result.setOutcome(Values.SUCCESS);
|
|
|
+
|
|
|
+ JobFinished job = (JobFinished)event.getDatum();
|
|
|
+ Map<String, Long> countersMap =
|
|
|
+ JobHistoryUtils.extractCounters(job.totalCounters);
|
|
|
+ result.putTotalCounters(countersMap);
|
|
|
+ countersMap = JobHistoryUtils.extractCounters(job.mapCounters);
|
|
|
+ result.putMapCounters(countersMap);
|
|
|
+ countersMap = JobHistoryUtils.extractCounters(job.reduceCounters);
|
|
|
+ result.putReduceCounters(countersMap);
|
|
|
}
|
|
|
|
|
|
- private LoggedTask getTask(String taskIDname) {
|
|
|
- LoggedTask result = mapTasks.get(taskIDname);
|
|
|
+ private ParsedTask getTask(String taskIDname) {
|
|
|
+ ParsedTask result = mapTasks.get(taskIDname);
|
|
|
|
|
|
if (result != null) {
|
|
|
return result;
|
|
@@ -630,9 +663,9 @@ public class JobBuilder {
|
|
|
* if true, we can create a task.
|
|
|
* @return
|
|
|
*/
|
|
|
- private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
|
|
|
+ private ParsedTask getOrMakeTask(TaskType type, String taskIDname,
|
|
|
boolean allowCreate) {
|
|
|
- Map<String, LoggedTask> taskMap = otherTasks;
|
|
|
+ Map<String, ParsedTask> taskMap = otherTasks;
|
|
|
List<LoggedTask> tasks = this.result.getOtherTasks();
|
|
|
|
|
|
switch (type) {
|
|
@@ -650,10 +683,10 @@ public class JobBuilder {
|
|
|
// no code
|
|
|
}
|
|
|
|
|
|
- LoggedTask result = taskMap.get(taskIDname);
|
|
|
+ ParsedTask result = taskMap.get(taskIDname);
|
|
|
|
|
|
if (result == null && allowCreate) {
|
|
|
- result = new LoggedTask();
|
|
|
+ result = new ParsedTask();
|
|
|
result.setTaskType(getPre21Value(type.toString()));
|
|
|
result.setTaskID(taskIDname);
|
|
|
taskMap.put(taskIDname, result);
|
|
@@ -663,13 +696,13 @@ public class JobBuilder {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
|
|
|
+ private ParsedTaskAttempt getOrMakeTaskAttempt(TaskType type,
|
|
|
String taskIDName, String taskAttemptName) {
|
|
|
- LoggedTask task = getOrMakeTask(type, taskIDName, false);
|
|
|
- LoggedTaskAttempt result = attempts.get(taskAttemptName);
|
|
|
+ ParsedTask task = getOrMakeTask(type, taskIDName, false);
|
|
|
+ ParsedTaskAttempt result = attempts.get(taskAttemptName);
|
|
|
|
|
|
if (result == null && task != null) {
|
|
|
- result = new LoggedTaskAttempt();
|
|
|
+ result = new ParsedTaskAttempt();
|
|
|
result.setAttemptID(taskAttemptName);
|
|
|
attempts.put(taskAttemptName, result);
|
|
|
task.getAttempts().add(result);
|