|
@@ -94,7 +94,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
|
|
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
|
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
@@ -138,6 +137,7 @@ public abstract class TaskAttemptImpl implements
|
|
protected final Configuration conf;
|
|
protected final Configuration conf;
|
|
protected final Path jobFile;
|
|
protected final Path jobFile;
|
|
protected final int partition;
|
|
protected final int partition;
|
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
protected final EventHandler eventHandler;
|
|
protected final EventHandler eventHandler;
|
|
private final TaskAttemptId attemptId;
|
|
private final TaskAttemptId attemptId;
|
|
private final Clock clock;
|
|
private final Clock clock;
|
|
@@ -204,6 +204,11 @@ public abstract class TaskAttemptImpl implements
|
|
.addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
|
|
.addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
|
new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
|
|
new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
|
|
|
|
+ .addTransition(TaskAttemptState.ASSIGNED,
|
|
|
|
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
|
|
|
+ CLEANUP_CONTAINER_TRANSITION)
|
|
|
|
+ // ^ If RM kills the container due to expiry, preemption etc.
|
|
.addTransition(TaskAttemptState.ASSIGNED,
|
|
.addTransition(TaskAttemptState.ASSIGNED,
|
|
TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
TaskAttemptState.KILL_CONTAINER_CLEANUP,
|
|
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
|
|
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
|
|
@@ -432,7 +437,8 @@ public abstract class TaskAttemptImpl implements
|
|
//this is the last status reported by the REMOTE running attempt
|
|
//this is the last status reported by the REMOTE running attempt
|
|
private TaskAttemptStatus reportedStatus;
|
|
private TaskAttemptStatus reportedStatus;
|
|
|
|
|
|
- public TaskAttemptImpl(TaskId taskId, int i, EventHandler eventHandler,
|
|
|
|
|
|
+ public TaskAttemptImpl(TaskId taskId, int i,
|
|
|
|
+ @SuppressWarnings("rawtypes") EventHandler eventHandler,
|
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
|
Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
|
|
Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
|
|
Token<JobTokenIdentifier> jobToken,
|
|
Token<JobTokenIdentifier> jobToken,
|
|
@@ -528,6 +534,13 @@ public abstract class TaskAttemptImpl implements
|
|
ContainerLaunchContext container =
|
|
ContainerLaunchContext container =
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
|
|
|
|
|
+ // Application resources
|
|
|
|
+ Map<String, LocalResource> localResources =
|
|
|
|
+ new HashMap<String, LocalResource>();
|
|
|
|
+
|
|
|
|
+ // Application environment
|
|
|
|
+ Map<String, String> environment = new HashMap<String, String>();
|
|
|
|
+
|
|
try {
|
|
try {
|
|
FileSystem remoteFS = FileSystem.get(conf);
|
|
FileSystem remoteFS = FileSystem.get(conf);
|
|
|
|
|
|
@@ -536,7 +549,7 @@ public abstract class TaskAttemptImpl implements
|
|
Path remoteJobJar = (new Path(remoteTask.getConf().get(
|
|
Path remoteJobJar = (new Path(remoteTask.getConf().get(
|
|
MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
|
|
MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
|
|
remoteFS.getWorkingDirectory());
|
|
remoteFS.getWorkingDirectory());
|
|
- container.setLocalResource(
|
|
|
|
|
|
+ localResources.put(
|
|
MRConstants.JOB_JAR,
|
|
MRConstants.JOB_JAR,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobJar,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobJar,
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
@@ -558,7 +571,7 @@ public abstract class TaskAttemptImpl implements
|
|
new Path(path, oldJobId.toString());
|
|
new Path(path, oldJobId.toString());
|
|
Path remoteJobConfPath =
|
|
Path remoteJobConfPath =
|
|
new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
|
|
new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
|
|
- container.setLocalResource(
|
|
|
|
|
|
+ localResources.put(
|
|
MRConstants.JOB_CONF_FILE,
|
|
MRConstants.JOB_CONF_FILE,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
|
|
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
@@ -566,9 +579,14 @@ public abstract class TaskAttemptImpl implements
|
|
+ remoteJobConfPath.toUri().toASCIIString());
|
|
+ remoteJobConfPath.toUri().toASCIIString());
|
|
// //////////// End of JobConf setup
|
|
// //////////// End of JobConf setup
|
|
|
|
|
|
|
|
+
|
|
// Setup DistributedCache
|
|
// Setup DistributedCache
|
|
- setupDistributedCache(remoteFS, conf, container);
|
|
|
|
|
|
+ setupDistributedCache(remoteFS, conf, localResources, environment);
|
|
|
|
|
|
|
|
+ // Set local-resources and environment
|
|
|
|
+ container.setLocalResources(localResources);
|
|
|
|
+ container.setEnv(environment);
|
|
|
|
+
|
|
// Setup up tokens
|
|
// Setup up tokens
|
|
Credentials taskCredentials = new Credentials();
|
|
Credentials taskCredentials = new Credentials();
|
|
|
|
|
|
@@ -595,12 +613,12 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
// Add shuffle token
|
|
// Add shuffle token
|
|
LOG.info("Putting shuffle token in serviceData");
|
|
LOG.info("Putting shuffle token in serviceData");
|
|
- container
|
|
|
|
- .setServiceData(
|
|
|
|
- ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
|
|
|
- ShuffleHandler.serializeServiceData(jobToken));
|
|
|
|
|
|
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
|
|
+ serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
|
|
|
+ ShuffleHandler.serializeServiceData(jobToken));
|
|
|
|
+ container.setServiceData(serviceData);
|
|
|
|
|
|
- MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath());
|
|
|
|
|
|
+ MRApps.addToClassPath(container.getEnv(), getInitialClasspath());
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
throw new YarnException(e);
|
|
throw new YarnException(e);
|
|
}
|
|
}
|
|
@@ -623,11 +641,11 @@ public abstract class TaskAttemptImpl implements
|
|
classPaths.add(workDir.toString()); // TODO
|
|
classPaths.add(workDir.toString()); // TODO
|
|
|
|
|
|
// Construct the actual Container
|
|
// Construct the actual Container
|
|
- container.addAllCommands(MapReduceChildJVM.getVMCommand(
|
|
|
|
|
|
+ container.setCommands(MapReduceChildJVM.getVMCommand(
|
|
taskAttemptListener.getAddress(), remoteTask, javaHome,
|
|
taskAttemptListener.getAddress(), remoteTask, javaHome,
|
|
workDir.toString(), containerLogDir, childTmpDir, jvmID));
|
|
workDir.toString(), containerLogDir, childTmpDir, jvmID));
|
|
|
|
|
|
- MapReduceChildJVM.setVMEnv(container.getAllEnv(), classPaths,
|
|
|
|
|
|
+ MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths,
|
|
workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
|
|
workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
|
|
localizedApplicationTokensFile);
|
|
localizedApplicationTokensFile);
|
|
|
|
|
|
@@ -649,11 +667,15 @@ public abstract class TaskAttemptImpl implements
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
|
|
- private void setupDistributedCache(FileSystem remoteFS, Configuration conf,
|
|
|
|
- ContainerLaunchContext container) throws IOException {
|
|
|
|
|
|
+ private void setupDistributedCache(FileSystem remoteFS,
|
|
|
|
+ Configuration conf,
|
|
|
|
+ Map<String, LocalResource> localResources,
|
|
|
|
+ Map<String, String> env)
|
|
|
|
+ throws IOException {
|
|
|
|
|
|
// Cache archives
|
|
// Cache archives
|
|
- parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.ARCHIVE,
|
|
|
|
|
|
+ parseDistributedCacheArtifacts(remoteFS, localResources, env,
|
|
|
|
+ LocalResourceType.ARCHIVE,
|
|
DistributedCache.getCacheArchives(conf),
|
|
DistributedCache.getCacheArchives(conf),
|
|
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
|
|
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
|
|
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
|
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
|
@@ -661,7 +683,9 @@ public abstract class TaskAttemptImpl implements
|
|
DistributedCache.getArchiveClassPaths(conf));
|
|
DistributedCache.getArchiveClassPaths(conf));
|
|
|
|
|
|
// Cache files
|
|
// Cache files
|
|
- parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.FILE,
|
|
|
|
|
|
+ parseDistributedCacheArtifacts(remoteFS,
|
|
|
|
+ localResources, env,
|
|
|
|
+ LocalResourceType.FILE,
|
|
DistributedCache.getCacheFiles(conf),
|
|
DistributedCache.getCacheFiles(conf),
|
|
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
|
|
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
|
|
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
|
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
|
@@ -673,7 +697,10 @@ public abstract class TaskAttemptImpl implements
|
|
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
|
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
|
// long[], boolean[], Path[], FileType)
|
|
// long[], boolean[], Path[], FileType)
|
|
private void parseDistributedCacheArtifacts(
|
|
private void parseDistributedCacheArtifacts(
|
|
- FileSystem remoteFS, ContainerLaunchContext container, LocalResourceType type,
|
|
|
|
|
|
+ FileSystem remoteFS,
|
|
|
|
+ Map<String, LocalResource> localResources,
|
|
|
|
+ Map<String, String> env,
|
|
|
|
+ LocalResourceType type,
|
|
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
|
|
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
|
|
Path[] pathsToPutOnClasspath) throws IOException {
|
|
Path[] pathsToPutOnClasspath) throws IOException {
|
|
|
|
|
|
@@ -710,9 +737,9 @@ public abstract class TaskAttemptImpl implements
|
|
throw new IllegalArgumentException("Resource name must be relative");
|
|
throw new IllegalArgumentException("Resource name must be relative");
|
|
}
|
|
}
|
|
String linkName = name.toUri().getPath();
|
|
String linkName = name.toUri().getPath();
|
|
- container.setLocalResource(
|
|
|
|
|
|
+ localResources.put(
|
|
linkName,
|
|
linkName,
|
|
- BuilderUtils.newLocalResource(recordFactory,
|
|
|
|
|
|
+ BuilderUtils.newLocalResource(
|
|
p.toUri(), type,
|
|
p.toUri(), type,
|
|
visibilities[i]
|
|
visibilities[i]
|
|
? LocalResourceVisibility.PUBLIC
|
|
? LocalResourceVisibility.PUBLIC
|
|
@@ -720,8 +747,7 @@ public abstract class TaskAttemptImpl implements
|
|
sizes[i], timestamps[i])
|
|
sizes[i], timestamps[i])
|
|
);
|
|
);
|
|
if (classPaths.containsKey(u.getPath())) {
|
|
if (classPaths.containsKey(u.getPath())) {
|
|
- Map<String, String> environment = container.getAllEnv();
|
|
|
|
- MRApps.addToClassPath(environment, linkName);
|
|
|
|
|
|
+ MRApps.addToClassPath(env, linkName);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -893,6 +919,7 @@ public abstract class TaskAttemptImpl implements
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void handle(TaskAttemptEvent event) {
|
|
public void handle(TaskAttemptEvent event) {
|
|
LOG.info("Processing " + event.getTaskAttemptID() +
|
|
LOG.info("Processing " + event.getTaskAttemptID() +
|
|
@@ -903,7 +930,8 @@ public abstract class TaskAttemptImpl implements
|
|
try {
|
|
try {
|
|
stateMachine.doTransition(event.getType(), event);
|
|
stateMachine.doTransition(event.getType(), event);
|
|
} catch (InvalidStateTransitonException e) {
|
|
} catch (InvalidStateTransitonException e) {
|
|
- LOG.error("Can't handle this event at current state", e);
|
|
|
|
|
|
+ LOG.error("Can't handle this event at current state for "
|
|
|
|
+ + this.attemptId, e);
|
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
|
this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
|
|
this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
|
|
" on TaskAttempt " + this.attemptId));
|
|
" on TaskAttempt " + this.attemptId));
|
|
@@ -981,8 +1009,8 @@ public abstract class TaskAttemptImpl implements
|
|
try {
|
|
try {
|
|
if (progressSplitBlock == null) {
|
|
if (progressSplitBlock == null) {
|
|
progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
|
|
progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
|
|
- JHConfig.JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS_KEY,
|
|
|
|
- WrappedProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
|
|
|
|
|
|
+ MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
|
|
|
|
+ MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
|
|
}
|
|
}
|
|
return progressSplitBlock;
|
|
return progressSplitBlock;
|
|
} finally {
|
|
} finally {
|
|
@@ -1035,6 +1063,7 @@ public abstract class TaskAttemptImpl implements
|
|
public RequestContainerTransition(boolean rescheduled) {
|
|
public RequestContainerTransition(boolean rescheduled) {
|
|
this.rescheduled = rescheduled;
|
|
this.rescheduled = rescheduled;
|
|
}
|
|
}
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1063,6 +1092,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class ContainerAssignedTransition implements
|
|
private static class ContainerAssignedTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(final TaskAttemptImpl taskAttempt,
|
|
public void transition(final TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1112,6 +1142,7 @@ public abstract class TaskAttemptImpl implements
|
|
this.finalState = finalState;
|
|
this.finalState = finalState;
|
|
this.withdrawsContainerRequest = withdrawsContainerRequest;
|
|
this.withdrawsContainerRequest = withdrawsContainerRequest;
|
|
}
|
|
}
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1158,6 +1189,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class LaunchedContainerTransition implements
|
|
private static class LaunchedContainerTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent evnt) {
|
|
TaskAttemptEvent evnt) {
|
|
@@ -1208,6 +1240,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class CommitPendingTransition implements
|
|
private static class CommitPendingTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1219,6 +1252,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class TaskCleanupTransition implements
|
|
private static class TaskCleanupTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1234,6 +1268,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class SucceededTransition implements
|
|
private static class SucceededTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1263,6 +1298,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class FailedTransition implements
|
|
private static class FailedTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|
// set the finish time
|
|
// set the finish time
|
|
@@ -1287,6 +1323,7 @@ public abstract class TaskAttemptImpl implements
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SuppressWarnings({ "unchecked" })
|
|
private void logAttemptFinishedEvent(TaskAttemptState state) {
|
|
private void logAttemptFinishedEvent(TaskAttemptState state) {
|
|
//Log finished events only if an attempt started.
|
|
//Log finished events only if an attempt started.
|
|
if (getLaunchTime() == 0) return;
|
|
if (getLaunchTime() == 0) return;
|
|
@@ -1320,6 +1357,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class TooManyFetchFailureTransition implements
|
|
private static class TooManyFetchFailureTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|
//add to diagnostic
|
|
//add to diagnostic
|
|
@@ -1347,6 +1385,7 @@ public abstract class TaskAttemptImpl implements
|
|
private static class KilledTransition implements
|
|
private static class KilledTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1373,6 +1412,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class CleanupContainerTransition implements
|
|
private static class CleanupContainerTransition implements
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|
|
@@ -1399,6 +1439,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class StatusUpdater
|
|
private static class StatusUpdater
|
|
implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
TaskAttemptEvent event) {
|
|
TaskAttemptEvent event) {
|