Browse Source

MAPREDUCE-5958. Wrong reduce task progress if map output is compressed. Contributed by Emilio Coppa and Jason Lowe.
(cherry picked from commit 8f701ae07a0b1dc70b8e1eb8d4a5c35c0a1e76da)

Kihwal Lee 10 năm trước cách đây
mục cha
commit
a19123df9a

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -243,6 +243,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect
     with no authority in job jar path. (Gera Shegalov via jlowe)
 
+    MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
+    (Emilio Coppa and jlowe via kihwal)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java

@@ -515,9 +515,9 @@ public class Merger {
     }
 
     private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
-      long startPos = reader.getPosition();
+      long startPos = reader.getReader().bytesRead;
       boolean hasNext = reader.nextRawKey();
-      long endPos = reader.getPosition();
+      long endPos = reader.getReader().bytesRead;
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
       if (hasNext) {
@@ -543,7 +543,7 @@ public class Merger {
         }
       }
       minSegment = top();
-      long startPos = minSegment.getPosition();
+      long startPos = minSegment.getReader().bytesRead;
       key = minSegment.getKey();
       if (!minSegment.inMemory()) {
         //When we load the value from an inmemory segment, we reset
@@ -560,7 +560,7 @@ public class Merger {
       } else {
         minSegment.getValue(value);
       }
-      long endPos = minSegment.getPosition();
+      long endPos = minSegment.getReader().bytesRead;
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
       return true;
@@ -638,9 +638,9 @@ public class Merger {
             // Initialize the segment at the last possible moment;
             // this helps in ensuring we don't use buffers until we need them
             segment.init(readsCounter);
-            long startPos = segment.getPosition();
+            long startPos = segment.getReader().bytesRead;
             boolean hasNext = segment.nextRawKey();
-            long endPos = segment.getPosition();
+            long endPos = segment.getReader().bytesRead;
             
             if (hasNext) {
               startBytes += endPos - startPos;

+ 55 - 25
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java

@@ -18,13 +18,12 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doAnswer;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -32,9 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -43,14 +41,15 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.Merger;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -58,21 +57,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.CryptoUtils;
-import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
 import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
-import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.Lists;
-
 public class TestMerger {
 
   private Configuration conf;
@@ -254,7 +249,7 @@ public class TestMerger {
     testMergeShouldReturnProperProgress(getUncompressedSegments());
   }
 
-  @SuppressWarnings( { "deprecation", "unchecked" })
+  @SuppressWarnings( { "unchecked" })
   public void testMergeShouldReturnProperProgress(
       List<Segment<Text, Text>> segments) throws IOException {
     Path tmpDir = new Path("localpath");
@@ -267,7 +262,38 @@ public class TestMerger {
     RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
         valueClass, segments, 2, tmpDir, comparator, getReporter(),
         readsCounter, writesCounter, mergePhase);
-    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), 0.0f);
+    final float epsilon = 0.00001f;
+
+    // Reading 6 keys total, 3 each in 2 segments, so each key read moves the
+    // progress forward 1/6th of the way. Initially the first keys from each
+    // segment have been read as part of the merge setup, so progress = 2/6.
+    Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // The first next() returns one of the keys already read during merge setup
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // Subsequent next() calls should read one key and move progress
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // At this point we've exhausted all of the keys in one segment
+    // so getting the next key will return the already cached key from the
+    // other segment
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // Subsequent next() calls should read one key and move progress
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // Now there should be no more input
+    Assert.assertFalse(mergeQueue.next());
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
   }
 
   private Progressable getReporter() {
@@ -281,7 +307,7 @@ public class TestMerger {
 
   private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
     List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
-    for (int i = 1; i < 1; i++) {
+    for (int i = 0; i < 2; i++) {
       segments.add(getUncompressedSegment(i));
     }
     return segments;
@@ -289,44 +315,51 @@ public class TestMerger {
 
   private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
     List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
-    for (int i = 1; i < 1; i++) {
+    for (int i = 0; i < 2; i++) {
       segments.add(getCompressedSegment(i));
     }
     return segments;
   }
 
   private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
-    return new Segment<Text, Text>(getReader(i), false);
+    return new Segment<Text, Text>(getReader(i, false), false);
   }
 
   private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
-    return new Segment<Text, Text>(getReader(i), false, 3000l);
+    return new Segment<Text, Text>(getReader(i, true), false, 3000l);
   }
 
   @SuppressWarnings("unchecked")
-  private Reader<Text, Text> getReader(int i) throws IOException {
+  private Reader<Text, Text> getReader(int i, boolean isCompressedInput)
+      throws IOException {
     Reader<Text, Text> readerMock = mock(Reader.class);
+    when(readerMock.getLength()).thenReturn(30l);
     when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
         20l);
     when(
         readerMock.nextRawKey(any(DataInputBuffer.class)))
-        .thenAnswer(getKeyAnswer("Segment" + i));
+        .thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput));
     doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
         any(DataInputBuffer.class));
 
     return readerMock;
   }
 
-  private Answer<?> getKeyAnswer(final String segmentName) {
+  private Answer<?> getKeyAnswer(final String segmentName,
+      final boolean isCompressedInput) {
     return new Answer<Object>() {
       int i = 0;
 
+      @SuppressWarnings("unchecked")
       public Boolean answer(InvocationOnMock invocation) {
-        Object[] args = invocation.getArguments();
-        DataInputBuffer key = (DataInputBuffer) args[0];
-        if (i++ == 2) {
+        if (i++ == 3) {
           return false;
         }
+        Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
+        int multiplier = isCompressedInput ? 100 : 1;
+        mock.bytesRead += 10 * multiplier;
+        Object[] args = invocation.getArguments();
+        DataInputBuffer key = (DataInputBuffer) args[0];
         key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
         return true;
       }
@@ -340,9 +373,6 @@ public class TestMerger {
       public Void answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         DataInputBuffer key = (DataInputBuffer) args[0];
-        if (i++ == 2) {
-          return null;
-        }
         key.reset(("Segment Value " + segmentName + i).getBytes(), 20);
         return null;
       }