|
@@ -20,15 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Optional;
|
|
|
-
|
|
|
-import java.io.File;
|
|
|
-import java.io.IOException;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
-import java.util.regex.Pattern;
|
|
|
-
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -46,10 +37,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DelegatingLinuxContainerRuntime;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
|
@@ -60,6 +55,22 @@ import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+
|
|
|
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*;
|
|
|
+
|
|
|
+/** Container execution for Linux. Provides linux-specific localization
|
|
|
+ * mechanisms, resource management via cgroups and can switch between multiple
|
|
|
+ * container runtimes - e.g Standard "Process Tree", Docker etc
|
|
|
+ */
|
|
|
+
|
|
|
public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
|
|
|
private static final Log LOG = LogFactory
|
|
@@ -73,6 +84,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
private int containerSchedPriorityAdjustment = 0;
|
|
|
private boolean containerLimitUsers;
|
|
|
private ResourceHandler resourceHandlerChain;
|
|
|
+ private LinuxContainerRuntime linuxContainerRuntime;
|
|
|
+
|
|
|
+ public LinuxContainerExecutor() {
|
|
|
+ }
|
|
|
+
|
|
|
+ // created primarily for testing
|
|
|
+ public LinuxContainerExecutor(LinuxContainerRuntime linuxContainerRuntime) {
|
|
|
+ this.linuxContainerRuntime = linuxContainerRuntime;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public void setConf(Configuration conf) {
|
|
@@ -85,10 +105,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
resourcesHandler.setConf(conf);
|
|
|
|
|
|
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
|
|
|
- containerSchedPriorityIsSet = true;
|
|
|
- containerSchedPriorityAdjustment = conf
|
|
|
- .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
|
|
|
- YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
|
|
|
+ containerSchedPriorityIsSet = true;
|
|
|
+ containerSchedPriorityAdjustment = conf
|
|
|
+ .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
|
|
|
+ YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
|
|
|
}
|
|
|
nonsecureLocalUser = conf.get(
|
|
|
YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
|
|
@@ -122,46 +142,6 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * List of commands that the setuid script will execute.
|
|
|
- */
|
|
|
- enum Commands {
|
|
|
- INITIALIZE_CONTAINER(0),
|
|
|
- LAUNCH_CONTAINER(1),
|
|
|
- SIGNAL_CONTAINER(2),
|
|
|
- DELETE_AS_USER(3);
|
|
|
-
|
|
|
- private int value;
|
|
|
- Commands(int value) {
|
|
|
- this.value = value;
|
|
|
- }
|
|
|
- int getValue() {
|
|
|
- return value;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Result codes returned from the C container-executor.
|
|
|
- * These must match the values in container-executor.h.
|
|
|
- */
|
|
|
- enum ResultCode {
|
|
|
- OK(0),
|
|
|
- INVALID_USER_NAME(2),
|
|
|
- UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
|
|
|
- INVALID_CONTAINER_PID(9),
|
|
|
- INVALID_CONTAINER_EXEC_PERMISSIONS(22),
|
|
|
- INVALID_CONFIG_FILE(24),
|
|
|
- WRITE_CGROUP_FAILED(27);
|
|
|
-
|
|
|
- private final int value;
|
|
|
- ResultCode(int value) {
|
|
|
- this.value = value;
|
|
|
- }
|
|
|
- int getValue() {
|
|
|
- return value;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
protected String getContainerExecutorExecutablePath(Configuration conf) {
|
|
|
String yarnHomeEnvVar =
|
|
|
System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
|
|
@@ -203,9 +183,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
+ " (error=" + exitCode + ")", e);
|
|
|
}
|
|
|
|
|
|
- try {
|
|
|
- Configuration conf = super.getConf();
|
|
|
+ Configuration conf = super.getConf();
|
|
|
|
|
|
+ try {
|
|
|
resourceHandlerChain = ResourceHandlerModule
|
|
|
.getConfiguredResourceHandlerChain(conf);
|
|
|
if (resourceHandlerChain != null) {
|
|
@@ -216,9 +196,20 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
throw new IOException("Failed to bootstrap configured resource subsystems!");
|
|
|
}
|
|
|
|
|
|
+ try {
|
|
|
+ if (linuxContainerRuntime == null) {
|
|
|
+ LinuxContainerRuntime runtime = new DelegatingLinuxContainerRuntime();
|
|
|
+
|
|
|
+ runtime.initialize(conf);
|
|
|
+ this.linuxContainerRuntime = runtime;
|
|
|
+ }
|
|
|
+ } catch (ContainerExecutionException e) {
|
|
|
+ throw new IOException("Failed to initialize linux container runtime(s)!");
|
|
|
+ }
|
|
|
+
|
|
|
resourcesHandler.init(this);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public void startLocalizer(LocalizerStartContext ctx)
|
|
|
throws IOException, InterruptedException {
|
|
@@ -238,7 +229,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
command.addAll(Arrays.asList(containerExecutorExe,
|
|
|
runAsUser,
|
|
|
user,
|
|
|
- Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
|
|
|
+ Integer.toString(PrivilegedOperation.RunAsUserCommand.INITIALIZE_CONTAINER.getValue()),
|
|
|
appId,
|
|
|
nmPrivateContainerTokensPath.toUri().getPath().toString(),
|
|
|
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
|
@@ -294,6 +285,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
Path containerWorkDir = ctx.getContainerWorkDir();
|
|
|
List<String> localDirs = ctx.getLocalDirs();
|
|
|
List<String> logDirs = ctx.getLogDirs();
|
|
|
+ Map<Path, List<String>> localizedResources = ctx.getLocalizedResources();
|
|
|
|
|
|
verifyUsernamePattern(user);
|
|
|
String runAsUser = getRunAsUser(user);
|
|
@@ -351,50 +343,48 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
throw new IOException("ResourceHandlerChain.preStart() failed!");
|
|
|
}
|
|
|
|
|
|
- ShellCommandExecutor shExec = null;
|
|
|
-
|
|
|
try {
|
|
|
Path pidFilePath = getPidFilePath(containerId);
|
|
|
if (pidFilePath != null) {
|
|
|
- List<String> command = new ArrayList<String>();
|
|
|
- addSchedPriorityCommand(command);
|
|
|
- command.addAll(Arrays.asList(
|
|
|
- containerExecutorExe, runAsUser, user, Integer
|
|
|
- .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
|
|
|
- containerIdStr, containerWorkDir.toString(),
|
|
|
- nmPrivateContainerScriptPath.toUri().getPath().toString(),
|
|
|
- nmPrivateTokensPath.toUri().getPath().toString(),
|
|
|
- pidFilePath.toString(),
|
|
|
- StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
|
|
- localDirs),
|
|
|
- StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
|
|
- logDirs),
|
|
|
- resourcesOptions));
|
|
|
+ List<String> prefixCommands= new ArrayList<>();
|
|
|
+ ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
|
|
|
+ .Builder(container);
|
|
|
+
|
|
|
+ addSchedPriorityCommand(prefixCommands);
|
|
|
+ if (prefixCommands.size() > 0) {
|
|
|
+ builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
|
|
|
+ prefixCommands);
|
|
|
+ }
|
|
|
+
|
|
|
+ builder.setExecutionAttribute(LOCALIZED_RESOURCES, localizedResources)
|
|
|
+ .setExecutionAttribute(RUN_AS_USER, runAsUser)
|
|
|
+ .setExecutionAttribute(USER, user)
|
|
|
+ .setExecutionAttribute(APPID, appId)
|
|
|
+ .setExecutionAttribute(CONTAINER_ID_STR, containerIdStr)
|
|
|
+ .setExecutionAttribute(CONTAINER_WORK_DIR, containerWorkDir)
|
|
|
+ .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
|
|
|
+ nmPrivateContainerScriptPath)
|
|
|
+ .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, nmPrivateTokensPath)
|
|
|
+ .setExecutionAttribute(PID_FILE_PATH, pidFilePath)
|
|
|
+ .setExecutionAttribute(LOCAL_DIRS, localDirs)
|
|
|
+ .setExecutionAttribute(LOG_DIRS, logDirs)
|
|
|
+ .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
|
|
|
|
|
|
if (tcCommandFile != null) {
|
|
|
- command.add(tcCommandFile);
|
|
|
+ builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
|
|
|
}
|
|
|
|
|
|
- String[] commandArray = command.toArray(new String[command.size()]);
|
|
|
- shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
|
|
|
- container.getLaunchContext().getEnvironment()); // sanitized env
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("launchContainer: " + Arrays.toString(commandArray));
|
|
|
- }
|
|
|
- shExec.execute();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- logOutput(shExec.getOutput());
|
|
|
- }
|
|
|
+ linuxContainerRuntime.launchContainer(builder.build());
|
|
|
} else {
|
|
|
LOG.info("Container was marked as inactive. Returning terminated error");
|
|
|
return ExitCode.TERMINATED.getExitCode();
|
|
|
}
|
|
|
- } catch (ExitCodeException e) {
|
|
|
- int exitCode = shExec.getExitCode();
|
|
|
+ } catch (ContainerExecutionException e) {
|
|
|
+ int exitCode = e.getExitCode();
|
|
|
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
|
|
|
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
|
|
|
// terminated/killed forcefully. In all other cases, log the
|
|
|
- // container-executor's output
|
|
|
+ // output
|
|
|
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
|
|
|
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
|
|
|
LOG.warn("Exception from container-launch with container ID: "
|
|
@@ -404,13 +394,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
builder.append("Exception from container-launch.\n");
|
|
|
builder.append("Container id: " + containerId + "\n");
|
|
|
builder.append("Exit code: " + exitCode + "\n");
|
|
|
- if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
|
|
|
- builder.append("Exception message: " + e.getMessage() + "\n");
|
|
|
+ if (!Optional.fromNullable(e.getErrorOutput()).or("").isEmpty()) {
|
|
|
+ builder.append("Exception message: " + e.getErrorOutput() + "\n");
|
|
|
}
|
|
|
builder.append("Stack trace: "
|
|
|
+ StringUtils.stringifyException(e) + "\n");
|
|
|
- if (!shExec.getOutput().isEmpty()) {
|
|
|
- builder.append("Shell output: " + shExec.getOutput() + "\n");
|
|
|
+ if (!e.getOutput().isEmpty()) {
|
|
|
+ builder.append("Shell output: " + e.getOutput() + "\n");
|
|
|
}
|
|
|
String diagnostics = builder.toString();
|
|
|
logOutput(diagnostics);
|
|
@@ -433,10 +423,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
"containerId: " + containerId + ". Exception: " + e);
|
|
|
}
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
|
|
|
- logOutput(shExec.getOutput());
|
|
|
- }
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -474,6 +461,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
@Override
|
|
|
public boolean signalContainer(ContainerSignalContext ctx)
|
|
|
throws IOException {
|
|
|
+ Container container = ctx.getContainer();
|
|
|
String user = ctx.getUser();
|
|
|
String pid = ctx.getPid();
|
|
|
Signal signal = ctx.getSignal();
|
|
@@ -481,30 +469,27 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
verifyUsernamePattern(user);
|
|
|
String runAsUser = getRunAsUser(user);
|
|
|
|
|
|
- String[] command =
|
|
|
- new String[] { containerExecutorExe,
|
|
|
- runAsUser,
|
|
|
- user,
|
|
|
- Integer.toString(Commands.SIGNAL_CONTAINER.getValue()),
|
|
|
- pid,
|
|
|
- Integer.toString(signal.getValue()) };
|
|
|
- ShellCommandExecutor shExec = new ShellCommandExecutor(command);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("signalContainer: " + Arrays.toString(command));
|
|
|
- }
|
|
|
+ ContainerRuntimeContext runtimeContext = new ContainerRuntimeContext
|
|
|
+ .Builder(container)
|
|
|
+ .setExecutionAttribute(RUN_AS_USER, runAsUser)
|
|
|
+ .setExecutionAttribute(USER, user)
|
|
|
+ .setExecutionAttribute(PID, pid)
|
|
|
+ .setExecutionAttribute(SIGNAL, signal)
|
|
|
+ .build();
|
|
|
+
|
|
|
try {
|
|
|
- shExec.execute();
|
|
|
- } catch (ExitCodeException e) {
|
|
|
- int ret_code = shExec.getExitCode();
|
|
|
- if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
|
|
|
+ linuxContainerRuntime.signalContainer(runtimeContext);
|
|
|
+ } catch (ContainerExecutionException e) {
|
|
|
+ int retCode = e.getExitCode();
|
|
|
+ if (retCode == PrivilegedOperation.ResultCode.INVALID_CONTAINER_PID.getValue()) {
|
|
|
return false;
|
|
|
}
|
|
|
LOG.warn("Error in signalling container " + pid + " with " + signal
|
|
|
- + "; exit = " + ret_code, e);
|
|
|
- logOutput(shExec.getOutput());
|
|
|
+ + "; exit = " + retCode, e);
|
|
|
+ logOutput(e.getOutput());
|
|
|
throw new IOException("Problem signalling container " + pid + " with "
|
|
|
- + signal + "; output: " + shExec.getOutput() + " and exitCode: "
|
|
|
- + ret_code, e);
|
|
|
+ + signal + "; output: " + e.getOutput() + " and exitCode: "
|
|
|
+ + retCode, e);
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
@@ -524,7 +509,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
Arrays.asList(containerExecutorExe,
|
|
|
runAsUser,
|
|
|
user,
|
|
|
- Integer.toString(Commands.DELETE_AS_USER.getValue()),
|
|
|
+ Integer.toString(PrivilegedOperation.
|
|
|
+ RunAsUserCommand.DELETE_AS_USER.getValue()),
|
|
|
dirString));
|
|
|
List<String> pathsToDelete = new ArrayList<String>();
|
|
|
if (baseDirs == null || baseDirs.size() == 0) {
|
|
@@ -558,13 +544,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
|
|
|
+ public boolean isContainerAlive(ContainerLivenessContext ctx)
|
|
|
throws IOException {
|
|
|
String user = ctx.getUser();
|
|
|
String pid = ctx.getPid();
|
|
|
+ Container container = ctx.getContainer();
|
|
|
|
|
|
// Send a test signal to the process as the user to see if it's alive
|
|
|
return signalContainer(new ContainerSignalContext.Builder()
|
|
|
+ .setContainer(container)
|
|
|
.setUser(user)
|
|
|
.setPid(pid)
|
|
|
.setSignal(Signal.NULL)
|