Browse Source

HADOOP-613. Perform final merge while reducing. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@474922 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
8cb0d8126b

+ 4 - 0
CHANGES.txt

@@ -56,6 +56,10 @@ Trunk (unreleased changes)
 17. HADOOP-705.  Fix a bug in the JobTracker when failed jobs were
     not completely cleaned up.  (Mahadev Konar via cutting)
 
+18. HADOOP-613.  Perform final merge while reducing.  This removes one
+    sort pass over the data and should consequently significantly
+    decrease overall processing time.  (Devaraj Das via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 52 - 1
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Support for flat files of binary key/value pairs. */
@@ -1538,6 +1539,34 @@ public class SequenceFile {
       }
     }
 
+    /** 
+     * Perform a file sort from a set of input files and return an iterator.
+     * @param inFiles the files to be sorted
+     * @param tempDir the directory where temp files are created during sort
+     * @param deleteInput should the input files be deleted as they are read?
+     * @return iterator the RawKeyValueIterator
+     */
+    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
+                                    boolean deleteInput) throws IOException {
+      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
+      if (fs.exists(outFile)) {
+        throw new IOException("already exists: " + outFile);
+      }
+      this.inFiles = inFiles;
+      //outFile will basically be used as prefix for temp files in the cases
+      //where sort outputs multiple sorted segments. For the single segment
+      //case, the outputFile itself will contain the sorted data for that
+      //segment
+      this.outFile = outFile;
+
+      int segments = sortPass(deleteInput);
+      if (segments > 1)
+        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"));
+      else if (segments == 1)
+        return merge(new Path[]{outFile}, true);
+      else return null;
+    }
+
     /**
      * The backwards compatible interface to sort.
      * @param inFile the input file to sort
@@ -1799,6 +1828,10 @@ public class SequenceFile {
        * @throws IOException
        */
       void close() throws IOException;
+      /** Gets the Progress object; this has a float (0.0 - 1.0) 
+       * indicating the bytes processed by the iterator so far
+       */
+      Progress getProgress();
     }    
     
   /**
@@ -1941,6 +1974,9 @@ public class SequenceFile {
       private boolean blockCompress;
       private DataOutputBuffer rawKey = new DataOutputBuffer();
       private ValueBytes rawValue;
+      private long totalBytesProcessed;
+      private float progPerByte;
+      private Progress mergeProgress = new Progress();
       
       //a TreeMap used to store the segments sorted by size (segment offset and
       //segment path name is used to break ties between segments of same sizes)
@@ -1992,7 +2028,7 @@ public class SequenceFile {
         //load the raw value. Re-use the existing rawValue buffer
         if(rawValue == null)
           rawValue = ms.in.createValueBytes();
-        ms.nextRawValue(rawValue);
+        int valLength = ms.nextRawValue(rawValue);
 
         if (ms.nextRawKey()) {
           adjustTop();
@@ -2000,9 +2036,17 @@ public class SequenceFile {
           pop();
           ms.cleanup();
         }
+        if (progPerByte > 0) {
+          totalBytesProcessed += rawKey.getLength() + valLength;
+          mergeProgress.set(totalBytesProcessed * progPerByte);
+        }
         return true;
       }
       
+      public Progress getProgress() {
+        return mergeProgress; 
+      }
+
       /** This is the single level merge that is called multiple times 
        * depending on the factor size and the number of segments
        * @return RawKeyValueIterator
@@ -2029,6 +2073,13 @@ public class SequenceFile {
           //if we have lesser number of segments remaining, then just return the
           //iterator, else do another single level merge
           if (numSegments <= factor) {
+            //calculate the length of the remaining segments. Required for 
+            //calculating the merge progress
+            long totalBytes = 0;
+            for (int i = 0; i < numSegments; i++)
+              totalBytes += mStream[i].segmentLength;
+            if (totalBytes != 0) //being paranoid
+              progPerByte = 1.0f / (float)totalBytes;
             return this;
           } else {
             //we want to spread the creation of temp files on multiple disks if 

+ 34 - 20
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -122,23 +122,28 @@ class ReduceTask extends Task {
 
   /** Iterates values while keys match in sorted input. */
   private class ValuesIterator implements Iterator {
-    private SequenceFile.Reader in;               // input file
+    private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
     private WritableComparable key;               // current key
     private Writable value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
-    private float progPerByte;
     private TaskUmbilicalProtocol umbilical;
     private WritableComparator comparator;
-
-    public ValuesIterator (SequenceFile.Reader in, long length,
-                           WritableComparator comparator,
-                           TaskUmbilicalProtocol umbilical)
+    private Class keyClass;
+    private Class valClass;
+    private DataOutputBuffer valOut = new DataOutputBuffer();
+    private DataInputBuffer valIn = new DataInputBuffer();
+    private DataInputBuffer keyIn = new DataInputBuffer();
+
+    public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
+                           WritableComparator comparator, Class keyClass,
+                           Class valClass, TaskUmbilicalProtocol umbilical)
       throws IOException {
       this.in = in;
-      this.progPerByte = 1.0f / (float)length;
       this.umbilical = umbilical;
       this.comparator = comparator;
+      this.keyClass = keyClass;
+      this.valClass = valClass;
       getNext();
     }
 
