Bladeren bron

HADOOP-2721. Uses setsid when creating new tasks so that subprocesses of this process will be within this new session (and this process will be the process leader for all the subprocesses). Killing the process leader, or the main Java task in Hadoop's case, kills the entire subtree of processes. Contributed by Ravi Gummadi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@739302 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 jaren geleden
bovenliggende
commit
4a6d44b79b

+ 6 - 0
CHANGES.txt

@@ -50,6 +50,12 @@ Trunk (unreleased changes)
     HADOOP-5088. Include releaseaudit target as part of developer test-patch
     target.  (Giridharan Kesavan via nigel)
 
+    HADOOP-2721. Uses setsid when creating new tasks so that subprocesses of 
+    this process will be within this new session (and this process will be 
+    the process leader for all the subprocesses). Killing the process leader,
+    or the main Java task in Hadoop's case, kills the entire subtree of
+    processes. (Ravi Gummadi via ddas)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 285 - 0
src/core/org/apache/hadoop/util/ProcessTree.java

@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/** 
+ * Process tree related operations
+ */
+public class ProcessTree {
+
+  private static final Log LOG = LogFactory.getLog(ProcessTree.class);
+
+  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+
+  public static final boolean isSetsidAvailable = isSetsidSupported();
+  private static boolean isSetsidSupported() {
+    ShellCommandExecutor shexec = null;
+    boolean setsidSupported = true;
+    try {
+      String[] args = {"setsid", "bash", "-c", "echo $$"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("setsid is not available on this machine. So not using it.");
+      setsidSupported = false;
+    } finally { // handle the exit code
+      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+      return setsidSupported;
+    }
+  }
+
+  /**
+   * Kills the process(OR process group) by sending the signal SIGKILL
+   * in the current thread
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   */
+  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup) {
+    // Kill the process tree with SIGKILL if it is still alive
+    if (ProcessTree.isAlive(pid)) {
+      ShellCommandExecutor shexec = null;
+
+      try {
+        String pid_pgrpid;
+        if(isProcessGroup) {//kill the whole process group
+          pid_pgrpid = "-" + pid;
+        }
+        else {//kill single process
+          pid_pgrpid = pid;
+        }
+        
+        String[] args = { "kill", "-9", pid_pgrpid };
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (IOException ioe) {
+        LOG.warn("Error executing shell command " + ioe);
+      } finally {
+        if(isProcessGroup) {
+          LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
+            + shexec.getExitCode());
+        }
+        else {
+          LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+                    + shexec.getExitCode());
+        }
+      }
+    }
+  }
+
+  /** Kills the process(OR process group) by sending the signal SIGKILL
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  private static void sigKill(String pid, boolean isProcessGroup,
+                        long sleeptimeBeforeSigkill, boolean inBackground) {
+
+    if(inBackground) { // use a separate thread for killing
+      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
+                                                      sleeptimeBeforeSigkill);
+      sigKillThread.setDaemon(true);
+      sigKillThread.start();
+    }
+    else {
+      sigKillInCurrentThread(pid, isProcessGroup);
+    }
+  }
+
+  /** Destroy the process.
+   * @param pid Process id of to-be-killed-process
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
+                                    boolean inBackground) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", pid };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command " + ioe);
+    } finally {
+      LOG.info("Killing process " + pid +
+               " with SIGTERM. Exit code " + shexec.getExitCode());
+    }
+    
+    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+  }
+  
+  /** Destroy the process group.
+   * @param pgrpId Process group id of to-be-killed-processes
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process group is to be killed in the back ground with
+   *                     a separate thread
+   */
+  protected static void destroyProcessGroup(String pgrpId,
+                       long sleeptimeBeforeSigkill, boolean inBackground) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "--", "-" + pgrpId };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command " + ioe);
+    } finally {
+      LOG.info("Killing all processes in the process group " + pgrpId +
+               " with SIGTERM. Exit code " + shexec.getExitCode());
+    }
+    
+    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+  }
+
+  /**
+   * Destroy the process-tree.
+   * @param pid process id of the root process of the subtree of processes
+   *            to be killed
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param isProcessGroup pid is a process group leader or not
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  public static void destroy(String pid, long sleeptimeBeforeSigkill,
+                             boolean isProcessGroup, boolean inBackground) {
+    if(isProcessGroup) {
+      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+    }
+    else {
+      //TODO: Destroy all the processes in the subtree in this case also.
+      // For the time being, killing only the root process.
+      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+    }
+  }
+
+  /**
+   * Get PID from a pid-file.
+   *
+   * @param pidFileName
+   *          Name of the pid-file.
+   * @return the PID string read from the pid-file. Returns null if the
+   *         pidFileName points to a non-existing file or if read fails from
+   *         the file.
+   */
+  public static String getPidFromPidFile(String pidFileName) {
+    BufferedReader pidFile = null;
+    FileReader fReader = null;
+    String pid = null;
+
+    try {
+      fReader = new FileReader(pidFileName);
+      pidFile = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      LOG.debug("PidFile doesn't exist : " + pidFileName);
+      return pid;
+    }
+
+    try {
+      pid = pidFile.readLine();
+    } catch (IOException i) {
+      LOG.error("Failed to read from " + pidFileName);
+    } finally {
+      try {
+        if (fReader != null) {
+          fReader.close();
+        }
+        try {
+          if (pidFile != null) {
+            pidFile.close();
+          }
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + pidFile);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    return pid;
+  }
+
+  /**
+   * Is the process with PID pid still alive?
+   * This method assumes that isAlive is called on a pid that was alive not
+   * too long ago, and hence assumes no chance of pid-wrapping-around.
+   */
+  public static boolean isAlive(String pid) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "-0", pid };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (ExitCodeException ee) {
+      return false;
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command "
+          + Arrays.toString(shexec.getExecString()) + ioe);
+      return false;
+    }
+    return (shexec.getExitCode() == 0 ? true : false);
+  }
+
+  /**
+   * Helper thread class that kills process-tree with SIGKILL in background
+   */
+  static class SigKillThread extends Thread {
+    private static final Log LOG = LogFactory
+               .getLog("SigKillThread.class");
+
+    private String pid = null;
+    private boolean isProcessGroup = false;
+
+    private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+
+    private SigKillThread(String pid, boolean isProcessGroup, long interval) {
+      this.pid = pid;
+      this.isProcessGroup = isProcessGroup;
+      this.setName(this.getClass().getName() + "-" + pid);
+      sleepTimeBeforeSigKill = interval;
+    }
+
+    public void run() {
+      try {
+        // Sleep for some time before sending SIGKILL
+        Thread.sleep(sleepTimeBeforeSigKill);
+      } catch (InterruptedException i) {
+        LOG.warn("Thread sleep is interrupted.");
+      }
+
+      sigKillInCurrentThread(pid, isProcessGroup);
+    }
+  }
+}

