Browse Source

HADOOP-2765. Enables specifying ulimits for streaming/pipes tasks. Contributed by Amareshwari Sri Ramadasu and Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@636044 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
76ba132bf7

+ 2 - 0
CHANGES.txt

@@ -77,6 +77,8 @@ Trunk (unreleased changes)
     HADOOP-2057.  Streaming should optionally treat a non-zero exit status
     of a child process as a failed task.  (Rick Cox via tomwhite)
 
+    HADOOP-2765. Enables specifying ulimits for streaming/pipes tasks (ddas)
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

+ 2 - 0
conf/hadoop-default.xml

@@ -704,6 +704,8 @@ creations/deletions), or "all".</description>
   For example, to enable verbose gc logging to a file named for the taskid in
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+  The value of -Xmx will also directly influence the amount of virtual memory
+  that a streaming/pipes task gets during execution.
   </description>
 </property>
 

+ 2 - 1
docs/cluster_setup.html

@@ -505,7 +505,8 @@ document.write("Last Published: " + document.lastModified);
 <td colspan="1" rowspan="1">mapred.child.java.opts</td>
                     <td colspan="1" rowspan="1">-Xmx512M</td>
                     <td colspan="1" rowspan="1">
-                      Larger heap-size for child jvms of maps/reduces.
+                      Larger heap-size for child jvms of maps/reduces. Also controls the amount 
+                      of virtual memory that a streaming/pipes task gets.
                     </td>
                   
 </tr>

File diff suppressed because it is too large
+ 3 - 3
docs/cluster_setup.pdf


+ 6 - 4
docs/hadoop-default.html

@@ -68,6 +68,10 @@ creations/deletions), or "all".</td>
                for compression/decompression.</td>
 </tr>
 <tr>
+<td><a name="io.serializations">io.serializations</a></td><td>org.apache.hadoop.io.serializer.WritableSerialization</td><td>A list of serialization classes that can be used for
+  obtaining serializers and deserializers.</td>
+</tr>
+<tr>
 <td><a name="fs.default.name">fs.default.name</a></td><td>file:///</td><td>The name of the default file system.  A URI whose
   scheme and authority determine the FileSystem implementation.  The
   uri's scheme determines the config property (fs.SCHEME.impl) naming
@@ -429,6 +433,8 @@ creations/deletions), or "all".</td>
   For example, to enable verbose gc logging to a file named for the taskid in
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+  The value of -Xmx will also directly influence the amount of virtual memory
+  that a streaming/pipes task gets during execution.
   </td>
 </tr>
 <tr>
@@ -538,10 +544,6 @@ creations/deletions), or "all".</td>
   </td>
 </tr>
 <tr>
-<td><a name="io.seqfile.compression.type">io.seqfile.compression.type</a></td><td>RECORD</td><td>The default compression type for SequenceFile.Writer.
-  </td>
-</tr>
-<tr>
 <td><a name="map.sort.class">map.sort.class</a></td><td>org.apache.hadoop.mapred.MergeSorter</td><td>The default sort class for sorting keys.
   </td>
 </tr>

+ 1 - 1
src/contrib/build-contrib.xml

@@ -193,7 +193,7 @@
     <mkdir dir="${hadoop.log.dir}"/>
     <junit
       printsummary="yes" showoutput="${test.output}" 
-      haltonfailure="no" fork="yes" maxmemory="256m"
+      haltonfailure="no" fork="yes" maxmemory="1280m"
       errorProperty="tests.failed" failureProperty="tests.failed"
       timeout="${test.timeout}">
       

+ 11 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java

@@ -89,6 +89,17 @@ public class Environment extends Properties {
     return arr;
   }
 
