Browse Source

commit 46247fca6d3a0fe6e475109a6ccfdc95af454388
Author: Yahoo\! <ltucker@yahoo-inc.com>
Date: Tue Aug 18 09:22:17 2009 -0700

Apply Patch for MAPREDUCE:478 from: http://issues.apache.org/jira/secure/attachment/12416638/MAPREDUCE-478_3_20090814_yhadoop.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1076978 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
4c8c02aab4

+ 12 - 3
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java

@@ -76,9 +76,18 @@ public class TestMultipleCachefiles extends TestCase
           "-jobconf", strNamenode,
           "-jobconf", strJobtracker,
           "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
-          "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
-                      "-Dbuild.test=" + System.getProperty("build.test") + " " +
-                      conf.get("mapred.child.java.opts",""),
+          "-jobconf", 
+            JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" +
+              "-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+              "-Dbuild.test=" + System.getProperty("build.test") + " " +
+              conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, 
+                       conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")),
+          "-jobconf", 
+            JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + "=" +
+              "-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+              "-Dbuild.test=" + System.getProperty("build.test") + " " +
+              conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, 
+                       conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")),
           "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#" + mapString,
           "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE_2 + "#" + mapString2
         };

+ 12 - 3
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java

@@ -73,9 +73,18 @@ public class TestSymLink extends TestCase
           "-jobconf", strNamenode,
           "-jobconf", strJobtracker,
           "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
-          "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
-                      "-Dbuild.test=" + System.getProperty("build.test") + " " +
-                      conf.get("mapred.child.java.opts",""),
+          "-jobconf", 
+            JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" +
+              "-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+              "-Dbuild.test=" + System.getProperty("build.test") + " " +
+              conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, 
+                       conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")),
+          "-jobconf", 
+            JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + "=" +
+              "-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+              "-Dbuild.test=" + System.getProperty("build.test") + " " +
+              conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, 
+                       conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")),
           "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
         };
 

+ 2 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
 
