Browse Source

Merge -r 1450722:1450723 from trunk to branch-2. Fixes: MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1450724 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 12 năm trước cách đây
mục cha
commit
7b943682fb

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -34,6 +34,9 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-4951. Container preemption interpreted as task failure.
     (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
+    (Sandy Ryza via tomwhite)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

+ 6 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

@@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
           combineCollector.setWriter(writer);
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
+        writer.close();
         compressAwarePath = new CompressAwarePath(outputPath,
             writer.getRawLength());
-        writer.close();
 
         LOG.info(reduceId +  
             " Merge of the " + noInMemorySegments +
@@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                             mergedMapOutputsCounter, null);
 
         Merger.writeFile(iter, writer, reporter, jobConf);
+        writer.close();
         compressAwarePath = new CompressAwarePath(outputPath,
             writer.getRawLength());
-        writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
@@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
-        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+        Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
-          // add to list of final disk outputs.
+          writer.close();
           onDiskMapOutputs.add(new CompressAwarePath(outputPath,
               writer.getRawLength()));
+          writer = null;
+          // add to list of final disk outputs.
         } catch (IOException e) {
           if (null != outputPath) {
             try {