1
0
Просмотр исходного кода

HADOOP-5539. Fixes a problem to do with not preserving intermediate output compression for merged data. Contributed by Jothi Padmanabhan and Billy Pearson.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@779572 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 лет назад
Родитель
Сommit
7c5b96d313

+ 4 - 0
CHANGES.txt

@@ -93,6 +93,10 @@ Release 0.20.1 - Unreleased
     momentary spurts in memory usage due to java's fork() model.
     (yhemanth)
 
+    HADOOP-5539. Fixes a problem to do with not preserving intermediate
+    output compression for merged data.
+    (Jothi Padmanabhan and Billy Pearson via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -1415,7 +1415,7 @@ class MapTask extends Task {
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass,
+                         keyClass, valClass, codec,
                          segmentList, job.getInt("io.sort.factor", 100),
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter,

+ 43 - 0
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -64,6 +64,23 @@ class Merger {
                                            readsCounter, writesCounter);
   }
   
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                                   Class<K> keyClass, Class<V> valueClass,
+                                   CompressionCodec codec,
+                                   List<Segment<K, V>> segments, 
+                                   int mergeFactor, Path tmpDir,
+                                   RawComparator<K> comparator, Progressable reporter,
+                                   Counters.Counter readsCounter,
+                                   Counters.Counter writesCounter)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+        false, codec).merge(keyClass, valueClass,
+            mergeFactor, tmpDir,
+            readsCounter, writesCounter);
+
+  }
+
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
                             Class<K> keyClass, Class<V> valueClass, 
@@ -110,6 +127,25 @@ class Merger {
                                                readsCounter, writesCounter);
   }
 
+
+  static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
+                          CompressionCodec codec,
+                          List<Segment<K, V>> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator<K> comparator, Progressable reporter,
+                          boolean sortSegments,
+                          Counters.Counter readsCounter,
+                          Counters.Counter writesCounter)
+    throws IOException {
+  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter);
+}
+
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
@@ -267,6 +303,13 @@ class Merger {
       }
     }
 
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
     public void close() throws IOException {
       Segment<K, V> segment;
       while((segment = pop()) != null) {

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -2234,7 +2234,7 @@ class ReduceTask extends Task {
         diskSegments.addAll(0, memDiskSegments);
         memDiskSegments.clear();
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, diskSegments,
+            job, fs, keyClass, valueClass, codec, diskSegments,
             ioSortFactor, numInMemSegments, tmpDir, comparator,
             reporter, false, spilledRecordsCounter, null);
         diskSegments.clear();