|
@@ -137,6 +137,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
protected final Configuration conf;
|
|
|
protected final Path jobFile;
|
|
|
protected final int partition;
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
protected final EventHandler eventHandler;
|
|
|
private final TaskAttemptId attemptId;
|
|
|
private final Clock clock;
|
|
@@ -431,7 +432,8 @@ public abstract class TaskAttemptImpl implements
|
|
|
//this is the last status reported by the REMOTE running attempt
|
|
|
private TaskAttemptStatus reportedStatus;
|
|
|
|
|
|
- public TaskAttemptImpl(TaskId taskId, int i, EventHandler eventHandler,
|
|
|
+ public TaskAttemptImpl(TaskId taskId, int i,
|
|
|
+ @SuppressWarnings("rawtypes") EventHandler eventHandler,
|
|
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
|
|
Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
|
|
|
Token<JobTokenIdentifier> jobToken,
|
|
@@ -527,6 +529,13 @@ public abstract class TaskAttemptImpl implements
|
|
|
ContainerLaunchContext container =
|
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
|
|
|
+ // Application resources
|
|
|
+ Map<String, LocalResource> localResources =
|
|
|
+ new HashMap<String, LocalResource>();
|
|
|
+
|
|
|
+ // Application environment
|
|
|
+ Map<String, String> environment = new HashMap<String, String>();
|
|
|
+
|
|
|
try {
|
|
|
FileSystem remoteFS = FileSystem.get(conf);
|
|
|
|
|
@@ -535,7 +544,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
Path remoteJobJar = (new Path(remoteTask.getConf().get(
|
|
|
MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
|
|
|
remoteFS.getWorkingDirectory());
|
|
|
- container.setLocalResource(
|
|
|
+ localResources.put(
|
|
|
MRConstants.JOB_JAR,
|
|
|
createLocalResource(remoteFS, recordFactory, remoteJobJar,
|
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
@@ -557,7 +566,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
new Path(path, oldJobId.toString());
|
|
|
Path remoteJobConfPath =
|
|
|
new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
|
|
|
- container.setLocalResource(
|
|
|
+ localResources.put(
|
|
|
MRConstants.JOB_CONF_FILE,
|
|
|
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
|
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
@@ -565,9 +574,14 @@ public abstract class TaskAttemptImpl implements
|
|
|
+ remoteJobConfPath.toUri().toASCIIString());
|
|
|
// //////////// End of JobConf setup
|
|
|
|
|
|
+
|
|
|
// Setup DistributedCache
|
|
|
- setupDistributedCache(remoteFS, conf, container);
|
|
|
+ setupDistributedCache(remoteFS, conf, localResources, environment);
|
|
|
|
|
|
+ // Set local-resources and environment
|
|
|
+ container.setLocalResources(localResources);
|
|
|
+ container.setEnv(environment);
|
|
|
+
|
|
|
// Setup up tokens
|
|
|
Credentials taskCredentials = new Credentials();
|
|
|
|
|
@@ -594,12 +608,12 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
// Add shuffle token
|
|
|
LOG.info("Putting shuffle token in serviceData");
|
|
|
- container
|
|
|
- .setServiceData(
|
|
|
- ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
|
|
- ShuffleHandler.serializeServiceData(jobToken));
|
|
|
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
|
+ serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
|
|
+ ShuffleHandler.serializeServiceData(jobToken));
|
|
|
+ container.setServiceData(serviceData);
|
|
|
|
|
|
- MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath());
|
|
|
+ MRApps.addToClassPath(container.getEnv(), getInitialClasspath());
|
|
|
} catch (IOException e) {
|
|
|
throw new YarnException(e);
|
|
|
}
|
|
@@ -622,11 +636,11 @@ public abstract class TaskAttemptImpl implements
|
|
|
classPaths.add(workDir.toString()); // TODO
|
|
|
|
|
|
// Construct the actual Container
|
|
|
- container.addAllCommands(MapReduceChildJVM.getVMCommand(
|
|
|
+ container.setCommands(MapReduceChildJVM.getVMCommand(
|
|
|
taskAttemptListener.getAddress(), remoteTask, javaHome,
|
|
|
workDir.toString(), containerLogDir, childTmpDir, jvmID));
|
|
|
|
|
|
- MapReduceChildJVM.setVMEnv(container.getAllEnv(), classPaths,
|
|
|
+ MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths,
|
|
|
workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
|
|
|
localizedApplicationTokensFile);
|
|
|
|
|
@@ -648,11 +662,15 @@ public abstract class TaskAttemptImpl implements
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- private void setupDistributedCache(FileSystem remoteFS, Configuration conf,
|
|
|
- ContainerLaunchContext container) throws IOException {
|
|
|
+ private void setupDistributedCache(FileSystem remoteFS,
|
|
|
+ Configuration conf,
|
|
|
+ Map<String, LocalResource> localResources,
|
|
|
+ Map<String, String> env)
|
|
|
+ throws IOException {
|
|
|
|
|
|
// Cache archives
|
|
|
- parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.ARCHIVE,
|
|
|
+ parseDistributedCacheArtifacts(remoteFS, localResources, env,
|
|
|
+ LocalResourceType.ARCHIVE,
|
|
|
DistributedCache.getCacheArchives(conf),
|
|
|
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
|
|
|
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
|
@@ -660,7 +678,9 @@ public abstract class TaskAttemptImpl implements
|
|
|
DistributedCache.getArchiveClassPaths(conf));
|
|
|
|
|
|
// Cache files
|
|
|
- parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.FILE,
|
|
|
+ parseDistributedCacheArtifacts(remoteFS,
|
|
|
+ localResources, env,
|
|
|
+ LocalResourceType.FILE,
|
|
|
DistributedCache.getCacheFiles(conf),
|
|
|
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
|
|
|
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
|
@@ -672,7 +692,10 @@ public abstract class TaskAttemptImpl implements
|
|
|
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
|
|
// long[], boolean[], Path[], FileType)
|
|
|
private void parseDistributedCacheArtifacts(
|
|
|
- FileSystem remoteFS, ContainerLaunchContext container, LocalResourceType type,
|
|
|
+ FileSystem remoteFS,
|
|
|
+ Map<String, LocalResource> localResources,
|
|
|
+ Map<String, String> env,
|
|
|
+ LocalResourceType type,
|
|
|
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
|
|
|
Path[] pathsToPutOnClasspath) throws IOException {
|
|
|
|
|
@@ -709,7 +732,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
throw new IllegalArgumentException("Resource name must be relative");
|
|
|
}
|
|
|
String linkName = name.toUri().getPath();
|
|
|
- container.setLocalResource(
|
|
|
+ localResources.put(
|
|
|
linkName,
|
|
|
BuilderUtils.newLocalResource(
|
|
|
p.toUri(), type,
|
|
@@ -719,8 +742,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
sizes[i], timestamps[i])
|
|
|
);
|
|
|
if (classPaths.containsKey(u.getPath())) {
|
|
|
- Map<String, String> environment = container.getAllEnv();
|
|
|
- MRApps.addToClassPath(environment, linkName);
|
|
|
+ MRApps.addToClassPath(env, linkName);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -892,6 +914,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void handle(TaskAttemptEvent event) {
|
|
|
LOG.info("Processing " + event.getTaskAttemptID() +
|
|
@@ -1034,6 +1057,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
public RequestContainerTransition(boolean rescheduled) {
|
|
|
this.rescheduled = rescheduled;
|
|
|
}
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1062,6 +1086,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class ContainerAssignedTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(final TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1111,6 +1136,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
this.finalState = finalState;
|
|
|
this.withdrawsContainerRequest = withdrawsContainerRequest;
|
|
|
}
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1157,6 +1183,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class LaunchedContainerTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent evnt) {
|
|
@@ -1207,6 +1234,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class CommitPendingTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1218,6 +1246,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class TaskCleanupTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1233,6 +1262,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class SucceededTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1262,6 +1292,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class FailedTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|
|
// set the finish time
|
|
@@ -1286,6 +1317,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings({ "unchecked" })
|
|
|
private void logAttemptFinishedEvent(TaskAttemptState state) {
|
|
|
//Log finished events only if an attempt started.
|
|
|
if (getLaunchTime() == 0) return;
|
|
@@ -1319,6 +1351,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class TooManyFetchFailureTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
|
|
//add to diagnostic
|
|
@@ -1346,6 +1379,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
private static class KilledTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1372,6 +1406,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class CleanupContainerTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1398,6 +1433,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class StatusUpdater
|
|
|
implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|