浏览代码

HADOOP-846. Report progress during entire map. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@492677 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
03e3066e5f
共有 3 个文件被更改,包括 42 次插入34 次删除
  1. 4 0
      CHANGES.txt
  2. 32 32
      src/java/org/apache/hadoop/mapred/MapTask.java
  3. 6 2
      src/java/org/apache/hadoop/mapred/Task.java

+ 4 - 0
CHANGES.txt

@@ -181,6 +181,10 @@ Trunk (unreleased changes)
     distributions.  Also add contrib and example documentation to
     distributions.  Also add contrib and example documentation to
     distributed javadoc, in separate sections.  (Nigel Daley via cutting)
     distributed javadoc, in separate sections.  (Nigel Daley via cutting)
 
 
+52. HADOOP-846.  Report progress during entire map, as sorting of
+    intermediate outputs may happen at any time, potentially causing
+    task timeouts.  (Devaraj Das via cutting)
+
 
 
 Release 0.9.2 - 2006-12-15
 Release 0.9.2 - 2006-12-15
 
 

+ 32 - 32
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -161,7 +161,7 @@ class MapTask extends Task {
         public synchronized boolean next(Writable key, Writable value)
         public synchronized boolean next(Writable key, Writable value)
           throws IOException {
           throws IOException {
 
 
-          reportProgress(umbilical, getProgress());
+          setProgress(getProgress());
           long beforePos = getPos();
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           boolean ret = rawIn.next(key, value);
           myMetrics.mapInput(getPos() - beforePos);
           myMetrics.mapInput(getPos() - beforePos);
@@ -174,13 +174,13 @@ class MapTask extends Task {
         }
         }
       };
       };
 
 
+    Thread sortProgress = createProgressThread(umbilical);
     MapRunnable runner =
     MapRunnable runner =
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
 
     try {
     try {
+      sortProgress.start();
       runner.run(in, collector, reporter);      // run the map
       runner.run(in, collector, reporter);      // run the map
-    } finally {
-      in.close();                               // close input
       //check whether the length of the key/value buffer is 0. If not, then
       //check whether the length of the key/value buffer is 0. If not, then
       //we need to spill that to disk. Note that we reset the key/val buffer
       //we need to spill that to disk. Note that we reset the key/val buffer
       //upon each spill (so a length > 0 means that we have not spilled yet)
       //upon each spill (so a length > 0 means that we have not spilled yet)
@@ -189,12 +189,40 @@ class MapTask extends Task {
       }
       }
       //merge the partitions from the spilled files and create one output
       //merge the partitions from the spilled files and create one output
       collector.mergeParts();
       collector.mergeParts();
+    } finally {
       //close
       //close
+      in.close();                               // close input
       collector.close();
       collector.close();
+      sortProgress.interrupt();
     }
     }
     done(umbilical);
     done(umbilical);
   }
   }
 
 
+  private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
+    //spawn a thread to give merge progress heartbeats
+    Thread sortProgress = new Thread() {
+      public void run() {
+        LOG.info("Started thread: " + getName());
+        while (true) {
+          try {
+            reportProgress(umbilical);
+            Thread.sleep(PROGRESS_INTERVAL);
+          } catch (InterruptedException e) {
+              return;
+          } catch (Throwable e) {
+              LOG.info("Thread Exception in " +
+                                 "reporting sort progress\n" +
+                                 StringUtils.stringifyException(e));
+              continue;
+          }
+        }
+      }
+    };
+    sortProgress.setName("Sort progress reporter for task "+getTaskId());
+    sortProgress.setDaemon(true);
+    return sortProgress;
+  }
+
   public void setConf(Configuration conf) {
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;
       this.conf = (JobConf) conf;
@@ -298,7 +326,6 @@ class MapTask extends Task {
         int partNumber = partitioner.getPartition(key, value, partitions);
         int partNumber = partitioner.getPartition(key, value, partitions);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
 
-        reportProgress(umbilical); 
         myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
         myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
 
 
         //now check whether we need to spill to disk
         //now check whether we need to spill to disk
@@ -348,7 +375,6 @@ class MapTask extends Task {
                   throws IOException {
                   throws IOException {
                   synchronized (this) {
                   synchronized (this) {
                     writer.append(key, value);
                     writer.append(key, value);
-                    reportProgress(umbilical);
                   }
                   }
                 }
                 }
               };
               };
@@ -374,7 +400,6 @@ class MapTask extends Task {
       while (values.more()) {
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
         values.nextKey();
-        reportProgress(umbilical);
       }
       }
     }
     }
     
     
@@ -402,7 +427,6 @@ class MapTask extends Task {
         value.readFields(valIn);
         value.readFields(valIn);
 
 
         writer.append(key, value);
         writer.append(key, value);
-        reportProgress(umbilical);
       }
       }
     }
     }
     
     
@@ -435,34 +459,12 @@ class MapTask extends Task {
                   compressionType, codec);
                   compressionType, codec);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
-          reportProgress(umbilical);
         }
         }
         finalOut.close();
         finalOut.close();
         finalIndexOut.close();
         finalIndexOut.close();
         return;
         return;
       }
       }
-      //spawn a thread to give merge progress heartbeats
-      Thread sortProgress = new Thread() {
-        public void run() {
-          while (true) {
-            try {
-              reportProgress(umbilical);
-              Thread.sleep(PROGRESS_INTERVAL);
-            } catch (InterruptedException e) {
-                return;
-            } catch (Throwable e) {
-                LOG.info("Thread Exception in " +
-                                   "reporting sort progress\n" +
-                                   StringUtils.stringifyException(e));
-                continue;
-            }
-          }
-        }
-      };
-      sortProgress.setName("Sort progress reporter for task "+getTaskId());
-      sortProgress.setDaemon(true);
-      sortProgress.start();
-      try {
+      {
         Path [] filename = new Path[numSpills];
         Path [] filename = new Path[numSpills];
         Path [] indexFileName = new Path[numSpills];
         Path [] indexFileName = new Path[numSpills];
         FSDataInputStream in[] = new FSDataInputStream[numSpills];
         FSDataInputStream in[] = new FSDataInputStream[numSpills];
@@ -514,8 +516,6 @@ class MapTask extends Task {
           in[i].close(); localFs.delete(filename[i]);
           in[i].close(); localFs.delete(filename[i]);
           indexIn[i].close(); localFs.delete(indexFileName[i]);
           indexIn[i].close(); localFs.delete(indexFileName[i]);
         }
         }
-      } finally {
-        sortProgress.interrupt();
       }
       }
     }
     }
     
     

+ 6 - 2
src/java/org/apache/hadoop/mapred/Task.java

@@ -142,7 +142,7 @@ abstract class Task implements Writable, Configurable {
   /** The number of milliseconds between progress reports. */
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 1000;
   public static final int PROGRESS_INTERVAL = 1000;
 
 
-  private transient Progress taskProgress = new Progress();
+  private volatile Progress taskProgress = new Progress();
   private transient long nextProgressTime =
   private transient long nextProgressTime =
     System.currentTimeMillis() + PROGRESS_INTERVAL;
     System.currentTimeMillis() + PROGRESS_INTERVAL;
 
 
@@ -165,9 +165,13 @@ abstract class Task implements Writable, Configurable {
       };
       };
   }
   }
 
 
+  public void setProgress(float progress) {
+    taskProgress.set(progress);
+  }
+
   public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
   public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
     throws IOException {
     throws IOException {
-    taskProgress.set(progress);
+    setProgress(progress);
     reportProgress(umbilical);
     reportProgress(umbilical);
   }
   }