Jelajahi Sumber

HADOOP-4490. Provide ability to run tasks as job owners. Contributed by Sreekanth Ramakrishnan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@765713 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 tahun lalu
induk
melakukan
64e4362359

+ 3 - 0
CHANGES.txt

@@ -237,6 +237,9 @@ Trunk (unreleased changes)
     without having to restart the daemon.
     without having to restart the daemon.
     (Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli via yhemanth)
     (Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli via yhemanth)
 
 
+    HADOOP-4490. Provide ability to run tasks as job owners.
+    (Sreekanth Ramakrishnan via yhemanth)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 2 - 1
src/core/org/apache/hadoop/filecache/DistributedCache.java

@@ -428,7 +428,8 @@ public class DistributedCache {
       
       
       // do chmod here 
       // do chmod here 
       try {
       try {
-    	FileUtil.chmod(parchive.toString(), "+x");
+        //Setting recursive permission to grant everyone read and execute
+        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
       } catch(InterruptedException e) {
       } catch(InterruptedException e) {
     	LOG.warn("Exception in chmod" + e.toString());
     	LOG.warn("Exception in chmod" + e.toString());
       }
       }

+ 34 - 3
src/core/org/apache/hadoop/fs/FileUtil.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.mortbay.log.Log;
 
 
 /**
 /**
  * A collection of file-processing util methods
  * A collection of file-processing util methods
@@ -706,11 +707,41 @@ public class FileUtil {
    */
    */
   public static int chmod(String filename, String perm
   public static int chmod(String filename, String perm
                           ) throws IOException, InterruptedException {
                           ) throws IOException, InterruptedException {
-    String cmd = "chmod " + perm + " " + filename;
-    Process p = Runtime.getRuntime().exec(cmd, null);
-    return p.waitFor();
+    return chmod(filename, perm, false);
   }
   }
 
 
+  /**
+   * Change the permissions on a file / directory, recursively, if
+   * needed.
+   * @param filename name of the file whose permissions are to change
+   * @param perm permission string
+   * @param recursive true, if permissions should be changed recursively
+   * @return the exit code from the command.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public static int chmod(String filename, String perm, boolean recursive)
+                            throws IOException, InterruptedException {
+    StringBuffer cmdBuf = new StringBuffer();
+    cmdBuf.append("chmod ");
+    if (recursive) {
+      cmdBuf.append("-R ");
+    }
+    cmdBuf.append(perm).append(" ");
+    cmdBuf.append(filename);
+    String[] shellCmd = {"bash", "-c" ,cmdBuf.toString()};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(shellCmd);
+    try {
+      shExec.execute();
+    }catch(Exception e) {
+      if(Log.isDebugEnabled()) {
+        Log.debug("Error while changing permission : " + filename 
+            +" Exception: " + StringUtils.stringifyException(e));
+      }
+    }
+    return shExec.getExitCode();
+  }
+  
   /**
   /**
    * Create a tmp file for a base file.
    * Create a tmp file for a base file.
    * @param basefile the base file of the tmp
    * @param basefile the base file of the tmp

+ 116 - 0
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -474,6 +474,122 @@
             </ul>
             </ul>
           </section>
           </section>
           
           
+          <section>
+            <title>Task Controllers</title>
+            <p>Task controllers are classes in the Hadoop Map/Reduce 
+            framework that define how user's map and reduce tasks 
+            are launched and controlled. They can 
+            be used in clusters that require some customization in 
+            the process of launching or controlling the user tasks.
+            For example, in some 
+            clusters, there may be a requirement to run tasks as 
+            the user who submitted the job, instead of as the task 
+            tracker user, which is how tasks are launched by default.
+            This section describes how to configure and use 
+            task controllers.</p>
+            <p>The following task controllers are the available in
+            Hadoop.
+            </p>
+            <table>
+            <tr><th>Name</th><th>Class Name</th><th>Description</th></tr>
+            <tr>
+            <td>DefaultTaskController</td>
+            <td>org.apache.hadoop.mapred.DefaultTaskController</td>
+            <td> The default task controller which Hadoop uses to manage task
+            execution. The tasks run as the task tracker user.</td>
+            </tr>
+            <tr>
+            <td>LinuxTaskController</td>
+            <td>org.apache.hadoop.mapred.LinuxTaskController</td>
+            <td>This task controller, which is supported only on Linux, 
+            runs the tasks as the user who submitted the job. It requires
+            these user accounts to be created on the cluster nodes 
+            where the tasks are launched. It 
+            uses a setuid executable that is included in the Hadoop
+            distribution. The task tracker uses this executable to 
+            launch and kill tasks. The setuid executable switches to
+            the user who has submitted the job and launches or kills
+            the tasks. Currently, this task controller 
+            opens up permissions to local files and directories used 
+            by the tasks such as the job jar files, distributed archive 
+            files, intermediate files and task log files. In future,
+            it is expected that stricter file permissions are used.
+            </td>
+            </tr>
+            </table>
+            <section>
+            <title>Configuring Task Controllers</title>
+            <p>The task controller to be used can be configured by setting the
+            value of the following key in mapred-site.xml</p>
+            <table>
+            <tr>
+            <th>Property</th><th>Value</th><th>Notes</th>
+            </tr>
+            <tr>
+            <td>mapred.task.tracker.task-controller</td>
+            <td>Fully qualified class name of the task controller class</td>
+            <td>Currently there are two implementations of task controller
+            in the Hadoop system, DefaultTaskController and LinuxTaskController.
+            Refer to the class names mentioned above to determine the value
+            to set for the class of choice.
+            </td>
+            </tr>
+            </table>
+            </section>
+            <section>
+            <title>Using the LinuxTaskController</title>
+            <p>This section of the document describes the steps required to
+            use the LinuxTaskController.</p>
+            
+            <p>In order to use the LinuxTaskController, a setuid executable
+            should be built and deployed on the compute nodes. The
+            executable is named task-controller. To build the executable, 
+            execute 
+            <em>ant task-controller -Dhadoop.conf.dir=/path/to/conf/dir.
+            </em>
+            The path passed in <em>-Dhadoop.conf.dir</em> should be the path
+            on the cluster nodes where a configuration file for the setuid
+            executable would be located. The executable would be built to
+            <em>build.dir/dist.dir/bin</em> and should be installed to 
+            <em>$HADOOP_HOME/bin</em>.
+            </p>
+            
+            <p>
+            The executable must be deployed as a setuid executable, by changing
+            the ownership to <em>root</em> and giving it permissions <em>4755</em>. 
+            </p>
+            
+            <p>The executable requires a configuration file called 
+            <em>taskcontroller.cfg</em> to be
+            present in the configuration directory passed to the ant target 
+            mentioned above. If the binary was not built with a specific 
+            conf directory, the path defaults to <em>/path-to-binary/../conf</em>.
+            </p>
+            
+            <p>The executable requires following configuration items to be 
+            present in the <em>taskcontroller.cfg</em> file. The items should
+            be mentioned as simple <em>key=value</em> pairs.
+            </p>
+            <table><tr><th>Name</th><th>Description</th></tr>
+            <tr>
+            <td>mapred.local.dir</td>
+            <td>Path to mapred local directories. Should be same as the value 
+            which was provided to key in mapred-site.xml. This is required to
+            validate paths passed to the setuid executable in order to prevent
+            arbitrary paths being passed to it.</td>
+            </tr>
+            </table>
+
+            <p>
+            The LinuxTaskController requires that paths leading up to
+            the directories specified in
+            <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be 755
+            and directories themselves having 777 permissions.
+            </p>
+            </section>
+            
+          </section>
+          
         </section>
         </section>
         
         
         <section>
         <section>

+ 7 - 0
src/mapred/mapred-default.xml

@@ -970,4 +970,11 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapred.task.tracker.task-controller</name>
+  <value>org.apache.hadoop.mapred.DefaultTaskController</value>
+  <description>TaskController which is used to launch and manage task execution 
+  </description>
+</property>
+
 </configuration>
 </configuration>

+ 127 - 0
src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The default implementation for controlling tasks.
+ * 
+ * This class provides an implementation for launching and killing 
+ * tasks that need to be run as the tasktracker itself. Hence,
+ * many of the initializing or cleanup methods are not required here.
+ */
+class DefaultTaskController extends TaskController {
+
+  private static final Log LOG = 
+      LogFactory.getLog(DefaultTaskController.class);
+  /**
+   * Launch a new JVM for the task.
+   * 
+   * This method launches the new JVM for the task by executing the
+   * the JVM command using the {@link Shell.ShellCommandExecutor}
+   */
+  void launchTaskJVM(TaskController.TaskControllerContext context) 
+                                      throws IOException {
+    JvmEnv env = context.env;
+    List<String> wrappedCommand = 
+      TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
+          env.logSize, true, env.pidFile);
+    ShellCommandExecutor shexec = 
+        new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
+                                  env.workDir, env.env);
+    // set the ShellCommandExecutor for later use.
+    context.shExec = shexec;
+    shexec.execute();
+  }
+  
+  /**
+   * Kills the JVM running the task stored in the context.
+   * 
+   * @param context the context storing the task running within the JVM
+   * that needs to be killed.
+   */
+  void killTaskJVM(TaskController.TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    JvmEnv env = context.env;
+    if (shexec != null) {
+      Process process = shexec.getProcess();
+      if (process != null) {
+        if (Shell.WINDOWS) {
+          process.destroy();
+        }
+        else {
+          Path pidFilePath = new Path(env.pidFile);
+          String pid = ProcessTree.getPidFromPidFile(
+                                                pidFilePath.toString());
+          if (pid != null) {
+            long sleeptimeBeforeSigkill = env.conf.getLong(
+                 "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                 ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
+                     ProcessTree.isSetsidAvailable, false);
+            try {
+              LOG.info("Process exited with exit code:" + process.waitFor());
+            } catch (InterruptedException ie) {}
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Initialize the task environment.
+   * 
+   * Since tasks are launched as the tasktracker user itself, this
+   * method has no action to perform.
+   */
+  void initializeTask(TaskController.TaskControllerContext context) {
+    // The default task controller does not need to set up
+    // any permissions for proper execution.
+    // So this is a dummy method.
+    return;
+  }
+  
+
+  @Override
+  void setup() {
+    // nothing to setup
+    return;
+  }
+
+  /*
+   * No need to do anything as we don't need to do as we dont need anything
+   * extra from what TaskTracker has done.
+   */
+  @Override
+  void initializeJob(JobID jobId) {
+  }
+  
+}

+ 44 - 34
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -54,9 +55,9 @@ class JvmManager {
   
   
   public JvmManager(TaskTracker tracker) {
   public JvmManager(TaskTracker tracker) {
     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
-        true);
+        true, tracker);
     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
-        false);
+        false, tracker);
   }
   }
   
   
   public void stop() {
   public void stop() {
@@ -124,11 +125,15 @@ class JvmManager {
     int maxJvms;
     int maxJvms;
     boolean isMap;
     boolean isMap;
     
     
+    TaskTracker tracker;
+    
     Random rand = new Random(System.currentTimeMillis());
     Random rand = new Random(System.currentTimeMillis());
 
 
-    public JvmManagerForType(int maxJvms, boolean isMap) {
+    public JvmManagerForType(int maxJvms, boolean isMap, 
+        TaskTracker tracker) {
       this.maxJvms = maxJvms;
       this.maxJvms = maxJvms;
       this.isMap = isMap;
       this.isMap = isMap;
+      this.tracker = tracker;
     }
     }
 
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -140,7 +145,23 @@ class JvmManager {
     
     
     synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
     synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
       if (jvmToRunningTask.containsKey(jvmId)) {
       if (jvmToRunningTask.containsKey(jvmId)) {
-        return jvmToRunningTask.get(jvmId).getTaskInProgress();
+        //Incase of JVM reuse, tasks are returned to previously launched
+        //JVM via this method. However when a new task is launched
+        //the task being returned has to be initialized.
+        TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+        Task task = taskRunner.getTaskInProgress().getTask();
+        TaskControllerContext context = 
+          new TaskController.TaskControllerContext();
+        context.env = jvmRunner.env;
+        context.task = task;
+        //If we are returning the same task as which the JVM was launched
+        //we don't initialize task once again.
+        if(!jvmRunner.env.conf.get("mapred.task.id").
+            equals(task.getTaskID().toString())) {
+          tracker.getTaskController().initializeTask(context);
+        }
+        return taskRunner.getTaskInProgress();
       }
       }
       return null;
       return null;
     }
     }
@@ -254,7 +275,7 @@ class JvmManager {
       }
       }
       //*MUST* never reach this
       //*MUST* never reach this
       throw new RuntimeException("Inconsistent state!!! " +
       throw new RuntimeException("Inconsistent state!!! " +
-      		"JVM Manager reached an unstable state " +
+          "JVM Manager reached an unstable state " +
             "while reaping a JVM for task: " + t.getTask().getTaskID()+
             "while reaping a JVM for task: " + t.getTask().getTaskID()+
             " " + getDetails());
             " " + getDetails());
     }
     }