+ 105 - 131
src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java

@@ -29,39 +29,51 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.Arrays;
 import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.util.Shell.ExitCodeException;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
  */
-public class ProcfsBasedProcessTree {
+public class ProcfsBasedProcessTree extends ProcessTree {
 
   private static final Log LOG = LogFactory
-      .getLog("org.apache.hadoop.mapred.ProcfsBasedProcessTree");
+      .getLog(ProcfsBasedProcessTree.class);
 
   private static final String PROCFS = "/proc/";
-  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
-  private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+
   private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
       .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
 
   private Integer pid = -1;
+  private boolean setsidUsed = false;
+  private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
 
   private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
+    this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+  }
+
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
+                                long sigkillInterval) {
     this.pid = getValidPID(pid);
+    this.setsidUsed = setsidUsed;
+    sleeptimeBeforeSigkill = sigkillInterval;
   }
 
+  /**
+   * Sets SIGKILL interval
+   * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
+   *                  String, boolean, long)} instead
+   * @param interval The time to wait before sending SIGKILL
+   *                 after sending SIGTERM
+   */
+  @Deprecated
   public void setSigKillInterval(long interval) {
-    sleepTimeBeforeSigKill = interval;
+    sleeptimeBeforeSigkill = interval;
   }
 
   /**
@@ -141,47 +153,107 @@ public class ProcfsBasedProcessTree {
   }
 
   /**
-   * Is the process-tree alive? Currently we care only about the status of the
-   * root-process.
+   * Is the root-process alive?
    * 
-   * @return true if the process-true is alive, false otherwise.
+   * @return true if the root-process is alive, false otherwise.
    */
   public boolean isAlive() {
     if (pid == -1) {
       return false;
     } else {
-      return this.isAlive(pid);
+      return isAlive(pid.toString());
+    }
+  }
+
+  /**
+   * Is any of the subprocesses in the process-tree alive?
+   * 
+   * @return true if any of the processes in the process-tree is
+   *           alive, false otherwise.
+   */
+  public boolean isAnyProcessInTreeAlive() {
+    for (Integer pId : processTree.keySet()) {
+      if (isAlive(pId.toString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** Verify that the given process id is same as its process group id.
+   * @param pidStr Process id of the to-be-verified-process
+   */
+  private static boolean assertPidPgrpidForMatch(String pidStr) {
+    Integer pId = Integer.parseInt(pidStr);
+    // Get information for this process
+    ProcessInfo pInfo = new ProcessInfo(pId);
+    pInfo = constructProcessInfo(pInfo);
+    //make sure that pId and its pgrpId match
+    if (!pInfo.getPgrpId().equals(pId)) {
+      LOG.warn("Unexpected: Process with PID " + pId +
+               " is not a process group leader.");
+      return false;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(pId + " is a process group leader, as expected.");
     }
+    return true;
+  }
+
+  /** Make sure that the given pid is a process group leader and then
+   * destroy the process group.
+   * @param pgrpId   Process group id of to-be-killed-processes
+   * @param interval The time to wait before sending SIGKILL
+   *                 after sending SIGTERM
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
+                       boolean inBackground)
+         throws IOException {
+    // Make sure that the pid given is a process group leader
+    if (!assertPidPgrpidForMatch(pgrpId)) {
+      throw new IOException("Process with PID " + pgrpId  +
+                          " is not a process group leader.");
+    }
+    destroyProcessGroup(pgrpId, interval, inBackground);
   }
 
   /**
-   * Destroy the process-tree. Currently we only make sure the root process is
-   * gone. It is the responsibility of the root process to make sure that all
-   * its descendants are cleaned up.
+   * Destroy the process-tree.
    */
   public void destroy() {
+    destroy(true);
+  }
+  
+  /**
+   * Destroy the process-tree.
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  public void destroy(boolean inBackground) {
     LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
     if (pid == -1) {
       return;
     }
-    ShellCommandExecutor shexec = null;
 
-    if (isAlive(this.pid)) {
-      try {
-        String[] args = { "kill", this.pid.toString() };
-        shexec = new ShellCommandExecutor(args);
-        shexec.execute();
-      } catch (IOException ioe) {
-        LOG.warn("Error executing shell command " + ioe);
-      } finally {
-        LOG.info("Killing " + pid + " with SIGTERM. Exit code "
-            + shexec.getExitCode());
+    if (isAlive(pid.toString())) {
+      if (isSetsidAvailable && setsidUsed) {
+        // In this case, we know that pid got created using setsid. So kill the
+        // whole processGroup.
+        try {
+          assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
+                              inBackground);
+        } catch (IOException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+      else {
+        //TODO: Destroy all the processes in the subtree in this case also.
+        // For the time being, killing only the root process.
+        destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
       }
     }
-
-    SigKillThread sigKillThread = new SigKillThread();
-    sigKillThread.setDaemon(true);
-    sigKillThread.start();
   }
 
   /**
@@ -200,52 +272,7 @@ public class ProcfsBasedProcessTree {
     return total;
   }
 
-  /**
-   * Get PID from a pid-file.
-   * 
-   * @param pidFileName
-   *          Name of the pid-file.
-   * @return the PID string read from the pid-file. Returns null if the
-   *         pidFileName points to a non-existing file or if read fails from the
-   *         file.
-   */
-  public static String getPidFromPidFile(String pidFileName) {
-    BufferedReader pidFile = null;
-    FileReader fReader = null;
-    String pid = null;
-
-    try {
-      fReader = new FileReader(pidFileName);
-      pidFile = new BufferedReader(fReader);
-    } catch (FileNotFoundException f) {
-      LOG.debug("PidFile doesn't exist : " + pidFileName);
-      return pid;
-    }
-
-    try {
-      pid = pidFile.readLine();
-    } catch (IOException i) {
-      LOG.error("Failed to read from " + pidFileName);
-    } finally {
-      try {
-        if (fReader != null) {
-          fReader.close();
-        }
-        try {
-          if (pidFile != null) {
-            pidFile.close();
-          }
-        } catch (IOException i) {
-          LOG.warn("Error closing the stream " + pidFile);
-        }
-      } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
-      }
-    }
-    return pid;
-  }
-
-  private Integer getValidPID(String pid) {
+  private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
       retPid = Integer.parseInt((String) pid);
@@ -285,7 +312,7 @@ public class ProcfsBasedProcessTree {
    * Construct the ProcessInfo using the process' PID and procfs and return the
    * same. Returns null on failing to read from procfs,
    */
-  private ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
     ProcessInfo ret = null;
     // Read "/proc/<pid>/stat" file
     BufferedReader in = null;
@@ -333,59 +360,6 @@ public class ProcfsBasedProcessTree {
     return ret;
   }
 
-  /**
-   * Is the process with PID pid still alive?
-   */
-  private boolean isAlive(Integer pid) {
-    // This method assumes that isAlive is called on a pid that was alive not
-    // too long ago, and hence assumes no chance of pid-wrapping-around.
-    ShellCommandExecutor shexec = null;
-    try {
-      String[] args = { "kill", "-0", pid.toString() };
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (ExitCodeException ee) {
-      return false;
-    } catch (IOException ioe) {
-      LOG.warn("Error executing shell command "
-          + Arrays.toString(shexec.getExecString()) + ioe);
-      return false;
-    }
-    return (shexec.getExitCode() == 0 ? true : false);
-  }
-
-  /**
-   * Helper thread class that kills process-tree with SIGKILL in background
-   */
-  private class SigKillThread extends Thread {
-
-    public void run() {
-      this.setName(this.getClass().getName() + "-" + String.valueOf(pid));
-      ShellCommandExecutor shexec = null;
-
-      try {
-        // Sleep for some time before sending SIGKILL
-        Thread.sleep(sleepTimeBeforeSigKill);
-      } catch (InterruptedException i) {
-        LOG.warn("Thread sleep is interrupted.");
-      }
-
-      // Kill the root process with SIGKILL if it is still alive
-      if (ProcfsBasedProcessTree.this.isAlive(pid)) {
-        try {
-          String[] args = { "kill", "-9", pid.toString() };
-          shexec = new ShellCommandExecutor(args);
-          shexec.execute();
-        } catch (IOException ioe) {
-          LOG.warn("Error executing shell command " + ioe);
-        } finally {
-          LOG.info("Killing " + pid + " with SIGKILL. Exit code "
-              + shexec.getExitCode());
-        }
-      }
-    }
-  }
-
   /**
    * Returns a string printing PIDs of process present in the
    * ProcfsBasedProcessTree. Output format : [pid pid ..]

+ 3 - 4
src/mapred/mapred-default.xml

@@ -312,12 +312,11 @@
 </property>
 
 <property>
-  <name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name>
+  <name>mapred.tasktracker.sigkillthread.sleeptime-before-sigkill</name>
   <value>5000</value>
   <description>The time, in milliseconds, the tasktracker waits for sending a
-  SIGKILL to a process that has overrun memory limits, after it has been sent
-  a SIGTERM. Used only if tasks' memory management is enabled via
-  mapred.tasktracker.tasks.maxmemory.</description>
+  SIGKILL to a process, after it has been sent a SIGTERM.
+  </description>
 </property>
 
 <property>

+ 13 - 18
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -88,10 +88,9 @@ class Child {
     t.setName("Thread for syncLogs");
     t.setDaemon(true);
     t.start();
-    //for the memory management, a PID file is written and the PID file
-    //is written once per JVM. We simply symlink the file on a per task
-    //basis later (see below). Long term, we should change the Memory
-    //manager to use JVMId instead of TaskAttemptId
+    // A PID file is written once per JVM. We simply symlink the file
+    // on a per task basis later (see below). Long term, we should change
+    // the Memory manager to use JVMId instead of TaskAttemptId
     Path srcPidPath = null;
     Path dstPidPath = null;
     int idleLoopCount = 0;
@@ -121,18 +120,15 @@ class Child {
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid);
         JobConf job = new JobConf(task.getJobFile());
-        if (job.getBoolean("task.memory.mgmt.enabled", false)) {
-          if (srcPidPath == null) {
-            srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
-                                                              job);
-          }
-          //since the JVM is running multiple tasks potentially, we need
-          //to do symlink stuff only for the subsequent tasks
-          if (!taskid.equals(firstTaskid)) {
-            dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
-            FileUtil.symLink(srcPidPath.toUri().getPath(), 
-                dstPidPath.toUri().getPath());
-          }
+        if (srcPidPath == null) {
+          srcPidPath = TaskTracker.getPidFilePath(firstTaskid, job);
+        }
+        //since the JVM is running multiple tasks potentially, we need
+        //to do symlink stuff only for the subsequent tasks
+        if (!taskid.equals(firstTaskid)) {
+          dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+          FileUtil.symLink(srcPidPath.toUri().getPath(), 
+              dstPidPath.toUri().getPath());
         }
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
@@ -155,8 +151,7 @@ class Child {
           task.run(job, umbilical);             // run the task
         } finally {
           TaskLog.syncLogs(firstTaskid, taskid);
-          if (!taskid.equals(firstTaskid) && 
-              job.getBoolean("task.memory.mgmt.enabled", false)) {
+          if (!taskid.equals(firstTaskid)) {
             new File(dstPidPath.toUri().getPath()).delete();
           }
         }

+ 22 - 6
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -31,8 +31,12 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
 
 class JvmManager {
 
@@ -339,7 +343,7 @@ class JvmManager {
           env.vargs.add(Integer.toString(jvmId.getId()));
           List<String> wrappedCommand = 
             TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-                env.logSize, env.pidFile);
+                env.logSize, true, env.pidFile);
           shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
               env.workDir, env.env);
           shexec.execute();
@@ -362,20 +366,32 @@ class JvmManager {
               FileUtil.fullyDelete(env.workDir);
             }
           } catch (IOException ie){}
-          if (tracker.isTaskMemoryManagerEnabled()) {
           // Remove the associated pid-file, if any
-            tracker.getTaskMemoryManager().
-               removePidFile(TaskAttemptID.forName(
+          tracker.removePidFile(TaskAttemptID.forName(
                    env.conf.get("mapred.task.id")));
-          }
         }
       }
 
+      /** 
+       * Kills the process. Also kills its subprocesses if the process(root of subtree
+       * of processes) is created using setsid.
+       */
       public void kill() {
         if (shexec != null) {
           Process process = shexec.getProcess();
           if (process != null) {
-            process.destroy();
+            Path pidFilePath = TaskTracker.getPidFilePath(
+                        TaskAttemptID.forName(env.conf.get("mapred.task.id")),
+                        env.conf);
+            String pid = ProcessTree.getPidFromPidFile(
+                                                    pidFilePath.toString());
+
+            long sleeptimeBeforeSigkill = env.conf.getLong(
+                 "mapred.tasktracker.sigkillthread.sleeptime-before-sigkill",
+                 ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
+                     ProcessTree.isSetsidAvailable, true/*in the background*/);
           }
         }
         removeJvm(jvmId);

+ 31 - 2
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -34,6 +34,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -349,7 +350,7 @@ public class TaskLog {
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(null, cmd, stdoutFilename,
-                              stderrFilename, tailLength, null );
+                              stderrFilename, tailLength, false, null );
   }
 
   /**
@@ -371,7 +372,7 @@ public class TaskLog {
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-        tailLength, null);
+                              tailLength, false, null);
   }
 
   /**
@@ -394,6 +395,32 @@ public class TaskLog {
                                                 long tailLength,
                                                 String pidFileName
                                                ) throws IOException {
+    return captureOutAndError (setup, cmd, stdoutFilename, stderrFilename,
+                               tailLength, false, pidFileName);
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files.
+   * Setup commands such as setting memory limit can be passed which 
+   * will be executed before exec.
+   * If the tailLength is 0, the entire output will be saved.
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @param useSetsid Should setsid be used in the command or not.
+   * @param pidFileName The name of the pid-file
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+                                                List<String> cmd, 
+                                                File stdoutFilename,
+                                                File stderrFilename,
+                                                long tailLength,
+                                                boolean useSetsid,
+                                                String pidFileName
+                                               ) throws IOException {
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);
     List<String> result = new ArrayList<String>(3);
@@ -414,6 +441,8 @@ public class TaskLog {
     }
     if (tailLength > 0) {
       mergedCmd.append("(");
+    } else if(ProcessTree.isSetsidAvailable && useSetsid) {
+      mergedCmd.append("exec setsid ");
     } else {
       mergedCmd.append("exec ");
     }

+ 14 - 49
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -27,12 +26,12 @@ import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.util.ProcessTree;
 
 /**
  * Manages memory usage of tasks running under this TT. Kills any task-trees
@@ -44,7 +43,6 @@ class TaskMemoryManagerThread extends Thread {
 
   private TaskTracker taskTracker;
   private long monitoringInterval;
-  private long sleepTimeBeforeSigKill;
 
   private long maxMemoryAllowedForAllTasks;
 
@@ -66,16 +64,12 @@ class TaskMemoryManagerThread extends Thread {
 
     monitoringInterval = taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
-    sleepTimeBeforeSigKill = taskTracker.getJobConf().getLong(
-        "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
-        ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
   }
 
   public void addTask(TaskAttemptID tid, long memLimit) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
-          sleepTimeBeforeSigKill);
+      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -93,13 +87,10 @@ class TaskMemoryManagerThread extends Thread {
     private long memLimit;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
+        ProcfsBasedProcessTree pTree, long memLimit) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
-      if (this.pTree != null) {
-        this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
-      }
       this.memLimit = memLimit;
     }
 
@@ -178,8 +169,13 @@ class TaskMemoryManagerThread extends Thread {
             // itself is still retained in runningTasks till successful
             // transmission to JT
 
+            long sleeptimeBeforeSigkill = taskTracker.getJobConf().getLong(
+                    "mapred.tasktracker.sigkillthread.sleeptime-before-sigkill",
+                    ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
             // create process tree object
-            ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
+            ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId,
+                       ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
             LOG.debug("Tracking ProcessTree " + pId + " for the first time");
 
             ptInfo.setPid(pId);
@@ -225,7 +221,7 @@ class TaskMemoryManagerThread extends Thread {
           taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 
           // Now destroy the ProcessTree, remove it from monitoring map.
-          pTree.destroy();
+          pTree.destroy(true/*in the background*/);
           it.remove();
           LOG.info("Removed ProcessTree with root " + pId);
         } else {
@@ -296,7 +292,7 @@ class TaskMemoryManagerThread extends Thread {
         // Now destroy the ProcessTree, remove it from monitoring map.
         ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
         ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
-        pTree.destroy();
+        pTree.destroy(true/*in the background*/);
         processTreeInfoMap.remove(tid);
         LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
       }
@@ -313,43 +309,12 @@ class TaskMemoryManagerThread extends Thread {
    * @return the pid of the task process.
    */
   private String getPid(TaskAttemptID tipID) {
-    Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
+    Path pidFileName = TaskTracker.getPidFilePath(tipID, taskTracker.getJobConf());
     if (pidFileName == null) {
       return null;
     }
-    return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
+    return ProcessTree.getPidFromPidFile(pidFileName.toString());
   }
 
-  private static LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator("mapred.local.dir");
 
-  /**
-   * Get the pidFile path of a Task
-   * @param tipID
-   * @return pidFile's Path
-   */
-  public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
-    Path pidFileName = null;
-    try {
-      //this actually need not use a localdirAllocator since the PID
-      //files are really small..
-      pidFileName = lDirAlloc.getLocalPathToRead(
-          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
-          conf);
-    } catch (IOException i) {
-      // PID file is not there
-      LOG.debug("Failed to get pidFile name for " + tipID);
-    }
-    return pidFileName;
-  }
-  public void removePidFile(TaskAttemptID tid) {
-    if (taskTracker.isTaskMemoryManagerEnabled()) {
-      Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
-      if (pidFilePath != null) {
-        try {
-          FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
-        } catch(IOException ie) {}
-      }
-    }
-  }
 }

+ 1 - 3
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -375,11 +375,9 @@ abstract class TaskRunner extends Thread {
       vargs.add(taskid.toString());                      // pass task identifier
 
       String pidFile = null;
-      if (tracker.isTaskMemoryManagerEnabled()) {
-        pidFile = lDirAlloc.getLocalPathForWrite(
+      pidFile = lDirAlloc.getLocalPathForWrite(
             (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
             this.conf).toString();
-      }
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);

+ 31 - 5
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -81,6 +81,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -415,7 +416,35 @@ public class TaskTracker
   static String getPidFilesSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
   }
-    
+ 
+  /**
+   * Get the pidFile path of a Task
+   * @param tid the TaskAttemptID of the task for which pidFile's path is needed
+   * @return pidFile's Path
+   */
+  public static Path getPidFilePath(TaskAttemptID tid, JobConf conf) {
+    Path pidFileName = null;
+    try {
+      //this actually need not use a localdirAllocator since the PID
+      //files are really small..
+      pidFileName = lDirAlloc.getLocalPathToRead(
+          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tid),
+          conf);
+    } catch (IOException i) {
+      // PID file is not there
+      LOG.warn("Failed to get pidFile name for " + tid + " " + i);
+    }
+    return pidFileName;
+  }
+  public void removePidFile(TaskAttemptID tid) {
+    Path pidFilePath = getPidFilePath(tid, getJobConf());
+    if (pidFilePath != null) {
+      try {
+        FileSystem.getLocal(getJobConf()).delete(pidFilePath, false);
+      } catch(IOException ie) {}
+    }
+  }
+  
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
     if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -737,7 +766,7 @@ public class TaskTracker
     }
   }
 
-  private LocalDirAllocator lDirAlloc = 
+  private static LocalDirAllocator lDirAlloc = 
                               new LocalDirAllocator("mapred.local.dir");
 
   // intialize the job directory
@@ -1900,9 +1929,6 @@ public class TaskTracker
         //disable jvm reuse
         localJobConf.setNumTasksToExecutePerJvm(1);
       }
-      if (isTaskMemoryManagerEnabled()) {
-        localJobConf.setBoolean("task.memory.mgmt.enabled", true);
-      }
       OutputStream out = localFs.create(localTaskFile);
       try {
         localJobConf.writeXml(out);

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/pipes/Application.java

@@ -96,7 +96,8 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
-    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+    cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
+                                     false, null);
 
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();

+ 311 - 0
src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java

@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ProcessTree;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A JUnit test to test Kill Job that has tasks with children and checks if the
+ * children(subprocesses of java task) are also killed when a task is killed.
+ */
+public class TestKillSubProcesses extends TestCase {
+
+  private static volatile Log LOG = LogFactory
+            .getLog(TestKillSubProcesses.class);
+
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
+  static JobClient jobClient = null;
+
+  static MiniMRCluster mr = null;
+  static Path scriptDir = new Path(TEST_ROOT_DIR + "/killjob");
+
+  // number of levels in the subtree of subprocesses of map task
+  static int numLevelsOfSubProcesses = 6;
+
+  /**
+   * Runs a job, kills the job and verifies if the map task and its
+   * subprocesses are also killed properly or not.
+   */
+  static JobID runJobKill(JobTracker jt, JobConf conf) throws IOException {
+
+    conf.setJobName("testkillsubprocesses");
+    conf.setMapperClass(KillMapperWithChild.class);
+
+    RunningJob job = runJob(conf);
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+        LOG.warn("sleep is interrupted:" + ie);
+        break;
+      }
+    }
+    String pid = null;
+    String scriptDirName = scriptDir.toString().substring(5);
+
+    // get the taskAttemptID of the map task and use it to get the pid
+    // of map task from pid file
+    TaskReport[] mapReports = jobClient.getMapTaskReports(job.getID());
+
+    JobInProgress jip = jt.getJob(job.getID());
+    for(TaskReport tr : mapReports) {
+      TaskInProgress tip = jip.getTaskInProgress(tr.getTaskID());
+      assertTrue(tip.isRunning());
+
+      // for this tip, get active tasks of all attempts
+      while(tip.getActiveTasks().size() == 0) {
+        //wait till the activeTasks Tree is built
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie) {
+          LOG.warn("sleep is interrupted:" + ie);
+          break;
+        }
+      }
+
+      for (Iterator<TaskAttemptID> it = 
+        tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
+        TaskAttemptID id = it.next();
+        LOG.info("taskAttemptID of map task is " + id);
+
+        String localDir = mr.getTaskTrackerLocalDir(0); // TT with index 0
+        LOG.info("localDir is " + localDir);
+        JobConf confForThisTask = new JobConf(conf);
+        confForThisTask.set("mapred.local.dir", localDir);//set the localDir
+
+        Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+        while (pidFilePath == null) {
+          //wait till the pid file is created
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException ie) {
+            LOG.warn("sleep is interrupted:" + ie);
+            break;
+          }
+          pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+        }
+
+        pid = ProcessTree.getPidFromPidFile(pidFilePath.toString());
+        LOG.info("pid of map task is " + pid);
+
+        // Checking if the map task is alive
+        assertTrue(ProcessTree.isAlive(pid));
+        LOG.info("The map task is alive before killJob, as expected.");
+      }
+    }
+
+    // Checking if the descendant processes of map task are alive
+    if(ProcessTree.isSetsidAvailable) {
+      String childPid = ProcessTree.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + 0);
+      while(childPid == null) {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie) {
+          LOG.warn("sleep is interrupted:" + ie);
+          break;
+        }
+        childPid = ProcessTree.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + 0);
+      }
+
+      // As childPidFile0(leaf process in the subtree of processes with
+      // map task as root) is created, all other child pid files should
+      // have been created already(See the script for details).
+      // Now check if the descendants of map task are alive.
+      for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+        childPid = ProcessTree.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + i);
+        LOG.info("pid of the descendant process at level " + i +
+                 "in the subtree of processes(with the map task as the root)" +
+                 " is " + childPid);
+        assertTrue("Unexpected: The subprocess at level " + i +
+                   " in the subtree is not alive before killJob",
+                   ProcessTree.isAlive(childPid));
+      }
+    }
+
+    // kill the job now
+    job.killJob();
+
+    while (job.cleanupProgress() == 0.0f) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+        LOG.warn("sleep is interrupted:" + ie);
+        break;
+      }
+    }
+
+    // Checking that the Job got killed
+    assertTrue(job.isComplete());
+    assertEquals(job.getJobState(), JobStatus.KILLED);
+
+    // Checking if the map task got killed or not
+    assertTrue(!ProcessTree.isAlive(pid));
+    LOG.info("The map task is not alive after killJob, as expected.");
+
+    // Checking if the descendant processes of map task are killed properly
+    if(ProcessTree.isSetsidAvailable) {
+      for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+        String childPid = ProcessTree.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + i);
+        LOG.info("pid of the descendant process at level " + i +
+                 "in the subtree of processes(with the map task as the root)" +
+                 " is " + childPid);
+        assertTrue("Unexpected: The subprocess at level " + i +
+                   " in the subtree is alive after killJob",
+                   !ProcessTree.isAlive(childPid));
+      }
+    }
+
+    return job.getID();
+  }
+
+  static RunningJob runJob(JobConf conf) throws IOException {
+
+    final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
+    final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
+
+    FileSystem fs = FileSystem.get(conf);
+    if(fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if(fs.exists(scriptDir)) {
+      fs.delete(scriptDir, true);
+    }
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    // create input file
+    String input = "The quick brown fox\n" + "has many silly\n"
+        + "red fox sox\n";
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes(input);
+    file.close();
+
+
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(IntWritable.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    conf.set("test.build.data", TEST_ROOT_DIR);
+
+    jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+
+    return job;
+
+  }
+
+  public void testJobKill() throws IOException {
+    JobConf conf=null;
+    try {
+      mr = new MiniMRCluster(1, "file:///", 1);
+
+      // run the TCs
+      conf = mr.createJobConf();
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      runJobKill(jt, conf);
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+      FileSystem fs = FileSystem.get(conf);
+      if(fs.exists(scriptDir)) {
+        fs.delete(scriptDir, true);
+      }
+    }
+  }
+
+  static class KillMapperWithChild extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+    public void configure(JobConf conf) {
+      try {
+        FileSystem fs = FileSystem.get(conf);
+        TEST_ROOT_DIR = conf.get("test.build.data").toString().substring(5);
+        scriptDir = new Path(TEST_ROOT_DIR + "/killjob");
+
+        if(ProcessTree.isSetsidAvailable) {
+          // create shell script
+          Random rm = new Random();
+          Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt() + ".sh");
+          String shellScript = scriptPath.toString();
+          String script =
+             "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
+             "echo hello\nsleep 1\n" +
+             "if [ $1 != 0 ]\nthen\n" +
+             " sh " + shellScript + " $(($1-1))\n" +
+             "else\n" +
+             " while true\n do\n" +
+             "  sleep 2\n" +
+             " done\n" +
+             "fi";
+          DataOutputStream file = fs.create(scriptPath);
+          file.writeBytes(script);
+          file.close();
+
+          LOG.info("Calling script from map task : " + shellScript);
+          Runtime.getRuntime().exec(shellScript + " " +
+                                    numLevelsOfSubProcesses);
+        }
+      } catch (Exception e) {
+        LOG.warn("Exception in KillMapperWithChild.configure: " +
+                 StringUtils.stringifyException(e));
+      }
+    }
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      try {
+        while(true) {//just wait till kill happens
+          Thread.sleep(1000);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Exception in KillMapperWithChild.map:" + e);
+      }
+    }
+  }
+}

