瀏覽代碼

HADOOP-556. Contrib/streaming: send keep-alive reports to the tasktracker every 10 seconds rather than every 100 records. Contributed by Michel.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@449844 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 年之前
父節點
當前提交
5432432da5

+ 5 - 0
CHANGES.txt

@@ -61,6 +61,11 @@ Trunk (unreleased changes)
     extend ObjectWritable to handle enums, so that they can be passed
     as RPC parameters.  (Sanjay Dahiya via cutting)
 
+16. HADOOP-556.  Contrib/streaming: send keep-alive reports to task
+    tracker every 10 seconds rather than every 100 records, to avoid
+    task timeouts.  (Michel Tourn via cutting)
+
+
 Release 0.6.2 (unreleased)
 
 1. HADOOP-532.  Fix a bug reading value-compressed sequence files,

+ 13 - 4
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -497,8 +497,12 @@ public abstract class PipeMapRed {
             output.collect(key, val);
           }
           numRecWritten_++;
-          if (numRecWritten_ % 100 == 0) {
-            logprintln(numRecRead_ + "/" + numRecWritten_);
+          long now = System.currentTimeMillis();
+          if (now-lastStdoutReport > reporterOutDelay_) {
+            lastStdoutReport = now;
+            String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
+            reporter.setStatus(hline);
+            logprintln(hline);
             logflush();
           }
         }
@@ -511,6 +515,8 @@ public abstract class PipeMapRed {
     OutputCollector output;
     Reporter reporter;
     byte[] answer;
+    long lastStdoutReport = 0;
+    
   }
 
   class MRErrorThread extends Thread {
@@ -529,11 +535,11 @@ public abstract class PipeMapRed {
           String lineStr = new String(line, "UTF-8");
           logprintln(lineStr);
           long now = System.currentTimeMillis(); 
-          if (num < 10 || (now-lastStderrReport > 10*1000)) {
+          if (num < 20 || (now-lastStderrReport > reporterErrDelay_)) {
+            lastStderrReport = now;
             String hline = "MRErr: " + lineStr;
             System.err.println(hline);
             reporter.setStatus(hline);
-            lastStderrReport = now;
           }
         }
       } catch (IOException io) {
@@ -671,11 +677,14 @@ public abstract class PipeMapRed {
   long numRecSkipped_ = 0;
   long nextRecReadLog_ = 1;
 
+  
   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
 
   int keyCols_;
   final static int ALL_COLS = Integer.MAX_VALUE;
 
+  long reporterOutDelay_ = 10*1000L; 
+  long reporterErrDelay_ = 10*1000L; 
   long joinDelay_;
   JobConf job_;
   FileSystem fs_;

+ 20 - 16
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -46,7 +46,6 @@ public class StreamJob {
 
   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
   final static String REDUCE_NONE = "NONE";
-
   private boolean reducerNone_;
 
   public StreamJob(String[] argv, boolean mayExit) {
@@ -107,12 +106,13 @@ public class StreamJob {
     redCmd_ = unqualifyIfLocalPath(redCmd_);
   }
 
-  void validateNameEqValue(String neqv) {
+  String[] parseNameEqValue(String neqv) {
     String[] nv = neqv.split("=", 2);
     if (nv.length < 2) {
       fail("Invalid name=value spec: " + neqv);
     }
     msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
+    return nv;
   }
 
   String unqualifyIfLocalPath(String cmd) throws IOException {
@@ -199,17 +199,17 @@ public class StreamJob {
         configPath_.add(s);
       } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
         i++;
-        userJobConfProps_.add("fs.default.name=" + s);
+        userJobConfProps_.put("fs.default.name", s);
       } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
         i++;
-        userJobConfProps_.add("mapred.job.tracker=" + s);
+        userJobConfProps_.put("mapred.job.tracker", s);
       } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
         i++;
-        validateNameEqValue(s);
-        userJobConfProps_.add(s);
+        String[] nv = parseNameEqValue(s);
+        userJobConfProps_.put(nv[0], nv[1]);
       } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
         i++;
-        validateNameEqValue(s);
+        parseNameEqValue(s);
         if (addTaskEnvironment_.length() > 0) {
           addTaskEnvironment_ += " ";
         }
@@ -389,8 +389,9 @@ public class StreamJob {
     // First try an explicit spec: it's too hard to find our own location in this case:
     // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
     // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
-    String runtimeClasses = jobConf_.get("stream.shipped.hadoopstreaming"); // jar or class dir
-
+    String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
+System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
+    
     if (runtimeClasses == null) {
       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
     }
@@ -433,13 +434,15 @@ public class StreamJob {
   }
 
   protected void setUserJobConfProps(boolean doEarlyProps) {
-    Iterator it = userJobConfProps_.iterator();
+    Iterator it = userJobConfProps_.keySet().iterator();
     while (it.hasNext()) {
-      String prop = (String) it.next();
-      String[] nv = prop.split("=", 2);
-      if (doEarlyProps == nv[0].equals("fs.default.name")) {
-        msg("xxxJobConf: set(" + nv[0] + ", " + nv[1] + ") early=" + doEarlyProps);
-        jobConf_.set(nv[0], nv[1]);
+      String key = (String) it.next();
+      String val = (String)userJobConfProps_.get(key);
+      boolean earlyName = key.equals("fs.default.name");
+      earlyName |= key.equals("stream.shipped.hadoopstreaming");
+      if (doEarlyProps == earlyName) {
+        msg("xxxJobConf: set(" + key + ", " + val + ") early=" + doEarlyProps);
+        jobConf_.set(key, val);
       }
     }
   }
@@ -752,7 +755,8 @@ public class StreamJob {
   protected boolean hasSimpleInputSpecs_;
   protected ArrayList packageFiles_ = new ArrayList(); // <String>
   protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
-  protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
+  //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
+  protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); 
   protected String output_;
   protected String mapsideoutURI_;
   protected String mapCmd_;