@@ -317,6 +338,9 @@ class JvmManager {
       JVMId jvmId;
       JVMId jvmId;
       volatile boolean busy = true;
       volatile boolean busy = true;
       private ShellCommandExecutor shexec; // shell terminal for running the task
       private ShellCommandExecutor shexec; // shell terminal for running the task
+      //context used for starting JVM
+      private TaskControllerContext initalContext;
+      
       public JvmRunner(JvmEnv env, JobID jobId) {
       public JvmRunner(JvmEnv env, JobID jobId) {
         this.env = env;
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
@@ -328,18 +352,19 @@ class JvmManager {
       }
       }
 
 
       public void runChild(JvmEnv env) {
       public void runChild(JvmEnv env) {
+        initalContext = new TaskControllerContext();
         try {
         try {
           env.vargs.add(Integer.toString(jvmId.getId()));
           env.vargs.add(Integer.toString(jvmId.getId()));
-          List<String> wrappedCommand = 
-            TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-                env.logSize, true, env.pidFile);
-          shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
-              env.workDir, env.env);
-          shexec.execute();
+          //Launch the task controller to run task JVM
+          initalContext.task = jvmToRunningTask.get(jvmId).getTask();
+          initalContext.env = env;
+          tracker.getTaskController().initializeTask(initalContext);
+          tracker.getTaskController().launchTaskJVM(initalContext);
         } catch (IOException ioe) {
         } catch (IOException ioe) {
           // do nothing
           // do nothing
           // error and output are appropriately redirected
           // error and output are appropriately redirected
         } finally { // handle the exit code
         } finally { // handle the exit code
+          shexec = initalContext.shExec;
           if (shexec == null) {
           if (shexec == null) {
             return;
             return;
           }
           }
@@ -364,29 +389,14 @@ class JvmManager {
        * of processes) is created using setsid.
        * of processes) is created using setsid.
        */
        */
       public void kill() {
       public void kill() {
-        if (shexec != null) {
-          Process process = shexec.getProcess();
-          if (process != null) {
-            if (Shell.WINDOWS) {
-              process.destroy();
-            }
-            else {
-              Path pidFilePath = new Path(env.pidFile);
-              String pid = ProcessTree.getPidFromPidFile(
-                                                    pidFilePath.toString());
-              if (pid != null) {
-                long sleeptimeBeforeSigkill = env.conf.getLong(
-                     "mapred.tasktracker.tasks.sleeptime-before-sigkill",
-                     ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-                ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
-                         ProcessTree.isSetsidAvailable, false);
-                try {
-                  LOG.info("Process exited with exit code:" + process.waitFor());
-                } catch (InterruptedException ie) {}
-              }
-            }
-          }
+        TaskController controller = tracker.getTaskController();
+        //Check inital context before issuing a kill to prevent situations
+        //where kill is issued before task is launched.
+        if(initalContext != null && initalContext.env != null) {
+          controller.killTaskJVM(initalContext);
+        } else {
+          LOG.info(String.format("JVM Not killed %s but just removed", 
+              jvmId.toString()));
         }
         }
         removeJvm(jvmId);
         removeJvm(jvmId);
       }
       }

+ 422 - 0
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A {@link TaskController} that runs the task JVMs as the user 
+ * who submits the job.
+ * 
+ * This class executes a setuid executable to implement methods
+ * of the {@link TaskController}, including launching the task 
+ * JVM and killing it when needed, and also initializing and
+ * finalizing the task environment. 
+ * <p> The setuid executable is launched using the command line:</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
+ * <p>command is one of the cardinal value of the 
+ * {@link LinuxTaskController.TaskCommands} enumeration</p>
+ * <p>command-args depends on the command being launched.</p>
+ * 
+ * In addition to running and killing tasks, the class also 
+ * sets up appropriate access for the directories and files 
+ * that will be used by the tasks. 
+ */
+class LinuxTaskController extends TaskController {
+
+  private static final Log LOG = 
+            LogFactory.getLog(LinuxTaskController.class);
+
+  // Name of the executable script that will contain the child
+  // JVM command line. See writeCommand for details.
+  private static final String COMMAND_FILE = "taskjvm.sh";
+  
+  // Path to the setuid executable.
+  private static String taskControllerExe;
+  
+  static {
+    // the task-controller is expected to be under the $HADOOP_HOME/bin
+    // directory.
+    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+    taskControllerExe = 
+        new File(hadoopBin, "task-controller").getAbsolutePath();
+  }
+  
+  // The list of directory paths specified in the
+  // variable mapred.local.dir. This is used to determine
+  // which among the list of directories is picked up
+  // for storing data for a particular task.
+  private String[] mapredLocalDirs;
+  
+  // permissions to set on files and directories created.
+  // When localized files are handled securely, this string
+  // will change to something more restrictive. Until then,
+  // it opens up the permissions for all, so that the tasktracker
+  // and job owners can access files together.
+  private static final String FILE_PERMISSIONS = "ugo+rwx";
+  
+  // permissions to set on components of the path leading to
+  // localized files and directories. Read and execute permissions
+  // are required for different users to be able to access the
+  // files.
+  private static final String PATH_PERMISSIONS = "go+rx";
+  
+  public LinuxTaskController() {
+    super();
+  }
+  
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    mapredLocalDirs = conf.getStrings("mapred.local.dir");
+    //Setting of the permissions of the local directory is done in 
+    //setup()
+  }
+  
+  /**
+   * List of commands that the setuid script will execute.
+   */
+  enum TaskCommands {
+    LAUNCH_TASK_JVM,
+    KILL_TASK_JVM
+  }
+  
+  /**
+   * Launch a task JVM that will run as the owner of the job.
+   * 
+   * This method launches a task JVM by executing a setuid
+   * executable that will switch to the user and run the
+   * task.
+   */
+  @Override
+  void launchTaskJVM(TaskController.TaskControllerContext context) 
+                                        throws IOException {
+    JvmEnv env = context.env;
+    // get the JVM command line.
+    String cmdLine = 
+      TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
+          env.logSize, true, env.pidFile);
+
+    // write the command to a file in the
+    // task specific cache directory
+    writeCommand(cmdLine, getTaskCacheDirectory(context));
+    
+    // Call the taskcontroller with the right parameters.
+    List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
+    ShellCommandExecutor shExec =  buildTaskControllerExecutor(
+                                    TaskCommands.LAUNCH_TASK_JVM, 
+                                    env.conf.getUser(),
+                                    launchTaskJVMArgs, env);
+    context.shExec = shExec;
+    shExec.execute();
+    LOG.debug("output after executing task jvm = " + shExec.getOutput());
+  }
+
+  // convenience API for building command arguments for specific commands
+  private List<String> buildTaskCommandArgs(TaskControllerContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    String taskId = context.task.getTaskID().toString();
+    String jobId = getJobId(context);
+    commandArgs.add(jobId);
+    if(!context.task.isTaskCleanupTask()) {
+      commandArgs.add(taskId);
+    }else {
+      commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+    }
+    
+    LOG.debug("getting the task directory as: " 
+                + getTaskCacheDirectory(context));
+    commandArgs.add(getDirectoryChosenForTask(
+                              new File(getTaskCacheDirectory(context)), 
+                              context));
+    return commandArgs;
+  }
+  
+  // get the Job ID from the information in the TaskControllerContext
+  private String getJobId(TaskControllerContext context) {
+    String taskId = context.task.getTaskID().toString();
+    TaskAttemptID tId = TaskAttemptID.forName(taskId);
+    String jobId = tId.getJobID().toString();
+    return jobId;
+  }
+
+  // Get the directory from the list of directories configured
+  // in mapred.local.dir chosen for storing data pertaining to
+  // this task.
+  private String getDirectoryChosenForTask(File directory,
+                                            TaskControllerContext context) {
+    String jobId = getJobId(context);
+    String taskId = context.task.getTaskID().toString();
+    for (String dir : mapredLocalDirs) {
+      File mapredDir = new File(dir);
+      File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
+          jobId, taskId, context.task.isTaskCleanupTask()));
+      if (directory.equals(taskDir)) {
+        return dir;
+      }
+    }
+    
+    LOG.error("Couldn't parse task cache directory correctly");
+    throw new IllegalArgumentException("invalid task cache directory "
+                + directory.getAbsolutePath());
+  }
+  
+  /**
+   * Kill a launched task JVM running as the user of the job.
+   * 
+   * This method will launch the task controller setuid executable
+   * that in turn will kill the task JVM by sending a kill signal.
+   */
+  void killTaskJVM(TaskControllerContext context) {
+   
+    if(context.task == null) {
+      LOG.info("Context task null not killing the JVM");
+      return;
+    }
+    
+    JvmEnv env = context.env;
+    List<String> killTaskJVMArgs = buildTaskCommandArgs(context);
+    try {
+      ShellCommandExecutor shExec = buildTaskControllerExecutor(
+                                      TaskCommands.KILL_TASK_JVM,
+                                      context.env.conf.getUser(),
+                                      killTaskJVMArgs, 
+                                      context.env);
+      shExec.execute();
+      LOG.debug("Command output :" +shExec.getOutput());
+    } catch (IOException ioe) {
+      LOG.warn("IOException in killing task: " + ioe.getMessage());
+    }
+  }
+
+  /**
+   * Setup appropriate permissions for directories and files that
+   * are used by the task.
+   * 
+   * As the LinuxTaskController launches tasks as a user, different
+   * from the daemon, all directories and files that are potentially 
+   * used by the tasks are setup with appropriate permissions that
+   * will allow access.
+   * 
+   * Until secure data handling is implemented (see HADOOP-4491 and
+   * HADOOP-4493, for e.g.), the permissions are set up to allow
+   * read, write and execute access for everyone. This will be 
+   * changed to restricted access as data is handled securely.
+   */
+  void initializeTask(TaskControllerContext context) {
+    // Setup permissions for the job and task cache directories.
+    setupTaskCacheFileAccess(context);
+    // setup permissions for task log directory
+    setupTaskLogFileAccess(context);    
+  }
+  
+  // Allows access for the task to create log files under 
+  // the task log directory
+  private void setupTaskLogFileAccess(TaskControllerContext context) {
+    TaskAttemptID taskId = context.task.getTaskID();
+    File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
+    String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
+    changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
+  }
+
+  // Allows access for the task to read, write and execute 
+  // the files under the job and task cache directories
+  private void setupTaskCacheFileAccess(TaskControllerContext context) {
+    String taskId = context.task.getTaskID().toString();
+    JobID jobId = JobID.forName(getJobId(context));
+    //Change permission for the task across all the disks
+    for(String localDir : mapredLocalDirs) {
+      File f = new File(localDir);
+      File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
+          jobId.toString(), taskId, context.task.isTaskCleanupTask()));
+      if(taskCacheDir.exists()) {
+        changeDirectoryPermissions(taskCacheDir.getPath(), 
+            FILE_PERMISSIONS, true);
+      }          
+    }//end of local directory Iteration 
+  }
+
+  // convenience method to execute chmod.
+  private void changeDirectoryPermissions(String dir, String mode, 
+                                              boolean isRecursive) {
+    int ret = 0;
+    try {
+      ret = FileUtil.chmod(dir, mode, isRecursive);
+    } catch (Exception e) {
+      LOG.warn("Exception in changing permissions for directory " + dir + 
+                  ". Exception: " + e.getMessage());
+    }
+    if (ret != 0) {
+      LOG.warn("Could not change permissions for directory " + dir);
+    }
+  }
+  
+  // convenience API to create the executor for launching the
+  // setuid script.
+  private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
+                                          String userName, 
+                                          List<String> cmdArgs, JvmEnv env) 
+                                    throws IOException {
+    String[] taskControllerCmd = new String[3 + cmdArgs.size()];
+    taskControllerCmd[0] = taskControllerExe;
+    taskControllerCmd[1] = userName;
+    taskControllerCmd[2] = String.valueOf(command.ordinal());
+    int i = 3;
+    for (String cmdArg : cmdArgs) {
+      taskControllerCmd[i++] = cmdArg;
+    }
+    if (LOG.isDebugEnabled()) {
+      for (String cmd : taskControllerCmd) {
+        LOG.debug("taskctrl command = " + cmd);
+      }
+    }
+    ShellCommandExecutor shExec = null;
+    if(env.workDir != null && env.workDir.exists()) {
+      shExec = new ShellCommandExecutor(taskControllerCmd,
+          env.workDir, env.env);
+    } else {
+      shExec = new ShellCommandExecutor(taskControllerCmd);
+    }
+    
+    return shExec;
+  }
+  
+  // Return the task specific directory under the cache.
+  private String getTaskCacheDirectory(TaskControllerContext context) {
+    // In the case of JVM reuse, the task specific directory
+    // is different from what is set with respect with
+    // env.workDir. Hence building this from the taskId everytime.
+    String taskId = context.task.getTaskID().toString();
+    File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
+    if(context.task.isTaskCleanupTask()) {
+      taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
+    }
+    return new File(cacheDirForJob, taskId).getAbsolutePath(); 
+  }
+  
+  // Write the JVM command line to a file under the specified directory
+  // Note that the JVM will be launched using a setuid executable, and
+  // could potentially contain strings defined by a user. Hence, to
+  // prevent special character attacks, we write the command line to
+  // a file and execute it.
+  private void writeCommand(String cmdLine, 
+                                      String directory) throws IOException {
+    
+    PrintWriter pw = null;
+    String commandFile = directory + File.separator + COMMAND_FILE;
+    LOG.info("Writing commands to " + commandFile);
+    try {
+      FileWriter fw = new FileWriter(commandFile);
+      BufferedWriter bw = new BufferedWriter(fw);
+      pw = new PrintWriter(bw);
+      pw.write(cmdLine);
+    } catch (IOException ioe) {
+      LOG.error("Caught IOException while writing JVM command line to file. "
+                + ioe.getMessage());
+    } finally {
+      if (pw != null) {
+        pw.close();
+      }
+      // set execute permissions for all on the file.
+      File f = new File(commandFile);
+      if (f.exists()) {
+        f.setReadable(true, false);
+        f.setExecutable(true, false);
+      }
+    }
+  }
+  
+
+  /**
+   * Sets up the permissions of the following directories:
+   * 
+   * Job cache directory
+   * Archive directory
+   * Hadoop log directories
+   * 
+   */
+  @Override
+  void setup() {
+    //set up job cache directory and associated permissions
+    String localDirs[] = this.mapredLocalDirs;
+    for(String localDir : localDirs) {
+      //Cache root
+      File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
+      File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
+      if(!cacheDirectory.exists()) {
+        if(!cacheDirectory.mkdirs()) {
+          LOG.warn("Unable to create cache directory : " + 
+              cacheDirectory.getPath());
+        }
+      }
+      if(!jobCacheDirectory.exists()) {
+        if(!jobCacheDirectory.mkdirs()) {
+          LOG.warn("Unable to create job cache directory : " + 
+              jobCacheDirectory.getPath());
+        }
+      }
+      //Give world writable permission for every directory under
+      //mapred-local-dir.
+      //Child tries to write files under it when executing.
+      changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
+    }//end of local directory manipulations
+    //setting up perms for user logs
+    File taskLog = TaskLog.getUserLogDir();
+    changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+  }
+
+  /*
+   * Create Job directories across disks and set their permissions to 777
+   * This way when tasks are run we just need to setup permissions for
+   * task folder.
+   */
+  @Override
+  void initializeJob(JobID jobid) {
+    for(String localDir : this.mapredLocalDirs) {
+      File jobDirectory = new File(localDir, 
+          TaskTracker.getLocalJobDir(jobid.toString()));
+      if(!jobDirectory.exists()) {
+        if(!jobDirectory.mkdir()) {
+          LOG.warn("Unable to create job cache directory : " 
+              + jobDirectory.getPath());
+          continue;
+        }
+      }
+      //Should be recursive because the jar and work folders might be 
+      //present under the job cache directory
+      changeDirectoryPermissions(
+          jobDirectory.getPath(), FILE_PERMISSIONS, true);
+    }
+  }
+  
+}
+