+ 61 - 32
src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

@@ -22,36 +22,48 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Random;
+import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 import junit.framework.TestCase;
 
+/**
+ * A JUnit test to test ProcfsBasedProcessTree.
+ */
 public class TestProcfsBasedProcessTree extends TestCase {
 
   private static final Log LOG = LogFactory
       .getLog(TestProcfsBasedProcessTree.class);
+  private static String TEST_ROOT_DIR = new Path(System.getProperty(
+         "test.build.data", "/tmp")).toString().replace(' ', '+');
+
   private ShellCommandExecutor shexec = null;
-  private String pidFile;
+  private String pidFile, lowestDescendant;
   private String shellScript;
-  private static final int N = 10; // Controls the RogueTask
-
-  private static final int memoryLimit = 15 * 1024 * 1024; // 15MB
-  private static final long PROCESSTREE_RECONSTRUCTION_INTERVAL =
-    ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL; // msec
+  private static final int N = 6; // Controls the RogueTask
 
   private class RogueTaskThread extends Thread {
     public void run() {
       try {
-        String args[] = { "bash", "-c",
-            "echo $$ > " + pidFile + "; sh " + shellScript + " " + N + ";" };
-        shexec = new ShellCommandExecutor(args);
+        Vector<String> args = new Vector<String>();
+        if(ProcessTree.isSetsidAvailable) {
+          args.add("setsid");
+        }
+        args.add("bash");
+        args.add("-c");
+        args.add(" echo $$ > " + pidFile + "; sh " +
+                          shellScript + " " + N + ";") ;
+        shexec = new ShellCommandExecutor(args.toArray(new String[0]));
         shexec.execute();
       } catch (ExitCodeException ee) {
-        LOG.info("Shell Command exit with a non-zero exit code. " + ee);
+        LOG.info("Shell Command exit with a non-zero exit code. This is" +
+                 " expected as we are killing the subprocesses of the" +
+                 " task intentionally. " + ee);
       } catch (IOException ioe) {
         LOG.info("Error executing shell command " + ioe);
       } finally {
@@ -71,7 +83,7 @@ public class TestProcfsBasedProcessTree extends TestCase {
     }
 
     // read from pidFile
-    return ProcfsBasedProcessTree.getPidFromPidFile(pidFile);
+    return ProcessTree.getPidFromPidFile(pidFile);
   }
 
   public void testProcessTree() {
@@ -88,26 +100,34 @@ public class TestProcfsBasedProcessTree extends TestCase {
     }
     // create shell script
     Random rm = new Random();
-    File tempFile = new File(this.getName() + "_shellScript_" + rm.nextInt()
-        + ".sh");
+    File tempFile = new File(TEST_ROOT_DIR, this.getName() + "_shellScript_" +
+                             rm.nextInt() + ".sh");
     tempFile.deleteOnExit();
-    shellScript = tempFile.getName();
+    shellScript = TEST_ROOT_DIR + File.separator + tempFile.getName();
 
     // create pid file
-    tempFile = new File(this.getName() + "_pidFile_" + rm.nextInt() + ".pid");
+    tempFile = new File(TEST_ROOT_DIR,  this.getName() + "_pidFile_" +
+                        rm.nextInt() + ".pid");
     tempFile.deleteOnExit();
-    pidFile = tempFile.getName();
+    pidFile = TEST_ROOT_DIR + File.separator + tempFile.getName();
+
+    lowestDescendant = TEST_ROOT_DIR + File.separator + "lowestDescendantPidFile";
 
     // write to shell-script
     try {
       FileWriter fWriter = new FileWriter(shellScript);
       fWriter.write(
           "# rogue task\n" +
-          "sleep 10\n" +
+          "sleep 1\n" +
           "echo hello\n" +
           "if [ $1 -ne 0 ]\n" +
           "then\n" +
           " sh " + shellScript + " $(($1-1))\n" +
+          "else\n" +
+          " echo $$ > " + lowestDescendant + "\n" +
+          " while true\n do\n" +
+          "  sleep 5\n" +
+          " done\n" +
           "fi");
       fWriter.close();
     } catch (IOException ioe) {
@@ -119,25 +139,34 @@ public class TestProcfsBasedProcessTree extends TestCase {
     t.start();
     String pid = getRogueTaskPID();
     LOG.info("Root process pid: " + pid);
-    ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid);
+    ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
+                               ProcessTree.isSetsidAvailable,
+                               ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
     p = p.getProcessTree(); // initialize
-    try {
-      while (true) {
-        LOG.info("ProcessTree: " + p.toString());
-        long mem = p.getCumulativeVmem();
-        LOG.info("Memory usage: " + mem + "bytes.");
-        if (mem > memoryLimit) {
-          p.destroy();
-          break;
-        }
-        Thread.sleep(PROCESSTREE_RECONSTRUCTION_INTERVAL);
-        p = p.getProcessTree(); // reconstruct
+    LOG.info("ProcessTree: " + p.toString());
+
+    File leaf = new File(lowestDescendant);
+    //wait till lowest descendant process of Rougue Task starts execution
+    while (!leaf.exists()) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ie) {
+        break;
       }
-    } catch (InterruptedException ie) {
-      LOG.info("Interrupted.");
     }
 
-    assertEquals(false, p.isAlive()); // processtree should should be gone
+    p = p.getProcessTree(); // reconstruct
+    LOG.info("ProcessTree: " + p.toString());
+
+    // destroy the map task and all its subprocesses
+    p.destroy(true/*in the background*/);
+
+    if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone
+      assertEquals(false, p.isAnyProcessInTreeAlive());
+    }
+    else {// process should be gone
+      assertEquals(false, p.isAlive());
+    }
     // Not able to join thread sometimes when forking with large N.
     try {
       t.join(2000);