Explorar o código

HADOOP-1154. Fail a streaming task if the threads reading from or writing to the streaming process fail. Contributed by Koji Noguchi.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@529574 13f79535-47bb-0310-9956-ffa450edef68
Thomas White %!s(int64=18) %!d(string=hai) anos
pai
achega
f829ed16c1

+ 3 - 0
CHANGES.txt

@@ -219,6 +219,9 @@ Trunk (unreleased changes)
 66. HADOOP-1224.  Fix "Browse the filesystem" link to no longer point 
     to dead datanodes.  (Enis Soztutar via tomwhite)
 
+67. HADOOP-1154.  Fail a streaming task if the threads reading from or 
+    writing to the streaming process fail.  (Koji Noguchi via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 8 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -515,6 +515,9 @@ public abstract class PipeMapRed {
         }
       } catch (IOException io) {
         io.printStackTrace(log_);
+        outerrThreadsThrowable = io;
+      } catch (Throwable th) {
+        outerrThreadsThrowable = th;
       }
       logprintln("MROutputThread done");
     }
@@ -551,6 +554,9 @@ public abstract class PipeMapRed {
         }
       } catch (IOException io) {
         logStackTrace(io);
+        outerrThreadsThrowable = io;
+      } catch (Throwable th) {
+        outerrThreadsThrowable = th;
       }
     }
     long lastStderrReport = 0;
@@ -719,6 +725,8 @@ public abstract class PipeMapRed {
 
   private OutputStream sideEffectOut_;
 
+  protected volatile Throwable outerrThreadsThrowable;
+
   String LOGNAME;
   PrintStream log_;
 

+ 7 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
@@ -69,6 +70,12 @@ public class PipeMapper extends PipeMapRed implements Mapper {
     if (outThread_ == null) {
       startOutputThreads(output, reporter);
     }
+    if( outerrThreadsThrowable != null ) {
+      mapRedFinished();
+      throw new IOException ("MROutput/MRErrThread failed:"
+                             + StringUtils.stringifyException(
+                                          outerrThreadsThrowable));
+    }
     try {
       // 1/4 Hadoop in
       numRecRead_++;

+ 7 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
@@ -73,6 +74,12 @@ public class PipeReducer extends PipeMapRed implements Reducer {
         numRecRead_++;
         maybeLogRecord();
         if (doPipe_) {
+          if( outerrThreadsThrowable != null ) {
+            mapRedFinished();
+            throw new IOException ("MROutput/MRErrThread failed:"
+                                   + StringUtils.stringifyException( 
+                                               outerrThreadsThrowable));
+          }
           write(key);
           clientOut_.write('\t');
           write(val);