|
@@ -61,7 +61,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
|
|
|
private static final int WIN_MAX_PATH = 260;
|
|
|
|
|
|
- private final FileContext lfs;
|
|
|
+ protected final FileContext lfs;
|
|
|
|
|
|
public DefaultContainerExecutor() {
|
|
|
try {
|
|
@@ -75,11 +75,19 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
this.lfs = lfs;
|
|
|
}
|
|
|
|
|
|
+ protected void copyFile(Path src, Path dst, String owner) throws IOException {
|
|
|
+ lfs.util().copy(src, dst);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void setScriptExecutable(Path script, String owner) throws IOException {
|
|
|
+ lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void init() throws IOException {
|
|
|
// nothing to do or verify here
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
|
|
|
InetSocketAddress nmAddr, String user, String appId, String locId,
|
|
@@ -93,14 +101,14 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
createUserLocalDirs(localDirs, user);
|
|
|
createUserCacheDirs(localDirs, user);
|
|
|
createAppDirs(localDirs, user, appId);
|
|
|
- createAppLogDirs(appId, logDirs);
|
|
|
+ createAppLogDirs(appId, logDirs, user);
|
|
|
|
|
|
// TODO: Why pick first app dir. The same in LCE why not random?
|
|
|
Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
|
|
|
|
|
|
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
|
|
|
Path tokenDst = new Path(appStorageDir, tokenFn);
|
|
|
- lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
|
|
|
+ copyFile(nmPrivateContainerTokensPath, tokenDst, user);
|
|
|
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
|
|
|
lfs.setWorkingDirectory(appStorageDir);
|
|
|
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
|
|
@@ -129,30 +137,29 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
|
|
Path appDir = new Path(appCacheDir, appIdStr);
|
|
|
Path containerDir = new Path(appDir, containerIdStr);
|
|
|
- createDir(containerDir, dirPerm, true);
|
|
|
+ createDir(containerDir, dirPerm, true, userName);
|
|
|
}
|
|
|
|
|
|
// Create the container log-dirs on all disks
|
|
|
- createContainerLogDirs(appIdStr, containerIdStr, logDirs);
|
|
|
+ createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
|
|
|
|
|
|
Path tmpDir = new Path(containerWorkDir,
|
|
|
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
|
|
|
- createDir(tmpDir, dirPerm, false);
|
|
|
+ createDir(tmpDir, dirPerm, false, userName);
|
|
|
|
|
|
// copy launch script to work dir
|
|
|
Path launchDst =
|
|
|
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
|
|
|
- lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
|
|
|
+ copyFile(nmPrivateContainerScriptPath, launchDst, userName);
|
|
|
|
|
|
// copy container tokens to work dir
|
|
|
Path tokenDst =
|
|
|
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
|
|
|
- lfs.util().copy(nmPrivateTokensPath, tokenDst);
|
|
|
+ copyFile(nmPrivateTokensPath, tokenDst, userName);
|
|
|
|
|
|
// Create new local launch wrapper script
|
|
|
- LocalWrapperScriptBuilder sb = Shell.WINDOWS ?
|
|
|
- new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
|
|
|
- new UnixLocalWrapperScriptBuilder(containerWorkDir);
|
|
|
+ LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder(
|
|
|
+ containerIdStr, containerWorkDir);
|
|
|
|
|
|
// Fail fast if attempting to launch the wrapper script would fail due to
|
|
|
// Windows path length limitation.
|
|
@@ -178,14 +185,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
// fork script
|
|
|
ShellCommandExecutor shExec = null;
|
|
|
try {
|
|
|
- lfs.setPermission(launchDst,
|
|
|
- ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
|
|
- lfs.setPermission(sb.getWrapperScriptPath(),
|
|
|
- ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
|
|
+ setScriptExecutable(launchDst, userName);
|
|
|
+ setScriptExecutable(sb.getWrapperScriptPath(), userName);
|
|
|
|
|
|
// Setup command to run
|
|
|
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
|
|
|
- containerIdStr, this.getConf());
|
|
|
+ containerIdStr, userName, pidFile, this.getConf());
|
|
|
|
|
|
LOG.info("launchContainer: " + Arrays.toString(command));
|
|
|
shExec = new ShellCommandExecutor(
|
|
@@ -241,7 +246,14 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- private abstract class LocalWrapperScriptBuilder {
|
|
|
+ protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder(
|
|
|
+ String containerIdStr, Path containerWorkDir) {
|
|
|
+ return Shell.WINDOWS ?
|
|
|
+ new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
|
|
|
+ new UnixLocalWrapperScriptBuilder(containerWorkDir);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected abstract class LocalWrapperScriptBuilder {
|
|
|
|
|
|
private final Path wrapperScriptPath;
|
|
|
|
|
@@ -449,7 +461,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
* $logdir/$user/$appId */
|
|
|
static final short LOGDIR_PERM = (short)0710;
|
|
|
|
|
|
- private Path getFirstApplicationDir(List<String> localDirs, String user,
|
|
|
+ protected Path getFirstApplicationDir(List<String> localDirs, String user,
|
|
|
String appId) {
|
|
|
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
|
|
|
}
|
|
@@ -472,8 +484,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
ContainerLocalizer.FILECACHE);
|
|
|
}
|
|
|
|
|
|
- private void createDir(Path dirPath, FsPermission perms,
|
|
|
- boolean createParent) throws IOException {
|
|
|
+ protected void createDir(Path dirPath, FsPermission perms,
|
|
|
+ boolean createParent, String user) throws IOException {
|
|
|
lfs.mkdir(dirPath, perms, createParent);
|
|
|
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
|
|
|
lfs.setPermission(dirPath, perms);
|
|
@@ -493,7 +505,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
for (String localDir : localDirs) {
|
|
|
// create $local.dir/usercache/$user and its immediate parent
|
|
|
try {
|
|
|
- createDir(getUserCacheDir(new Path(localDir), user), userperms, true);
|
|
|
+ createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unable to create the user directory : " + localDir, e);
|
|
|
continue;
|
|
@@ -529,7 +541,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
Path localDirPath = new Path(localDir);
|
|
|
final Path appDir = getAppcacheDir(localDirPath, user);
|
|
|
try {
|
|
|
- createDir(appDir, appCachePerms, true);
|
|
|
+ createDir(appDir, appCachePerms, true, user);
|
|
|
appcacheDirStatus = true;
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unable to create app cache directory : " + appDir, e);
|
|
@@ -537,7 +549,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
// create $local.dir/usercache/$user/filecache
|
|
|
final Path distDir = getFileCacheDir(localDirPath, user);
|
|
|
try {
|
|
|
- createDir(distDir, fileperms, true);
|
|
|
+ createDir(distDir, fileperms, true, user);
|
|
|
distributedCacheDirStatus = true;
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unable to create file cache directory : " + distDir, e);
|
|
@@ -570,7 +582,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
|
|
|
// create $local.dir/usercache/$user/appcache/$appId
|
|
|
try {
|
|
|
- createDir(fullAppDir, appperms, true);
|
|
|
+ createDir(fullAppDir, appperms, true, user);
|
|
|
initAppDirStatus = true;
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
|
|
@@ -586,7 +598,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
/**
|
|
|
* Create application log directories on all disks.
|
|
|
*/
|
|
|
- void createAppLogDirs(String appId, List<String> logDirs)
|
|
|
+ void createAppLogDirs(String appId, List<String> logDirs, String user)
|
|
|
throws IOException {
|
|
|
|
|
|
boolean appLogDirStatus = false;
|
|
@@ -595,7 +607,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
// create $log.dir/$appid
|
|
|
Path appLogDir = new Path(rootLogDir, appId);
|
|
|
try {
|
|
|
- createDir(appLogDir, appLogDirPerms, true);
|
|
|
+ createDir(appLogDir, appLogDirPerms, true, user);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
|
|
|
continue;
|
|
@@ -612,7 +624,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
* Create application log directories on all disks.
|
|
|
*/
|
|
|
void createContainerLogDirs(String appId, String containerId,
|
|
|
- List<String> logDirs) throws IOException {
|
|
|
+ List<String> logDirs, String user) throws IOException {
|
|
|
|
|
|
boolean containerLogDirStatus = false;
|
|
|
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
|
|
@@ -621,7 +633,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|
|
Path appLogDir = new Path(rootLogDir, appId);
|
|
|
Path containerLogDir = new Path(appLogDir, containerId);
|
|
|
try {
|
|
|
- createDir(containerLogDir, containerLogDirPerms, true);
|
|
|
+ createDir(containerLogDir, containerLogDirPerms, true, user);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unable to create the container-log directory : "
|
|
|
+ appLogDir, e);
|