瀏覽代碼

HADOOP-1771. Fix a NullPointerException in streaming caused by an IOException in MROutputThread. Contributed by lohit.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@584273 13f79535-47bb-0310-9956-ffa450edef68
Nigel Daley 18 年之前
父節點
當前提交
fe6eee62cb
共有 2 個文件被更改,包括 31 次插入7 次删除
  1. 3 0
      CHANGES.txt
  2. 28 7
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

+ 3 - 0
CHANGES.txt

@@ -291,6 +291,9 @@ Trunk (unreleased changes)
     remain in the RUNNING state even after being killed by the JobTracker and
     thus handicap the cleanup of the task's output sub-directory. (acmurthy)
 
+    HADOOP-1771. Fix a NullPointerException in streaming caused by an 
+    IOException in MROutputThread. (lohit vijayarenu via nigel)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

+ 28 - 7
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -353,13 +353,23 @@ public abstract class PipeMapRed {
             logflush();
           }
         }
-      } catch (IOException io) {
-        io.printStackTrace(log_);
-        outerrThreadsThrowable = io;
+        if (clientIn_ != null) {
+          clientIn_.close();
+          clientIn_ = null;
+          LOG.info("MROutputThread done");
+        }
       } catch (Throwable th) {
         outerrThreadsThrowable = th;
+        LOG.warn(StringUtils.stringifyException(th));
+        try {
+          if (clientIn_ != null) {
+            clientIn_.close();
+            clientIn_ = null;
+          }
+        } catch (IOException io) {
+          LOG.info(StringUtils.stringifyException(io));
+        }
       }
-      logprintln("MROutputThread done");
     }
 
     OutputCollector output;
@@ -388,11 +398,22 @@ public abstract class PipeMapRed {
             reporter.progress();
           }
         }
-      } catch (IOException io) {
-        logStackTrace(io);
-        outerrThreadsThrowable = io;
+	if (clientErr_ != null) {
+          clientErr_.close();
+          clientErr_ = null;
+          LOG.info("MRErrorThread done");
+        }
       } catch (Throwable th) {
         outerrThreadsThrowable = th;
+        LOG.warn(StringUtils.stringifyException(th));
+        try {
+          if (clientErr_ != null) {
+            clientErr_.close();
+            clientErr_ = null;
+          }
+        } catch (IOException io) {
+          LOG.info(StringUtils.stringifyException(io));
+        }
       }
     }
     long lastStderrReport = 0;