Browse Source

HADOOP-3131. Fix reduce progress reporting for compressed intermediate data. Contributed by Matei Zaharia.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@681243 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 years ago
parent
commit
23a53802ea

+ 3 - 0
CHANGES.txt

@@ -199,6 +199,9 @@ Trunk (unreleased changes)
     HADOOP-3848. Cache calls to getSystemDir in the TaskTracker instead of
     calling it for each task start. (acmurthy via omalley)
 
+    HADOOP-3131. Fix reduce progress reporting for compressed intermediate
+    data. (Matei Zaharia via acmurthy) 
+
 Release 0.18.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 19 - 7
src/core/org/apache/hadoop/io/SequenceFile.java

@@ -2808,7 +2808,6 @@ public class SequenceFile {
       public boolean next() throws IOException {
         if (size() == 0)
           return false;
-        int valLength;
         if (minSegment != null) {
           //minSegment is non-null for all invocations of next except the first
           //one. For the first invocation, the priority queue is ready for use
@@ -2820,17 +2819,16 @@ public class SequenceFile {
           }
         }
         minSegment = (SegmentDescriptor)top();
+        long startPos = minSegment.in.getPosition(); // Current position in stream
         //save the raw key reference
         rawKey = minSegment.getKey();
         //load the raw value. Re-use the existing rawValue buffer
         if (rawValue == null) {
           rawValue = minSegment.in.createValueBytes();
         }
-        valLength = minSegment.nextRawValue(rawValue);
-        if (progPerByte > 0) {
-          totalBytesProcessed += rawKey.getLength() + valLength;
-          mergeProgress.set(totalBytesProcessed * progPerByte);
-        }
+        minSegment.nextRawValue(rawValue);
+        long endPos = minSegment.in.getPosition(); // End position after reading value
+        updateProgress(endPos - startPos);
         return true;
       }
       
@@ -2839,13 +2837,25 @@ public class SequenceFile {
       }
 
       private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
-        if (ms.nextRawKey()) {
+        long startPos = ms.in.getPosition(); // Current position in stream
+        boolean hasNext = ms.nextRawKey();
+        long endPos = ms.in.getPosition(); // End position after reading key
+        updateProgress(endPos - startPos);
+        if (hasNext) {
           adjustTop();
         } else {
           pop();
           ms.cleanup();
         }
       }
+
+      private void updateProgress(long bytesProcessed) {
+        totalBytesProcessed += bytesProcessed;
+        if (progPerByte > 0) {
+          mergeProgress.set(totalBytesProcessed * progPerByte);
+        }
+      }
+      
       /** This is the single level merge that is called multiple times 
        * depending on the factor size and the number of segments
        * @return RawKeyValueIterator
@@ -2874,6 +2884,8 @@ public class SequenceFile {
               if (mStream[i].nextRawKey()) {
                 segmentsToMerge.add(mStream[i]);
                 segmentsConsidered++;
+                // Count the fact that we read some bytes in calling nextRawKey()
+                updateProgress(mStream[i].in.getPosition());
               }
               else {
                 mStream[i].cleanup();

+ 15 - 2
src/mapred/org/apache/hadoop/mapred/IFile.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -214,7 +215,8 @@ class IFile {
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
     private static final int MAX_VINT_SIZE = 9;
 
-    InputStream in;
+    FSDataInputStream rawIn;   // Raw InputStream from file
+    InputStream in;            // Possibly decompressed stream that we read
     Decompressor decompressor;
     long bytesRead = 0;
     long fileLength = 0;
@@ -233,8 +235,9 @@ class IFile {
     
     protected Reader() {}
     
-    public Reader(Configuration conf, InputStream in, long length, 
+    public Reader(Configuration conf, FSDataInputStream in, long length, 
                   CompressionCodec codec) throws IOException {
+      this.rawIn = in;
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         this.in = codec.createInputStream(in, decompressor);
@@ -248,6 +251,8 @@ class IFile {
     
     public long getLength() { return fileLength; }
     
+    public long getPosition() throws IOException { return rawIn.getPos(); }
+    
     /**
      * Read upto len bytes into buf starting at offset off.
      * 
@@ -399,6 +404,14 @@ class IFile {
       dataIn.reset(buffer, start, length);
     }
     
+    @Override
+    public long getPosition() throws IOException {
+      // InMemoryReader does not initialize streams like Reader, so in.getPos()
+      // would not work. Instead, return the number of uncompressed bytes read,
+      // which will be correct since in-memory data is not compressed.
+      return bytesRead;
+    }
+    
     private void dumpOnError() {
       File dumpFile = new File("../output/" + taskAttemptId + ".dump");
       System.err.println("Dumping corrupt map-output of " + taskAttemptId + 

+ 21 - 10
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -141,6 +141,10 @@ class Merger {
         fs.delete(file, false);
       }
     }
+
+    public long getPosition() throws IOException {
+      return reader.getPosition();
+    }
   }
   
   private static class MergeQueue<K extends Object, V extends Object> 
@@ -221,7 +225,12 @@ class Merger {
     }
 
     private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
-      if (reader.next()) {
+      long startPos = reader.getPosition();
+      boolean hasNext = reader.next();
+      long endPos = reader.getPosition();
+      totalBytesProcessed += endPos - startPos;
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+      if (hasNext) {
         adjustTop();
       } else {
         pop();
@@ -248,10 +257,6 @@ class Merger {
       key = minSegment.getKey();
       value = minSegment.getValue();
 
-      totalBytesProcessed += (key.getLength()-key.getPosition()) + 
-                             (value.getLength()-value.getPosition());
-      mergeProgress.set(totalBytesProcessed * progPerByte);
-
       return true;
     }
 
@@ -293,8 +298,12 @@ class Merger {
             // Initialize the segment at the last possible moment;
             // this helps in ensuring we don't use buffers until we need them
             segment.init();
-            
-            if (segment.next()) {
+            long startPos = segment.getPosition();
+            boolean hasNext = segment.next();
+            long endPos = segment.getPosition();
+            totalBytesProcessed += endPos - startPos;
+            mergeProgress.set(totalBytesProcessed * progPerByte);
+            if (hasNext) {
               segmentsToMerge.add(segment);
               segmentsConsidered++;
             }
@@ -330,9 +339,11 @@ class Merger {
           }
           if (totalBytes != 0) //being paranoid
             progPerByte = 1.0f / (float)totalBytes;
-
-          // Reset bytes-processed to track the progress of the final merge
-          totalBytesProcessed = 0;
+          
+          if (totalBytes != 0)
+            mergeProgress.set(totalBytesProcessed * progPerByte);
+          else
+            mergeProgress.set(1.0f); // Last pass and no segments left - we're done
           
           LOG.info("Down to the last merge-pass, with " + numSegments + 
                    " segments left of total size: " + totalBytes + " bytes");

+ 98 - 0
src/test/org/apache/hadoop/io/TestSequenceFileMergeProgress.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.*;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestSequenceFileMergeProgress extends TestCase {
+  private static final Log LOG = FileInputFormat.LOG;
+  private static final int RECORDS = 10000;
+  
+  public void testMergeProgressWithNoCompression() throws IOException {
+    runTest(SequenceFile.CompressionType.NONE);
+  }
+
+  public void testMergeProgressWithRecordCompression() throws IOException {
+    runTest(SequenceFile.CompressionType.RECORD);
+  }
+
+  public void testMergeProgressWithBlockCompression() throws IOException {
+    runTest(SequenceFile.CompressionType.BLOCK);
+  }
+
+  public void runTest(CompressionType compressionType) throws IOException {
+    JobConf job = new JobConf();
+    FileSystem fs = FileSystem.getLocal(job);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "test.seq");
+    Path tempDir = new Path(dir, "tmp");
+
+    fs.delete(dir, true);
+    FileInputFormat.setInputPaths(job, dir);
+    fs.mkdirs(tempDir);
+
+    LongWritable tkey = new LongWritable();
+    Text tval = new Text();
+
+    SequenceFile.Writer writer =
+      SequenceFile.createWriter(fs, job, file, LongWritable.class, Text.class,
+        compressionType, new DefaultCodec());
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        tkey.set(1234);
+        tval.set("valuevaluevaluevaluevaluevaluevaluevaluevaluevaluevalue");
+        writer.append(tkey, tval);
+      }
+    } finally {
+      writer.close();
+    }
+    
+    long fileLength = fs.getFileStatus(file).getLen();
+    LOG.info("With compression = " + compressionType + ": "
+        + "compressed length = " + fileLength);
+    
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, 
+        job.getOutputKeyComparator(), job.getMapOutputKeyClass(),
+        job.getMapOutputValueClass(), job);
+    Path[] paths = new Path[] {file};
+    RawKeyValueIterator rIter = sorter.merge(paths, tempDir, false);
+    int count = 0;
+    while (rIter.next()) {
+      count++;
+    }
+    assertEquals(RECORDS, count);
+    assertEquals(1.0f, rIter.getProgress().get());
+  }
+
+}

+ 19 - 6
src/test/org/apache/hadoop/mapred/TestReduceTask.java

@@ -24,10 +24,10 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -73,11 +73,12 @@ public class TestReduceTask extends TestCase {
     };
   
   public void runValueIterator(Path tmpDir, Pair[] vals, 
-                               Configuration conf) throws IOException {
+                               Configuration conf, 
+                               CompressionCodec codec) throws IOException {
     FileSystem fs = tmpDir.getFileSystem(conf);
     Path path = new Path(tmpDir, "data.in");
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class, null);
+      new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class, codec);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
     }
@@ -85,7 +86,7 @@ public class TestReduceTask extends TestCase {
     
     @SuppressWarnings("unchecked")
     RawKeyValueIterator rawItr = 
-      Merger.merge(conf, fs, Text.class, Text.class, null, new Path[]{path}, 
+      Merger.merge(conf, fs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
                    new Text.Comparator(), new NullProgress());
     @SuppressWarnings("unchecked") // WritableComparators are not generic
@@ -114,13 +115,25 @@ public class TestReduceTask extends TestCase {
       valItr.nextKey();
     }
     assertEquals(vals.length, i);
+    // make sure we have progress equal to 1.0
+    assertEquals(1.0f, rawItr.getProgress().get());
   }
 
   public void testValueIterator() throws Exception {
     Path tmpDir = new Path("build/test/test.reduce.task");
     Configuration conf = new Configuration();
     for (Pair[] testCase: testCases) {
-      runValueIterator(tmpDir, testCase, conf);
+      runValueIterator(tmpDir, testCase, conf, null);
+    }
+  }
+  
+  public void testValueIteratorWithCompression() throws Exception {
+    Path tmpDir = new Path("build/test/test.reduce.task.compression");
+    Configuration conf = new Configuration();
+    DefaultCodec codec = new DefaultCodec();
+    codec.setConf(conf);
+    for (Pair[] testCase: testCases) {
+      runValueIterator(tmpDir, testCase, conf, codec);
     }
   }
 }