Parcourir la source

HADOOP-3280. Separate the configuration of the virtual memory size
(mapred.child.ulimit) from the jvm heap size, so that 64 bit
streaming applications are supported even when running with 32 bit
jvms. Contributed by acmurthy.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@652179 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley il y a 17 ans
Parent
commit
ed3433072e

+ 5 - 0
CHANGES.txt

@@ -205,6 +205,11 @@ Release 0.17.0 - Unreleased
     HADOOP-3266. Removed HOD changes from CHANGES.txt, as they are now inside 
     src/contrib/hod  (Hemanth Yamijala via ddas)
 
+    HADOOP-3280. Separate the configuration of the virtual memory size
+    (mapred.child.ulimit) from the jvm heap size, so that 64 bit
+    streaming applications are supported even when running with 32 bit
+    jvms. (acmurthy via omalley)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

+ 17 - 2
conf/hadoop-default.xml

@@ -753,8 +753,23 @@ creations/deletions), or "all".</description>
   For example, to enable verbose gc logging to a file named for the taskid in
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
-  The value of -Xmx will also directly influence the amount of virtual memory
-  that a streaming/pipes task gets during execution.
+  
+  The configuration variable mapred.child.ulimit can be used to control the
+  maximum virtual memory of the child processes. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.child.ulimit</name>
+  <value></value>
+  <description>The maximum virtual memory, in KB, of a process launched by the 
+  Map-Reduce framework. This can be used to control both the Mapper/Reducer 
+  tasks and applications using Hadoop Pipes, Hadoop Streaming etc. 
+  By default it is left unspecified to let cluster admins control it via 
+  limits.conf and other such relevant mechanisms.
+  
+  Note: mapred.child.ulimit must be greater than or equal to the -Xmx passed to
+  JavaVM, else the VM might not start. 
   </description>
 </property>
 

+ 6 - 40
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
@@ -165,23 +166,11 @@ public abstract class PipeMapRed {
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
       // add TMPDIR environment variable with the value of java.io.tmpdir
       envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
-      if (StreamUtil.isCygwin()) {
-        sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
-      } else {
-        List<String> cmd = new ArrayList<String>();
-        for (String arg : argvSplit) {
-          cmd.add(arg);
-        }
-        // set memory limit using ulimit.
-        ProcessBuilder builder;
-        List<String> setup = new ArrayList<String>();
-        setup.add("ulimit");
-        setup.add("-v"); 
-        setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
-        builder = new ProcessBuilder(wrapCommand(setup, cmd));
-        builder.environment().putAll(childEnv.toMap());
-        sim = builder.start();
-      }
+
+      // Start the process
+      ProcessBuilder builder = new ProcessBuilder(argvSplit);
+      builder.environment().putAll(childEnv.toMap());
+      sim = builder.start();
 
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
       clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
@@ -196,29 +185,6 @@ public abstract class PipeMapRed {
       throw new RuntimeException("configuration exception", e);
     }
   }
