|
@@ -22,6 +22,7 @@ import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.LinkedHashMap;
|
|
import java.util.LinkedHashMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.Lock;
|
|
@@ -108,6 +109,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
// they will come to be running when they get a Container
|
|
// they will come to be running when they get a Container
|
|
private int numberUncompletedAttempts = 0;
|
|
private int numberUncompletedAttempts = 0;
|
|
|
|
|
|
|
|
+ private boolean historyTaskStartGenerated = false;
|
|
|
|
+
|
|
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
|
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
|
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
|
|
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
|
|
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
|
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
|
@@ -571,7 +574,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":8080");
|
|
tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":8080");
|
|
tce.setStatus(status);
|
|
tce.setStatus(status);
|
|
tce.setAttemptId(attempt.getID());
|
|
tce.setAttemptId(attempt.getID());
|
|
- tce.setAttemptRunTime(0); // TODO: set the exact run time of the task.
|
|
|
|
|
|
+ int runTime = 0;
|
|
|
|
+ if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() !=0)
|
|
|
|
+ runTime = (int)(attempt.getFinishTime() - attempt.getLaunchTime());
|
|
|
|
+ tce.setAttemptRunTime(runTime);
|
|
|
|
|
|
//raise the event to job so that it adds the completion event to its
|
|
//raise the event to job so that it adds the completion event to its
|
|
//data structures
|
|
//data structures
|
|
@@ -589,13 +595,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
return tfe;
|
|
return tfe;
|
|
}
|
|
}
|
|
|
|
|
|
- private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, String error, TaskState taskState, TaskAttemptId taId) {
|
|
|
|
|
|
+ private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskState taskState, TaskAttemptId taId) {
|
|
|
|
+ StringBuilder errorSb = new StringBuilder();
|
|
|
|
+ if (diag != null) {
|
|
|
|
+ for (String d : diag) {
|
|
|
|
+ errorSb.append(", ").append(d);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
|
|
TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
|
|
TypeConverter.fromYarn(task.taskId),
|
|
TypeConverter.fromYarn(task.taskId),
|
|
// Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
|
|
// Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
|
|
task.getFinishTime(taId),
|
|
task.getFinishTime(taId),
|
|
TypeConverter.fromYarn(task.getType()),
|
|
TypeConverter.fromYarn(task.getType()),
|
|
- error == null ? "" : error,
|
|
|
|
|
|
+ errorSb.toString(),
|
|
taskState.toString(),
|
|
taskState.toString(),
|
|
taId == null ? null : TypeConverter.fromYarn(taId));
|
|
taId == null ? null : TypeConverter.fromYarn(taId));
|
|
return taskFailedEvent;
|
|
return taskFailedEvent;
|
|
@@ -615,6 +627,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
.getTaskSplitMetaInfo().getLocations()) : "");
|
|
.getTaskSplitMetaInfo().getLocations()) : "");
|
|
task.eventHandler
|
|
task.eventHandler
|
|
.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
|
|
.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
|
|
|
|
+ task.historyTaskStartGenerated = true;
|
|
task.metrics.launchedTask(task);
|
|
task.metrics.launchedTask(task);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -681,8 +694,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
task.taskId, TaskState.SUCCEEDED));
|
|
task.taskId, TaskState.SUCCEEDED));
|
|
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
|
|
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
|
|
// issue kill to all other attempts
|
|
// issue kill to all other attempts
|
|
- TaskFinishedEvent tfe = createTaskFinishedEvent(task, TaskState.SUCCEEDED);
|
|
|
|
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
|
|
|
|
|
|
+ if (task.historyTaskStartGenerated) {
|
|
|
|
+ TaskFinishedEvent tfe = createTaskFinishedEvent(task,
|
|
|
|
+ TaskState.SUCCEEDED);
|
|
|
|
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
|
|
+ tfe));
|
|
|
|
+ }
|
|
for (TaskAttempt attempt : task.attempts.values()) {
|
|
for (TaskAttempt attempt : task.attempts.values()) {
|
|
if (attempt.getID() != task.successfulAttempt &&
|
|
if (attempt.getID() != task.successfulAttempt &&
|
|
// This is okay because it can only talk us out of sending a
|
|
// This is okay because it can only talk us out of sending a
|
|
@@ -726,10 +743,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
TaskAttemptCompletionEventStatus.KILLED);
|
|
TaskAttemptCompletionEventStatus.KILLED);
|
|
// check whether all attempts are finished
|
|
// check whether all attempts are finished
|
|
if (task.finishedAttempts == task.attempts.size()) {
|
|
if (task.finishedAttempts == task.attempts.size()) {
|
|
- TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, "",
|
|
|
|
- finalState, null); //TODO JH verify failedAttempt null
|
|
|
|
|
|
+ if (task.historyTaskStartGenerated) {
|
|
|
|
+ TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
|
|
|
|
+ finalState, null); // TODO JH verify failedAttempt null
|
|
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
taskFailedEvent));
|
|
taskFailedEvent));
|
|
|
|
+ } else {
|
|
|
|
+ LOG.debug("Not generating HistoryFinish event since start event not" +
|
|
|
|
+ " generated for task: " + task.getID());
|
|
|
|
+ }
|
|
|
|
|
|
task.eventHandler.handle(
|
|
task.eventHandler.handle(
|
|
new JobTaskEvent(task.taskId, finalState));
|
|
new JobTaskEvent(task.taskId, finalState));
|
|
@@ -769,11 +791,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
|
|
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
|
|
TaskAttemptId taId = ev.getTaskAttemptID();
|
|
TaskAttemptId taId = ev.getTaskAttemptID();
|
|
|
|
|
|
- //TODO JH Populate the error string. FailReason from TaskAttempt(taId)
|
|
|
|
- TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, "",
|
|
|
|
|
|
+ if (task.historyTaskStartGenerated) {
|
|
|
|
+ TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
|
|
TaskState.FAILED, taId);
|
|
TaskState.FAILED, taId);
|
|
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
taskFailedEvent));
|
|
taskFailedEvent));
|
|
|
|
+ } else {
|
|
|
|
+ LOG.debug("Not generating HistoryFinish event since start event not" +
|
|
|
|
+ " generated for task: " + task.getID());
|
|
|
|
+ }
|
|
task.eventHandler.handle(
|
|
task.eventHandler.handle(
|
|
new JobTaskEvent(task.taskId, TaskState.FAILED));
|
|
new JobTaskEvent(task.taskId, TaskState.FAILED));
|
|
return task.finished(TaskState.FAILED);
|
|
return task.finished(TaskState.FAILED);
|
|
@@ -825,10 +851,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
@Override
|
|
@Override
|
|
public void transition(TaskImpl task, TaskEvent event) {
|
|
public void transition(TaskImpl task, TaskEvent event) {
|
|
|
|
|
|
|
|
+ if (task.historyTaskStartGenerated) {
|
|
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
|
|
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
|
|
- TaskState.KILLED, null); //TODO Verify failedAttemptId is null
|
|
|
|
|
|
+ TaskState.KILLED, null); // TODO Verify failedAttemptId is null
|
|
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
|
taskFailedEvent));
|
|
taskFailedEvent));
|
|
|
|
+ }else {
|
|
|
|
+ LOG.debug("Not generating HistoryFinish event since start event not" +
|
|
|
|
+ " generated for task: " + task.getID());
|
|
|
|
+ }
|
|
|
|
|
|
task.eventHandler.handle(
|
|
task.eventHandler.handle(
|
|
new JobTaskEvent(task.taskId, TaskState.KILLED));
|
|
new JobTaskEvent(task.taskId, TaskState.KILLED));
|