@@ -54,7 +55,7 @@ public class TestUlimit extends TestCase {
       "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
       "-numReduceTasks", "0",
       "-jobconf", "mapred.map.tasks=1",
-      "-jobconf", "mapred.child.ulimit=" + memLimit,
+      "-jobconf", JobConf.MAPRED_MAP_TASK_ULIMIT + "=" + memLimit,
       "-jobconf", "mapred.job.tracker=" + "localhost:" +
                                            mr.getJobTrackerPort(),
       "-jobconf", "fs.default.name=" + "hdfs://localhost:" 

+ 29 - 2
src/core/org/apache/hadoop/util/Shell.java

@@ -63,6 +63,31 @@ abstract public class Shell {
   /** If or not script timed out*/
   private AtomicBoolean timedOut;
 
+  /** a Unix command to get ulimit of a process. */
+  public static final String ULIMIT_COMMAND = "ulimit";
+  
+  /** 
+   * Get the Unix command for setting the maximum virtual memory available
+   * to a given child process. This is only relevant when we are forking a
+   * process from within the Mapper or the Reducer implementations.
+   * Also see Hadoop Pipes and Hadoop Streaming.
+   * 
+   * It also checks to ensure that we are running on a *nix platform else 
+   * (e.g. in Cygwin/Windows) it returns <code>null</code>.
+   * @param memoryLimit virtual memory limit
+   * @return a <code>String[]</code> with the ulimit command arguments or 
+   *         <code>null</code> if we are running on a non *nix platform or
+   *         if the limit is unspecified.
+   */
+  public static String[] getUlimitMemoryCommand(int memoryLimit) {
+    // ulimit isn't supported on Windows
+    if (WINDOWS) {
+      return null;
+    }
+    
+    return new String[] {ULIMIT_COMMAND, "-v", String.valueOf(memoryLimit)};
+  }
+  
   /** 
    * Get the Unix command for setting the maximum virtual memory available
    * to a given child process. This is only relevant when we are forking a
@@ -77,7 +102,9 @@ abstract public class Shell {
    * @return a <code>String[]</code> with the ulimit command arguments or 
    *         <code>null</code> if we are running on a non *nix platform or
    *         if the limit is unspecified.
+   * @deprecated Use {@link #getUlimitMemoryCommand(int)}
    */
+  @Deprecated
   public static String[] getUlimitMemoryCommand(Configuration conf) {
     // ulimit isn't supported on Windows
     if (WINDOWS) {
@@ -92,8 +119,8 @@ abstract public class Shell {
     
     // Parse it to ensure it is legal/sane
     int memoryLimit = Integer.valueOf(ulimit);
-
-    return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)};
+    
+    return getUlimitMemoryCommand(memoryLimit);
   }
   
   /** Set to true on Windows platforms */

+ 20 - 4
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -392,10 +392,18 @@
                   </tr>
                   <tr>
                     <td>conf/mapred-site.xml</td>
-                    <td>mapred.child.java.opts</td>
+                    <td>mapred.map.child.java.opts</td>
                     <td>-Xmx512M</td>
                     <td>
-                      Larger heap-size for child jvms of maps/reduces. 
+                      Larger heap-size for child jvms of maps. 
+                    </td>
+                  </tr>
+                  <tr>
+                    <td>conf/mapred-site.xml</td>
+                    <td>mapred.reduce.child.java.opts</td>
+                    <td>-Xmx512M</td>
+                    <td>
+                      Larger heap-size for child jvms of reduces. 
                     </td>
                   </tr>
                   <tr>
@@ -465,9 +473,17 @@
                   </tr>
                   <tr>
                     <td>conf/mapred-site.xml</td>
-                    <td>mapred.child.java.opts</td>
+                    <td>mapred.map.child.java.opts</td>
+                    <td>-Xmx512M</td>
+                    <td>
+                      Larger heap-size for child jvms of maps. 
+                    </td>
+                  </tr>
+                  <tr>
+                    <td>conf/mapred-site.xml</td>
+                    <td>mapred.reduce.child.java.opts</td>
                     <td>-Xmx1024M</td>
-                    <td>Larger heap-size for child jvms of maps/reduces.</td>
+                    <td>Larger heap-size for child jvms of reduces.</td>
                   </tr>
                 </table>
               </li>

+ 31 - 16
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1051,24 +1051,25 @@
         
         <p>The child-task inherits the environment of the parent 
         <code>TaskTracker</code>. The user can specify additional options to the
-        child-jvm via the <code>mapred.child.java.opts</code> configuration
-        parameter in the <code>JobConf</code> such as non-standard paths for the 
-        run-time linker to search shared libraries via 
+        child-jvm via the <code>mapred.{map|reduce}.child.java.opts</code> 
+        configuration parameter in the <code>JobConf</code> such as non-standard 
+         paths for the run-time linker to search shared libraries via 
         <code>-Djava.library.path=&lt;&gt;</code> etc. If the 
-        <code>mapred.child.java.opts</code> contains the symbol <em>@taskid@</em> 
-        it is interpolated with value of <code>taskid</code> of the map/reduce
-        task.</p>
+        <code>mapred.{map|reduce}.child.java.opts</code> parameters contains the 
+        symbol <em>@taskid@</em> it is interpolated with value of 
+        <code>taskid</code> of the map/reduce task.</p>
         
         <p>Here is an example with multiple arguments and substitutions, 
         showing jvm GC logging, and start of a passwordless JVM JMX agent so that
         it can connect with jconsole and the likes to watch child memory, 
         threads and get thread dumps. It also sets the maximum heap-size of the 
-        child jvm to 512MB and adds an additional path to the 
-        <code>java.library.path</code> of the child-jvm.</p>
+        map and reduce child jvm to 512MB &amp; 1024MB respectively. It also 
+        adds an additional path to the <code>java.library.path</code> of the 
+        child-jvm.</p>
 
         <p>
           <code>&lt;property&gt;</code><br/>
-          &nbsp;&nbsp;<code>&lt;name&gt;mapred.child.java.opts&lt;/name&gt;</code><br/>
+          &nbsp;&nbsp;<code>&lt;name&gt;mapred.map.child.java.opts&lt;/name&gt;</code><br/>
           &nbsp;&nbsp;<code>&lt;value&gt;</code><br/>
           &nbsp;&nbsp;&nbsp;&nbsp;<code>
                     -Xmx512M -Djava.library.path=/home/mycompany/lib
@@ -1080,19 +1081,33 @@
           <code>&lt;/property&gt;</code>
         </p>
         
+        <p>
+          <code>&lt;property&gt;</code><br/>
+          &nbsp;&nbsp;<code>&lt;name&gt;mapred.reduce.child.java.opts&lt;/name&gt;</code><br/>
+          &nbsp;&nbsp;<code>&lt;value&gt;</code><br/>
+          &nbsp;&nbsp;&nbsp;&nbsp;<code>
+                    -Xmx1024M -Djava.library.path=/home/mycompany/lib
+                    -verbose:gc -Xloggc:/tmp/@taskid@.gc</code><br/>
+          &nbsp;&nbsp;&nbsp;&nbsp;<code>
+                    -Dcom.sun.management.jmxremote.authenticate=false 
+                    -Dcom.sun.management.jmxremote.ssl=false</code><br/>
+          &nbsp;&nbsp;<code>&lt;/value&gt;</code><br/>
+          <code>&lt;/property&gt;</code>
+        </p>
+        
         <section>
         <title> Memory management</title>
         <p>Users/admins can also specify the maximum virtual memory 
         of the launched child-task, and any sub-process it launches 
-        recursively, using <code>mapred.child.ulimit</code>. Note that
-        the value set here is a per process limit.
-        The value for <code>mapred.child.ulimit</code> should be specified 
-        in kilo bytes (KB). And also the value must be greater than
+        recursively, using <code>mapred.{map|reduce}.child.ulimit</code>. Note 
+        that the value set here is a per process limit.
+        The value for <code>mapred.{map|reduce}.child.ulimit</code> should be 
+        specified in kilo bytes (KB). And also the value must be greater than
         or equal to the -Xmx passed to JavaVM, else the VM might not start. 
         </p>
         
-        <p>Note: <code>mapred.child.java.opts</code> are used only for 
-        configuring the launched child tasks from task tracker. Configuring 
+        <p>Note: <code>mapred.{map|reduce}.child.java.opts</code> are used only 
+        for configuring the launched child tasks from task tracker. Configuring 
         the memory options for daemons is documented in 
         <a href="cluster_setup.html#Configuring+the+Environment+of+the+Hadoop+Daemons">
         cluster_setup.html </a></p>
@@ -1229,7 +1244,7 @@
                 shuffle.</td></tr>
             <tr><td>mapred.job.shuffle.input.buffer.percent</td><td>float</td>
                 <td>The percentage of memory- relative to the maximum heapsize
-                as typically specified in <code>mapred.child.java.opts</code>-
+                as typically specified in <code>mapred.reduce.child.java.opts</code>-
                 that can be allocated to storing map outputs during the
                 shuffle. Though some memory should be set aside for the
                 framework, in general it is advantageous to set this high

+ 147 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -157,6 +157,153 @@ public class JobConf extends Configuration {
   static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.job.reduce.memory.mb";
 
+  /**
+   * Configuration key to set the java command line options for the child
+   * map and reduce tasks.
+   * 
+   * Java opts for the task tracker child processes.
+   * The following symbol, if present, will be interpolated: @taskid@. 
+   * It is replaced by current TaskID. Any other occurrences of '@' will go 
+   * unchanged.
+   * For example, to enable verbose gc logging to a file named for the taskid in
+   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
+   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+   * 
+   * The configuration variable {@link #MAPRED_TASK_ULIMIT} can be used to 
+   * control the maximum virtual memory of the child processes.
+   * 
+   * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
+   * other environment variables to the child processes.
+   * 
+   * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
+   *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
+   */
+  @Deprecated
+  public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
+  
+  /**
+   * Configuration key to set the java command line options for the map tasks.
+   * 
+   * Java opts for the task tracker child map processes.
+   * The following symbol, if present, will be interpolated: @taskid@. 
+   * It is replaced by current TaskID. Any other occurrences of '@' will go 
+   * unchanged.
+   * For example, to enable verbose gc logging to a file named for the taskid in
+   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
+   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+   * 
+   * The configuration variable {@link #MAPRED_MAP_TASK_ULIMIT} can be used to 
+   * control the maximum virtual memory of the map processes.
+   * 
+   * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
+   * other environment variables to the map processes.
+   */
+  public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
+    "mapred.map.child.java.opts";
+  
+  /**
+   * Configuration key to set the java command line options for the reduce tasks.
+   * 
+   * Java opts for the task tracker child reduce processes.
+   * The following symbol, if present, will be interpolated: @taskid@. 
+   * It is replaced by current TaskID. Any other occurrences of '@' will go 
+   * unchanged.
+   * For example, to enable verbose gc logging to a file named for the taskid in
+   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
+   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+   * 
+   * The configuration variable {@link #MAPRED_REDUCE_TASK_ULIMIT} can be used  
+   * to control the maximum virtual memory of the reduce processes.
+   * 
+   * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
+   * pass process environment variables to the reduce processes.
+   */
+  public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
+    "mapred.reduce.child.java.opts";
+  
+  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
+  
+  /**
+   * Configuration key to set the maximum virutal memory available to the child
+   * map and reduce tasks (in kilo-bytes).
+   * 
+   * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
+   *       via {@link #MAPRED_TASK_JAVA_OPTS}, else the VM might not start.
+   * 
+   * @deprecated Use {@link #MAPRED_MAP_TASK_ULIMIT} or 
+   *                 {@link #MAPRED_REDUCE_TASK_ULIMIT}
+   */
+  @Deprecated
+  public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
+
+  /**
+   * Configuration key to set the maximum virutal memory available to the
+   * map tasks (in kilo-bytes).
+   * 
+   * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
+   *       via {@link #MAPRED_MAP_TASK_JAVA_OPTS}, else the VM might not start.
+   */
+  public static final String MAPRED_MAP_TASK_ULIMIT = "mapred.map.child.ulimit";
+  
+  /**
+   * Configuration key to set the maximum virutal memory available to the
+   * reduce tasks (in kilo-bytes).
+   * 
+   * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
+   *       via {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}, else the VM might not start.
+   */
+  public static final String MAPRED_REDUCE_TASK_ULIMIT =
+    "mapred.reduce.child.ulimit";
+
+  /**
+   * Configuration key to set the environment of the child map/reduce tasks.
+   * 
+   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
+   * reference existing environment variables via <code>$key</code>.
+   * 
+   * Example:
+   * <ul>
+   *   <li> A=foo - This will set the env variable A to foo. </li>
+   *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
+   * </ul>
+   * 
+   * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
+   *                 {@link #MAPRED_REDUCE_TASK_ENV}
+   */
+  @Deprecated
+  public static final String MAPRED_TASK_ENV = "mapred.child.env";
+
+  /**
+   * Configuration key to set the maximum virutal memory available to the
+   * map tasks.
+   * 
+   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
+   * reference existing environment variables via <code>$key</code>.
+   * 
+   * Example:
+   * <ul>
+   *   <li> A=foo - This will set the env variable A to foo. </li>
+   *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
+   * </ul>
+   */
+  public static final String MAPRED_MAP_TASK_ENV = "mapred.map.child.env";
+  
+  /**
+   * Configuration key to set the maximum virutal memory available to the
+   * reduce tasks.
+   * 
+   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
+   * reference existing environment variables via <code>$key</code>.
+   * 
+   * Example:
+   * <ul>
+   *   <li> A=foo - This will set the env variable A to foo. </li>
+   *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
+   * </ul>
+   */
+  public static final String MAPRED_REDUCE_TASK_ENV =
+    "mapred.reduce.child.env";
+
   /**
    * Construct a map/reduce job configuration.
    */

+ 20 - 1
src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java

@@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
-
+  
   public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
     super(task, tracker, conf);
   }
@@ -43,4 +43,23 @@ class MapTaskRunner extends TaskRunner {
     LOG.info(getTask()+" done; removing files.");
     mapOutputFile.removeAll(getTask().getTaskID());
   }
+
+  @Override
+  public String getChildJavaOpts(JobConf jobConf, String defaultValue) {
+    return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, 
+                       super.getChildJavaOpts(jobConf, 
+                           JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+  }
+  
+  @Override
+  public int getChildUlimit(JobConf jobConf) {
+    return jobConf.getInt(JobConf.MAPRED_MAP_TASK_ULIMIT, 
+                          super.getChildUlimit(jobConf));
+  }
+
+  @Override
+  public String getChildEnv(JobConf jobConf) {
+    return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV, super.getChildEnv(jobConf));
+  }
+
 }

+ 21 - 1
src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
-  
+
   public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
     
@@ -48,4 +48,24 @@ class ReduceTaskRunner extends TaskRunner {
     getTask().getProgress().setStatus("closed");
     mapOutputFile.removeAll(getTask().getTaskID());
   }
+
+  @Override
+  public String getChildJavaOpts(JobConf jobConf, String defaultValue) {
+    return jobConf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, 
+                       super.getChildJavaOpts(jobConf, 
+                           JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+  }
+ 
+  @Override
+  public int getChildUlimit(JobConf jobConf) {
+    return jobConf.getInt(JobConf.MAPRED_REDUCE_TASK_ULIMIT, 
+                          super.getChildUlimit(jobConf));
+  }
+
+  @Override
+  public String getChildEnv(JobConf jobConf) {
+    return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV, 
+                       super.getChildEnv(jobConf));
+  }
+
 }