@@ -173,18 +178,26 @@ class ReduceTask extends Task {
     public WritableComparable getKey() { return key; }
 
     private void getNext() throws IOException {
-      reducePhase.set(in.getPosition()*progPerByte); // update progress
+      reducePhase.set(in.getProgress().get()); // update progress
       reportProgress(umbilical);
 
       Writable lastKey = key;                     // save previous key
       try {
-        key = (WritableComparable)in.getKeyClass().newInstance();
-        value = (Writable)in.getValueClass().newInstance();
+        key = (WritableComparable)keyClass.newInstance();
+        value = (Writable)valClass.newInstance();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      more = in.next(key, value);
+      more = in.next();
       if (more) {
+        //de-serialize the raw key/value
+        keyIn.reset(in.getKey().getData(), in.getKey().getLength());
+        key.readFields(keyIn);
+        valOut.reset();
+        (in.getValue()).writeUncompressedBytes(valOut);
+        valIn.reset(valOut.getData(), valOut.getLength());
+        value.readFields(valIn);
+
         if (lastKey == null) {
           hasNext = true;
         } else {
@@ -233,10 +246,12 @@ class ReduceTask extends Task {
       };
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
 
-    Path sortedFile = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.2");
+    Path tempDir = job.getLocalPath(getTaskId()); 
 
     WritableComparator comparator = job.getOutputKeyComparator();
-    
+   
+    SequenceFile.Sorter.RawKeyValueIterator rIter;
+ 
     try {
       setPhase(TaskStatus.Phase.SORT) ; 
       sortProgress.start();
@@ -244,7 +259,8 @@ class ReduceTask extends Task {
       // sort the input file
       SequenceFile.Sorter sorter =
         new SequenceFile.Sorter(lfs, comparator, valueClass, job);
-      sorter.sort(mapFiles, sortedFile, !conf.getKeepFailedTaskFiles()); // sort
+      rIter = sorter.sortAndIterate(mapFiles, tempDir, 
+                                    !conf.getKeepFailedTaskFiles()); // sort
 
     } finally {
       sortComplete = true;
@@ -269,11 +285,11 @@ class ReduceTask extends Task {
       };
     
     // apply reduce function
-    SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
-    long length = lfs.getLength(sortedFile);
     try {
-      ValuesIterator values = new ValuesIterator(in, length, comparator,
-                                                 umbilical);
+      Class keyClass = job.getMapOutputKeyClass();
+      Class valClass = job.getMapOutputValueClass();
+      ValuesIterator values = new ValuesIterator(rIter, comparator, keyClass, 
+                                                 valClass, umbilical);
       while (values.more()) {
         myMetrics.reduceInput();
         reducer.reduce(values.getKey(), values, collector, reporter);
@@ -282,8 +298,6 @@ class ReduceTask extends Task {
 
     } finally {
       reducer.close();
-      in.close();
-      lfs.delete(sortedFile);                     // remove sorted
       out.close(reporter);
     }
     done(umbilical);