瀏覽代碼

Merge -r 652178:652179 from trunk to branch-0.17 to fix HADOOP-3280.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@652182 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 17 年之前
父節點
當前提交
303ccc1a2e

+ 5 - 0
CHANGES.txt

@@ -92,6 +92,11 @@ Release 0.17.0 - Unreleased
     HADOOP-3266. Removed HOD changes from CHANGES.txt, as they are now inside 
     src/contrib/hod  (Hemanth Yamijala via ddas)
 
+    HADOOP-3280. Separate the configuration of the virtual memory size
+    (mapred.child.ulimit) from the jvm heap size, so that 64 bit
+    streaming applications are supported even when running with 32 bit
+    jvms. (acmurthy via omalley)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

+ 17 - 2
conf/hadoop-default.xml

@@ -736,8 +736,23 @@ creations/deletions), or "all".</description>
   For example, to enable verbose gc logging to a file named for the taskid in
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
-  The value of -Xmx will also directly influence the amount of virtual memory
-  that a streaming/pipes task gets during execution.
+  
+  The configuration variable mapred.child.ulimit can be used to control the
+  maximum virtual memory of the child processes. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.child.ulimit</name>
+  <value></value>
+  <description>The maximum virtual memory, in KB, of a process launched by the 
+  Map-Reduce framework. This can be used to control both the Mapper/Reducer 
+  tasks and applications using Hadoop Pipes, Hadoop Streaming etc. 
+  By default it is left unspecified to let cluster admins control it via 
+  limits.conf and other such relevant mechanisms.
+  
+  Note: mapred.child.ulimit must be greater than or equal to the -Xmx passed to
+  JavaVM, else the VM might not start. 
   </description>
 </property>
 

+ 6 - 40
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
@@ -165,23 +166,11 @@ public abstract class PipeMapRed {
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
       // add TMPDIR environment variable with the value of java.io.tmpdir
       envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
-      if (StreamUtil.isCygwin()) {
-        sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
-      } else {
-        List<String> cmd = new ArrayList<String>();
-        for (String arg : argvSplit) {
-          cmd.add(arg);
-        }
-        // set memory limit using ulimit.
-        ProcessBuilder builder;
-        List<String> setup = new ArrayList<String>();
-        setup.add("ulimit");
-        setup.add("-v"); 
-        setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
-        builder = new ProcessBuilder(wrapCommand(setup, cmd));
-        builder.environment().putAll(childEnv.toMap());
-        sim = builder.start();
-      }
+
+      // Start the process
+      ProcessBuilder builder = new ProcessBuilder(argvSplit);
+      builder.environment().putAll(childEnv.toMap());
+      sim = builder.start();
 
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
       clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
@@ -196,29 +185,6 @@ public abstract class PipeMapRed {
       throw new RuntimeException("configuration exception", e);
     }
   }
