Browse Source

HADOOP-4714. Report status between merges and make the number of records
between progress reports configurable. Contributed by Jothi Padmanabhan.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@722573 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 16 years ago
parent
commit
3166f41719

+ 3 - 0
CHANGES.txt

@@ -1282,6 +1282,9 @@ Release 0.18.3 - Unreleased
 
     HADOOP-4635. Fix a memory leak in fuse dfs. (pete wyckoff via mahadev)
 
+    HADOOP-4714. Report status between merges and make the number of records
+    between progress reports configurable. (Jothi Padmanabhan via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

+ 8 - 0
conf/hadoop-default.xml

@@ -1553,4 +1553,12 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.merge.recordsBeforeProgress</name>
+  <value>10000</value>
+  <description> The number of records to process during merge before
+   sending a progress notification to the TaskTracker.
+  </description>
+</property>
+
 </configuration>

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -1248,7 +1248,7 @@ class MapTask extends Task {
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                                spilledRecordsCounter);
           if (null == combinerClass || numSpills < minSpillsForCombine) {
-            Merger.writeFile(kvIter, writer, reporter);
+            Merger.writeFile(kvIter, writer, reporter, job);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(kvIter, combineInputCounter);

+ 5 - 5
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -42,8 +42,6 @@ import org.apache.hadoop.util.Progressable;
 
 class Merger {  
   private static final Log LOG = LogFactory.getLog(Merger.class);
-  
-  private static final long PROGRESS_BAR = 10000;
 
   // Local directories
   private static LocalDirAllocator lDirAlloc = 
@@ -114,13 +112,15 @@ class Merger {
 
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
-                 Progressable progressable) 
+                 Progressable progressable, Configuration conf) 
   throws IOException {
+    long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
+        10000);
     long recordCtr = 0;
     while(records.next()) {
       writer.append(records.getKey(), records.getValue());
       
-      if ((++recordCtr % PROGRESS_BAR) == 0) {
+      if (((recordCtr++) % progressBar) == 0) {
         progressable.progress();
       }
     }
@@ -446,7 +446,7 @@ class Merger {
           Writer<K, V> writer = 
             new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
                              writesCounter);
-          writeFile(this, writer, reporter);
+          writeFile(this, writer, reporter, conf);
           writer.close();
           
           //we finished one single level merge; now clean up the priority 

+ 3 - 3
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -2084,7 +2084,7 @@ class ReduceTask extends Task {
           final Writer writer = new Writer(job, fs, outputPath,
               keyClass, valueClass, codec, null);
           try {
-            Merger.writeFile(rIter, writer, reporter);
+            Merger.writeFile(rIter, writer, reporter, job);
             addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
           } catch (Exception e) {
             if (null != outputPath) {
@@ -2396,7 +2396,7 @@ class ReduceTask extends Task {
                                   conf.getOutputKeyComparator(), reporter,
                                   spilledRecordsCounter, null);
               
-              Merger.writeFile(iter, writer, reporter);
+              Merger.writeFile(iter, writer, reporter, conf);
               writer.close();
             } catch (Exception e) {
               localFileSys.delete(outputPath, true);
@@ -2495,7 +2495,7 @@ class ReduceTask extends Task {
                                spilledRecordsCounter, null);
           
           if (null == combinerClass) {
-            Merger.writeFile(rIter, writer, reporter);
+            Merger.writeFile(rIter, writer, reporter, conf);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(rIter, reduceCombineInputCounter);