-
-  /**
-   * Wrap command with bash -c with setup commands.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @return the modified command that should be run
-   */
-  private List<String> wrapCommand( List<String> setup,
-                                    List<String> cmd 
-                                   ) throws IOException {
-    List<String> result = new ArrayList<String>();
-    result.add("bash");
-    result.add("-c");
-    StringBuffer mergedCmd = new StringBuffer();
-    mergedCmd.append(TaskLog.addCommand(setup, false));
-    mergedCmd.append(";");
-    mergedCmd.append("exec ");
-    mergedCmd.append(TaskLog.addCommand(cmd, true));
-    result.add(mergedCmd.toString());
-    return result;
-  }
   
   void setStreamJobDetails(JobConf job) {
     jobLog_ = job.get("stream.jobLog_");

+ 0 - 44
src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java

@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-
-/** A minimal Java implementation of /bin/cat
- *  The class also tries to allocate a huge array( 10MB) to test ulimits.
- *  Look at {@link TestUlimit}
- */
-public class CatApp {
-  public static void main(String args[]) throws IOException{
-    char s[] = null;
-    try {
-      s = new char[10*1024*1024];
-      BufferedReader in = new BufferedReader(
-                              new InputStreamReader(System.in));
-      String line;
-      while ((line = in.readLine()) != null) {
-        System.out.println(line);
-      }
-    } finally {
-      if (s == null) {
-        System.exit(-1);
-      }
-    }
-  }
-}

+ 11 - 24
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java

@@ -41,9 +41,6 @@ import junit.framework.TestCase;
  * is expected to be a failure.  
  */
 public class TestUlimit extends TestCase {
-  private static final Log LOG =
-         LogFactory.getLog(TestUlimit.class.getName());
-  enum RESULT { FAILURE, SUCCESS };
   String input = "the dummy input";
   Path inputPath = new Path("/testing/in");
   Path outputPath = new Path("/testing/out");
@@ -51,6 +48,7 @@ public class TestUlimit extends TestCase {
   MiniDFSCluster dfs = null;
   MiniMRCluster mr = null;
   FileSystem fs = null;
+  private static String SET_MEMORY_LIMIT = "786432"; // 768MB
 
   String[] genArgs(String memLimit) {
     return new String[] {
@@ -59,10 +57,11 @@ public class TestUlimit extends TestCase {
       "-mapper", map,
       "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
       "-numReduceTasks", "0",
-      "-jobconf", "mapred.child.java.opts=" + memLimit,
+      "-jobconf", "mapred.child.ulimit=" + memLimit,
       "-jobconf", "mapred.job.tracker=" + "localhost:" +
                                            mr.getJobTrackerPort(),
-      "-jobconf", "fs.default.name=" + "localhost:" + dfs.getNameNodePort(),
+      "-jobconf", "fs.default.name=" + "hdfs://localhost:" 
+                   + dfs.getNameNodePort(),
       "-jobconf", "stream.tmpdir=" + 
                    System.getProperty("test.build.data","/tmp")
     };
@@ -87,12 +86,10 @@ public class TestUlimit extends TestCase {
       
       mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
       writeInputFile(fs, inputPath);
-      map = StreamUtil.makeJavaCommand(CatApp.class, new String[]{});  
-      runProgram("-Xmx2048m", RESULT.SUCCESS);
+      map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});  
+      runProgram(SET_MEMORY_LIMIT);
       FileUtil.fullyDelete(fs, outputPath);
       assertFalse("output not cleaned up", fs.exists(outputPath));
-      // 100MB is not sufficient for launching jvm. This launch should fail.
-      runProgram("-Xmx0.5m", RESULT.FAILURE);
       mr.waitUntilIdle();
     } catch(IOException e) {
       fail(e.toString());
@@ -114,24 +111,14 @@ public class TestUlimit extends TestCase {
    * @param result Expected result
    * @throws IOException
    */
-  private void runProgram(String memLimit, RESULT result
-                          ) throws IOException {
+  private void runProgram(String memLimit) throws IOException {
     boolean mayExit = false;
-    int ret = 1;
-    try {
-      StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
-      ret = job.go();
-    } catch (IOException ioe) {
-      LOG.warn("Job Failed! " + StringUtils.stringifyException(ioe));
-      ioe.printStackTrace();
-    }
+    StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
+    job.go();
     String output = TestMiniMRWithDFS.readOutput(outputPath,
                                         mr.createJobConf());
-    if (RESULT.SUCCESS.name().equals(result.name())){
-      assertEquals("output is wrong", input, output.trim());
-    } else {
-      assertTrue("output is correct", !input.equals(output.trim()));
-    }
+    assertEquals("output is wrong", SET_MEMORY_LIMIT,
+                                    output.trim());
   }
   
   public static void main(String[]args) throws Exception

+ 45 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/** 
+ *  The UlimitApp discards the input
+ *  and exec's ulimit -v to know the ulimit value.
+ *  And writes the output to the standard out. 
+ *  @see {@link TestUlimit}
+ */
+public class UlimitApp {
+  public static void main(String args[]) throws IOException{
+    BufferedReader in = new BufferedReader(
+                            new InputStreamReader(System.in));
+    String line = null;
+    while ((line = in.readLine()) != null) {}
+
+    Process process = Runtime.getRuntime().exec(new String[]{
+                                 "bash", "-c", "ulimit -v"});
+    InputStream is = process.getInputStream();
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+    while ((line = br.readLine()) != null) {
+      System.out.println(line);
+    }
+  }
+}

+ 1 - 2
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -272,8 +272,7 @@
                     <td>mapred.child.java.opts</td>
                     <td>-Xmx512M</td>
                     <td>
-                      Larger heap-size for child jvms of maps/reduces. Also controls the amount 
-                      of virtual memory that a streaming/pipes task gets.
+                      Larger heap-size for child jvms of maps/reduces. 
                     </td>
                   </tr>
                   <tr>

+ 3 - 0
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1065,6 +1065,9 @@
           <code>&lt;/property&gt;</code>
         </p>
         
+        <p>Users/admins can also specify the maximum virtual memory 
+        of the launched child-task using <code>mapred.child.ulimit</code>.</p>
+        
         <p>When the job starts, the localized job directory
         <code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
         has the following directories: </p>

+ 13 - 1
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -24,8 +24,10 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
+
 import java.io.*;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Vector;
 import java.net.URI;
@@ -368,12 +370,22 @@ abstract class TaskRunner extends Thread {
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid);                      // pass task identifier
 
+      // set memory limit using ulimit if feasible and necessary ...
+      String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+      List<String> setup = null;
+      if (ulimitCmd != null) {
+        setup = new ArrayList<String>();
+        for (String arg : ulimitCmd) {
+          setup.add(arg);
+        }
+      }
+
       // Set up the redirection of the task's stdout and stderr streams
       File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
       stdout.getParentFile().mkdirs();
       List<String> wrappedCommand = 
-        TaskLog.captureOutAndError(vargs, stdout, stderr, logSize);
+        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
       
       // Run the task as child of the parent TaskTracker process
       runChild(wrappedCommand, workDir, taskid);

+ 2 - 10
src/java/org/apache/hadoop/mapred/pipes/Application.java

@@ -84,16 +84,8 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
-    // set memory limit using ulimit.
-    if (!WINDOWS) {
-      List<String> setup = new ArrayList<String>();
-      setup.add("ulimit");
-      setup.add("-v"); 
-      setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
-      cmd = TaskLog.captureOutAndError(setup, cmd, stdout, stderr, logLength);
-    } else {
-      cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
-    }
+    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
     handler = new OutputHandler<K2, V2>(output, reporter);

+ 34 - 0
src/java/org/apache/hadoop/util/Shell.java

@@ -25,6 +25,7 @@ import java.io.BufferedReader;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
 
 /** 
  * A base class for running a Unix command.
@@ -54,6 +55,39 @@ abstract public class Shell {
     return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
   }
 
+  /** 
+   * Get the Unix command for setting the maximum virtual memory available
+   * to a given child process. This is only relevant when we are forking a
+   * process from within the {@link org.apache.hadoop.mapred.Mapper} or the 
+   * {@link org.apache.hadoop.mapred.Reducer} implementations 
+   * e.g. <a href="{@docRoot}/org/apache/hadoop/mapred/pipes/package-summary.html">Hadoop Pipes</a> 
+   * or <a href="{@docRoot}/org/apache/hadoop/streaming/package-summary.html">Hadoop Streaming</a>.
+   * 
+   * It also checks to ensure that we are running on a *nix platform else 
+   * (e.g. in Cygwin/Windows) it returns <code>null</code>.
+   * @param job job configuration
+   * @return a <code>String[]</code> with the ulimit command arguments or 
+   *         <code>null</code> if we are running on a non *nix platform or
+   *         if the limit is unspecified.
+   */
+  public static String[] getUlimitMemoryCommand(JobConf job) {
+    // ulimit isn't supported on Windows
+    if (WINDOWS) {
+      return null;
+    }
+    
+    // get the memory limit from the JobConf
+    String ulimit = job.get("mapred.child.ulimit");
+    if (ulimit == null) {
+      return null;
+    }
+    
+    // Parse it to ensure it is legal/sane
+    int memoryLimit = Integer.valueOf(ulimit);
+
+    return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)};
+  }
+  
   /** Set to true on Windows platforms */
   public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
                 = System.getProperty("os.name").startsWith("Windows");