Browse Source

HADOOP-5539. Fixes a problem to do with not preserving intermediate output compression for merged data. Contributed by Jothi Padmanabhan and Billy Pearson.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@779571 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
555ecfef77

+ 4 - 0
CHANGES.txt

@@ -796,6 +796,10 @@ Release 0.20.1 - Unreleased
     momentary spurts in memory usage due to java's fork() model.
     (yhemanth)
 
+    HADOOP-5539. Fixes a problem to do with not preserving intermediate
+    output compression for merged data.
+    (Jothi Padmanabhan and Billy Pearson via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -1434,7 +1434,7 @@ class MapTask extends Task {
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass,
+                         keyClass, valClass, codec,
                          segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,

+ 47 - 0
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -97,6 +97,25 @@ class Merger {
                                                mergePhase);
   }
 
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
+                            CompressionCodec codec,
+                            List<Segment<K, V>> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter,
+                            boolean sortSegments,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                           sortSegments, codec).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
   static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class<K> keyClass, Class<V> valueClass,
@@ -116,6 +135,27 @@ class Merger {
                                                mergePhase);
   }
 
+
+  static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
+                          CompressionCodec codec,
+                          List<Segment<K, V>> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator<K> comparator, Progressable reporter,
+                          boolean sortSegments,
+                          Counters.Counter readsCounter,
+                          Counters.Counter writesCounter,
+                          Progress mergePhase)
+    throws IOException {
+  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter,
+                                             mergePhase);
+}
+
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
@@ -326,6 +366,13 @@ class Merger {
       }
     }
 
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
     public void close() throws IOException {
       Segment<K, V> segment;
       while((segment = pop()) != null) {

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -2323,7 +2323,7 @@ class ReduceTask extends Task {
         memDiskSegments.clear();
         Progress mergePhase = (sortPhaseFinished) ? null : sortPhase; 
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, diskSegments,
+            job, fs, keyClass, valueClass, codec, diskSegments,
             ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
             tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
             mergePhase);