|
@@ -29,8 +29,10 @@ import java.util.Map.Entry;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
|
|
|
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
@@ -49,7 +51,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
|
* <p>task-controller user-name command command-args, where</p>
|
|
|
* <p>user-name is the name of the owner who submits the job</p>
|
|
|
* <p>command is one of the cardinal value of the
|
|
|
- * {@link LinuxTaskController.TaskCommands} enumeration</p>
|
|
|
+ * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
|
|
|
* <p>command-args depends on the command being launched.</p>
|
|
|
*
|
|
|
* In addition to running and killing tasks, the class also
|
|
@@ -83,7 +85,7 @@ class LinuxTaskController extends TaskController {
|
|
|
/**
|
|
|
* List of commands that the setuid script will execute.
|
|
|
*/
|
|
|
- enum TaskCommands {
|
|
|
+ enum TaskControllerCommands {
|
|
|
INITIALIZE_USER,
|
|
|
INITIALIZE_JOB,
|
|
|
INITIALIZE_DISTRIBUTEDCACHE_FILE,
|
|
@@ -91,7 +93,8 @@ class LinuxTaskController extends TaskController {
|
|
|
INITIALIZE_TASK,
|
|
|
TERMINATE_TASK_JVM,
|
|
|
KILL_TASK_JVM,
|
|
|
- ENABLE_TASK_FOR_CLEANUP
|
|
|
+ ENABLE_TASK_FOR_CLEANUP,
|
|
|
+ ENABLE_JOB_FOR_CLEANUP
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -152,7 +155,7 @@ class LinuxTaskController extends TaskController {
|
|
|
// Call the taskcontroller with the right parameters.
|
|
|
List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
|
|
|
ShellCommandExecutor shExec = buildTaskControllerExecutor(
|
|
|
- TaskCommands.LAUNCH_TASK_JVM,
|
|
|
+ TaskControllerCommands.LAUNCH_TASK_JVM,
|
|
|
env.conf.getUser(),
|
|
|
launchTaskJVMArgs, env.workDir, env.env);
|
|
|
context.shExec = shExec;
|
|
@@ -181,41 +184,42 @@ class LinuxTaskController extends TaskController {
|
|
|
/**
|
|
|
* Helper method that runs a LinuxTaskController command
|
|
|
*
|
|
|
- * @param taskCommand
|
|
|
+ * @param taskControllerCommand
|
|
|
* @param user
|
|
|
* @param cmdArgs
|
|
|
* @param env
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void runCommand(TaskCommands taskCommand, String user,
|
|
|
- List<String> cmdArgs, File workDir, Map<String, String> env)
|
|
|
+ private void runCommand(TaskControllerCommands taskControllerCommand,
|
|
|
+ String user, List<String> cmdArgs, File workDir, Map<String, String> env)
|
|
|
throws IOException {
|
|
|
|
|
|
ShellCommandExecutor shExec =
|
|
|
- buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
|
|
|
+ buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
|
|
|
+ workDir, env);
|
|
|
try {
|
|
|
shExec.execute();
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Exit code from " + taskCommand.toString() + " is : "
|
|
|
+ LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
|
|
|
+ shExec.getExitCode());
|
|
|
- LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
|
|
|
+ LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
- LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
|
|
|
- + " follows:");
|
|
|
+ LOG.info("Output from LinuxTaskController's "
|
|
|
+ + taskControllerCommand.toString() + " follows:");
|
|
|
logOutput(shExec.getOutput());
|
|
|
throw new IOException(e);
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
|
|
|
- + " follows:");
|
|
|
+ LOG.info("Output from LinuxTaskController's "
|
|
|
+ + taskControllerCommand.toString() + " follows:");
|
|
|
logOutput(shExec.getOutput());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Returns list of arguments to be passed while initializing a new task. See
|
|
|
- * {@code buildTaskControllerExecutor(TaskCommands, String, List<String>,
|
|
|
- * JvmEnv)} documentation.
|
|
|
+ * {@code buildTaskControllerExecutor(TaskControllerCommands, String,
|
|
|
+ * List<String>, JvmEnv)} documentation.
|
|
|
*
|
|
|
* @param context
|
|
|
* @return Argument to be used while launching Task VM
|
|
@@ -237,10 +241,12 @@ class LinuxTaskController extends TaskController {
|
|
|
void initializeTask(TaskControllerContext context)
|
|
|
throws IOException {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
|
|
|
+ LOG.debug("Going to do "
|
|
|
+ + TaskControllerCommands.INITIALIZE_TASK.toString()
|
|
|
+ " for " + context.task.getTaskID().toString());
|
|
|
}
|
|
|
- runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
|
|
|
+ runCommand(TaskControllerCommands.INITIALIZE_TASK,
|
|
|
+ context.env.conf.getUser(),
|
|
|
buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
|
|
|
}
|
|
|
|
|
@@ -249,7 +255,7 @@ class LinuxTaskController extends TaskController {
|
|
|
* cleanup. Last arg in this List is either $attemptId or $attemptId/work
|
|
|
*/
|
|
|
private List<String> buildTaskCleanupArgs(
|
|
|
- TaskControllerPathDeletionContext context) {
|
|
|
+ TaskControllerTaskPathDeletionContext context) {
|
|
|
List<String> commandArgs = new ArrayList<String>(3);
|
|
|
commandArgs.add(context.mapredLocalDir.toUri().getPath());
|
|
|
commandArgs.add(context.task.getJobID().toString());
|
|
@@ -268,6 +274,19 @@ class LinuxTaskController extends TaskController {
|
|
|
return commandArgs;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Builds the args to be passed to task-controller for enabling of job for
|
|
|
+ * cleanup. Last arg in this List is $jobid.
|
|
|
+ */
|
|
|
+ private List<String> buildJobCleanupArgs(
|
|
|
+ TaskControllerJobPathDeletionContext context) {
|
|
|
+ List<String> commandArgs = new ArrayList<String>(2);
|
|
|
+ commandArgs.add(context.mapredLocalDir.toUri().getPath());
|
|
|
+ commandArgs.add(context.jobId.toString());
|
|
|
+
|
|
|
+ return commandArgs;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Enables the task for cleanup by changing permissions of the specified path
|
|
|
* in the local filesystem
|
|
@@ -275,33 +294,63 @@ class LinuxTaskController extends TaskController {
|
|
|
@Override
|
|
|
void enableTaskForCleanup(PathDeletionContext context)
|
|
|
throws IOException {
|
|
|
+ if (context instanceof TaskControllerTaskPathDeletionContext) {
|
|
|
+ TaskControllerTaskPathDeletionContext tContext =
|
|
|
+ (TaskControllerTaskPathDeletionContext) context;
|
|
|
+ enablePathForCleanup(tContext,
|
|
|
+ TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
|
|
|
+ buildTaskCleanupArgs(tContext));
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
|
|
|
+ + "TaskControllerTaskPathDeletionContext.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Enables the job for cleanup by changing permissions of the specified path
|
|
|
+ * in the local filesystem
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ void enableJobForCleanup(PathDeletionContext context)
|
|
|
+ throws IOException {
|
|
|
+ if (context instanceof TaskControllerJobPathDeletionContext) {
|
|
|
+ TaskControllerJobPathDeletionContext tContext =
|
|
|
+ (TaskControllerJobPathDeletionContext) context;
|
|
|
+ enablePathForCleanup(tContext,
|
|
|
+ TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
|
|
|
+ buildJobCleanupArgs(tContext));
|
|
|
+ } else {
|
|
|
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
|
|
|
+ + "TaskControllerJobPathDeletionContext.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Enable a path for cleanup
|
|
|
+ * @param c {@link TaskControllerPathDeletionContext} for the path to be
|
|
|
+ * cleaned up
|
|
|
+ * @param command {@link TaskControllerCommands} for task/job cleanup
|
|
|
+ * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
|
|
|
+ * path cleanup
|
|
|
+ */
|
|
|
+ private void enablePathForCleanup(TaskControllerPathDeletionContext c,
|
|
|
+ TaskControllerCommands command,
|
|
|
+ List<String> cleanupArgs) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
|
|
|
- + " for " + context.fullPath);
|
|
|
+ LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
|
|
|
}
|
|
|
|
|
|
- if (context instanceof TaskControllerPathDeletionContext) {
|
|
|
- TaskControllerPathDeletionContext tContext =
|
|
|
- (TaskControllerPathDeletionContext) context;
|
|
|
-
|
|
|
- if (tContext.task.getUser() != null &&
|
|
|
- tContext.fs instanceof LocalFileSystem) {
|
|
|
- try {
|
|
|
- runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
|
|
|
- tContext.task.getUser(),
|
|
|
- buildTaskCleanupArgs(tContext), null, null);
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.warn("Uanble to change permissions for " + tContext.fullPath);
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- throw new IllegalArgumentException("Either user is null or the " +
|
|
|
- "file system is not local file system.");
|
|
|
+ if ( c.user != null && c.fs instanceof LocalFileSystem) {
|
|
|
+ try {
|
|
|
+ runCommand(command, c.user, cleanupArgs, null, null);
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.warn("Unable to change permissions for " + c.fullPath);
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
- throw new IllegalArgumentException("PathDeletionContext provided is not "
|
|
|
- + "TaskControllerPathDeletionContext.");
|
|
|
+ throw new IllegalArgumentException("Either user is null or the "
|
|
|
+ + "file system is not local file system.");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -323,7 +372,7 @@ class LinuxTaskController extends TaskController {
|
|
|
|
|
|
/**
|
|
|
* Returns list of arguments to be passed while launching task VM.
|
|
|
- * See {@code buildTaskControllerExecutor(TaskCommands,
|
|
|
+ * See {@code buildTaskControllerExecutor(TaskControllerCommands,
|
|
|
* String, List<String>, JvmEnv)} documentation.
|
|
|
* @param context
|
|
|
* @return Argument to be used while launching Task VM
|
|
@@ -379,8 +428,8 @@ class LinuxTaskController extends TaskController {
|
|
|
args.add("--");
|
|
|
args.add(context.localizedBaseDir.toString());
|
|
|
args.add(context.uniqueString);
|
|
|
- runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
|
|
|
- args, context.workDir, null);
|
|
|
+ runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
|
|
|
+ context.user, args, context.workDir, null);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -388,7 +437,7 @@ class LinuxTaskController extends TaskController {
|
|
|
throws IOException {
|
|
|
LOG.debug("Going to initialize user directories for " + context.user
|
|
|
+ " on the TT");
|
|
|
- runCommand(TaskCommands.INITIALIZE_USER, context.user,
|
|
|
+ runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
|
|
|
new ArrayList<String>(), context.workDir, null);
|
|
|
}
|
|
|
|
|
@@ -412,7 +461,7 @@ class LinuxTaskController extends TaskController {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private ShellCommandExecutor buildTaskControllerExecutor(
|
|
|
- TaskCommands command, String userName, List<String> cmdArgs,
|
|
|
+ TaskControllerCommands command, String userName, List<String> cmdArgs,
|
|
|
File workDir, Map<String, String> env)
|
|
|
throws IOException {
|
|
|
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
|
|
@@ -500,14 +549,14 @@ class LinuxTaskController extends TaskController {
|
|
|
throws IOException {
|
|
|
LOG.debug("Going to initialize job " + context.jobid.toString()
|
|
|
+ " on the TT");
|
|
|
- runCommand(TaskCommands.INITIALIZE_JOB, context.user,
|
|
|
+ runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
|
|
|
buildInitializeJobCommandArgs(context), context.workDir, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* API which builds the command line to be pass to LinuxTaskController
|
|
|
* binary to terminate/kill the task. See
|
|
|
- * {@code buildTaskControllerExecutor(TaskCommands,
|
|
|
+ * {@code buildTaskControllerExecutor(TaskControllerCommands,
|
|
|
* String, List<String>, JvmEnv)} documentation.
|
|
|
*
|
|
|
*
|
|
@@ -529,7 +578,7 @@ class LinuxTaskController extends TaskController {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void finishTask(TaskControllerContext context,
|
|
|
- TaskCommands command) throws IOException{
|
|
|
+ TaskControllerCommands command) throws IOException{
|
|
|
if(context.task == null) {
|
|
|
LOG.info("Context task null not killing the JVM");
|
|
|
return;
|
|
@@ -549,7 +598,7 @@ class LinuxTaskController extends TaskController {
|
|
|
@Override
|
|
|
void terminateTask(TaskControllerContext context) {
|
|
|
try {
|
|
|
- finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
|
|
|
+ finishTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Exception thrown while sending kill to the Task VM " +
|
|
|
StringUtils.stringifyException(e));
|
|
@@ -559,7 +608,7 @@ class LinuxTaskController extends TaskController {
|
|
|
@Override
|
|
|
void killTask(TaskControllerContext context) {
|
|
|
try {
|
|
|
- finishTask(context, TaskCommands.KILL_TASK_JVM);
|
|
|
+ finishTask(context, TaskControllerCommands.KILL_TASK_JVM);
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Exception thrown while sending destroy to the Task VM " +
|
|
|
StringUtils.stringifyException(e));
|