浏览代码

HADOOP-1328. Implement user counters in streaming. Contributed by Tom White.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663366 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 17 年之前
父节点
当前提交
adb4183283

+ 3 - 0
CHANGES.txt

@@ -136,6 +136,9 @@ Trunk (unreleased changes)
     HADOOP-3177. Implement Syncable interface for FileSystem.
     (Tsz Wo (Nicholas), SZE via dhruba)
 
+    HADOOP-1328. Implement user counters in streaming. (tomwhite via
+    omalley)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

+ 51 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -405,6 +405,9 @@ public abstract class PipeMapRed {
   class MRErrorThread extends Thread {
 
     public MRErrorThread() {
+      this.reporterPrefix = job_.get("stream.stderr.reporter.prefix", "reporter:");
+      this.counterPrefix = reporterPrefix + "counter:";
+      this.statusPrefix = reporterPrefix + "status:";
       setDaemon(true);
     }
     
@@ -418,7 +421,18 @@ public abstract class PipeMapRed {
       try {
         lineReader = new LineReader((InputStream)clientErr_, job_);
         while (lineReader.readLine(line) > 0) {
-          System.err.println(line.toString());
+          String lineStr = line.toString();
+          if (matchesReporter(lineStr)) {
+            if (matchesCounter(lineStr)) {
+              incrCounter(lineStr);
+            } else if (matchesStatus(lineStr)) {
+              setStatus(lineStr);
+            } else {
+              LOG.warn("Cannot parse reporter line: " + lineStr);
+            }
+          } else {
+            System.err.println(lineStr);
+          }
           long now = System.currentTimeMillis(); 
           if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
             lastStderrReport = now;
@@ -450,8 +464,44 @@ public abstract class PipeMapRed {
         }
       }
     }
+    
+    private boolean matchesReporter(String line) {
+      return line.startsWith(reporterPrefix);
+    }
+
+    private boolean matchesCounter(String line) {
+      return line.startsWith(counterPrefix);
+    }
+
+    private boolean matchesStatus(String line) {
+      return line.startsWith(statusPrefix);
+    }
+
+    private void incrCounter(String line) {
+      String trimmedLine = line.substring(counterPrefix.length()).trim();
+      String[] columns = trimmedLine.split(",");
+      if (columns.length == 3) {
+        try {
+          reporter.incrCounter(columns[0], columns[1],
+              Long.parseLong(columns[2]));
+        } catch (NumberFormatException e) {
+          LOG.warn("Cannot parse counter increment '" + columns[2] +
+              "' from line: " + line);
+        }
+      } else {
+        LOG.warn("Cannot parse counter line: " + line);
+      }
+    }
+
+    private void setStatus(String line) {
+      reporter.setStatus(line.substring(statusPrefix.length()).trim());
+    }
+    
     long lastStderrReport = 0;
     volatile Reporter reporter;
+    private final String reporterPrefix;
+    private final String counterPrefix;
+    private final String statusPrefix;
   }
 
   public void mapRedFinished() {

+ 1 - 14
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java

@@ -20,10 +20,6 @@ package org.apache.hadoop.streaming;
 
 import junit.framework.TestCase;
 import java.io.*;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
@@ -73,7 +69,7 @@ public class TestStreaming extends TestCase
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws IOException
   {
     try {
       try {
@@ -94,8 +90,6 @@ public class TestStreaming extends TestCase
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
@@ -104,13 +98,6 @@ public class TestStreaming extends TestCase
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreaming().testCommandLine();

+ 1 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

@@ -91,6 +91,7 @@ public class TrApp
     while ((line = in.readLine()) != null) {
       String out = line.replace(find, replace);
       System.out.println(out);
+      System.err.println("reporter:counter:UserCounters,InputLines,1");
     }
   }