|
@@ -182,6 +182,8 @@ public class ApplicationMaster {
|
|
|
DS_APP_ATTEMPT, DS_CONTAINER
|
|
|
}
|
|
|
|
|
|
+ private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
|
|
|
+
|
|
|
// Configuration
|
|
|
private Configuration conf;
|
|
|
|
|
@@ -279,6 +281,8 @@ public class ApplicationMaster {
|
|
|
private final String linux_bash_command = "bash";
|
|
|
private final String windows_command = "cmd /c";
|
|
|
|
|
|
+ private int yarnShellIdCounter = 1;
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
protected final Set<ContainerId> launchedContainers =
|
|
|
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
|
|
@@ -803,8 +807,11 @@ public class ApplicationMaster {
|
|
|
+ allocatedContainers.size());
|
|
|
numAllocatedContainers.addAndGet(allocatedContainers.size());
|
|
|
for (Container allocatedContainer : allocatedContainers) {
|
|
|
+ String yarnShellId = Integer.toString(yarnShellIdCounter);
|
|
|
+ yarnShellIdCounter++;
|
|
|
LOG.info("Launching shell command on a new container."
|
|
|
+ ", containerId=" + allocatedContainer.getId()
|
|
|
+ + ", yarnShellId=" + yarnShellId
|
|
|
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
|
|
|
+ ":" + allocatedContainer.getNodeId().getPort()
|
|
|
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
|
@@ -815,7 +822,8 @@ public class ApplicationMaster {
|
|
|
// + ", containerToken"
|
|
|
// +allocatedContainer.getContainerToken().getIdentifier().toString());
|
|
|
|
|
|
- Thread launchThread = createLaunchContainerThread(allocatedContainer);
|
|
|
+ Thread launchThread = createLaunchContainerThread(allocatedContainer,
|
|
|
+ yarnShellId);
|
|
|
|
|
|
// launch and start the container on a separate thread to keep
|
|
|
// the main thread unblocked
|
|
@@ -927,7 +935,8 @@ public class ApplicationMaster {
|
|
|
private class LaunchContainerRunnable implements Runnable {
|
|
|
|
|
|
// Allocated container
|
|
|
- Container container;
|
|
|
+ private Container container;
|
|
|
+ private String shellId;
|
|
|
|
|
|
NMCallbackHandler containerListener;
|
|
|
|
|
@@ -935,10 +944,11 @@ public class ApplicationMaster {
|
|
|
* @param lcontainer Allocated container
|
|
|
* @param containerListener Callback handler of the container
|
|
|
*/
|
|
|
- public LaunchContainerRunnable(
|
|
|
- Container lcontainer, NMCallbackHandler containerListener) {
|
|
|
+ public LaunchContainerRunnable(Container lcontainer,
|
|
|
+ NMCallbackHandler containerListener, String shellId) {
|
|
|
this.container = lcontainer;
|
|
|
this.containerListener = containerListener;
|
|
|
+ this.shellId = shellId;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -949,7 +959,7 @@ public class ApplicationMaster {
|
|
|
*/
|
|
|
public void run() {
|
|
|
LOG.info("Setting up container launch container for containerid="
|
|
|
- + container.getId());
|
|
|
+ + container.getId() + " with shellid=" + shellId);
|
|
|
|
|
|
// Set the local resources
|
|
|
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
|
@@ -1038,8 +1048,11 @@ public class ApplicationMaster {
|
|
|
// download anyfiles in the distributed file-system. The tokens are
|
|
|
// otherwise also useful in cases, for e.g., when one is running a
|
|
|
// "hadoop dfs" command inside the distributed shell.
|
|
|
+ Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
|
|
|
+ myShellEnv.put(YARN_SHELL_ID, shellId);
|
|
|
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
|
|
|
- localResources, shellEnv, commands, null, allTokens.duplicate(), null);
|
|
|
+ localResources, myShellEnv, commands, null, allTokens.duplicate(),
|
|
|
+ null);
|
|
|
containerListener.addContainer(container.getId(), container);
|
|
|
nmClientAsync.startContainerAsync(container, ctx);
|
|
|
}
|
|
@@ -1189,9 +1202,11 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- Thread createLaunchContainerThread(Container allocatedContainer) {
|
|
|
+ Thread createLaunchContainerThread(Container allocatedContainer,
|
|
|
+ String shellId) {
|
|
|
LaunchContainerRunnable runnableLaunchContainer =
|
|
|
- new LaunchContainerRunnable(allocatedContainer, containerListener);
|
|
|
+ new LaunchContainerRunnable(allocatedContainer, containerListener,
|
|
|
+ shellId);
|
|
|
return new Thread(runnableLaunchContainer);
|
|
|
}
|
|
|
}
|