+  public Map<String, String> toMap() {
+    Map<String, String> map = new HashMap<String, String>();
+    Enumeration<Object> it = super.keys();
+    while (it.hasMoreElements()) {
+      String key = (String) it.nextElement();
+      String val = (String) get(key);
+      map.put(key, val);
+    }
+    return map;
+  }
+  
   public String getHost() {
     String host = getProperty("HOST");
     if (host == null) {

+ 42 - 8
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -22,6 +22,7 @@ import java.io.*;
 import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.Arrays;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
@@ -164,14 +166,23 @@ public abstract class PipeMapRed {
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
       // add TMPDIR environment variable with the value of java.io.tmpdir
       envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
-      sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
-
-      /* // This way required jdk1.5
-         Builder processBuilder = new ProcessBuilder(argvSplit);
-         Map<String, String> env = processBuilder.environment();
-         addEnvironment(env, job_.get("stream.addenvironment"));
-         sim = processBuilder.start();
-      */
+      if (StreamUtil.isCygwin()) {
+        sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
+      } else {
+        List<String> cmd = new ArrayList<String>();
+        for (String arg : argvSplit) {
+          cmd.add(arg);
+        }
+        // set memory limit using ulimit.
+        ProcessBuilder builder;
+        List<String> setup = new ArrayList<String>();
+        setup.add("ulimit");
+        setup.add("-v"); 
+        setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
+        builder = new ProcessBuilder(wrapCommand(setup, cmd));
+        builder.environment().putAll(childEnv.toMap());
+        sim = builder.start();
+      }
 
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
       clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
@@ -185,6 +196,29 @@ public abstract class PipeMapRed {
     }
   }
 
+  /**
+   * Wrap command with bash -c with setup commands.
+   * Setup commands such as setting memory limit can be passed which 
+   * will be executed before exec.
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @return the modified command that should be run
+   */
+  private List<String> wrapCommand( List<String> setup,
+                                    List<String> cmd 
+                                   ) throws IOException {
+    List<String> result = new ArrayList<String>();
+    result.add("bash");
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    mergedCmd.append(TaskLog.addCommand(setup, false));
+    mergedCmd.append(";");
+    mergedCmd.append("exec ");
+    mergedCmd.append(TaskLog.addCommand(cmd, true));
+    result.add(mergedCmd.toString());
+    return result;
+  }
+  
   void setStreamJobDetails(JobConf job) {
     jobLog_ = job.get("stream.jobLog_");
     String s = job.get("stream.minRecWrittenToEnableSkip_");

+ 44 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/** A minimal Java implementation of /bin/cat
+ *  The class also tries to allocate a huge array( 10MB) to test ulimits.
+ *  Look at {@link TestUlimit}
+ */
+public class CatApp {
+  public static void main(String args[]) throws IOException{
+    char s[] = null;
+    try {
+      s = new char[10*1024*1024];
+      BufferedReader in = new BufferedReader(
+                              new InputStreamReader(System.in));
+      String line;
+      while ((line = in.readLine()) != null) {
+        System.out.println(line);
+      }
+    } finally {
+      if (s == null) {
+        System.exit(-1);
+      }
+    }
+  }
+}

+ 142 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.*;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests the setting of memory limit for streaming processes.
+ * This will launch a streaming app which will allocate 10MB memory.
+ * First, program is launched with sufficient memory. And test expects
+ * it to succeed. Then program is launched with insufficient memory and 
+ * is expected to be a failure.  
+ */
+public class TestUlimit extends TestCase {
+  private static final Log LOG =
+         LogFactory.getLog(TestUlimit.class.getName());
+  enum RESULT { FAILURE, SUCCESS };
+  String input = "the dummy input";
+  Path inputPath = new Path("/testing/in");
+  Path outputPath = new Path("/testing/out");
+  String map = null;
+  MiniDFSCluster dfs = null;
+  MiniMRCluster mr = null;
+  FileSystem fs = null;
+
+  String[] genArgs(String memLimit) {
+    return new String[] {
+      "-input", inputPath.toString(),
+      "-output", outputPath.toString(),
+      "-mapper", map,
+      "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
+      "-numReduceTasks", "0",
+      "-jobconf", "mapred.child.java.opts=" + memLimit,
+      "-jobconf", "mapred.job.tracker=" + "localhost:" +
+                                           mr.getJobTrackerPort(),
+      "-jobconf", "fs.default.name=" + "localhost:" + dfs.getNameNodePort(),
+      "-jobconf", "stream.tmpdir=" + 
+                   System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  /**
+   * This tests the setting of memory limit for streaming processes.
+   * This will launch a streaming app which will allocate 10MB memory.
+   * First, program is launched with sufficient memory. And test expects
+   * it to succeed. Then program is launched with insufficient memory and 
+   * is expected to be a failure.  
+   */
+  public void testCommandLine() {
+    if (StreamUtil.isCygwin()) {
+      return;
+    }
+    try {
+      final int numSlaves = 2;
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, numSlaves, true, null);
+      fs = dfs.getFileSystem();
+      
+      mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
+      writeInputFile(fs, inputPath);
+      map = StreamUtil.makeJavaCommand(CatApp.class, new String[]{});  
+      runProgram("-Xmx2048m", RESULT.SUCCESS);
+      FileUtil.fullyDelete(fs, outputPath);
+      assertFalse("output not cleaned up", fs.exists(outputPath));
+      // 100MB is not sufficient for launching jvm. This launch should fail.
+      runProgram("-Xmx0.5m", RESULT.FAILURE);
+      mr.waitUntilIdle();
+    } catch(IOException e) {
+      fail(e.toString());
+    } finally {
+      mr.shutdown();
+      dfs.shutdown();
+    }
+  }
+
+  private void writeInputFile(FileSystem fs, Path dir) throws IOException {
+    DataOutputStream out = fs.create(new Path(dir, "part0"));
+    out.writeBytes(input);
+    out.close();
+  }
+
+  /**
+   * Runs the streaming program. and asserts the result of the program.
+   * @param memLimit memory limit to set for mapred child.
+   * @param result Expected result
+   * @throws IOException
+   */
+  private void runProgram(String memLimit, RESULT result
+                          ) throws IOException {
+    boolean mayExit = false;
+    int ret = 1;
+    try {
+      StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
+      ret = job.go();
+    } catch (IOException ioe) {
+      LOG.warn("Job Failed! " + StringUtils.stringifyException(ioe));
+      ioe.printStackTrace();
+    }
+    String output = TestMiniMRWithDFS.readOutput(outputPath,
+                                        mr.createJobConf());
+    if (RESULT.SUCCESS.name().equals(result.name())){
+      assertEquals("output is wrong", input, output.trim());
+    } else {
+      assertTrue("output is correct", !input.equals(output.trim()));
+    }
+  }
+  
+  public static void main(String[]args) throws Exception
+  {
+    new TestUlimit().testCommandLine();
+  }
+
+}

+ 2 - 1
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -272,7 +272,8 @@
                     <td>mapred.child.java.opts</td>
                     <td>-Xmx512M</td>
                     <td>
-                      Larger heap-size for child jvms of maps/reduces.
+                      Larger heap-size for child jvms of maps/reduces. Also controls the amount 
+                      of virtual memory that a streaming/pipes task gets.
                     </td>
                   </tr>
                   <tr>

+ 55 - 14
src/java/org/apache/hadoop/mapred/TaskLog.java

@@ -205,31 +205,44 @@ public class TaskLog {
                                                 File stderrFilename,
                                                 long tailLength
                                                ) throws IOException {
+    return captureOutAndError(null, cmd, stdoutFilename,
+                              stderrFilename, tailLength );
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files.
+   * Setup commands such as setting memory limit can be passed which 
+   * will be executed before exec.
+   * If the tailLength is 0, the entire output will be saved.
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+                                                List<String> cmd, 
+                                                File stdoutFilename,
+                                                File stderrFilename,
+                                                long tailLength
+                                               ) throws IOException {
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
     StringBuffer mergedCmd = new StringBuffer();
+    if (setup != null && setup.size() > 0) {
+      mergedCmd.append(addCommand(setup, false));
+      mergedCmd.append(";");
+    }
     if (tailLength > 0) {
       mergedCmd.append("(");
     } else {
       mergedCmd.append("exec ");
     }
-    boolean isExecutable = true;
-    for(String s: cmd) {
-      mergedCmd.append('\'');
-      if (isExecutable) {
-        // the executable name needs to be expressed as a shell path for the  
-        // shell to find it.
-        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
-        isExecutable = false; 
-      } else {
-        mergedCmd.append(s);
-      }
-      mergedCmd.append('\'');
-      mergedCmd.append(" ");
-    }
+    mergedCmd.append(addCommand(cmd, true));
     mergedCmd.append(" < /dev/null ");
     if (tailLength > 0) {
       mergedCmd.append(" | ");
@@ -254,6 +267,34 @@ public class TaskLog {
     result.add(mergedCmd.toString());
     return result;
   }
+
+  /**
+   * Add quotes to each of the command strings and
+   * return as a single string 
+   * @param cmd The command to be quoted
+   * @param isExecutable makes shell path if the first 
+   * argument is executable
+   * @return returns The quoted string. 
+   * @throws IOException
+   */
+  public static String addCommand(List<String> cmd, boolean isExecutable) 
+  throws IOException {
+    StringBuffer command = new StringBuffer();
+    for(String s: cmd) {
+    	command.append('\'');
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the  
+        // shell to find it.
+    	  command.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false; 
+      } else {
+    	  command.append(s);
+      }
+      command.append('\'');
+      command.append(" ");
+    }
+    return command.toString();
+  }
   
   /**
    * Wrap a command in a shell to capture debug script's 

+ 12 - 1
src/java/org/apache/hadoop/mapred/pipes/Application.java

@@ -52,6 +52,8 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
   private Socket clientSocket;
   private OutputHandler<K2, V2> handler;
   private DownwardProtocol<K1, V1> downlink;
+  static final boolean WINDOWS
+  = System.getProperty("os.name").startsWith("Windows");
 
   /**
    * Start the child process to handle the task for us.
@@ -82,7 +84,16 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
-    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+    // set memory limit using ulimit.
+    if (!WINDOWS) {
+      List<String> setup = new ArrayList<String>();
+      setup.add("ulimit");
+      setup.add("-v"); 
+      setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
+      cmd = TaskLog.captureOutAndError(setup, cmd, stdout, stderr, logLength);
+    } else {
+      cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+    }
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
     handler = new OutputHandler<K2, V2>(output, reporter);

Some files were not shown because too many files changed in this diff