|
@@ -27,6 +27,7 @@ import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
@@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
@@ -154,6 +156,8 @@ public abstract class TaskAttemptImpl implements
|
|
|
private Token<JobTokenIdentifier> jobToken;
|
|
|
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
|
|
private static String initialClasspath = null;
|
|
|
+ private static Object commonContainerSpecLock = new Object();
|
|
|
+ private static ContainerLaunchContext commonContainerSpec = null;
|
|
|
private static final Object classpathLock = new Object();
|
|
|
private long launchTime;
|
|
|
private long finishTime;
|
|
@@ -497,29 +501,27 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
/**
|
|
|
* Create a {@link LocalResource} record with all the given parameters.
|
|
|
- * TODO: This should pave way for Builder pattern.
|
|
|
*/
|
|
|
- private static LocalResource createLocalResource(FileSystem fc,
|
|
|
- RecordFactory recordFactory, Path file, LocalResourceType type,
|
|
|
- LocalResourceVisibility visibility) throws IOException {
|
|
|
+ private static LocalResource createLocalResource(FileSystem fc, Path file,
|
|
|
+ LocalResourceType type, LocalResourceVisibility visibility)
|
|
|
+ throws IOException {
|
|
|
FileStatus fstat = fc.getFileStatus(file);
|
|
|
- LocalResource resource =
|
|
|
- recordFactory.newRecordInstance(LocalResource.class);
|
|
|
- resource.setResource(ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
|
|
|
- .getPath())));
|
|
|
- resource.setType(type);
|
|
|
- resource.setVisibility(visibility);
|
|
|
- resource.setSize(fstat.getLen());
|
|
|
- resource.setTimestamp(fstat.getModificationTime());
|
|
|
- return resource;
|
|
|
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
|
|
|
+ .getPath()));
|
|
|
+ long resourceSize = fstat.getLen();
|
|
|
+ long resourceModificationTime = fstat.getModificationTime();
|
|
|
+
|
|
|
+ return BuilderUtils.newLocalResource(resourceURL, type, visibility,
|
|
|
+ resourceSize, resourceModificationTime);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Lock this on initialClasspath so that there is only one fork in the AM for
|
|
|
- * getting the initial class-path. TODO: This should go away once we construct
|
|
|
- * a parent CLC and use it for all the containers.
|
|
|
+ * getting the initial class-path. TODO: We already construct
|
|
|
+ * a parent CLC and use it for all the containers, so this should go away
|
|
|
+ * once the mr-generated-classpath stuff is gone.
|
|
|
*/
|
|
|
- private String getInitialClasspath() throws IOException {
|
|
|
+ private static String getInitialClasspath() throws IOException {
|
|
|
synchronized (classpathLock) {
|
|
|
if (initialClasspathFlag.get()) {
|
|
|
return initialClasspath;
|
|
@@ -534,11 +536,14 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * Create the {@link ContainerLaunchContext} for this attempt.
|
|
|
+ * Create the common {@link ContainerLaunchContext} for all attempts.
|
|
|
* @param applicationACLs
|
|
|
*/
|
|
|
- private ContainerLaunchContext createContainerLaunchContext(
|
|
|
- Map<ApplicationAccessType, String> applicationACLs) {
|
|
|
+ private static ContainerLaunchContext createCommonContainerLaunchContext(
|
|
|
+ Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
|
|
|
+ Token<JobTokenIdentifier> jobToken,
|
|
|
+ final org.apache.hadoop.mapred.JobID oldJobId,
|
|
|
+ Collection<Token<? extends TokenIdentifier>> fsTokens) {
|
|
|
|
|
|
// Application resources
|
|
|
Map<String, LocalResource> localResources =
|
|
@@ -556,13 +561,13 @@ public abstract class TaskAttemptImpl implements
|
|
|
FileSystem remoteFS = FileSystem.get(conf);
|
|
|
|
|
|
// //////////// Set up JobJar to be localized properly on the remote NM.
|
|
|
- if (conf.get(MRJobConfig.JAR) != null) {
|
|
|
- Path remoteJobJar = (new Path(remoteTask.getConf().get(
|
|
|
- MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
|
|
|
- remoteFS.getWorkingDirectory());
|
|
|
+ String jobJar = conf.get(MRJobConfig.JAR);
|
|
|
+ if (jobJar != null) {
|
|
|
+ Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
|
|
|
+ .getUri(), remoteFS.getWorkingDirectory());
|
|
|
localResources.put(
|
|
|
MRJobConfig.JOB_JAR,
|
|
|
- createLocalResource(remoteFS, recordFactory, remoteJobJar,
|
|
|
+ createLocalResource(remoteFS, remoteJobJar,
|
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
|
LOG.info("The job-jar file on the remote FS is "
|
|
|
+ remoteJobJar.toUri().toASCIIString());
|
|
@@ -584,7 +589,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
|
|
|
localResources.put(
|
|
|
MRJobConfig.JOB_CONF_FILE,
|
|
|
- createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
|
|
|
+ createLocalResource(remoteFS, remoteJobConfPath,
|
|
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
|
LOG.info("The job-conf file on the remote FS is "
|
|
|
+ remoteJobConfPath.toUri().toASCIIString());
|
|
@@ -630,19 +635,81 @@ public abstract class TaskAttemptImpl implements
|
|
|
throw new YarnException(e);
|
|
|
}
|
|
|
|
|
|
- // Setup environment
|
|
|
- MapReduceChildJVM.setVMEnv(environment, remoteTask);
|
|
|
+ // Shell
|
|
|
+ environment.put(
|
|
|
+ Environment.SHELL.name(),
|
|
|
+ conf.get(
|
|
|
+ MRJobConfig.MAPRED_ADMIN_USER_SHELL,
|
|
|
+ MRJobConfig.DEFAULT_SHELL)
|
|
|
+ );
|
|
|
+
|
|
|
+ // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
|
|
|
+ Apps.addToEnvironment(
|
|
|
+ environment,
|
|
|
+ Environment.LD_LIBRARY_PATH.name(),
|
|
|
+ Environment.PWD.$());
|
|
|
+
|
|
|
+ // Add the env variables passed by the admin
|
|
|
+ Apps.setEnvFromInputString(
|
|
|
+ environment,
|
|
|
+ conf.get(
|
|
|
+ MRJobConfig.MAPRED_ADMIN_USER_ENV,
|
|
|
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
|
|
|
+ );
|
|
|
+
|
|
|
+ // Construct the actual Container
|
|
|
+ // The null fields are per-container and will be constructed for each
|
|
|
+ // container separately.
|
|
|
+ ContainerLaunchContext container = BuilderUtils
|
|
|
+ .newContainerLaunchContext(null, conf
|
|
|
+ .get(MRJobConfig.USER_NAME), null, localResources,
|
|
|
+ environment, null, serviceData, tokens, applicationACLs);
|
|
|
+
|
|
|
+ return container;
|
|
|
+ }
|
|
|
+
|
|
|
+ static ContainerLaunchContext createContainerLaunchContext(
|
|
|
+ Map<ApplicationAccessType, String> applicationACLs,
|
|
|
+ ContainerId containerID, Configuration conf,
|
|
|
+ Token<JobTokenIdentifier> jobToken, Task remoteTask,
|
|
|
+ final org.apache.hadoop.mapred.JobID oldJobId,
|
|
|
+ Resource assignedCapability, WrappedJvmID jvmID,
|
|
|
+ TaskAttemptListener taskAttemptListener,
|
|
|
+ Collection<Token<? extends TokenIdentifier>> fsTokens) {
|
|
|
+
|
|
|
+ synchronized (commonContainerSpecLock) {
|
|
|
+ if (commonContainerSpec == null) {
|
|
|
+ commonContainerSpec = createCommonContainerLaunchContext(
|
|
|
+ applicationACLs, conf, jobToken, oldJobId, fsTokens);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Fill in the fields needed per-container that are missing in the common
|
|
|
+ // spec.
|
|
|
+
|
|
|
+ // Setup environment by cloning from common env.
|
|
|
+ Map<String, String> env = commonContainerSpec.getEnvironment();
|
|
|
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
|
|
|
+ myEnv.putAll(env);
|
|
|
+ MapReduceChildJVM.setVMEnv(myEnv, remoteTask);
|
|
|
|
|
|
// Set up the launch command
|
|
|
List<String> commands = MapReduceChildJVM.getVMCommand(
|
|
|
- taskAttemptListener.getAddress(), remoteTask,
|
|
|
- jvmID);
|
|
|
-
|
|
|
+ taskAttemptListener.getAddress(), remoteTask, jvmID);
|
|
|
+
|
|
|
+ // Duplicate the ByteBuffers for access by multiple containers.
|
|
|
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
|
|
|
+ for (Entry<String, ByteBuffer> entry : commonContainerSpec
|
|
|
+ .getServiceData().entrySet()) {
|
|
|
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
|
|
|
+ }
|
|
|
+
|
|
|
// Construct the actual Container
|
|
|
- ContainerLaunchContext container = BuilderUtils
|
|
|
- .newContainerLaunchContext(containerID, conf
|
|
|
- .get(MRJobConfig.USER_NAME), assignedCapability, localResources,
|
|
|
- environment, commands, serviceData, tokens, applicationACLs);
|
|
|
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
|
|
|
+ containerID, commonContainerSpec.getUser(), assignedCapability,
|
|
|
+ commonContainerSpec.getLocalResources(), myEnv, commands,
|
|
|
+ myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
|
|
|
+ applicationACLs);
|
|
|
|
|
|
return container;
|
|
|
}
|
|
@@ -1022,7 +1089,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
private static class ContainerAssignedTransition implements
|
|
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
|
|
- @SuppressWarnings({ "unchecked", "deprecation" })
|
|
|
+ @SuppressWarnings({ "unchecked" })
|
|
|
@Override
|
|
|
public void transition(final TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
@@ -1046,20 +1113,16 @@ public abstract class TaskAttemptImpl implements
|
|
|
|
|
|
//launch the container
|
|
|
//create the container object to be launched for a given Task attempt
|
|
|
- taskAttempt.eventHandler.handle(
|
|
|
- new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
|
|
|
- taskAttempt.containerID,
|
|
|
- taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
|
|
|
- @Override
|
|
|
- public ContainerLaunchContext getContainer() {
|
|
|
- return taskAttempt.createContainerLaunchContext(cEvent
|
|
|
- .getApplicationACLs());
|
|
|
- }
|
|
|
- @Override
|
|
|
- public Task getRemoteTask() { // classic mapred Task, not YARN version
|
|
|
- return taskAttempt.remoteTask;
|
|
|
- }
|
|
|
- });
|
|
|
+ ContainerLaunchContext launchContext = createContainerLaunchContext(
|
|
|
+ cEvent.getApplicationACLs(), taskAttempt.containerID,
|
|
|
+ taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
|
|
|
+ taskAttempt.oldJobId, taskAttempt.assignedCapability,
|
|
|
+ taskAttempt.jvmID, taskAttempt.taskAttemptListener,
|
|
|
+ taskAttempt.fsTokens);
|
|
|
+ taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
|
|
+ taskAttempt.attemptId, taskAttempt.containerID,
|
|
|
+ taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|
|
|
+ launchContext, taskAttempt.remoteTask));
|
|
|
|
|
|
// send event to speculator that our container needs are satisfied
|
|
|
taskAttempt.eventHandler.handle
|
|
@@ -1197,7 +1260,6 @@ public abstract class TaskAttemptImpl implements
|
|
|
@Override
|
|
|
public void transition(TaskAttemptImpl taskAttempt,
|
|
|
TaskAttemptEvent event) {
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
TaskAttemptContext taskContext =
|
|
|
new TaskAttemptContextImpl(taskAttempt.conf,
|
|
|
TypeConverter.fromYarn(taskAttempt.attemptId));
|