+ 111 - 0
src/mapred/org/apache/hadoop/mapred/TaskController.java

@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * Controls initialization, finalization and clean up of tasks, and
+ * also the launching and killing of task JVMs.
+ * 
+ * This class defines the API for initializing, finalizing and cleaning
+ * up of tasks, as also the launching and killing task JVMs.
+ * Subclasses of this class will implement the logic required for
+ * performing the actual actions. 
+ */
+abstract class TaskController implements Configurable {
+  
+  private Configuration conf;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
+  /**
+   * Setup task controller component.
+   * 
+   */
+  abstract void setup();
+  
+  
+  /**
+   * Launch a task JVM
+   * 
+   * This method defines how a JVM will be launched to run a task.
+   * @param context the context associated to the task
+   */
+  abstract void launchTaskJVM(TaskControllerContext context)
+                                      throws IOException;
+  
+  /**
+   * Kill a task JVM
+   * 
+   * This method defines how a JVM launched to execute one or more
+   * tasks will be killed.
+   * @param context
+   */
+  abstract void killTaskJVM(TaskControllerContext context);
+  
+  /**
+   * Perform initializing actions required before a task can run.
+   * 
+   * For instance, this method can be used to setup appropriate
+   * access permissions for files and directories that will be
+   * used by tasks. Tasks use the job cache, log, PID and distributed cache
+   * directories and files as part of their functioning. Typically,
+   * these files are shared between the daemon and the tasks
+   * themselves. So, a TaskController that is launching tasks
+   * as different users can implement this method to setup
+   * appropriate ownership and permissions for these directories
+   * and files.
+   */
+  abstract void initializeTask(TaskControllerContext context);
+  
+  
+  /**
+   * Contains task information required for the task controller.  
+   */
+  static class TaskControllerContext {
+    // task being executed
+    Task task; 
+    // the JVM environment for the task
+    JvmEnv env;
+    // the Shell executor executing the JVM for this task
+    ShellCommandExecutor shExec; 
+  }
+
+  /**
+   * Method which is called after the job is localized so that task controllers
+   * can implement their own job localization logic.
+   * 
+   * @param tip  Task of job for which localization happens.
+   */
+  abstract void initializeJob(JobID jobId);
+}