-
-  /**
-   * Wrap command with bash -c with setup commands.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @return the modified command that should be run
-   */
-  private List<String> wrapCommand( List<String> setup,
-                                    List<String> cmd 
-                                   ) throws IOException {
-    List<String> result = new ArrayList<String>();
-    result.add("bash");
-    result.add("-c");
-    StringBuffer mergedCmd = new StringBuffer();
-    mergedCmd.append(TaskLog.addCommand(setup, false));
-    mergedCmd.append(";");
-    mergedCmd.append("exec ");
-    mergedCmd.append(TaskLog.addCommand(cmd, true));
-    result.add(mergedCmd.toString());
-    return result;
-  }
   
   void setStreamJobDetails(JobConf job) {
     jobLog_ = job.get("stream.jobLog_");

+ 0 - 44
src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java

@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-
-/** A minimal Java implementation of /bin/cat
- *  The class also tries to allocate a huge array( 10MB) to test ulimits.
- *  Look at {@link TestUlimit}
- */
-public class CatApp {
-  public static void main(String args[]) throws IOException{
-    char s[] = null;
-    try {
-      s = new char[10*1024*1024];
-      BufferedReader in = new BufferedReader(
-                              new InputStreamReader(System.in));
-      String line;
-      while ((line = in.readLine()) != null) {
-        System.out.println(line);
-      }
-    } finally {
-      if (s == null) {
-        System.exit(-1);
-      }
-    }
-  }
-}

+ 11 - 24
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java

@@ -41,9 +41,6 @@ import junit.framework.TestCase;
  * is expected to be a failure.  
  */
 public class TestUlimit extends TestCase {
-  private static final Log LOG =
-         LogFactory.getLog(TestUlimit.class.getName());
-  enum RESULT { FAILURE, SUCCESS };
   String input = "the dummy input";
   Path inputPath = new Path("/testing/in");
   Path outputPath = new Path("/testing/out");
@@ -51,6 +48,7 @@ public class TestUlimit extends TestCase {
   MiniDFSCluster dfs = null;
   MiniMRCluster mr = null;
   FileSystem fs = null;
+  private static String SET_MEMORY_LIMIT = "786432"; // 768MB
 
   String[] genArgs(String memLimit) {
     return new String[] {
@@ -59,10 +57,11 @@ public class TestUlimit extends TestCase {
       "-mapper", map,
       "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
       "-numReduceTasks", "0",
-      "-jobconf", "mapred.child.java.opts=" + memLimit,
+      "-jobconf", "mapred.child.ulimit=" + memLimit,
       "-jobconf", "mapred.job.tracker=" + "localhost:" +
                                            mr.getJobTrackerPort(),
-      "-jobconf", "fs.default.name=" + "localhost:" + dfs.getNameNodePort(),
+      "-jobconf", "fs.default.name=" + "hdfs://localhost:" 
+                   + dfs.getNameNodePort(),
       "-jobconf", "stream.tmpdir=" + 
                    System.getProperty("test.build.data","/tmp")
     };
@@ -87,12 +86,10 @@ public class TestUlimit extends TestCase {
       
       mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
       writeInputFile(fs, inputPath);
-      map = StreamUtil.makeJavaCommand(CatApp.class, new String[]{});  
-      runProgram("-Xmx2048m", RESULT.SUCCESS);
+      map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});  
+      runProgram(SET_MEMORY_LIMIT);
       FileUtil.fullyDelete(fs, outputPath);
       assertFalse("output not cleaned up", fs.exists(outputPath));
-      // 100MB is not sufficient for launching jvm. This launch should fail.
-      runProgram("-Xmx0.5m", RESULT.FAILURE);
       mr.waitUntilIdle();
     } catch(IOException e) {
       fail(e.toString());
@@ -114,24 +111,14 @@ public class TestUlimit extends TestCase {
    * @param result Expected result
    * @throws IOException
    */
-  private void runProgram(String memLimit, RESULT result
-                          ) throws IOException {
+  private void runProgram(String memLimit) throws IOException {
     boolean mayExit = false;
-    int ret = 1;
-    try {
-      StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
-      ret = job.go();
-    } catch (IOException ioe) {
-      LOG.warn("Job Failed! " + StringUtils.stringifyException(ioe));
-      ioe.printStackTrace();
-    }
+    StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
+    job.go();
     String output = TestMiniMRWithDFS.readOutput(outputPath,
                                         mr.createJobConf());
-    if (RESULT.SUCCESS.name().equals(result.name())){
-      assertEquals("output is wrong", input, output.trim());
-    } else {
-      assertTrue("output is correct", !input.equals(output.trim()));
-    }
+    assertEquals("output is wrong", SET_MEMORY_LIMIT,
+                                    output.trim());
   }
   
   public static void main(String[]args) throws Exception

+ 45 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/** 
+ *  The UlimitApp discards the input
+ *  and exec's ulimit -v to know the ulimit value.
+ *  And writes the output to the standard out. 
+ *  @see {@link TestUlimit}
+ */
+public class UlimitApp {
+  public static void main(String args[]) throws IOException{
+    BufferedReader in = new BufferedReader(
+                            new InputStreamReader(System.in));
+    String line = null;
+    while ((line = in.readLine()) != null) {}
+
+    Process process = Runtime.getRuntime().exec(new String[]{
+                                 "bash", "-c", "ulimit -v"});
+    InputStream is = process.getInputStream();
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+    while ((line = br.readLine()) != null) {
+      System.out.println(line);
+    }
+  }
+}

+ 1 - 2
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -272,8 +272,7 @@
                     <td>mapred.child.java.opts</td>
                     <td>-Xmx512M</td>
                     <td>
-                      Larger heap-size for child jvms of maps/reduces. Also controls the amount 
-                      of virtual memory that a streaming/pipes task gets.
+                      Larger heap-size for child jvms of maps/reduces. 
                     </td>
                   </tr>
                   <tr>

+ 3 - 0
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1065,6 +1065,9 @@
           <code>&lt;/property&gt;</code>
         </p>
         
+        <p>Users/admins can also specify the maximum virtual memory 
+        of the launched child-task using <code>mapred.child.ulimit</code>.</p>
+        
         <p>When the job starts, the localized job directory
         <code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
         has the following directories: </p>

+ 13 - 1
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -24,8 +24,10 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
+
 import java.io.*;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Vector;
 import java.net.URI;
@@ -368,12 +370,22 @@ abstract class TaskRunner extends Thread {
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid);                      // pass task identifier
 
+      // set memory limit using ulimit if feasible and necessary ...
+      String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+      List<String> setup = null;
+      if (ulimitCmd != null) {
+        setup = new ArrayList<String>();
+        for (String arg : ulimitCmd) {
+          setup.add(arg);
+        }
+      }
+
       // Set up the redirection of the task's stdout and stderr streams
       File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
       stdout.getParentFile().mkdirs();
       List<String> wrappedCommand = 
-        TaskLog.captureOutAndError(vargs, stdout, stderr, logSize);
+        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
       
       // Run the task as child of the parent TaskTracker process
       runChild(wrappedCommand, workDir, taskid);

+ 2 - 10
src/java/org/apache/hadoop/mapred/pipes/Application.java

@@ -84,16 +84,8 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
-    // set memory limit using ulimit.
-    if (!WINDOWS) {
-      List<String> setup = new ArrayList<String>();
-      setup.add("ulimit");
-      setup.add("-v"); 
-      setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
-      cmd = TaskLog.captureOutAndError(setup, cmd, stdout, stderr, logLength);
-    } else {
-      cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
-    }
+    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
     handler = new OutputHandler<K2, V2>(output, reporter);

+ 34 - 0
src/java/org/apache/hadoop/util/Shell.java

@@ -25,6 +25,7 @@ import java.io.BufferedReader;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
 
 /** 
  * A base class for running a Unix command.
@@ -54,6 +55,39 @@ abstract public class Shell {
     return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
   }
 
+  /** 
+   * Get the Unix command for setting the maximum virtual memory available
+   * to a given child process. This is only relevant when we are forking a
+   * process from within the {@link org.apache.hadoop.mapred.Mapper} or the 
+   * {@link org.apache.hadoop.mapred.Reducer} implementations 
+   * e.g. <a href="{@docRoot}/org/apache/hadoop/mapred/pipes/package-summary.html">Hadoop Pipes</a> 
+   * or <a href="{@docRoot}/org/apache/hadoop/streaming/package-summary.html">Hadoop Streaming</a>.
+   * 
+   * It also checks to ensure that we are running on a *nix platform else 
+   * (e.g. in Cygwin/Windows) it returns <code>null</code>.
+   * @param job job configuration
+   * @return a <code>String[]</code> with the ulimit command arguments or 
+   *         <code>null</code> if we are running on a non *nix platform or
+   *         if the limit is unspecified.
+   */
+  public static String[] getUlimitMemoryCommand(JobConf job) {
+    // ulimit isn't supported on Windows
+    if (WINDOWS) {
+      return null;
+    }
+    
+    // get the memory limit from the JobConf
+    String ulimit = job.get("mapred.child.ulimit");
+    if (ulimit == null) {
+      return null;
+    }
+    
+    // Parse it to ensure it is legal/sane
+    int memoryLimit = Integer.valueOf(ulimit);
+
+    return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)};
+  }
+  
   /** Set to true on Windows platforms */
   public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
                 = System.getProperty("os.name").startsWith("Windows");