瀏覽代碼

HADOOP-5623. Ensure streaming status messages aren't overwritten. Contributed by Rick Cox & Ravi Gummadi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-203@1098832 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 14 年之前
父節點
當前提交
9e3f70c4e2

+ 3 - 0
CHANGES.txt

@@ -1694,6 +1694,9 @@ Release 0.20.2 - Unreleased
     MAPREDUCE-1070. Prevent a deadlock in the fair scheduler servlet.
     (Todd Lipcon via cdouglas)
 
+    HADOOP-5623. Ensure streaming status messages aren't overwritten. (Rick 
+    Cox & Ravi Gummadi via tomwhite)
+    
     MAPREDUCE-1163. Remove unused, hard-coded paths from libhdfs. (Allen
     Wittenauer via cdouglas)
 

+ 7 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -385,7 +385,11 @@ public abstract class PipeMapRed {
           if (now-lastStdoutReport > reporterOutDelay_) {
             lastStdoutReport = now;
             String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
-            reporter.setStatus(hline);
+            if (!processProvidedStatus_) {
+              reporter.setStatus(hline);
+            } else {
+              reporter.progress();
+            }
             logprintln(hline);
             logflush();
           }
@@ -446,6 +450,7 @@ public abstract class PipeMapRed {
             if (matchesCounter(lineStr)) {
               incrCounter(lineStr);
             } else if (matchesStatus(lineStr)) {
+              processProvidedStatus_ = true;
               setStatus(lineStr);
             } else {
               LOG.warn("Cannot parse reporter line: " + lineStr);
@@ -671,4 +676,5 @@ public abstract class PipeMapRed {
   String LOGNAME;
   PrintStream log_;
 
+  volatile boolean processProvidedStatus_ = false;
 }

+ 11 - 2
src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java

@@ -32,8 +32,16 @@ public class StderrApp
    * postWriteLines to stderr.
    */
   public static void go(int preWriteLines, int sleep, int postWriteLines) throws IOException {
+    go(preWriteLines, sleep, postWriteLines, false);
+  }
+  
+  public static void go(int preWriteLines, int sleep, int postWriteLines, boolean status) throws IOException {
     BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
     String line;
+    
+    if (status) {
+      System.err.println("reporter:status:starting echo");
+    }      
        
     while (preWriteLines > 0) {
       --preWriteLines;
@@ -57,13 +65,14 @@ public class StderrApp
 
   public static void main(String[] args) throws IOException {
     if (args.length < 3) {
-      System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE");
+      System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE [STATUS]");
       return;
     }
     int preWriteLines = Integer.parseInt(args[0]);
     int sleep = Integer.parseInt(args[1]);
     int postWriteLines = Integer.parseInt(args[2]);
+    boolean status = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
     
-    go(preWriteLines, sleep, postWriteLines);
+    go(preWriteLines, sleep, postWriteLines, status);
   }
 }