+ 58 - 8
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -99,6 +99,47 @@ abstract class TaskRunner extends Thread {
     return str.toString();
   }
   
+  /**
+   * Get the java command line options for the child map/reduce tasks.
+   * @param jobConf job configuration
+   * @param defaultValue default value
+   * @return the java command line options for child map/reduce tasks
+   * @deprecated Use command line options specific to map or reduce tasks set 
+   *             via {@link JobConf#MAPRED_MAP_TASK_JAVA_OPTS} or 
+   *             {@link JobConf#MAPRED_REDUCE_TASK_JAVA_OPTS}
+   */
+  @Deprecated
+  public String getChildJavaOpts(JobConf jobConf, String defaultValue) {
+    return jobConf.get(JobConf.MAPRED_TASK_JAVA_OPTS, defaultValue);
+  }
+  
+  /**
+   * Get the maximum virtual memory of the child map/reduce tasks.
+   * @param jobConf job configuration
+   * @return the maximum virtual memory of the child task or <code>-1</code> if
+   *         none is specified
+   * @deprecated Use limits specific to the map or reduce tasks set via
+   *             {@link JobConf#MAPRED_MAP_TASK_ULIMIT} or
+   *             {@link JobConf#MAPRED_REDUCE_TASK_ULIMIT} 
+   */
+  @Deprecated
+  public int getChildUlimit(JobConf jobConf) {
+    return jobConf.getInt(JobConf.MAPRED_TASK_ULIMIT, -1);
+  }
+  
+  /**
+   * Get the environment variables for the child map/reduce tasks.
+   * @param jobConf job configuration
+   * @return the environment variables for the child map/reduce tasks or
+   *         <code>null</code> if unspecified
+   * @deprecated Use environment variables specific to the map or reduce tasks
+   *             set via {@link JobConf#MAPRED_MAP_TASK_ENV} or
+   *             {@link JobConf#MAPRED_REDUCE_TASK_ENV}
+   */
+  public String getChildEnv(JobConf jobConf) {
+    return jobConf.get(JobConf.MAPRED_TASK_ENV);
+  }
+  
   @Override
   public final void run() {
     String errorInfo = "Child Error";
@@ -273,8 +314,8 @@ abstract class TaskRunner extends Thread {
 
       // Add child (task) java-vm options.
       //
-      // The following symbols if present in mapred.child.java.opts value are
-      // replaced:
+      // The following symbols if present in mapred.{map|reduce}.child.java.opts 
+      // value are replaced:
       // + @taskid@ is interpolated with value of TaskID.
       // Other occurrences of @ will not be altered.
       //
@@ -284,14 +325,23 @@ abstract class TaskRunner extends Thread {
       // and get thread dumps.
       //
       //  <property>
-      //    <name>mapred.child.java.opts</name>
-      //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
+      //    <name>mapred.map.child.java.opts</name>
+      //    <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+      //           -Dcom.sun.management.jmxremote.authenticate=false \
+      //           -Dcom.sun.management.jmxremote.ssl=false \
+      //    </value>
+      //  </property>
+      //
+      //  <property>
+      //    <name>mapred.reduce.child.java.opts</name>
+      //    <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
       //           -Dcom.sun.management.jmxremote.authenticate=false \
       //           -Dcom.sun.management.jmxremote.ssl=false \
       //    </value>
       //  </property>
       //
-      String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+      String javaOpts = getChildJavaOpts(conf, 
+                                         JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS);
       javaOpts = javaOpts.replace("@taskid@", taskid.toString());
       String [] javaOptsSplit = javaOpts.split(" ");
       
@@ -302,7 +352,7 @@ abstract class TaskRunner extends Thread {
       // 2. We also add the 'cwd' of the task to it's java.library.path to help 
       //    users distribute native libraries via the DistributedCache.
       // 3. The user can also specify extra paths to be added to the 
-      //    java.library.path via mapred.child.java.opts.
+      //    java.library.path via mapred.{map|reduce}.child.java.opts.
       //
       String libraryPath = System.getProperty("java.library.path");
       if (libraryPath == null) {
@@ -372,7 +422,7 @@ abstract class TaskRunner extends Thread {
       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
-      String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+      String[] ulimitCmd = Shell.getUlimitMemoryCommand(getChildUlimit(conf));
       List<String> setup = null;
       if (ulimitCmd != null) {
         setup = new ArrayList<String>();
@@ -399,7 +449,7 @@ abstract class TaskRunner extends Thread {
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
       
       // add the env variables passed by the user
-      String mapredChildEnv = conf.get("mapred.child.env");
+      String mapredChildEnv = getChildEnv(conf);
       if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
         String childEnvs[] = mapredChildEnv.split(",");
         for (String cEnv : childEnvs) {

+ 167 - 46
src/test/org/apache/hadoop/mapred/TestMiniMRChildTask.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
+import java.util.Iterator;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +44,11 @@ public class TestMiniMRChildTask extends TestCase {
   private static final Log LOG =
     LogFactory.getLog(TestMiniMRChildTask.class.getName());
 
+  private final static String OLD_CONFIGS = "test.old.configs";
+  private final static String TASK_OPTS_VAL = "-Xmx200m";
+  private final static String MAP_OPTS_VAL = "-Xmx200m";
+  private final static String REDUCE_OPTS_VAL = "-Xmx300m";
+
   private MiniMRCluster mr;
   private MiniDFSCluster dfs;
   private FileSystem fileSys;
@@ -85,7 +92,8 @@ public class TestMiniMRChildTask extends TestCase {
 
   // configure a job
   private void configure(JobConf conf, Path inDir, Path outDir, String input,
-                         Class<? extends Mapper> map) 
+                         Class<? extends Mapper> map, 
+                         Class<? extends Reducer> reduce) 
   throws IOException {
     // set up the input file system and write input text.
     FileSystem inFs = inDir.getFileSystem(conf);
@@ -104,7 +112,7 @@ public class TestMiniMRChildTask extends TestCase {
     // configure the mapred Job which creates a tempfile in map.
     conf.setJobName("testmap");
     conf.setMapperClass(map);
-    conf.setReducerClass(IdentityReducer.class);
+    conf.setReducerClass(reduce);
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(0);
     FileInputFormat.setInputPaths(conf, inDir);
@@ -127,7 +135,8 @@ public class TestMiniMRChildTask extends TestCase {
                          Path outDir,
                          String input)
   throws IOException {
-    configure(conf, inDir, outDir, input, MapClass.class);
+    configure(conf, inDir, outDir, input, 
+              MapClass.class, IdentityReducer.class);
 
     FileSystem outFs = outDir.getFileSystem(conf);
     
@@ -147,16 +156,52 @@ public class TestMiniMRChildTask extends TestCase {
     outFs.delete(outDir, true);
   }
 
+  private static void checkEnv(String envName, String expValue, String mode) {
+    String envValue = System.getenv(envName).trim();
+    if ("append".equals(mode)) {
+      if (envValue == null || !envValue.contains(":")) {
+        throw new RuntimeException("Missing env variable");
+      } else {
+        String parts[] = envValue.split(":");
+        // check if the value is appended
+        if (!parts[parts.length - 1].equals(expValue)) {
+          throw new RuntimeException("Wrong env variable in append mode");
+        }
+      }
+    } else {
+      if (envValue == null || !envValue.equals(expValue)) {
+        throw new RuntimeException("Wrong env variable in noappend mode");
+      }
+    }
+  }
+
   // Mappers that simply checks if the desired user env are present or not
   static class EnvCheckMapper extends MapReduceBase implements
       Mapper<WritableComparable, Writable, WritableComparable, Writable> {
-    private static String PATH;
     
-    public void map(WritableComparable key, Writable value,
-        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
-        throws IOException {
+    public void configure(JobConf job) {
+      boolean oldConfigs = job.getBoolean(OLD_CONFIGS, false);
+      if (oldConfigs) {
+        String javaOpts = job.get(JobConf.MAPRED_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", 
+                      javaOpts);
+        assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + 
+                     javaOpts, 
+                     javaOpts, TASK_OPTS_VAL);
+      } else {
+        String mapJavaOpts = job.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " is null!", 
+                      mapJavaOpts);
+        assertEquals(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " has value of: " + 
+                     mapJavaOpts, 
+                     mapJavaOpts, MAP_OPTS_VAL);
+      }
+
+      String path = job.get("path");
+      
       // check if the pwd is there in LD_LIBRARY_PATH
       String pwd = System.getenv("PWD");
+      
       assertTrue("LD doesnt contain pwd", 
                  System.getenv("LD_LIBRARY_PATH").contains(pwd));
       
@@ -170,34 +215,69 @@ public class TestMiniMRChildTask extends TestCase {
       checkEnv("NEW_PATH", ":/tmp", "noappend");
       // check if X=$(tt's X var):/tmp for an old env variable inherited from 
       // the tt
-      checkEnv("PATH",  PATH + ":/tmp", "noappend");
+      checkEnv("PATH",  path + ":/tmp", "noappend");
     }
 
-    private void checkEnv(String envName, String expValue, String mode) 
-    throws IOException {
-      String envValue = System.getenv(envName).trim();
-      if ("append".equals(mode)) {
-        if (envValue == null || !envValue.contains(":")) {
-          throw new  IOException("Missing env variable");
-        } else {
-          String parts[] = envValue.split(":");
-          // check if the value is appended
-          if (!parts[parts.length - 1].equals(expValue)) {
-            throw new  IOException("Wrong env variable in append mode");
-          }
-        }
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector<WritableComparable, Writable> out, 
+                    Reporter reporter)
+        throws IOException {
+    }
+  }
+
+  static class EnvCheckReducer extends MapReduceBase 
+  implements Reducer<WritableComparable, Writable, WritableComparable, Writable> {
+    
+    @Override
+    public void configure(JobConf job) {
+      boolean oldConfigs = job.getBoolean(OLD_CONFIGS, false);
+      if (oldConfigs) {
+        String javaOpts = job.get(JobConf.MAPRED_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", 
+                      javaOpts);
+        assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + 
+                     javaOpts, 
+                     javaOpts, TASK_OPTS_VAL);
       } else {
-        if (envValue == null || !envValue.equals(expValue)) {
-          throw new  IOException("Wrong env variable in noappend mode");
-        }
+        String reduceJavaOpts = job.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " is null!", 
+                      reduceJavaOpts);
+        assertEquals(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " has value of: " + 
+                     reduceJavaOpts, 
+                     reduceJavaOpts, REDUCE_OPTS_VAL);
       }
+
+      String path = job.get("path");
+      
+      // check if the pwd is there in LD_LIBRARY_PATH
+      String pwd = System.getenv("PWD");
+      
+      assertTrue("LD doesnt contain pwd", 
+                 System.getenv("LD_LIBRARY_PATH").contains(pwd));
+      
+      // check if X=$X:/abc works for LD_LIBRARY_PATH
+      checkEnv("LD_LIBRARY_PATH", "/tmp", "append");
+      // check if X=/tmp works for an already existing parameter
+      checkEnv("HOME", "/tmp", "noappend");
+      // check if X=/tmp for a new env variable
+      checkEnv("MY_PATH", "/tmp", "noappend");
+      // check if X=$X:/tmp works for a new env var and results into :/tmp
+      checkEnv("NEW_PATH", ":/tmp", "noappend");
+      // check if X=$(tt's X var):/tmp for an old env variable inherited from 
+      // the tt
+      checkEnv("PATH",  path + ":/tmp", "noappend");
+
     }
-    
-    public void configure(JobConf conf) {
-      PATH = conf.get("path");
+
+    @Override
+    public void reduce(WritableComparable key, Iterator<Writable> values,
+                       OutputCollector<WritableComparable, Writable> output, 
+                       Reporter reporter)
+        throws IOException {
     }
+    
   }
-
+  
   @Override
   public void setUp() {
     try {
@@ -265,28 +345,11 @@ public class TestMiniMRChildTask extends TestCase {
   public void testTaskEnv(){
     try {
       JobConf conf = mr.createJobConf();
-      
       // initialize input, output directories
       Path inDir = new Path("testing/wc/input1");
       Path outDir = new Path("testing/wc/output1");
-      String input = "The input";
-      
-      configure(conf, inDir, outDir, input, EnvCheckMapper.class);
-
       FileSystem outFs = outDir.getFileSystem(conf);
-      
-      // test 
-      //  - new SET of new var (MY_PATH)
-      //  - set of old var (HOME)
-      //  - append to an old var from modified env (LD_LIBRARY_PATH)
-      //  - append to an old var from tt's env (PATH)
-      //  - append to a new var (NEW_PATH)
-      conf.set("mapred.child.env", 
-               "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp,"
-               + "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
-      conf.set("path", System.getenv("PATH"));
-
-      JobClient.runJob(conf);
+      runTestTaskEnv(conf, inDir, outDir, false);
       outFs.delete(outDir, true);
     } catch(Exception e) {
       e.printStackTrace();
@@ -294,4 +357,62 @@ public class TestMiniMRChildTask extends TestCase {
       tearDown();
     }
   }
+  
+  /**
+   * Test to test if the user set *old* env variables reflect in the child
+   * processes. Mainly
+   *   - x=y (x can be a already existing env variable or a new variable)
+   *   - x=$x:y (replace $x with the current value of x)
+   */
+  public void testTaskOldEnv(){
+    try {
+      JobConf conf = mr.createJobConf();
+      // initialize input, output directories
+      Path inDir = new Path("testing/wc/input1");
+      Path outDir = new Path("testing/wc/output1");
+      FileSystem outFs = outDir.getFileSystem(conf);
+      runTestTaskEnv(conf, inDir, outDir, true);
+      outFs.delete(outDir, true);
+    } catch(Exception e) {
+      e.printStackTrace();
+      fail("Exception in testing child env");
+      tearDown();
+    }
+  }
+  
+  void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs) 
+  throws IOException {
+    String input = "The input";
+    configure(conf, inDir, outDir, input, 
+              EnvCheckMapper.class, EnvCheckReducer.class);
+    // test 
+    //  - new SET of new var (MY_PATH)
+    //  - set of old var (HOME)
+    //  - append to an old var from modified env (LD_LIBRARY_PATH)
+    //  - append to an old var from tt's env (PATH)
+    //  - append to a new var (NEW_PATH)
+    String mapTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV;
+    String reduceTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV;
+    String mapTaskJavaOptsKey = JobConf.MAPRED_MAP_TASK_JAVA_OPTS;
+    String reduceTaskJavaOptsKey = JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS;
+    String mapTaskJavaOpts = MAP_OPTS_VAL;
+    String reduceTaskJavaOpts = REDUCE_OPTS_VAL;
+    conf.setBoolean(OLD_CONFIGS, oldConfigs);
+    if (oldConfigs) {
+      mapTaskEnvKey = reduceTaskEnvKey = JobConf.MAPRED_TASK_ENV;
+      mapTaskJavaOptsKey = reduceTaskJavaOptsKey = JobConf.MAPRED_TASK_JAVA_OPTS;
+      mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL;
+    }
+    conf.set(mapTaskEnvKey, 
+             "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
+             "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
+    conf.set(reduceTaskEnvKey, 
+             "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
+             "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
+    conf.set("path", System.getenv("PATH"));
+    conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
+    conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
+    RunningJob job = JobClient.runJob(conf);
+    assertTrue("The environment checker job failed.", job.isSuccessful());
+  }
 }

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestReduceFetch.java

@@ -121,7 +121,7 @@ public class TestReduceFetch extends TestCase {
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
     job.setInt("mapred.reduce.parallel.copies", 1);
     job.setInt("io.sort.mb", 10);
-    job.set("mapred.child.java.opts", "-Xmx128m");
+    job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
     job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
     job.setNumTasksToExecutePerJvm(1);
     job.set("mapred.job.shuffle.merge.percent", "1.0");

+ 314 - 0
src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Utility methods used in various Job Control unit tests.
+ */
+public class MapReduceTestUtil {
+  public static final Log LOG = 
+    LogFactory.getLog(MapReduceTestUtil.class.getName());
+
+  static private Random rand = new Random();
+
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+
+  /**
+   * Cleans the data from the passed Path in the passed FileSystem.
+   * 
+   * @param fs FileSystem to delete data from.
+   * @param dirPath Path to be deleted.
+   * @throws IOException If an error occurs cleaning the data.
+   */
+  public static void cleanData(FileSystem fs, Path dirPath) 
+      throws IOException {
+    fs.delete(dirPath, true);
+  }
+
+  /**
+   * Generates a string of random digits.
+   * 
+   * @return A random string.
+   */
+  public static String generateRandomWord() {
+    return idFormat.format(rand.nextLong());
+  }
+
+  /**
+   * Generates a line of random text.
+   * 
+   * @return A line of random text.
+   */
+  public static String generateRandomLine() {
+    long r = rand.nextLong() % 7;
+    long n = r + 20;
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < n; i++) {
+      sb.append(generateRandomWord()).append(" ");
+    }
+    sb.append("\n");
+    return sb.toString();
+  }
+
+  /**
+   * Generates random data consisting of 10000 lines.
+   * 
+   * @param fs FileSystem to create data in.
+   * @param dirPath Path to create the data in.
+   * @throws IOException If an error occurs creating the data.
+   */
+  public static void generateData(FileSystem fs, Path dirPath) 
+      throws IOException {
+    FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
+    for (int i = 0; i < 10000; i++) {
+      String line = generateRandomLine();
+      out.write(line.getBytes("UTF-8"));
+    }
+    out.close();
+  }
+
+  /**
+   * Creates a simple copy job.
+   * 
+   * @param conf Configuration object
+   * @param outdir Output directory.
+   * @param indirs Comma separated input directories.
+   * @return Job initialized for a data copy job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  public static Job createCopyJob(Configuration conf, Path outdir, 
+      Path... indirs) throws Exception {
+    conf.setInt("mapred.map.tasks", 3);
+    Job theJob = new Job(conf);
+    theJob.setJobName("DataMoveJob");
+
+    FileInputFormat.setInputPaths(theJob, indirs);
+    theJob.setMapperClass(DataCopyMapper.class);
+    FileOutputFormat.setOutputPath(theJob, outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    theJob.setReducerClass(DataCopyReducer.class);
+    theJob.setNumReduceTasks(1);
+    return theJob;
+  }
+
+  /**
+   * Creates a simple fail job.
+   * 
+   * @param conf Configuration object
+   * @param outdir Output directory.
+   * @param indirs Comma separated input directories.
+   * @return Job initialized for a simple fail job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  public static Job createFailJob(Configuration conf, Path outdir, 
+      Path... indirs) throws Exception {
+
+    conf.setInt("mapred.map.max.attempts", 2);
+    Job theJob = new Job(conf);
+    theJob.setJobName("Fail-Job");
+
+    FileInputFormat.setInputPaths(theJob, indirs);
+    theJob.setMapperClass(FailMapper.class);
+    theJob.setReducerClass(Reducer.class);
+    theJob.setNumReduceTasks(0);
+    FileOutputFormat.setOutputPath(theJob, outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    return theJob;
+  }
+
+  /**
+   * Creates a simple fail job.
+   * 
+   * @param conf Configuration object
+   * @param outdir Output directory.
+   * @param indirs Comma separated input directories.
+   * @return Job initialized for a simple kill job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  public static Job createKillJob(Configuration conf, Path outdir, 
+      Path... indirs) throws Exception {
+
+    Job theJob = new Job(conf);
+    theJob.setJobName("Kill-Job");
+
+    FileInputFormat.setInputPaths(theJob, indirs);
+    theJob.setMapperClass(KillMapper.class);
+    theJob.setReducerClass(Reducer.class);
+    theJob.setNumReduceTasks(0);
+    FileOutputFormat.setOutputPath(theJob, outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    return theJob;
+  }
+
+  /**
+   * Simple Mapper and Reducer implementation which copies data it reads in.
+   */
+  public static class DataCopyMapper extends 
+      Mapper<LongWritable, Text, Text, Text> {
+    public void map(LongWritable key, Text value, Context context) 
+        throws IOException, InterruptedException {
+      context.write(new Text(key.toString()), value);
+    }
+  }
+
+  public static class DataCopyReducer extends Reducer<Text, Text, Text, Text> {
+    public void reduce(Text key, Iterator<Text> values, Context context)
+    throws IOException, InterruptedException {
+      Text dumbKey = new Text("");
+      while (values.hasNext()) {
+        Text data = (Text) values.next();
+        context.write(dumbKey, data);
+      }
+    }
+  }
+
+  // Mapper that fails
+  public static class FailMapper extends 
+    Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+    public void map(WritableComparable<?> key, Writable value, Context context)
+        throws IOException {
+      throw new RuntimeException("failing map");
+    }
+  }
+
+  // Mapper that sleeps for a long time.
+  // Used for running a job that will be killed
+  public static class KillMapper extends 
+    Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+    public void map(WritableComparable<?> key, Writable value, Context context)
+        throws IOException {
+      try {
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        // Do nothing
+      }
+    }
+  }
+
+  public static Job createJob(Configuration conf, Path inDir, Path outDir, 
+      int numInputFiles, int numReds) throws IOException {
+    String input = "The quick brown fox\n" + "has many silly\n"
+      + "red fox sox\n";
+    return createJob(conf, inDir, outDir, numInputFiles, numReds, input);
+  }
+
+  public static Job createJob(Configuration conf, Path inDir, Path outDir, 
+      int numInputFiles, int numReds, String input) throws IOException {
+    Job job = new Job(conf);
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if (fs.exists(inDir)) {
+      fs.delete(inDir, true);
+    }
+    fs.mkdirs(inDir);
+    for (int i = 0; i < numInputFiles; ++i) {
+      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+      file.writeBytes(input);
+      file.close();
+    }    
+
+    FileInputFormat.setInputPaths(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    job.setNumReduceTasks(numReds);
+    return job;
+  }
+
+  public static TaskAttemptContext createDummyMapTaskAttemptContext(
+      Configuration conf) {
+    TaskAttemptID tid = new TaskAttemptID("jt", 1, true, 0, 0);
+    conf.set("mapred.task.id", tid.toString());
+    return new TaskAttemptContext(conf, tid);    
+  }
+
+  public static StatusReporter createDummyReporter() {
+    return new StatusReporter() {
+      public void setStatus(String s) {
+      }
+      public void progress() {
+      }
+      public Counter getCounter(Enum<?> name) {
+        return new Counters().findCounter(name);
+      }
+      public Counter getCounter(String group, String name) {
+        return new Counters().findCounter(group, name);
+      }
+    };
+  }
+  
+  public static String readOutput(Path outDir, Configuration conf) 
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    StringBuffer result = new StringBuffer();
+
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+           new OutputLogFilter()));
+    for (Path outputFile : fileList) {
+      LOG.info("Path" + ": "+ outputFile);
+      BufferedReader file = 
+        new BufferedReader(new InputStreamReader(fs.open(outputFile)));
+      String line = file.readLine();
+      while (line != null) {
+        result.append(line);
+        result.append("\n");
+        line = file.readLine();
+      }
+      file.close();
+    }
+    return result.toString();
+  }
+
+}

+ 142 - 0
src/test/org/apache/hadoop/mapreduce/TestChild.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobConf;
+
+public class TestChild extends HadoopTestCase {
+  private static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data","/tmp"))
+    .toURI().toString().replace(' ', '+');
+  private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input");
+  private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output");
+  
+  private final static String OLD_CONFIGS = "test.old.configs";
+  private final static String TASK_OPTS_VAL = "-Xmx200m";
+  private final static String MAP_OPTS_VAL = "-Xmx200m";
+  private final static String REDUCE_OPTS_VAL = "-Xmx300m";
+  
+  public TestChild() throws IOException {
+    super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2);
+  }
+
+  static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      Configuration conf = context.getConfiguration();
+      boolean oldConfigs = conf.getBoolean(OLD_CONFIGS, false);
+      if (oldConfigs) {
+        String javaOpts = conf.get(JobConf.MAPRED_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", 
+                      javaOpts);
+        assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + 
+                     javaOpts, 
+                     javaOpts, TASK_OPTS_VAL);
+      } else {
+        String mapJavaOpts = conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " is null!", 
+                      mapJavaOpts);
+        assertEquals(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " has value of: " + 
+                     mapJavaOpts, 
+                     mapJavaOpts, MAP_OPTS_VAL);
+      }
+    }
+  }
+  
+  static class MyReducer 
+  extends Reducer<LongWritable, Text, LongWritable, Text> {
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      boolean oldConfigs = conf.getBoolean(OLD_CONFIGS, false);
+      if (oldConfigs) {
+        String javaOpts = conf.get(JobConf.MAPRED_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", 
+                      javaOpts);
+        assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + 
+                     javaOpts, 
+                     javaOpts, TASK_OPTS_VAL);
+      } else {
+        String reduceJavaOpts = conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS);
+        assertNotNull(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " is null!", 
+                      reduceJavaOpts);
+        assertEquals(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " has value of: " + 
+                     reduceJavaOpts, 
+                     reduceJavaOpts, REDUCE_OPTS_VAL);
+      }
+    }
+  }
+  
+  private Job submitAndValidateJob(JobConf conf, int numMaps, int numReds, 
+                                   boolean oldConfigs) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    conf.setBoolean(OLD_CONFIGS, oldConfigs);
+    if (oldConfigs) {
+      conf.set(JobConf.MAPRED_TASK_JAVA_OPTS, TASK_OPTS_VAL);
+    } else {
+      conf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, MAP_OPTS_VAL);
+      conf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, REDUCE_OPTS_VAL);
+    }
+    
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 
+                numMaps, numReds);
+    job.setMapperClass(MyMapper.class);
+    job.setReducerClass(MyReducer.class);
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+
+    // Check output directory
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+    FileStatus[] list = fs.listStatus(outDir, new OutputFilter());
+    int numPartFiles = numReds == 0 ? numMaps : numReds;
+    assertTrue("Number of part-files is " + list.length + " and not "
+        + numPartFiles, list.length == numPartFiles);
+    return job;
+  }
+  
+  public void testChild() throws Exception {
+    try {
+      submitAndValidateJob(createJobConf(), 1, 1, true);
+      submitAndValidateJob(createJobConf(), 1, 1, false);
+    } finally {
+      tearDown();
+    }
+  }
+  
+  private static class OutputFilter implements PathFilter {
+    public boolean accept(Path path) {
+      return !(path.getName().startsWith("_"));
+    }
+  }
+}