+ 38 - 5
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -476,11 +476,36 @@ public class TaskLog {
                                                 boolean useSetsid,
                                                 boolean useSetsid,
                                                 String pidFileName
                                                 String pidFileName
                                                ) throws IOException {
                                                ) throws IOException {
-    String stdout = FileUtil.makeShellPath(stdoutFilename);
-    String stderr = FileUtil.makeShellPath(stderrFilename);
     List<String> result = new ArrayList<String>(3);
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add(bashCommand);
     result.add("-c");
     result.add("-c");
+    String mergedCmd = buildCommandLine(setup, cmd, stdoutFilename,
+                                                    stderrFilename, tailLength, 
+                                                    useSetsid, pidFileName);
+    result.add(mergedCmd);
+    return result;
+  }
+  
+  /**
+   * Construct the command line for running the task JVM
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @param pidFileName The name of the pid-file
+   * @return the command line as a String
+   * @throws IOException
+   */
+  static String buildCommandLine(List<String> setup, List<String> cmd, 
+                                      File stdoutFilename,
+                                      File stderrFilename,
+                                      long tailLength, 
+                                      boolean useSetsid, String pidFileName)
+                                throws IOException {
+    
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    String stderr = FileUtil.makeShellPath(stderrFilename);    
     StringBuffer mergedCmd = new StringBuffer();
     StringBuffer mergedCmd = new StringBuffer();
     
     
     // Spit out the pid to pidFileName
     // Spit out the pid to pidFileName
@@ -524,10 +549,9 @@ public class TaskLog {
       mergedCmd.append(" 2>> ");
       mergedCmd.append(" 2>> ");
       mergedCmd.append(stderr);
       mergedCmd.append(stderr);
     }
     }
