|
@@ -18,8 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app2.rm.container;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
@@ -27,56 +25,30 @@ import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Map.Entry;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
|
|
|
-import org.apache.hadoop.mapred.ShuffleHandler;
|
|
|
-import org.apache.hadoop.mapred.Task;
|
|
|
+import org.apache.hadoop.mapred.JobID;
|
|
|
import org.apache.hadoop.mapred.WrappedJvmID;
|
|
|
-import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
|
|
|
-import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
|
|
|
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
|
-import org.apache.hadoop.security.Credentials;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.security.token.Token;
|
|
|
-import org.apache.hadoop.yarn.YarnException;
|
|
|
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
-import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
|
@@ -84,10 +56,8 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
|
|
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
|
|
import org.apache.hadoop.yarn.state.StateMachine;
|
|
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|
|
-import org.apache.hadoop.yarn.util.Apps;
|
|
|
-import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
-import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
|
|
|
+@SuppressWarnings("rawtypes")
|
|
|
public class AMContainerImpl implements AMContainer {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
|
|
@@ -101,13 +71,8 @@ public class AMContainerImpl implements AMContainer {
|
|
|
private final AppContext appContext;
|
|
|
private final ContainerHeartbeatHandler containerHeartbeatHandler;
|
|
|
private final TaskAttemptListener taskAttemptListener;
|
|
|
-
|
|
|
- private static Object commonContainerSpecLock = new Object();
|
|
|
- private static ContainerLaunchContext commonContainerSpec = null;
|
|
|
- private static final Object classpathLock = new Object();
|
|
|
- private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
|
|
- private static String initialClasspath = null;
|
|
|
-
|
|
|
+ protected final EventHandler eventHandler;
|
|
|
+
|
|
|
private final List<TaskAttemptId> completedAttempts = new LinkedList<TaskAttemptId>();
|
|
|
|
|
|
// TODO Maybe this should be pulled from the TaskAttempt.s
|
|
@@ -130,8 +95,7 @@ public class AMContainerImpl implements AMContainer {
|
|
|
|
|
|
private ContainerLaunchContext clc;
|
|
|
private WrappedJvmID jvmId;
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
- protected EventHandler eventHandler;
|
|
|
+
|
|
|
|
|
|
private static boolean stateMachineInited = false;
|
|
|
private static StateMachineFactory
|
|
@@ -194,16 +158,17 @@ public class AMContainerImpl implements AMContainer {
|
|
|
|
|
|
.installTopology();
|
|
|
}
|
|
|
-
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
+
|
|
|
+ // Attempting to use a container based purely on reosurces required, etc needs
|
|
|
+ // additional change - JvmID, YarnChild, etc depend on TaskType.
|
|
|
public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
|
|
|
- TaskAttemptListener tal, EventHandler eventHandler, AppContext appContext) {
|
|
|
+ TaskAttemptListener tal, AppContext appContext) {
|
|
|
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
|
|
|
this.readLock = rwLock.readLock();
|
|
|
this.writeLock = rwLock.writeLock();
|
|
|
this.container = container;
|
|
|
this.containerId = container.getId();
|
|
|
- this.eventHandler = eventHandler;
|
|
|
+ this.eventHandler = appContext.getEventHandler();
|
|
|
this.appContext = appContext;
|
|
|
this.containerHeartbeatHandler = chh;
|
|
|
this.taskAttemptListener = tal;
|
|
@@ -307,8 +272,7 @@ public class AMContainerImpl implements AMContainer {
|
|
|
this.eventHandler.handle(event);
|
|
|
}
|
|
|
|
|
|
- // TODO Maybe have pullTA send out an attemptId. TAL talks to the TaskAttempt
|
|
|
- // to fetch the actual RemoteTask.
|
|
|
+ // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks for a TaskAttempt.
|
|
|
public org.apache.hadoop.mapred.Task pullTaskAttempt() {
|
|
|
this.writeLock.lock();
|
|
|
try {
|
|
@@ -333,23 +297,29 @@ public class AMContainerImpl implements AMContainer {
|
|
|
@Override
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
AMContainerLaunchRequestEvent event = (AMContainerLaunchRequestEvent) cEvent;
|
|
|
- AMSchedulerTALaunchRequestEvent taEvent = event.getLaunchRequestEvent();
|
|
|
-
|
|
|
- // TODO LATER May be possible to forget about the clc or a part of it after
|
|
|
- // launch. Save AM resources.
|
|
|
-
|
|
|
- container.jvmId = new WrappedJvmID(taEvent.getRemoteTask().getJobID(), taEvent.getRemoteTask().isMapTask(), container.containerId.getId());
|
|
|
|
|
|
- container.clc = createContainerLaunchContext(
|
|
|
- event.getApplicationAcls(), container.getContainerId(),
|
|
|
- container.appContext.getJob(event.getJobId()).getConf(), taEvent.getJobToken(),
|
|
|
- taEvent.getRemoteTask(), TypeConverter.fromYarn(event.getJobId()),
|
|
|
+ JobID oldJobID = TypeConverter.fromYarn(event.getJobId());
|
|
|
+ container.jvmId = new WrappedJvmID(oldJobID,
|
|
|
+ event.getTaskTypeForContainer() == TaskType.MAP,
|
|
|
+ container.containerId.getId());
|
|
|
+
|
|
|
+ container.clc = AMContainerHelpers.createContainerLaunchContext(
|
|
|
+ container.appContext.getApplicationACLs(),
|
|
|
+ container.getContainerId(), event.getJobConf(),
|
|
|
+ event.getTaskTypeForContainer(), event.getJobToken(),
|
|
|
+ TypeConverter.fromYarn(event.getJobId()),
|
|
|
container.getContainer().getResource(), container.jvmId,
|
|
|
- container.taskAttemptListener, taEvent.getCredentials());
|
|
|
+ container.taskAttemptListener, event.getCredentials(),
|
|
|
+ event.shouldProfile());
|
|
|
|
|
|
- container.sendEvent(new NMCommunicatorLaunchRequestEvent(container.clc, container.container));
|
|
|
+ container.sendEvent(new NMCommunicatorLaunchRequestEvent(container.clc,
|
|
|
+ container.container));
|
|
|
LOG.info("Sending Launch Request for Container with id: "
|
|
|
+ container.clc.getContainerId());
|
|
|
+ // Forget about the clc to save resources. At some point, part of the clc
|
|
|
+ // info may need to be exposed to the scheduler to figure out whether a
|
|
|
+ // container can be used for a specific TaskAttempt.
|
|
|
+ container.clc = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -364,7 +334,9 @@ public class AMContainerImpl implements AMContainer {
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
|
|
|
container.inError = true;
|
|
|
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
|
|
|
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
|
|
|
+ "AMScheduler Error: TaskAttempt should not be" +
|
|
|
+ " allocated before a launch request.");
|
|
|
container.sendCompletedToScheduler();
|
|
|
container.deAllocate();
|
|
|
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
|
|
@@ -422,8 +394,11 @@ public class AMContainerImpl implements AMContainer {
|
|
|
protected void sendCompletedToScheduler() {
|
|
|
sendEvent(new AMSchedulerEventContainerCompleted(containerId));
|
|
|
}
|
|
|
-
|
|
|
- protected void sendTerminatedToTaskAttempt(TaskAttemptId taId) {
|
|
|
+
|
|
|
+ protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String message) {
|
|
|
+ if (message != null) {
|
|
|
+ sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message));
|
|
|
+ }
|
|
|
sendEvent(new TaskAttemptEventTerminated(taId));
|
|
|
}
|
|
|
|
|
@@ -462,14 +437,19 @@ public class AMContainerImpl implements AMContainer {
|
|
|
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
|
|
|
if (container.pendingAttempt != null) {
|
|
|
container.inError = true;
|
|
|
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
|
|
|
+ String errorMessage = "AMScheduler Error: Multiple simultaneous " +
|
|
|
+ "taskAttempt allocations to: " + container.getContainerId();
|
|
|
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
|
|
|
+ errorMessage);
|
|
|
container.deAllocate();
|
|
|
+ LOG.warn(errorMessage);
|
|
|
return AMContainerState.STOPPING;
|
|
|
}
|
|
|
container.pendingAttempt = event.getTaskAttemptId();
|
|
|
container.remoteTaskMap.put(event.getTaskAttemptId(),
|
|
|
event.getRemoteTask());
|
|
|
- // TODO XXX: Consider registering with the TAL, instead of the TAL pulling.
|
|
|
+ // TODO Consider registering with the TAL, instead of the TAL pulling.
|
|
|
+ // Possibly after splitting TAL and ContainerListener.
|
|
|
return container.getState();
|
|
|
}
|
|
|
}
|
|
@@ -515,7 +495,6 @@ public class AMContainerImpl implements AMContainer {
|
|
|
}
|
|
|
LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to container: [" + container.getContainerId() + "]");
|
|
|
return AMContainerState.RUNNING;
|
|
|
- // TODO XXX: Make sure the TAL sends out a TA_STARTED_REMOTELY, along with the shuffle port.
|
|
|
} else {
|
|
|
return AMContainerState.IDLE;
|
|
|
}
|
|
@@ -531,7 +510,6 @@ public class AMContainerImpl implements AMContainer {
|
|
|
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
|
|
|
@Override
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
- // TODO XXX: Send diagnostics to pending task attempt. Update action in transition table.
|
|
|
if (container.pendingAttempt != null) {
|
|
|
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
|
|
|
container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent(
|
|
@@ -551,7 +529,10 @@ public class AMContainerImpl implements AMContainer {
|
|
|
@Override
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
if (container.pendingAttempt != null) {
|
|
|
- container.sendTerminatedToTaskAttempt(container.pendingAttempt);
|
|
|
+ String errorMessage = "Container" + container.getContainerId()
|
|
|
+ + " failed. Received COMPLETED event while trying to launch";
|
|
|
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage);
|
|
|
+ LOG.warn(errorMessage);
|
|
|
// TODO XXX Maybe nullify pendingAttempt.
|
|
|
}
|
|
|
container.sendCompletedToScheduler();
|
|
@@ -605,10 +586,15 @@ public class AMContainerImpl implements AMContainer {
|
|
|
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
|
|
|
if (container.pendingAttempt != null) {
|
|
|
container.inError = true;
|
|
|
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
|
|
|
+ String errorMessage = "AMScheduler Error: Multiple simultaneous "
|
|
|
+ + "taskAttempt allocations to: " + container.getContainerId();
|
|
|
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
|
|
|
+ errorMessage);
|
|
|
+ LOG.warn(errorMessage);
|
|
|
container.sendStopRequestToNM();
|
|
|
container.deAllocate();
|
|
|
container.containerHeartbeatHandler.unregister(container.containerId);
|
|
|
+
|
|
|
return AMContainerState.STOPPING;
|
|
|
}
|
|
|
container.pendingAttempt = event.getTaskAttemptId();
|
|
@@ -631,7 +617,7 @@ public class AMContainerImpl implements AMContainer {
|
|
|
LOG.info("Cotnainer with id: " + container.getContainerId()
|
|
|
+ " Completed." + " Previous state was: " + container.getState());
|
|
|
if (container.pendingAttempt != null) {
|
|
|
- container.sendTerminatedToTaskAttempt(container.pendingAttempt);
|
|
|
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
|
|
|
}
|
|
|
container.sendCompletedToScheduler();
|
|
|
container.containerHeartbeatHandler.unregister(container.containerId);
|
|
@@ -689,7 +675,7 @@ public class AMContainerImpl implements AMContainer {
|
|
|
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
|
|
|
@Override
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
- container.sendTerminatedToTaskAttempt(container.runningAttempt);
|
|
|
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
|
|
|
container.sendCompletedToScheduler();
|
|
|
container.containerHeartbeatHandler.unregister(container.containerId);
|
|
|
container.unregisterAttemptFromListener(container.runningAttempt);
|
|
@@ -756,7 +742,11 @@ public class AMContainerImpl implements AMContainer {
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
|
|
|
container.inError = true;
|
|
|
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
|
|
|
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
|
|
|
+ + " cannot be allocated to container: " + container.getContainerId()
|
|
|
+ + " in STOPPING state";
|
|
|
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
|
|
|
+ errorMessage);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -784,7 +774,11 @@ public class AMContainerImpl implements AMContainer {
|
|
|
@Override
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
|
|
|
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
|
|
|
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
|
|
|
+ + " cannot be allocated to container: " + container.getContainerId()
|
|
|
+ + " in COMPLETED state";
|
|
|
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
|
|
|
+ errorMessage);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -797,14 +791,15 @@ public class AMContainerImpl implements AMContainer {
|
|
|
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
|
|
|
@Override
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
+ // XXX: Would some of these events not have gone out when entering the STOPPING state. Fix errorMessages
|
|
|
if (container.pendingAttempt != null) {
|
|
|
- container.sendTerminatedToTaskAttempt(container.pendingAttempt);
|
|
|
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
|
|
|
}
|
|
|
if (container.runningAttempt != null) {
|
|
|
- container.sendTerminatedToTaskAttempt(container.runningAttempt);
|
|
|
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
|
|
|
}
|
|
|
if (container.interruptedEvent != null) {
|
|
|
- container.sendTerminatedToTaskAttempt(container.interruptedEvent);
|
|
|
+ container.sendTerminatedToTaskAttempt(container.interruptedEvent, null);
|
|
|
}
|
|
|
container.sendCompletedToScheduler();
|
|
|
}
|
|
@@ -876,7 +871,10 @@ public class AMContainerImpl implements AMContainer {
|
|
|
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
|
|
|
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
|
|
|
container.inError = true;
|
|
|
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
|
|
|
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
|
|
|
+ + " cannot be allocated to container: " + container.getContainerId()
|
|
|
+ + " in RUNNING state";
|
|
|
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage);
|
|
|
container.sendStopRequestToNM();
|
|
|
container.deAllocate();
|
|
|
container.unregisterAttemptFromListener(container.runningAttempt);
|
|
@@ -948,217 +946,5 @@ public class AMContainerImpl implements AMContainer {
|
|
|
|
|
|
// TODO Create a generic ERROR state. Container tries informing relevant components in this case.
|
|
|
|
|
|
- /**
|
|
|
- * Create a {@link LocalResource} record with all the given parameters.
|
|
|
- */
|
|
|
- private static LocalResource createLocalResource(FileSystem fc, Path file,
|
|
|
- LocalResourceType type, LocalResourceVisibility visibility)
|
|
|
- throws IOException {
|
|
|
- FileStatus fstat = fc.getFileStatus(file);
|
|
|
- URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
|
|
|
- .getPath()));
|
|
|
- long resourceSize = fstat.getLen();
|
|
|
- long resourceModificationTime = fstat.getModificationTime();
|
|
|
-
|
|
|
- return BuilderUtils.newLocalResource(resourceURL, type, visibility,
|
|
|
- resourceSize, resourceModificationTime);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Lock this on initialClasspath so that there is only one fork in the AM for
|
|
|
- * getting the initial class-path. TODO: We already construct
|
|
|
- * a parent CLC and use it for all the containers, so this should go away
|
|
|
- * once the mr-generated-classpath stuff is gone.
|
|
|
- */
|
|
|
- private static String getInitialClasspath(Configuration conf) throws IOException {
|
|
|
- synchronized (classpathLock) {
|
|
|
- if (initialClasspathFlag.get()) {
|
|
|
- return initialClasspath;
|
|
|
- }
|
|
|
- Map<String, String> env = new HashMap<String, String>();
|
|
|
- MRApps.setClasspath(env, conf);
|
|
|
- initialClasspath = env.get(Environment.CLASSPATH.name());
|
|
|
- initialClasspathFlag.set(true);
|
|
|
- return initialClasspath;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create the common {@link ContainerLaunchContext} for all attempts.
|
|
|
- * @param applicationACLs
|
|
|
- */
|
|
|
- private static ContainerLaunchContext createCommonContainerLaunchContext(
|
|
|
- Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
|
|
|
- Token<JobTokenIdentifier> jobToken,
|
|
|
- final org.apache.hadoop.mapred.JobID oldJobId,
|
|
|
- Credentials credentials) {
|
|
|
-
|
|
|
- // Application resources
|
|
|
- Map<String, LocalResource> localResources =
|
|
|
- new HashMap<String, LocalResource>();
|
|
|
-
|
|
|
- // Application environment
|
|
|
- Map<String, String> environment = new HashMap<String, String>();
|
|
|
-
|
|
|
- // Service data
|
|
|
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
|
|
|
|
- // Tokens
|
|
|
- ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
|
|
|
- try {
|
|
|
- FileSystem remoteFS = FileSystem.get(conf);
|
|
|
-
|
|
|
- // //////////// Set up JobJar to be localized properly on the remote NM.
|
|
|
- String jobJar = conf.get(MRJobConfig.JAR);
|
|
|
- if (jobJar != null) {
|
|
|
- Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
|
|
|
- .getUri(), remoteFS.getWorkingDirectory());
|
|
|
- localResources.put(
|
|
|
- MRJobConfig.JOB_JAR,
|
|
|
- createLocalResource(remoteFS, remoteJobJar,
|
|
|
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
|
- LOG.info("The job-jar file on the remote FS is "
|
|
|
- + remoteJobJar.toUri().toASCIIString());
|
|
|
- } else {
|
|
|
- // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
|
|
|
- // mapreduce jar itself which is already on the classpath.
|
|
|
- LOG.info("Job jar is not present. "
|
|
|
- + "Not adding any jar to the list of resources.");
|
|
|
- }
|
|
|
- // //////////// End of JobJar setup
|
|
|
-
|
|
|
- // //////////// Set up JobConf to be localized properly on the remote NM.
|
|
|
- Path path =
|
|
|
- MRApps.getStagingAreaDir(conf, UserGroupInformation
|
|
|
- .getCurrentUser().getShortUserName());
|
|
|
- Path remoteJobSubmitDir =
|
|
|
- new Path(path, oldJobId.toString());
|
|
|
- Path remoteJobConfPath =
|
|
|
- new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
|
|
|
- localResources.put(
|
|
|
- MRJobConfig.JOB_CONF_FILE,
|
|
|
- createLocalResource(remoteFS, remoteJobConfPath,
|
|
|
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
|
|
- LOG.info("The job-conf file on the remote FS is "
|
|
|
- + remoteJobConfPath.toUri().toASCIIString());
|
|
|
- // //////////// End of JobConf setup
|
|
|
-
|
|
|
- // Setup DistributedCache
|
|
|
- MRApps.setupDistributedCache(conf, localResources);
|
|
|
-
|
|
|
- // Setup up task credentials buffer
|
|
|
- Credentials taskCredentials = new Credentials();
|
|
|
-
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- LOG.info("Adding #" + credentials.numberOfTokens()
|
|
|
- + " tokens and #" + credentials.numberOfSecretKeys()
|
|
|
- + " secret keys for NM use for launching container");
|
|
|
- taskCredentials.addAll(credentials);
|
|
|
- }
|
|
|
-
|
|
|
- // LocalStorageToken is needed irrespective of whether security is enabled
|
|
|
- // or not.
|
|
|
- TokenCache.setJobToken(jobToken, taskCredentials);
|
|
|
-
|
|
|
- DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
|
|
|
- LOG.info("Size of containertokens_dob is "
|
|
|
- + taskCredentials.numberOfTokens());
|
|
|
- taskCredentials.writeTokenStorageToStream(containerTokens_dob);
|
|
|
- taskCredentialsBuffer =
|
|
|
- ByteBuffer.wrap(containerTokens_dob.getData(), 0,
|
|
|
- containerTokens_dob.getLength());
|
|
|
-
|
|
|
- // Add shuffle token
|
|
|
- LOG.info("Putting shuffle token in serviceData");
|
|
|
- serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
|
|
- ShuffleHandler.serializeServiceData(jobToken));
|
|
|
-
|
|
|
- Apps.addToEnvironment(
|
|
|
- environment,
|
|
|
- Environment.CLASSPATH.name(),
|
|
|
- getInitialClasspath(conf));
|
|
|
- } catch (IOException e) {
|
|
|
- throw new YarnException(e);
|
|
|
- }
|
|
|
-
|
|
|
- // Shell
|
|
|
- environment.put(
|
|
|
- Environment.SHELL.name(),
|
|
|
- conf.get(
|
|
|
- MRJobConfig.MAPRED_ADMIN_USER_SHELL,
|
|
|
- MRJobConfig.DEFAULT_SHELL)
|
|
|
- );
|
|
|
-
|
|
|
- // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
|
|
|
- Apps.addToEnvironment(
|
|
|
- environment,
|
|
|
- Environment.LD_LIBRARY_PATH.name(),
|
|
|
- Environment.PWD.$());
|
|
|
-
|
|
|
- // Add the env variables passed by the admin
|
|
|
- Apps.setEnvFromInputString(
|
|
|
- environment,
|
|
|
- conf.get(
|
|
|
- MRJobConfig.MAPRED_ADMIN_USER_ENV,
|
|
|
- MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
|
|
|
- );
|
|
|
-
|
|
|
- // Construct the actual Container
|
|
|
- // The null fields are per-container and will be constructed for each
|
|
|
- // container separately.
|
|
|
- ContainerLaunchContext container = BuilderUtils
|
|
|
- .newContainerLaunchContext(null, conf
|
|
|
- .get(MRJobConfig.USER_NAME), null, localResources,
|
|
|
- environment, null, serviceData, taskCredentialsBuffer,
|
|
|
- applicationACLs);
|
|
|
-
|
|
|
- return container;
|
|
|
- }
|
|
|
-
|
|
|
- static ContainerLaunchContext createContainerLaunchContext(
|
|
|
- Map<ApplicationAccessType, String> applicationACLs,
|
|
|
- ContainerId containerID, Configuration conf,
|
|
|
- Token<JobTokenIdentifier> jobToken, Task remoteTask,
|
|
|
- final org.apache.hadoop.mapred.JobID oldJobId,
|
|
|
- Resource assignedCapability, WrappedJvmID jvmID,
|
|
|
- TaskAttemptListener taskAttemptListener,
|
|
|
- Credentials credentials) {
|
|
|
-
|
|
|
- synchronized (commonContainerSpecLock) {
|
|
|
- if (commonContainerSpec == null) {
|
|
|
- commonContainerSpec = createCommonContainerLaunchContext(
|
|
|
- applicationACLs, conf, jobToken, oldJobId, credentials);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Fill in the fields needed per-container that are missing in the common
|
|
|
- // spec.
|
|
|
-
|
|
|
- // Setup environment by cloning from common env.
|
|
|
- Map<String, String> env = commonContainerSpec.getEnvironment();
|
|
|
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
|
|
|
- myEnv.putAll(env);
|
|
|
- MapReduceChildJVM2.setVMEnv(myEnv, remoteTask);
|
|
|
-
|
|
|
- // Set up the launch command
|
|
|
- List<String> commands = MapReduceChildJVM2.getVMCommand(
|
|
|
- taskAttemptListener.getAddress(), remoteTask, jvmID);
|
|
|
-
|
|
|
- // Duplicate the ByteBuffers for access by multiple containers.
|
|
|
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
|
|
|
- for (Entry<String, ByteBuffer> entry : commonContainerSpec
|
|
|
- .getServiceData().entrySet()) {
|
|
|
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
|
|
|
- }
|
|
|
-
|
|
|
- // Construct the actual Container
|
|
|
- ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
|
|
|
- containerID, commonContainerSpec.getUser(), assignedCapability,
|
|
|
- commonContainerSpec.getLocalResources(), myEnv, commands,
|
|
|
- myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
|
|
|
- applicationACLs);
|
|
|
-
|
|
|
- return container;
|
|
|
- }
|
|
|
-
|
|
|
}
|