瀏覽代碼

MAPREDUCE-4154. streaming MR job succeeds even if the streaming command
fails. (Devaraj Das via tgraves)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1329779 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 13 年之前
父節點
當前提交
dad2192f76

+ 3 - 0
CHANGES.txt

@@ -36,6 +36,9 @@ Release 1.0.3 - unreleased
     HADOOP-8293. Fix the Makefile.am for the native library to include the
     JNI path. (omalley)
 
+    MAPREDUCE-4154. streaming MR job succeeds even if the streaming command 
+    fails. (Devaraj Das via tgraves)
+
 Release 1.0.2 - 2012.03.24
 
   NEW FEATURES

+ 4 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -569,6 +569,10 @@ public abstract class PipeMapRed {
           clientOut_.flush();
           clientOut_.close();
         }
+      } catch (IOException io) {
+        LOG.warn(StringUtils.stringifyException(io));
+      }
+      try {
         waitOutputThreads();
       } catch (IOException io) {
         LOG.warn(StringUtils.stringifyException(io));

+ 20 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * This class tests if hadoopStreaming returns Exception 
@@ -82,6 +83,25 @@ public class TestStreamingFailure extends TestStreaming
       }
     }
   }
+  
+  public void testStreamingFailureForFailedProcess() throws Exception {
+    int ret = 0;
+    try {
+      createInput();
+      String[] args = {
+          "-input", INPUT_FILE.getAbsolutePath(),
+          "-output", OUTPUT_DIR.getAbsolutePath(),
+          "-mapper", "/bin/ls dsdsdsds-does-not-exist",
+          "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data",
+              "/tmp"),
+      };
+      ret = ToolRunner.run(new StreamJob(), args);
+    } finally {
+      INPUT_FILE.delete();
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+    }
+    assertEquals("Streaming job failure code expected", 1, ret);
+  }
 
   public static void main(String[]args) throws Exception
   {