-    result.add(mergedCmd.toString());
-    return result;
+    return mergedCmd.toString();
   }
   }
-
+  
   /**
   /**
    * Add quotes to each of the command strings and
    * Add quotes to each of the command strings and
    * return as a single string 
    * return as a single string 
@@ -594,4 +618,13 @@ public class TaskLog {
     return result;
     return result;
   }
   }
   
   
+  /**
+   * Method to return the location of user log directory.
+   * 
+   * @return base log directory
+   */
+  static File getUserLogDir() {
+    return LOG_DIR;
+  }
+  
 } // TaskLog
 } // TaskLog

+ 26 - 3
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -97,6 +97,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  *******************************************************/
  *******************************************************/
 public class TaskTracker 
 public class TaskTracker 
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
+  
   static final long WAIT_FOR_DONE = 3 * 1000;
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
   private int httpPort;
 
 
@@ -133,6 +134,8 @@ public class TaskTracker
     
     
   // last heartbeat response recieved
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
   short heartbeatResponseId = -1;
+  
+  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
 
 
   /*
   /*
    * This is the last 'status' report sent by this tracker to the JobTracker.
    * This is the last 'status' report sent by this tracker to the JobTracker.
@@ -263,7 +266,12 @@ public class TaskTracker
   private int probe_sample_size = 500;
   private int probe_sample_size = 500;
 
 
   private IndexCache indexCache;
   private IndexCache indexCache;
-    
+
+  /**
+  * Handle to the specific instance of the {@link TaskController} class
+  */
+  private TaskController taskController;
+  
   /*
   /*
    * A list of commitTaskActions for whom commit response has been received 
    * A list of commitTaskActions for whom commit response has been received 
    */
    */
