|
@@ -39,6 +39,7 @@ import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -72,10 +73,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
|
@@ -88,7 +89,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
|
@@ -132,6 +132,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
|
|
|
/**
|
|
@@ -184,149 +185,149 @@ public abstract class TaskAttemptImpl implements
|
|
|
= new DiagnosticInformationUpdater();
|
|
|
|
|
|
private static final StateMachineFactory
|
|
|
- <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
|
|
|
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
|
|
|
stateMachineFactory
|
|
|
= new StateMachineFactory
|
|
|
- <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
|
|
|
- (TaskAttemptState.NEW)
|
|
|
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
|
|
|
+ (TaskAttemptStateInternal.NEW)
|
|
|
|
|
|
// Transitions from the NEW state.
|
|
|
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
|
|
|
TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
|
|
|
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
|
|
|
TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
|
|
|
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
|
|
|
TaskAttemptEventType.TA_KILL, new KilledTransition())
|
|
|
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
|
|
|
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
|
|
|
|
|
|
// Transitions from the UNASSIGNED state.
|
|
|
- .addTransition(TaskAttemptState.UNASSIGNED,
|
|
|
- TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED,
|
|
|
+ TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
|
|
|
new ContainerAssignedTransition())
|
|
|
- .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
|
|
|
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
|
|
|
- TaskAttemptState.KILLED, true))
|
|
|
- .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED,
|
|
|
+ TaskAttemptStateInternal.KILLED, true))
|
|
|
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
|
|
|
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
|
|
|
- TaskAttemptState.FAILED, true))
|
|
|
+ TaskAttemptStateInternal.FAILED, true))
|
|
|
|
|
|
// Transitions from the ASSIGNED state.
|
|
|
- .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
|
|
|
+ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
|
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
|
|
new LaunchedContainerTransition())
|
|
|
- .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
- .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
|
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
|
|
- new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
|
|
|
- .addTransition(TaskAttemptState.ASSIGNED,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
|
|
|
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
CLEANUP_CONTAINER_TRANSITION)
|
|
|
// ^ If RM kills the container due to expiry, preemption etc.
|
|
|
- .addTransition(TaskAttemptState.ASSIGNED,
|
|
|
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
|
|
|
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
|
|
|
- .addTransition(TaskAttemptState.ASSIGNED,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
|
|
|
|
|
|
// Transitions from RUNNING state.
|
|
|
- .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
|
|
|
TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
|
|
|
- .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// If no commit is required, task directly goes to success
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
|
|
|
// If commit is required, task goes through commit pending state.
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.COMMIT_PENDING,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
|
|
|
// Failure handling while RUNNING
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
|
|
|
//for handling container exit without sending the done or fail msg
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
CLEANUP_CONTAINER_TRANSITION)
|
|
|
// Timeout handling while RUNNING
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
|
|
|
// if container killed by AM shutting down
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.KILLED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.KILLED,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
|
|
|
// Kill handling
|
|
|
- .addTransition(TaskAttemptState.RUNNING,
|
|
|
- TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
|
|
+ .addTransition(TaskAttemptStateInternal.RUNNING,
|
|
|
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
|
|
CLEANUP_CONTAINER_TRANSITION)
|
|
|
|
|
|
// Transitions from COMMIT_PENDING state
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
|
|
|
new StatusUpdater())
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.COMMIT_PENDING,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
|
|
CLEANUP_CONTAINER_TRANSITION)
|
|
|
// if container killed by AM shutting down
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.KILLED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.KILLED,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
CLEANUP_CONTAINER_TRANSITION)
|
|
|
- .addTransition(TaskAttemptState.COMMIT_PENDING,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
|
|
|
|
|
|
// Transitions from SUCCESS_CONTAINER_CLEANUP state
|
|
|
// kill and cleanup the container
|
|
|
- .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
|
|
new SucceededTransition())
|
|
|
.addTransition(
|
|
|
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events
|
|
|
- .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_FAILMSG,
|
|
|
TaskAttemptEventType.TA_TIMED_OUT,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
|
|
|
|
|
|
// Transitions from FAIL_CONTAINER_CLEANUP state.
|
|
|
- .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.FAIL_TASK_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
|
|
|
- .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events
|
|
|
- .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
TaskAttemptEventType.TA_UPDATE,
|
|
@@ -339,17 +340,17 @@ public abstract class TaskAttemptImpl implements
|
|
|
TaskAttemptEventType.TA_TIMED_OUT))
|
|
|
|
|
|
// Transitions from KILL_CONTAINER_CLEANUP
|
|
|
- .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.KILL_TASK_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
|
|
|
- .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events
|
|
|
.addTransition(
|
|
|
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
|
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
TaskAttemptEventType.TA_UPDATE,
|
|
@@ -362,16 +363,16 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
// Transitions from FAIL_TASK_CLEANUP
|
|
|
// run the task cleanup
|
|
|
- .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
|
|
|
- TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
|
|
|
new FailedTransition())
|
|
|
- .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
|
|
|
- TaskAttemptState.FAIL_TASK_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events
|
|
|
- .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
|
|
|
- TaskAttemptState.FAIL_TASK_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
TaskAttemptEventType.TA_UPDATE,
|
|
@@ -384,16 +385,16 @@ public abstract class TaskAttemptImpl implements
|
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
|
|
|
|
|
|
// Transitions from KILL_TASK_CLEANUP
|
|
|
- .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
|
|
|
- TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
|
|
|
new KilledTransition())
|
|
|
- .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
|
|
|
- TaskAttemptState.KILL_TASK_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events
|
|
|
- .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
|
|
|
- TaskAttemptState.KILL_TASK_CLEANUP,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
|
|
|
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
TaskAttemptEventType.TA_UPDATE,
|
|
@@ -406,31 +407,31 @@ public abstract class TaskAttemptImpl implements
|
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
|
|
|
|
|
|
// Transitions from SUCCEEDED
|
|
|
- .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
|
|
|
- TaskAttemptState.FAILED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts
|
|
|
+ TaskAttemptStateInternal.FAILED,
|
|
|
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
|
|
|
new TooManyFetchFailureTransition())
|
|
|
- .addTransition(TaskAttemptState.SUCCEEDED,
|
|
|
- EnumSet.of(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED),
|
|
|
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED,
|
|
|
+ EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
|
|
|
TaskAttemptEventType.TA_KILL,
|
|
|
new KilledAfterSuccessTransition())
|
|
|
.addTransition(
|
|
|
- TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
|
|
|
+ TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events for SUCCEEDED state
|
|
|
- .addTransition(TaskAttemptState.SUCCEEDED,
|
|
|
- TaskAttemptState.SUCCEEDED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED,
|
|
|
+ TaskAttemptStateInternal.SUCCEEDED,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
|
|
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
|
|
|
|
|
|
// Transitions from FAILED state
|
|
|
- .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events for FAILED state
|
|
|
- .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_ASSIGNED,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
@@ -445,11 +446,11 @@ public abstract class TaskAttemptImpl implements
|
|
|
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
|
|
|
|
|
|
// Transitions from KILLED state
|
|
|
- .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
|
|
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
|
|
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
|
|
|
// Ignore-able events for KILLED state
|
|
|
- .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
|
|
|
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
|
|
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
|
|
TaskAttemptEventType.TA_ASSIGNED,
|
|
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
@@ -466,7 +467,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
.installTopology();
|
|
|
|
|
|
private final StateMachine
|
|
|
- <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
|
|
|
+ <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
|
|
|
stateMachine;
|
|
|
|
|
|
private ContainerId containerID;
|
|
@@ -874,9 +875,9 @@ public abstract class TaskAttemptImpl implements
|
|
|
readLock.lock();
|
|
|
try {
|
|
|
// TODO: Use stateMachine level method?
|
|
|
- return (getState() == TaskAttemptState.SUCCEEDED ||
|
|
|
- getState() == TaskAttemptState.FAILED ||
|
|
|
- getState() == TaskAttemptState.KILLED);
|
|
|
+ return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED ||
|
|
|
+ getInternalState() == TaskAttemptStateInternal.FAILED ||
|
|
|
+ getInternalState() == TaskAttemptStateInternal.KILLED);
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
@@ -953,7 +954,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
public TaskAttemptState getState() {
|
|
|
readLock.lock();
|
|
|
try {
|
|
|
- return stateMachine.getCurrentState();
|
|
|
+ return getExternalState(stateMachine.getCurrentState());
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
@@ -968,7 +969,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
}
|
|
|
writeLock.lock();
|
|
|
try {
|
|
|
- final TaskAttemptState oldState = getState();
|
|
|
+ final TaskAttemptStateInternal oldState = getInternalState() ;
|
|
|
try {
|
|
|
stateMachine.doTransition(event.getType(), event);
|
|
|
} catch (InvalidStateTransitonException e) {
|
|
@@ -980,16 +981,58 @@ public abstract class TaskAttemptImpl implements
|
|
|
eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
|
|
|
JobEventType.INTERNAL_ERROR));
|
|
|
}
|
|
|
- if (oldState != getState()) {
|
|
|
+ if (oldState != getInternalState()) {
|
|
|
LOG.info(attemptId + " TaskAttempt Transitioned from "
|
|
|
+ oldState + " to "
|
|
|
- + getState());
|
|
|
+ + getInternalState());
|
|
|
}
|
|
|
} finally {
|
|
|
writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public TaskAttemptStateInternal getInternalState() {
|
|
|
+ readLock.lock();
|
|
|
+ try {
|
|
|
+ return stateMachine.getCurrentState();
|
|
|
+ } finally {
|
|
|
+ readLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static TaskAttemptState getExternalState(
|
|
|
+ TaskAttemptStateInternal smState) {
|
|
|
+ switch (smState) {
|
|
|
+ case ASSIGNED:
|
|
|
+ case UNASSIGNED:
|
|
|
+ return TaskAttemptState.STARTING;
|
|
|
+ case COMMIT_PENDING:
|
|
|
+ return TaskAttemptState.COMMIT_PENDING;
|
|
|
+ case FAILED:
|
|
|
+ return TaskAttemptState.FAILED;
|
|
|
+ case KILLED:
|
|
|
+ return TaskAttemptState.KILLED;
|
|
|
+ // All CLEANUP states considered as RUNNING since events have not gone out
|
|
|
+ // to the Task yet. May be possible to consider them as a Finished state.
|
|
|
+ case FAIL_CONTAINER_CLEANUP:
|
|
|
+ case FAIL_TASK_CLEANUP:
|
|
|
+ case KILL_CONTAINER_CLEANUP:
|
|
|
+ case KILL_TASK_CLEANUP:
|
|
|
+ case SUCCESS_CONTAINER_CLEANUP:
|
|
|
+ case RUNNING:
|
|
|
+ return TaskAttemptState.RUNNING;
|
|
|
+ case NEW:
|
|
|
+ return TaskAttemptState.NEW;
|
|
|
+ case SUCCEEDED:
|
|
|
+ return TaskAttemptState.SUCCEEDED;
|
|
|
+ default:
|
|
|
+ throw new YarnException("Attempt to convert invalid "
|
|
|
+ + "stateMachineTaskAttemptState to externalTaskAttemptState: "
|
|
|
+ + smState);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
//always called in write lock
|
|
|
private void setFinishTime() {
|
|
|
//set the finish time only if launch time is set
|
|
@@ -1066,7 +1109,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
private static
|
|
|
TaskAttemptUnsuccessfulCompletionEvent
|
|
|
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
|
|
|
- TaskAttemptState attemptState) {
|
|
|
+ TaskAttemptStateInternal attemptState) {
|
|
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
|
|
new TaskAttemptUnsuccessfulCompletionEvent(
|
|
|
TypeConverter.fromYarn(taskAttempt.attemptId),
|
|
@@ -1247,10 +1290,10 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class DeallocateContainerTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
- private final TaskAttemptState finalState;
|
|
|
+ private final TaskAttemptStateInternal finalState;
|
|
|
private final boolean withdrawsContainerRequest;
|
|
|
DeallocateContainerTransition
|
|
|
- (TaskAttemptState finalState, boolean withdrawsContainerRequest) {
|
|
|
+ (TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) {
|
|
|
this.finalState = finalState;
|
|
|
this.withdrawsContainerRequest = withdrawsContainerRequest;
|
|
|
}
|
|
@@ -1288,10 +1331,10 @@ public abstract class TaskAttemptImpl implements
|
|
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
|
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
|
|
finalState);
|
|
|
- if(finalState == TaskAttemptState.FAILED) {
|
|
|
+ if(finalState == TaskAttemptStateInternal.FAILED) {
|
|
|
taskAttempt.eventHandler
|
|
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
|
|
|
- } else if(finalState == TaskAttemptState.KILLED) {
|
|
|
+ } else if(finalState == TaskAttemptStateInternal.KILLED) {
|
|
|
taskAttempt.eventHandler
|
|
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
|
|
|
}
|
|
@@ -1405,7 +1448,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
|
|
|
slotMillis);
|
|
|
taskAttempt.eventHandler.handle(jce);
|
|
|
- taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
|
|
|
+ taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
|
|
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
|
|
taskAttempt.attemptId,
|
|
|
TaskEventType.T_ATTEMPT_SUCCEEDED));
|
|
@@ -1428,10 +1471,10 @@ public abstract class TaskAttemptImpl implements
|
|
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
|
|
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
|
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
|
|
- TaskAttemptState.FAILED);
|
|
|
+ TaskAttemptStateInternal.FAILED);
|
|
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
|
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
|
|
- // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
|
|
|
+ // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
|
|
|
// handling failed map/reduce events.
|
|
|
}else {
|
|
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
|
@@ -1443,7 +1486,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "unchecked" })
|
|
|
- private void logAttemptFinishedEvent(TaskAttemptState state) {
|
|
|
+ private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
|
|
|
//Log finished events only if an attempt started.
|
|
|
if (getLaunchTime() == 0) return;
|
|
|
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
|
|
@@ -1500,7 +1543,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
|
|
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
|
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
|
|
- TaskAttemptState.FAILED);
|
|
|
+ TaskAttemptStateInternal.FAILED);
|
|
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
|
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
|
|
}else {
|
|
@@ -1513,11 +1556,11 @@ public abstract class TaskAttemptImpl implements
|
|
|
}
|
|
|
|
|
|
private static class KilledAfterSuccessTransition implements
|
|
|
- MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptState> {
|
|
|
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- public TaskAttemptState transition(TaskAttemptImpl taskAttempt,
|
|
|
+ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
|
if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
|
|
|
// after a reduce task has succeeded, its outputs are in safe in HDFS.
|
|
@@ -1530,7 +1573,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
// ignore this for reduce tasks
|
|
|
LOG.info("Ignoring killed event for successful reduce task attempt" +
|
|
|
taskAttempt.getID().toString());
|
|
|
- return TaskAttemptState.SUCCEEDED;
|
|
|
+ return TaskAttemptStateInternal.SUCCEEDED;
|
|
|
}
|
|
|
if(event instanceof TaskAttemptKillEvent) {
|
|
|
TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
|
|
@@ -1545,12 +1588,12 @@ public abstract class TaskAttemptImpl implements
|
|
|
taskAttempt.eventHandler
|
|
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
|
|
|
TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
|
|
|
- taskAttempt, TaskAttemptState.KILLED);
|
|
|
+ taskAttempt, TaskAttemptStateInternal.KILLED);
|
|
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
|
|
|
.getTaskId().getJobId(), tauce));
|
|
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
|
|
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
|
|
|
- return TaskAttemptState.KILLED;
|
|
|
+ return TaskAttemptStateInternal.KILLED;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1568,14 +1611,14 @@ public abstract class TaskAttemptImpl implements
|
|
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
|
|
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
|
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
|
|
- TaskAttemptState.KILLED);
|
|
|
+ TaskAttemptStateInternal.KILLED);
|
|
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
|
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
|
|
}else {
|
|
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
|
|
"generated for taskAttempt: " + taskAttempt.getID());
|
|
|
}
|
|
|
-// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure.
|
|
|
+// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
|
|
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
|
|
taskAttempt.attemptId,
|
|
|
TaskEventType.T_ATTEMPT_KILLED));
|