Explorar o código

Merge -r 725904:725905 725906:725907 725909:725910 from trunk onto 0.18 branch. Fixes HADOOP-4620.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@725912 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das %!s(int64=16) %!d(string=hai) anos
pai
achega
085c1d9d39

+ 3 - 0
CHANGES.txt

@@ -99,6 +99,9 @@ Release 0.18.3 - Unreleased
 
     HADOOP-4824. Should not use File.setWritable() in 0.18. (hairong)
 
+    HADOOP-4620. Fixes Streaming to handle well the cases of map/reduce with empty
+    input/output. (Ravi Gummadi via ddas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

+ 21 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -294,6 +294,22 @@ public abstract class PipeMapRed {
 
   void waitOutputThreads() {
     try {
+      if (outThread_ == null) {
+        // This happens only when reducer has empty input(So reduce() is not
+        // called at all in this task). If reducer still generates output,
+        // which is very uncommon and we may not have to support this case.
+        // So we don't write this output to HDFS, but we consume/collect
+        // this output just to avoid reducer hanging forever.
+
+        OutputCollector collector = new OutputCollector() {
+          public void collect(Object key, Object value)
+            throws IOException {
+            //just consume it, no need to write the record anywhere
+          }
+        };
+        Reporter reporter = Reporter.NULL;//dummy reporter
+        startOutputThreads(collector, reporter);
+      }
       int exitVal = sim.waitFor();
       // how'd it go?
       if (exitVal != 0) {
@@ -505,9 +521,11 @@ public abstract class PipeMapRed {
   }
 
   public void mapRedFinished() {
-    logprintln("mapRedFinished");
     try {
-      if (!doPipe_) return;
+      if (!doPipe_) {
+        logprintln("mapRedFinished");
+        return;
+      }
       try {
         if (clientOut_ != null) {
           clientOut_.flush();
@@ -517,6 +535,7 @@ public abstract class PipeMapRed {
       }
       waitOutputThreads();
       if (sim != null) sim.destroy();
+      logprintln("mapRedFinished");
     } catch (RuntimeException e) {
       logStackTrace(e);
       throw e;

+ 38 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRunner.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import org.apache.hadoop.mapred.MapRunner;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class PipeMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
+  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
+                  Reporter reporter)
+         throws IOException {
+    PipeMapper pipeMapper = (PipeMapper)getMapper();
+    pipeMapper.startOutputThreads(output, reporter);
+    super.run(input, output, reporter);
+  }
+}

+ 0 - 4
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -63,10 +63,6 @@ public class PipeMapper extends PipeMapRed implements Mapper {
   // (MapRed creates it reflectively)
 
   public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
-    // init
-    if (outThread_ == null) {
-      startOutputThreads(output, reporter);
-    }
     if (outerrThreadsThrowable != null) {
       mapRedFinished();
       throw new IOException ("MROutput/MRErrThread failed:"

+ 1 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -751,6 +751,7 @@ public class StreamJob {
         jobConf_.setMapperClass(c);
       } else {
         jobConf_.setMapperClass(PipeMapper.class);
+        jobConf_.setMapRunnerClass(PipeMapRunner.class);
         jobConf_.set("stream.map.streamprocessor", 
                      URLEncoder.encode(mapCmd_, "UTF-8"));
       }

+ 116 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingEmptyInpNonemptyOut.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode by giving
+ * empty input to mapper and the mapper generates nonempty output. Since map()
+ * is not called at all, output thread was not getting created and the mapper
+ * was hanging forever. Now this issue is solved. Similarly reducer is also
+ * checked for task completion with empty input and nonempty output.
+ */
+public class TestStreamingEmptyInpNonemptyOut extends TestCase
+{
+
+  protected File INPUT_FILE = new File("emptyInputFile.txt");
+  protected File OUTPUT_DIR = new File("out");
+  protected File SCRIPT_FILE = new File("perlScript.pl");
+
+  protected String map = "perlScript.pl";
+  protected String reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
+  protected String script = "#!/usr/bin/perl\nfor($count = 1500; $count >= 1; $count--) {print \"$count \";}";
+
+  private StreamJob job;
+
+  public TestStreamingEmptyInpNonemptyOut() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected void createInputAndScript() throws IOException
+  {
+    DataOutputStream out = new DataOutputStream(
+                           new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+    out.close();
+
+    out = new DataOutputStream(
+          new FileOutputStream(SCRIPT_FILE.getAbsoluteFile()));
+    out.write(script.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      //"-verbose",
+      //"-jobconf", "stream.debug=set"
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+  
+  public void testEmptyInputNonemptyOutput() throws IOException
+  {
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
+      createInputAndScript();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster.
+      // First let us test if mapper doesn't hang for empty i/p and nonempty o/p
+      job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      outFile.delete();
+
+      // Now let us test if reducer doesn't hang for empty i/p and nonempty o/p
+      map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+      reduce = "perlScript.pl";
+      job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      outFile.delete();
+    } finally {
+      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+      INPUT_FILE.delete();
+      SCRIPT_FILE.delete();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreaming().testCommandLine();
+  }
+
+}

+ 3 - 0
src/mapred/org/apache/hadoop/mapred/MapRunner.java

@@ -51,4 +51,7 @@ public class MapRunner<K1, V1, K2, V2>
     }
   }
 
+  protected Mapper<K1, V1, K2, V2> getMapper() {
+    return mapper;
+  }
 }