@@ -371,7 +379,11 @@ public class TaskTracker
           }
           }
         }
         }
       }, "taskCleanup");
       }, "taskCleanup");
-    
+
+  TaskController getTaskController() {
+    return taskController;
+  }
+  
   private RunningJob addTaskToJob(JobID jobId, 
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
     synchronized (runningJobs) {
@@ -431,7 +443,7 @@ public class TaskTracker
                                 boolean isCleanupAttempt) {
                                 boolean isCleanupAttempt) {
 	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
 	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
 	if (isCleanupAttempt) { 
 	if (isCleanupAttempt) { 
-      taskDir = taskDir + ".cleanup";
+      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
 	}
 	}
 	return taskDir;
 	return taskDir;
   }
   }
@@ -590,6 +602,15 @@ public class TaskTracker
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     mapLauncher.start();
     mapLauncher.start();
     reduceLauncher.start();
     reduceLauncher.start();
+    Class<? extends TaskController> taskControllerClass 
+                          = fConf.getClass("mapred.task.tracker.task-controller",
+                                            DefaultTaskController.class, 
+                                            TaskController.class); 
+    taskController = (TaskController)ReflectionUtils.newInstance(
+                                                      taskControllerClass, fConf);
+    
+    //setup and create jobcache directory with appropriate permissions
+    taskController.setup();
   }
   }
   
   
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -874,6 +895,7 @@ public class TaskTracker
                              localJobConf.getKeepFailedTaskFiles());
                              localJobConf.getKeepFailedTaskFiles());
         rjob.localized = true;
         rjob.localized = true;
         rjob.jobConf = localJobConf;
         rjob.jobConf = localJobConf;
+        taskController.initializeJob(jobId);
       }
       }
     }
     }
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
@@ -1446,6 +1468,7 @@ public class TaskTracker
     synchronized(runningJobs) {
     synchronized(runningJobs) {
       runningJobs.remove(jobId);
       runningJobs.remove(jobId);
     }
     }
+    
   }      
   }