浏览代码

HADOOP-2095. Improve the Map-Reduce shuffle/merge by cutting down buffer-copies; changed intermediate sort/merge to use the new IFile format rather than SequenceFiles and compression of map-outputs is now implemented by compressing the entire file rather than SequenceFile compression. Shuffle also has been changed to use a simple byte-buffer manager rather than the InMemoryFileSystem.
Configuration changes to hadoop-default.xml:
deprecated mapred.map.output.compression.type


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663440 13f79535-47bb-0310-9956-ffa450edef68

Arun Murthy 17 年之前
父节点
当前提交
7d941288e6
共有 32 个文件被更改,包括 1951 次插入889 次删除
  1. 10 0
      CHANGES.txt
  2. 0 8
      conf/hadoop-default.xml
  3. 5 2
      src/java/org/apache/hadoop/io/DataInputBuffer.java
  4. 12 0
      src/java/org/apache/hadoop/io/DataOutputBuffer.java
  5. 11 80
      src/java/org/apache/hadoop/io/SequenceFile.java
  6. 154 0
      src/java/org/apache/hadoop/io/compress/CodecPool.java
  7. 2 2
      src/java/org/apache/hadoop/io/compress/CompressionCodec.java
  8. 2 2
      src/java/org/apache/hadoop/io/compress/DefaultCodec.java
  9. 2 2
      src/java/org/apache/hadoop/io/compress/GzipCodec.java
  10. 2 2
      src/java/org/apache/hadoop/io/compress/LzoCodec.java
  11. 4 2
      src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
  12. 402 0
      src/java/org/apache/hadoop/mapred/IFile.java
  13. 20 12
      src/java/org/apache/hadoop/mapred/JobClient.java
  14. 11 1
      src/java/org/apache/hadoop/mapred/JobConf.java
  15. 3 4
      src/java/org/apache/hadoop/mapred/JobInProgress.java
  16. 1 1
      src/java/org/apache/hadoop/mapred/LocalJobRunner.java
  17. 6 0
      src/java/org/apache/hadoop/mapred/MRConstants.java
  18. 4 2
      src/java/org/apache/hadoop/mapred/MapOutputFile.java
  19. 0 289
      src/java/org/apache/hadoop/mapred/MapOutputLocation.java
  20. 142 160
      src/java/org/apache/hadoop/mapred/MapTask.java
  21. 416 0
      src/java/org/apache/hadoop/mapred/Merger.java
  22. 48 0
      src/java/org/apache/hadoop/mapred/RamManager.java
  23. 66 0
      src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java
  24. 459 287
      src/java/org/apache/hadoop/mapred/ReduceTask.java
  25. 125 7
      src/java/org/apache/hadoop/mapred/Task.java
  26. 2 2
      src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
  27. 12 2
      src/java/org/apache/hadoop/mapred/TaskTracker.java
  28. 11 0
      src/java/org/apache/hadoop/util/ReflectionUtils.java
  29. 2 2
      src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
  30. 1 1
      src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
  31. 8 12
      src/test/org/apache/hadoop/mapred/TestMapRed.java
  32. 8 7
      src/test/org/apache/hadoop/mapred/TestReduceTask.java

+ 10 - 0
CHANGES.txt

@@ -221,6 +221,16 @@ Trunk (unreleased changes)
     layout versions are consistent in the data node. (Steve Loughran
     via omalley)
 
+    HADOOP-2095. Improve the Map-Reduce shuffle/merge by cutting down
+    buffer-copies; changed intermediate sort/merge to use the new IFile format
+    rather than SequenceFiles and compression of map-outputs is now
+    implemented by compressing the entire file rather than SequenceFile
+    compression. Shuffle also has been changed to use a simple byte-buffer
+    manager rather than the InMemoryFileSystem. 
+    Configuration changes to hadoop-default.xml:
+      deprecated mapred.map.output.compression.type 
+    (acmurthy)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 0 - 8
conf/hadoop-default.xml

@@ -920,14 +920,6 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
-<property>
-  <name>mapred.map.output.compression.type</name>
-  <value>RECORD</value>
-  <description>If the map outputs are to compressed, how should they
-               be compressed? Should be one of NONE, RECORD or BLOCK.
-  </description>
-</property>
-
 <property>
   <name>mapred.map.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.DefaultCodec</value>

+ 5 - 2
src/java/org/apache/hadoop/io/DataInputBuffer.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.io;
 
 import java.io.*;
 
-
 /** A reusable {@link DataInput} implementation that reads from an in-memory
  * buffer.
  *
@@ -40,7 +39,6 @@ import java.io.*;
  *  
  */
 public class DataInputBuffer extends DataInputStream {
-
   private static class Buffer extends ByteArrayInputStream {
     public Buffer() {
       super(new byte[] {});
@@ -53,6 +51,7 @@ public class DataInputBuffer extends DataInputStream {
       this.pos = start;
     }
 
+    public byte[] getData() { return buf; }
     public int getPosition() { return pos; }
     public int getLength() { return count; }
   }
@@ -78,6 +77,10 @@ public class DataInputBuffer extends DataInputStream {
   public void reset(byte[] input, int start, int length) {
     buffer.reset(input, start, length);
   }
+  
+  public byte[] getData() {
+    return buffer.getData();
+  }
 
   /** Returns the current position in the input. */
   public int getPosition() { return buffer.getPosition(); }

+ 12 - 0
src/java/org/apache/hadoop/io/DataOutputBuffer.java

@@ -46,6 +46,14 @@ public class DataOutputBuffer extends DataOutputStream {
     public int getLength() { return count; }
     public void reset() { count = 0; }
 
+    public Buffer() {
+      super();
+    }
+    
+    public Buffer(int size) {
+      super(size);
+    }
+    
     public void write(DataInput in, int len) throws IOException {
       int newcount = count + len;
       if (newcount > buf.length) {
@@ -65,6 +73,10 @@ public class DataOutputBuffer extends DataOutputStream {
     this(new Buffer());
   }
   
+  public DataOutputBuffer(int size) {
+    this(new Buffer(size));
+  }
+  
   private DataOutputBuffer(Buffer buffer) {
     super(buffer);
     this.buffer = buffer;

+ 11 - 80
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -24,6 +24,7 @@ import java.rmi.server.UID;
 import java.security.MessageDigest;
 import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -778,13 +779,6 @@ public class SequenceFile {
   
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer implements java.io.Closeable {
-    /**
-     * A global compressor pool used to save the expensive 
-     * construction/destruction of (possibly native) compression codecs.
-     */
-    private static final CodecPool<Compressor> compressorPool = 
-      new CodecPool<Compressor>();
-    
     Configuration conf;
     FSDataOutputStream out;
     boolean ownOutputStream = true;
@@ -919,10 +913,7 @@ public class SequenceFile {
       this.uncompressedValSerializer.open(buffer);
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
-        compressor = compressorPool.getCodec(this.codec.getCompressorType());
-        if (compressor == null) {
-          compressor = this.codec.createCompressor();
-        }
+        this.compressor = CodecPool.getCompressor(this.codec);
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
@@ -954,7 +945,7 @@ public class SequenceFile {
     
     /** Close the file. */
     public synchronized void close() throws IOException {
-      compressorPool.returnCodec(compressor);
+      CodecPool.returnCompressor(compressor);
       
       keySerializer.close();
       uncompressedValSerializer.close();
@@ -1361,13 +1352,6 @@ public class SequenceFile {
   
   /** Reads key/value pairs from a sequence-format file. */
   public static class Reader implements java.io.Closeable {
-    /**
-     * A global decompressor pool used to save the expensive 
-     * construction/destruction of (possibly native) decompression codecs.
-     */
-    private static final CodecPool<Decompressor> decompressorPool = 
-      new CodecPool<Decompressor>();
-    
     private Path file;
     private FSDataInputStream in;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
@@ -1451,16 +1435,6 @@ public class SequenceFile {
       return fs.open(file, bufferSize);
     }
     
-    private Decompressor getPooledOrNewDecompressor() {
-      Decompressor decompressor = null;
-      decompressor = decompressorPool.getCodec(codec.getDecompressorType());
-      if (decompressor == null) {
-        decompressor = codec.createDecompressor();
-      }
-      return decompressor;
-    }
-    
-
     /**
      * Initialize the {@link Reader}
      * @param tmpReader <code>true</code> if we are constructing a temporary
@@ -1540,7 +1514,7 @@ public class SequenceFile {
       if (!tempReader) {
         valBuffer = new DataInputBuffer();
         if (decompress) {
-          valDecompressor = getPooledOrNewDecompressor();
+          valDecompressor = CodecPool.getDecompressor(codec);
           valInFilter = codec.createInputStream(valBuffer, valDecompressor);
           valIn = new DataInputStream(valInFilter);
         } else {
@@ -1552,16 +1526,16 @@ public class SequenceFile {
           keyBuffer = new DataInputBuffer();
           valLenBuffer = new DataInputBuffer();
 
-          keyLenDecompressor = getPooledOrNewDecompressor();
+          keyLenDecompressor = CodecPool.getDecompressor(codec);
           keyLenInFilter = codec.createInputStream(keyLenBuffer, 
                                                    keyLenDecompressor);
           keyLenIn = new DataInputStream(keyLenInFilter);
 
-          keyDecompressor = getPooledOrNewDecompressor();
+          keyDecompressor = CodecPool.getDecompressor(codec);
           keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
           keyIn = new DataInputStream(keyInFilter);
 
-          valLenDecompressor = getPooledOrNewDecompressor();
+          valLenDecompressor = CodecPool.getDecompressor(codec);
           valLenInFilter = codec.createInputStream(valLenBuffer, 
                                                    valLenDecompressor);
           valLenIn = new DataInputStream(valLenInFilter);
@@ -1572,10 +1546,10 @@ public class SequenceFile {
     /** Close the file. */
     public synchronized void close() throws IOException {
       // Return the decompressors to the pool
-      decompressorPool.returnCodec(keyLenDecompressor);
-      decompressorPool.returnCodec(keyDecompressor);
-      decompressorPool.returnCodec(valLenDecompressor);
-      decompressorPool.returnCodec(valDecompressor);
+      CodecPool.returnDecompressor(keyLenDecompressor);
+      CodecPool.returnDecompressor(keyDecompressor);
+      CodecPool.returnDecompressor(valLenDecompressor);
+      CodecPool.returnDecompressor(valDecompressor);
       
       // Close the input-stream
       in.close();
@@ -2100,49 +2074,6 @@ public class SequenceFile {
 
   }
 
-  private static class CodecPool<T> {
-
-    private Map<Class, List<T>> pool = new HashMap<Class, List<T>>();
-    
-    public T getCodec(Class codecClass) {
-      T codec = null;
-      
-      // Check if an appropriate codec is available
-      synchronized (pool) {
-        if (pool.containsKey(codecClass)) {
-          List<T> codecList = pool.get(codecClass);
-          
-          if (codecList != null) {
-            synchronized (codecList) {
-              if (!codecList.isEmpty()) {
-                codec = codecList.remove(0);
-              }
-            }
-          }
-        }
-      }
-      
-      return codec;
-    }
-
-    public void returnCodec(T codec) {
-      if (codec != null) {
-        Class codecClass = codec.getClass();
-        synchronized (pool) {
-          if (!pool.containsKey(codecClass)) {
-            pool.put(codecClass, new ArrayList<T>());
-          }
-
-          List<T> codecList = pool.get(codecClass);
-          synchronized (codecList) {
-            codecList.add(codec);
-          }
-        }
-      }
-    }
-
-  }
-  
   /** Sorts key/value pairs in a sequence-format file.
    *
    * <p>For best performance, applications should make sure that the {@link

+ 154 - 0
src/java/org/apache/hadoop/io/compress/CodecPool.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse 
+ * (possibly native) compression/decompression codecs.
+ */
+public class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+  
+  /**
+   * A global compressor pool used to save the expensive 
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
+    new HashMap<Class<Compressor>, List<Compressor>>();
+  
+  /**
+   * A global decompressor pool used to save the expensive 
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
+    new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+                             Class<? extends T> codecClass) {
+    T codec = null;
+    
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+        
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(0);
+            }
+          }
+        }
+      }
+    }
+    
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+    if (codec != null) {
+      Class<T> codecClass = ReflectionUtils.getClass(codec);
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
+   * pool or a new one.
+   *
+   * @param codec the <code>CompressionCodec</code> for which to get the 
+   *              <code>Compressor</code>
+   * @return <code>Compressor</code> for the given 
+   *         <code>CompressionCodec</code> from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec) {
+    Compressor compressor = borrow(compressorPool, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.debug("Got brand-new compressor");
+    } else {
+      LOG.debug("Got recycled compressor");
+    }
+    return compressor;
+  }
+  
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *  
+   * @param codec the <code>CompressionCodec</code> for which to get the 
+   *              <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given 
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.debug("Got brand-new decompressor");
+    } else {
+      LOG.debug("Got recycled decompressor");
+    }
+    return decompressor;
+  }
+  
+  /**
+   * Return the {@link Compressor} to the pool.
+   * 
+   * @param compressor the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    compressor.reset();
+    payback(compressorPool, compressor);
+  }
+  
+  /**
+   * Return the {@link Decompressor} to the pool.
+   * 
+   * @param decompressor the <code>Decompressor</code> to be returned to the 
+   *                     pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    decompressor.reset();
+    payback(decompressorPool, decompressor);
+  }
+}

+ 2 - 2
src/java/org/apache/hadoop/io/compress/CompressionCodec.java

@@ -56,7 +56,7 @@ public interface CompressionCodec {
    * 
    * @return the type of compressor needed by this codec.
    */
-  Class getCompressorType();
+  Class<? extends Compressor> getCompressorType();
   
   /**
    * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
@@ -93,7 +93,7 @@ public interface CompressionCodec {
    * 
    * @return the type of decompressor needed by this codec.
    */
-  Class getDecompressorType();
+  Class<? extends Decompressor> getDecompressorType();
   
   /**
    * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.

+ 2 - 2
src/java/org/apache/hadoop/io/compress/DefaultCodec.java

@@ -51,7 +51,7 @@ public class DefaultCodec implements Configurable, CompressionCodec {
                                 conf.getInt("io.file.buffer.size", 4*1024));
   }
 
-  public Class getCompressorType() {
+  public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.getZlibCompressorType(conf);
   }
 
@@ -72,7 +72,7 @@ public class DefaultCodec implements Configurable, CompressionCodec {
                                   conf.getInt("io.file.buffer.size", 4*1024));
   }
 
-  public Class getDecompressorType() {
+  public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.getZlibDecompressorType(conf);
   }
 

+ 2 - 2
src/java/org/apache/hadoop/io/compress/GzipCodec.java

@@ -161,7 +161,7 @@ public class GzipCodec extends DefaultCodec {
                null;
   }
 
-  public Class getCompressorType() {
+  public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.getZlibCompressorType(conf);
   }
 
@@ -191,7 +191,7 @@ public class GzipCodec extends DefaultCodec {
                null;                               
   }
 
-  public Class getDecompressorType() {
+  public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.getZlibDecompressorType(conf);
   }
 

+ 2 - 2
src/java/org/apache/hadoop/io/compress/LzoCodec.java

@@ -124,7 +124,7 @@ public class LzoCodec implements Configurable, CompressionCodec {
                                      compressionOverhead);
   }
 
-  public Class getCompressorType() {
+  public Class<? extends Compressor> getCompressorType() {
     // Ensure native-lzo library is loaded & initialized
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");
@@ -164,7 +164,7 @@ public class LzoCodec implements Configurable, CompressionCodec {
         conf.getInt("io.compression.codec.lzo.buffersize", 64*1024));
   }
 
-  public Class getDecompressorType() {
+  public Class<? extends Decompressor> getDecompressorType() {
     // Ensure native-lzo library is loaded & initialized
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");

+ 4 - 2
src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java

@@ -67,7 +67,8 @@ public class ZlibFactory {
    * @param conf configuration
    * @return the appropriate type of the zlib compressor.
    */
-  public static Class getZlibCompressorType(Configuration conf) {
+  public static Class<? extends Compressor> 
+  getZlibCompressorType(Configuration conf) {
     return (isNativeZlibLoaded(conf)) ? 
             ZlibCompressor.class : BuiltInZlibDeflater.class;
   }
@@ -89,7 +90,8 @@ public class ZlibFactory {
    * @param conf configuration
    * @return the appropriate type of the zlib decompressor.
    */
-  public static Class getZlibDecompressorType(Configuration conf) {
+  public static Class<? extends Decompressor> 
+  getZlibDecompressorType(Configuration conf) {
     return (isNativeZlibLoaded(conf)) ? 
             ZlibDecompressor.class : BuiltInZlibInflater.class;
   }

+ 402 - 0
src/java/org/apache/hadoop/mapred/IFile.java

@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * <code>IFile</code> is the simple <key-len, key, value-len, value> format
+ * for the intermediate map-outputs in Map-Reduce.
+ * 
+ * There is a <code>Writer</code> to write out map-outputs in this format and 
+ * a <code>Reader</code> to read files of this format.
+ */
+class IFile {
+
+  private static int EOF_MARKER = -1;
+  
+  /**
+   * <code>IFile.Writer</code> to write out intermediate map-outputs. 
+   */
+  public static class Writer<K extends Object, V extends Object> {
+    FSDataOutputStream out;
+    boolean ownOutputStream = false;
+    long start = 0;
+    
+    CompressionOutputStream compressedOut;
+    Compressor compressor;
+    boolean compressOutput = false;
+    
+    long decompressedBytesWritten = 0;
+    long compressedBytesWritten = 0;
+    
+    Class<K> keyClass;
+    Class<V> valueClass;
+    Serializer<K> keySerializer;
+    Serializer<V> valueSerializer;
+    
+    DataOutputBuffer buffer = new DataOutputBuffer();
+
+    public Writer(Configuration conf, FileSystem fs, Path file, 
+                  Class<K> keyClass, Class<V> valueClass,
+                  CompressionCodec codec) throws IOException {
+      this(conf, fs.create(file), keyClass, valueClass, codec);
+      ownOutputStream = true;
+    }
+    
+    public Writer(Configuration conf, FSDataOutputStream out, 
+        Class<K> keyClass, Class<V> valueClass,
+        CompressionCodec codec) throws IOException {
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        this.compressor.reset();
+        this.compressedOut = codec.createOutputStream(out, compressor);
+        this.out = new FSDataOutputStream(this.compressedOut,  null);
+        this.compressOutput = true;
+      } else {
+        this.out = out;
+      }
+      this.start = this.out.getPos();
+      
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(buffer);
+      this.valueSerializer = serializationFactory.getSerializer(valueClass);
+      this.valueSerializer.open(buffer);
+    }
+    
+    public void close() throws IOException {
+      // Write EOF_MARKER for key/value length
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+      
+      if (compressOutput) {
+        // Return the compressor
+        compressedOut.finish();
+        compressedOut.resetState();
+        CodecPool.returnCompressor(compressor);
+      }
+      
+      // Close the serializers
+      keySerializer.close();
+      valueSerializer.close();
+
+      // Close the stream
+      if (out != null) {
+        out.flush();
+        compressedBytesWritten = out.getPos() - start;
+        
+        // Close the underlying stream iff we own it...
+        if (ownOutputStream) {
+          out.close();
+        }
+        
+        out = null;
+      }
+    }
+
+    public void append(K key, V value) throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      if (value.getClass() != valueClass)
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
+
+      // Append the 'key'
+      keySerializer.serialize(key);
+      int keyLength = buffer.getLength();
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed: " + key);
+
+      // Append the 'value'
+      valueSerializer.serialize(value);
+      int valueLength = buffer.getLength() - keyLength;
+      
+      // Write the record out
+      WritableUtils.writeVInt(out, keyLength);                  // key length
+      WritableUtils.writeVInt(out, valueLength);                // value length
+      out.write(buffer.getData(), 0, buffer.getLength());       // data
+
+      // Reset
+      buffer.reset();
+      
+      // Update bytes written
+      decompressedBytesWritten += keyLength + valueLength + 
+                                  WritableUtils.getVIntSize(keyLength) + 
+                                  WritableUtils.getVIntSize(valueLength);
+    }
+    
+    public void append(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      int valueLength = value.getLength() - value.getPosition();
+      
+      WritableUtils.writeVInt(out, keyLength);
+      WritableUtils.writeVInt(out, valueLength);
+      out.write(key.getData(), key.getPosition(), keyLength); 
+      out.write(value.getData(), value.getPosition(), valueLength); 
+
+      // Update bytes written
+      decompressedBytesWritten += keyLength + valueLength + 
+                      WritableUtils.getVIntSize(keyLength) + 
+                      WritableUtils.getVIntSize(valueLength);
+}
+    
+    public long getRawLength() {
+      return decompressedBytesWritten;
+    }
+    
+    public long getCompressedLength() {
+      return compressedBytesWritten;
+    }
+  }
+
+  /**
+   * <code>IFile.Reader</code> to read intermediate map-outputs. 
+   */
+  public static class Reader<K extends Object, V extends Object> {
+    private static final int DEFAULT_BUFFER_SIZE = 128*1024;
+    private static final int MAX_VINT_SIZE = 5;
+
+    InputStream in;
+    Decompressor decompressor;
+    long bytesRead = 0;
+    long fileLength = 0;
+    boolean eof = false;
+    
+    byte[] buffer = null;
+    int bufferSize = DEFAULT_BUFFER_SIZE;
+    DataInputBuffer dataIn = new DataInputBuffer();
+
+    public Reader(Configuration conf, FileSystem fs, Path file,
+                  CompressionCodec codec) throws IOException {
+      this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
+    }
+    
+    protected Reader() {}
+    
+    public Reader(Configuration conf, InputStream in, long length, 
+                  CompressionCodec codec) throws IOException {
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        this.in = codec.createInputStream(in, decompressor);
+      } else {
+        this.in = in;
+      }
+      this.fileLength = length;
+      
+      this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    }
+    
+    public long getLength() { return fileLength; }
+    
+    private int readData(byte[] buf, int off, int len) throws IOException {
+      int bytesRead = 0;
+      while (bytesRead < len) {
+        int n = in.read(buf, off+bytesRead, len-bytesRead);
+        if (n < 0) {
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return len;
+    }
+    
+    void readNextBlock(int minSize) throws IOException {
+      if (buffer == null) {
+        buffer = new byte[bufferSize];
+        dataIn.reset(buffer, 0, 0);
+      }
+      buffer = 
+        rejigData(buffer, 
+                  (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
+      bufferSize = buffer.length;
+    }
+    
+    private byte[] rejigData(byte[] source, byte[] destination) 
+    throws IOException{
+      // Copy remaining data into the destination array
+      int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
+      if (bytesRemaining > 0) {
+        System.arraycopy(source, dataIn.getPosition(), 
+            destination, 0, bytesRemaining);
+      }
+      
+      // Read as much data as will fit from the underlying stream 
+      int n = readData(destination, bytesRemaining, 
+                       (destination.length - bytesRemaining));
+      dataIn.reset(destination, 0, (bytesRemaining + n));
+      
+      return destination;
+    }
+    
+    public boolean next(DataInputBuffer key, DataInputBuffer value) 
+    throws IOException {
+      // Sanity check
+      if (eof) {
+        throw new EOFException("Completed reading " + bytesRead);
+      }
+      
+      // Check if we have enough data to read lengths
+      if ((dataIn.getLength() - dataIn.getPosition()) < 2*MAX_VINT_SIZE) {
+        readNextBlock(2*MAX_VINT_SIZE);
+      }
+      
+      // Read key and value lengths
+      int oldPos = dataIn.getPosition();
+      int keyLength = WritableUtils.readVInt(dataIn);
+      int valueLength = WritableUtils.readVInt(dataIn);
+      int pos = dataIn.getPosition();
+      bytesRead += pos - oldPos;
+      
+      // Check for EOF
+      if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
+        eof = true;
+        return false;
+      }
+      
+      final int recordLength = keyLength + valueLength;
+      
+      // Check if we have the raw key/value in the buffer
+      if ((dataIn.getLength()-pos) < recordLength) {
+        readNextBlock(recordLength);
+        
+        // Sanity check
+        if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
+          throw new EOFException("Could read the next record");
+        }
+      }
+
+      // Setup the key and value
+      pos = dataIn.getPosition();
+      byte[] data = dataIn.getData();
+      key.reset(data, pos, keyLength);
+      value.reset(data, (pos + keyLength), valueLength);
+      
+      // Position for the next record
+      dataIn.skip(recordLength);
+      bytesRead += recordLength;
+
+      return true;
+    }
+
+    public void close() throws IOException {
+      // Return the decompressor
+      if (decompressor != null) {
+        decompressor.reset();
+        CodecPool.returnDecompressor(decompressor);
+      }
+      
+      // Close the underlying stream
+      if (in != null) {
+        in.close();
+      }
+      
+      // Release the buffer
+      dataIn = null;
+      buffer = null;
+    }
+  }    
+  
+  /**
+   * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
+   */
+  public static class InMemoryReader<K, V> extends Reader<K, V> {
+    RamManager ramManager;
+    
+    public InMemoryReader(RamManager ramManager, 
+                          byte[] data, int start, int length) {
+      this.ramManager = ramManager;
+      
+      buffer = data;
+      fileLength = bufferSize = (length - start);
+      dataIn.reset(buffer, start, length);
+    }
+    
+    public boolean next(DataInputBuffer key, DataInputBuffer value) 
+    throws IOException {
+      // Sanity check
+      if (eof) {
+        throw new EOFException("Completed reading " + bytesRead);
+      }
+      
+      // Read key and value lengths
+      int oldPos = dataIn.getPosition();
+      int keyLength = WritableUtils.readVInt(dataIn);
+      int valueLength = WritableUtils.readVInt(dataIn);
+      int pos = dataIn.getPosition();
+      bytesRead += pos - oldPos;
+      
+      // Check for EOF
+      if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
+        eof = true;
+        return false;
+      }
+      
+      final int recordLength = keyLength + valueLength;
+      
+      // Setup the key and value
+      pos = dataIn.getPosition();
+      byte[] data = dataIn.getData();
+      key.reset(data, pos, keyLength);
+      value.reset(data, (pos + keyLength), valueLength);
+      
+      // Position for the next record
+      long skipped = dataIn.skip(recordLength);
+      if (skipped != recordLength) {
+        throw new IOException("Failed to skip past record of length: " + 
+                              recordLength);
+      }
+      
+      // Record the byte
+      bytesRead += recordLength;
+
+      return true;
+    }
+      
+    public void close() {
+      // Release
+      dataIn = null;
+      buffer = null;
+      
+      // Inform the RamManager
+      ramManager.unreserve(bufferSize);
+    }
+  }
+}

+ 20 - 12
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -919,11 +919,11 @@ public class JobClient extends Configured implements MRConstants, Tool  {
 
   private static void downloadProfile(TaskCompletionEvent e
                                       ) throws IOException  {
-    URLConnection connection = new URL(e.getTaskTrackerHttp() + 
-                                       "&plaintext=true&filter=profile"
-                                       ).openConnection();
+    URLConnection connection = 
+      new URL(getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + 
+              "&filter=profile").openConnection();
     InputStream in = connection.getInputStream();
-    OutputStream out = new FileOutputStream(e.getTaskID() + ".profile");
+    OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
     IOUtils.copyBytes(in, out, 64 * 1024, true);
   }
 
@@ -1005,7 +1005,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
               if (event.getTaskStatus() == 
                 TaskCompletionEvent.Status.SUCCEEDED){
                 LOG.info(event.toString());
-                displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
+                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
               }
               break; 
             case FAILED:
@@ -1013,7 +1013,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
                 TaskCompletionEvent.Status.FAILED){
                 LOG.info(event.toString());
                 // Displaying the task diagnostic information
-                TaskAttemptID taskId = event.getTaskID();
+                TaskAttemptID taskId = event.getTaskAttemptId();
                 String[] taskDiagnostics = 
                   jc.jobSubmitClient.getTaskDiagnostics(taskId); 
                 if (taskDiagnostics != null) {
@@ -1022,7 +1022,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
                   }
                 }
                 // Displaying the task logs
-                displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
+                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
               }
               break; 
             case KILLED:
@@ -1032,7 +1032,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
               break; 
             case ALL:
               LOG.info(event.toString());
-              displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
+              displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
               break;
             }
           }
@@ -1061,15 +1061,22 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     return running;
   }
 
+  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+  }
+  
   private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
     throws IOException {
     // The tasktracker for a 'failed/killed' job might not be around...
     if (baseUrl != null) {
+      // Construct the url for the tasklogs
+      String taskLogUrl = getTaskLogURL(taskId, baseUrl);
+      
       // Copy tasks's stdout of the JobClient
-      getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
         
       // Copy task's stderr to stderr of the JobClient 
-      getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
     }
   }
     
@@ -1370,8 +1377,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     System.out.println("Number of events (from " + fromEventId + 
                        ") are: " + events.length);
     for(TaskCompletionEvent event: events) {
-      System.out.println(event.getTaskStatus() + " " + event.getTaskID() + 
-                         " " + event.getTaskTrackerHttp());
+      System.out.println(event.getTaskStatus() + " " + event.getTaskAttemptId() + " " + 
+                         getTaskLogURL(event.getTaskAttemptId(), 
+                                       event.getTaskTrackerHttp()));
     }
   }
 

+ 11 - 1
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -461,21 +461,31 @@ public class JobConf extends Configuration {
    * 
    * @param style the {@link CompressionType} to control how the map outputs  
    *              are compressed.
+   * @deprecated {@link CompressionType} is no longer valid for intermediate
+   *             map-outputs. 
    */
+  @Deprecated
   public void setMapOutputCompressionType(CompressionType style) {
     setCompressMapOutput(true);
     set("mapred.map.output.compression.type", style.toString());
+    LOG.warn("SequenceFile compression is no longer valid for intermediate " +
+    		     "map-outputs!");
   }
   
   /**
    * Get the {@link CompressionType} for the map outputs.
    * 
    * @return the {@link CompressionType} for map outputs, defaulting to 
-   *         {@link CompressionType#RECORD}. 
+   *         {@link CompressionType#RECORD}.
+   * @deprecated {@link CompressionType} is no longer valid for intermediate
+   *             map-outputs. 
    */
+  @Deprecated
   public CompressionType getMapOutputCompressionType() {
     String val = get("mapred.map.output.compression.type", 
                      CompressionType.RECORD.toString());
+    LOG.warn("SequenceFile compression is no longer valid for intermediate " +
+    "map-outputs!");
     return CompressionType.valueOf(val);
   }
 

+ 3 - 4
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -497,9 +497,8 @@ class JobInProgress {
         } else {
           host = ttStatus.getHost();
         }
-        httpTaskLogLocation = "http://" + host + ":" + 
-          ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
-          status.getTaskID();
+        httpTaskLogLocation = "http://" + host + ":" + ttStatus.getHttpPort(); 
+           //+ "/tasklog?plaintext=true&taskid=" + status.getTaskID();
       }
 
       TaskCompletionEvent taskEvent = null;
@@ -530,7 +529,7 @@ class JobInProgress {
         if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
           TaskCompletionEvent t = 
             this.taskCompletionEvents.get(eventNumber);
-          if (t.getTaskID().equals(status.getTaskID()))
+          if (t.getTaskAttemptId().equals(status.getTaskID()))
             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
         }
         

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -166,7 +166,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
             for (int i = 0; i < mapIds.size(); i++) {
               TaskAttemptID mapId = mapIds.get(i);
               Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-              Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
+              Path reduceIn = this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(),reduceId,
                   localFs.getLength(mapOut));
               if (!localFs.mkdirs(reduceIn.getParent())) {
                 throw new IOException("Mkdirs failed to create "

+ 6 - 0
src/java/org/apache/hadoop/mapred/MRConstants.java

@@ -36,6 +36,7 @@ interface MRConstants {
    * Constant denoting when a merge of in memory files will be triggered 
    */
   public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+  
   /**
    * Constant denoting the max size (in terms of the fraction of the total 
    * size of the filesys) of a map output file that we will try
@@ -55,6 +56,11 @@ interface MRConstants {
    */
   public static final String MAP_OUTPUT_LENGTH = "Map-Output-Length";
 
+  /**
+   * The custom http header used for the "raw" map output length.
+   */
+  public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
+
   /**
    * Temporary directory name 
    */

+ 4 - 2
src/java/org/apache/hadoop/mapred/MapOutputFile.java

@@ -153,12 +153,14 @@ class MapOutputFile {
    * @param reduceTaskId a reduce task id
    * @param size the size of the file
    */
-  public Path getInputFileForWrite(int mapId, TaskAttemptID reduceTaskId, long size)
+  public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, 
+                                   long size)
     throws IOException {
     // TODO *oom* should use a format here
     return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
                                           reduceTaskId + Path.SEPARATOR +
-                                          "output" + "/map_" + mapId + ".out",
+                                          ("output" + "/map_" + mapId.getId() + 
+                                           ".out"), 
                                           size, conf);
   }
 

+ 0 - 289
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.net.URLConnection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InMemoryFileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.mapred.ReduceTask.ReduceCopier.ShuffleClientMetrics;
-import org.apache.hadoop.util.Progressable;
-
-/** The location of a map output file, as passed to a reduce task via the
- * {@link InterTrackerProtocol}. */ 
-class MapOutputLocation implements Writable, MRConstants {
-
-  static {                                      // register a ctor
-    WritableFactories.setFactory
-      (MapOutputLocation.class,
-       new WritableFactory() {
-         public Writable newInstance() { return new MapOutputLocation(); }
-       });
-  }
-
-  private TaskAttemptID mapTaskId;
-  private int mapId;
-  private String host;
-  private int port;
-  
-  // basic/unit connection timeout (in milliseconds)
-  private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
-  // default read timeout (in milliseconds)
-  private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
-
-  /** RPC constructor **/
-  public MapOutputLocation() {
-  }
-
-  /** Construct a location. */
-  public MapOutputLocation(TaskAttemptID mapTaskId, int mapId, 
-                           String host, int port) {
-    this.mapTaskId = mapTaskId;
-    this.mapId = mapId;
-    this.host = host;
-    this.port = port;
-  }
-
-  /** The map task id. */
-  public TaskAttemptID getMapTaskID() { return mapTaskId; }
-  
-  /**
-   * Get the map's id number.
-   * @return The numeric id for this map
-   */
-  public int getMapId() {
-    return mapId;
-  }
-
-  /** The host the task completed on. */
-  public String getHost() { return host; }
-
-  /** The port listening for {@link MapOutputProtocol} connections. */
-  public int getPort() { return port; }
-
-  public void write(DataOutput out) throws IOException {
-    mapTaskId.write(out);
-    out.writeInt(mapId);
-    Text.writeString(out, host);
-    out.writeInt(port);
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.mapTaskId = TaskAttemptID.read(in);
-    this.mapId = in.readInt();
-    this.host = Text.readString(in);
-    this.port = in.readInt();
-  }
-
-  @Override
-  public String toString() {
-    return "http://" + host + ":" + port + "/mapOutput?job=" + mapTaskId.getJobID() +
-           "&map=" + mapTaskId;
-  }
-  
-  /** 
-   * The connection establishment is attempted multiple times and is given up 
-   * only on the last failure. Instead of connecting with a timeout of 
-   * X, we try connecting with a timeout of x < X but multiple times. 
-   */
-  private InputStream getInputStream(URLConnection connection, 
-                                     int connectionTimeout, 
-                                     int readTimeout) 
-  throws IOException {
-    int unit = 0;
-    if (connectionTimeout < 0) {
-      throw new IOException("Invalid timeout "
-                            + "[timeout = " + connectionTimeout + " ms]");
-    } else if (connectionTimeout > 0) {
-      unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
-             ? connectionTimeout
-             : UNIT_CONNECT_TIMEOUT;
-    }
-    // set the read timeout to the total timeout
-    connection.setReadTimeout(readTimeout);
-    // set the connect timeout to the unit-connect-timeout
-    connection.setConnectTimeout(unit);
-    while (true) {
-      try {
-        return connection.getInputStream();
-      } catch (IOException ioe) {
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
-        // throw an exception if we have waited for timeout amount of time
-        // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
-          throw ioe;
-        }
-
-        // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
-          // reset the connect time out for the final connect
-          connection.setConnectTimeout(unit);
-        }
-      }
-    }
-  }
-
-  /**
-   * Get the map output into a local file (either in the inmemory fs or on the 
-   * local fs) from the remote server.
-   * We use the file system so that we generate checksum files on the data.
-   * @param inMemFileSys the inmemory filesystem to write the file to
-   * @param localFileSys the local filesystem to write the file to
-   * @param shuffleMetrics the metrics context
-   * @param localFilename the filename to write the data into
-   * @param lDirAlloc the LocalDirAllocator object
-   * @param conf the Configuration object
-   * @param reduce the reduce id to get for
-   * @param timeout number of milliseconds for connection timeout
-   * @return the path of the file that got created
-   * @throws IOException when something goes wrong
-   */
-  public Path getFile(InMemoryFileSystem inMemFileSys,
-                      FileSystem localFileSys,
-                      ShuffleClientMetrics shuffleMetrics,
-                      Path localFilename, 
-                      LocalDirAllocator lDirAlloc,
-                      Configuration conf, int reduce,
-                      int timeout, Progressable progressable) 
-  throws IOException, InterruptedException {
-    return getFile(inMemFileSys, localFileSys, shuffleMetrics, localFilename, 
-                   lDirAlloc, conf, reduce, timeout, DEFAULT_READ_TIMEOUT, 
-                   progressable);
-  }
-
-  /**
-   * Get the map output into a local file (either in the inmemory fs or on the 
-   * local fs) from the remote server.
-   * We use the file system so that we generate checksum files on the data.
-   * @param inMemFileSys the inmemory filesystem to write the file to
-   * @param localFileSys the local filesystem to write the file to
-   * @param shuffleMetrics the metrics context
-   * @param localFilename the filename to write the data into
-   * @param lDirAlloc the LocalDirAllocator object
-   * @param conf the Configuration object
-   * @param reduce the reduce id to get for
-   * @param connectionTimeout number of milliseconds for connection timeout
-   * @param readTimeout number of milliseconds for read timeout
-   * @return the path of the file that got created
-   * @throws IOException when something goes wrong
-   */
-  public Path getFile(InMemoryFileSystem inMemFileSys,
-                      FileSystem localFileSys,
-                      ShuffleClientMetrics shuffleMetrics,
-                      Path localFilename, 
-                      LocalDirAllocator lDirAlloc,
-                      Configuration conf, int reduce,
-                      int connectionTimeout, int readTimeout, 
-                      Progressable progressable) 
-  throws IOException, InterruptedException {
-    boolean good = false;
-    long totalBytes = 0;
-    FileSystem fileSys = localFileSys;
-    Thread currentThread = Thread.currentThread();
-    URL path = new URL(toString() + "&reduce=" + reduce);
-    try {
-      URLConnection connection = path.openConnection();
-      InputStream input = getInputStream(connection, connectionTimeout, 
-                                         readTimeout); 
-      OutputStream output = null;
-      
-      //We will put a file in memory if it meets certain criteria:
-      //1. The size of the file should be less than 25% of the total inmem fs
-      //2. There is space available in the inmem fs
-      
-      long length = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-      long inMemFSSize = inMemFileSys.getFSSize();
-      long checksumLength = (int)inMemFileSys.getChecksumFileLength(
-                                                  localFilename, length);
-      
-      boolean createInMem = false; 
-      if (inMemFSSize > 0)  
-        createInMem = (((float)(length + checksumLength) / inMemFSSize <= 
-                        MAX_INMEM_FILESIZE_FRACTION) && 
-                       inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
-      if (createInMem) {
-        fileSys = inMemFileSys;
-      }
-      else {
-        //now hit the localFS to find out a suitable location for the output
-        localFilename = lDirAlloc.getLocalPathForWrite(
-            localFilename.toUri().getPath(), length + checksumLength, conf);
-      }
-      
-      output = fileSys.create(localFilename);
-      try {  
-        try {
-          byte[] buffer = new byte[64 * 1024];
-          if (currentThread.isInterrupted()) {
-            throw new InterruptedException();
-          }
-          int len = input.read(buffer);
-          while (len > 0) {
-            totalBytes += len;
-            shuffleMetrics.inputBytes(len);
-            output.write(buffer, 0 , len);
-            if (currentThread.isInterrupted()) {
-              throw new InterruptedException();
-            }
-            // indicate we're making progress
-            progressable.progress();
-            len = input.read(buffer);
-          }
-        } finally {
-          output.close();
-        }
-      } finally {
-        input.close();
-      }
-      good = (totalBytes == length);
-      if (!good) {
-        throw new IOException("Incomplete map output received for " + path +
-                              " (" + totalBytes + " instead of " + length + ")"
-                              );
-      }
-    } finally {
-      if (!good) {
-        try {
-          fileSys.delete(localFilename, true);
-          totalBytes = 0;
-        } catch (Throwable th) {
-          // IGNORED because we are cleaning up
-        }
-      }
-    }
-    return fileSys.makeQualified(localFilename);
-  }
-
-}

+ 142 - 160
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -41,22 +41,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Sorter;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
-import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
@@ -64,6 +57,11 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 /** A Map task. */
 class MapTask extends Task {
+  /**
+   * The size of each record in the index file for the map-outputs.
+   */
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+  
 
   private BytesWritable split = new BytesWritable();
   private String splitClass;
@@ -281,29 +279,23 @@ class MapTask extends Task {
     
   }
 
-  class MapOutputBuffer implements MapOutputCollector, IndexedSortable {
+  class MapOutputBuffer<K extends Object, V extends Object> 
+  implements MapOutputCollector<K, V>, IndexedSortable {
     private final int partitions;
-    private final Partitioner partitioner;
+    private final Partitioner<K, V> partitioner;
     private final JobConf job;
     private final Reporter reporter;
-    private final Class keyClass;
-    private final Class valClass;
-    private final RawComparator comparator;
+    private final Class<K> keyClass;
+    private final Class<V> valClass;
+    private final RawComparator<K> comparator;
     private final SerializationFactory serializationFactory;
-    private final Serializer keySerializer;
-    private final Serializer valSerializer;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valSerializer;
     private final Class<? extends Reducer> combinerClass;
-    private final CombineOutputCollector combineCollector;
-    private final boolean compressMapOutput;
-    private final CompressionCodec codec;
-    private final CompressionType compressionType;
-
-    // used if compressMapOutput && compressionType == RECORD
-    // DataOutputBuffer req b/c compression codecs req continguous buffer
-    private final DataOutputBuffer rawBuffer;
-    private final CompressionOutputStream deflateFilter;
-    private final DataOutputStream deflateStream;
-    private final Compressor compressor;
+    private final CombineOutputCollector<K, V> combineCollector;
+    
+    // Compression for map-outputs
+    private CompressionCodec codec = null;
 
     // k/v accounting
     private volatile int kvstart = 0;  // marks beginning of spill
@@ -380,8 +372,8 @@ class MapTask extends Task {
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
       comparator = job.getOutputKeyComparator();
-      keyClass = job.getMapOutputKeyClass();
-      valClass = job.getMapOutputValueClass();
+      keyClass = (Class<K>)job.getMapOutputKeyClass();
+      valClass = (Class<V>)job.getMapOutputValueClass();
       serializationFactory = new SerializationFactory(job);
       keySerializer = serializationFactory.getSerializer(keyClass);
       keySerializer.open(bb);
@@ -393,45 +385,23 @@ class MapTask extends Task {
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
       combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
-      // combiner and compression
-      compressMapOutput = job.getCompressMapOutput();
-      combinerClass = job.getCombinerClass();
-      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
-      if (compressMapOutput) {
-        compressionType = job.getMapOutputCompressionType();
+      // compression
+      if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
           job.getMapOutputCompressorClass(DefaultCodec.class);
         codec = (CompressionCodec)
           ReflectionUtils.newInstance(codecClass, job);
-        if (CompressionType.RECORD == compressionType
-            && null == combinerClass) {
-          compressor = codec.createCompressor();
-          rawBuffer = new DataOutputBuffer();
-          deflateFilter = codec.createOutputStream(rawBuffer, compressor);
-          deflateStream = new DataOutputStream(deflateFilter);
-          valSerializer.close();
-          valSerializer.open(deflateStream);
-        } else {
-          rawBuffer = null;
-          compressor = null;
-          deflateStream = null;
-          deflateFilter = null;
-        }
-      } else {
-        compressionType = CompressionType.NONE;
-        codec = null;
-        rawBuffer = null;
-        compressor = null;
-        deflateStream = null;
-        deflateFilter = null;
       }
+      // combiner
+      combinerClass = job.getCombinerClass();
       combineCollector = (null != combinerClass)
         ? new CombineOutputCollector(combineOutputCounter)
         : null;
+      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
     }
 
     @SuppressWarnings("unchecked")
-    public synchronized void collect(Object key, Object value)
+    public synchronized void collect(K key, V value)
         throws IOException {
       reporter.progress();
       if (key.getClass() != keyClass) {
@@ -449,6 +419,7 @@ class MapTask extends Task {
             ).initCause(sortSpillException);
       }
       try {
+          // serialize key bytes into buffer
         int keystart = bufindex;
         keySerializer.serialize(key);
         if (bufindex < keystart) {
@@ -456,27 +427,14 @@ class MapTask extends Task {
           bb.reset();
           keystart = 0;
         }
+        // serialize value bytes into buffer
         int valstart = bufindex;
-        if (compressMapOutput && CompressionType.RECORD == compressionType
-            && null == combinerClass) {
-          // compress serialized value bytes
-          rawBuffer.reset();
-          deflateFilter.resetState();
-          valSerializer.serialize(value);
-          deflateStream.flush();
-          deflateFilter.finish();
-          bb.write(rawBuffer.getData(), 0, rawBuffer.getLength());
-          bb.markRecord();
-          mapOutputByteCounter.increment((valstart - keystart) +
-              compressor.getBytesRead());
-        } else {
-          // serialize value bytes into buffer
-          valSerializer.serialize(value);
-          int valend = bb.markRecord();
-          mapOutputByteCounter.increment(valend > keystart
-              ? valend - keystart
-              : (bufvoid - keystart) + valend);
-        }
+        valSerializer.serialize(value);
+        int valend = bb.markRecord();
+        mapOutputByteCounter.increment(valend > keystart
+                ? valend - keystart
+                        : (bufvoid - keystart) + valend);
+
         int partition = partitioner.getPartition(key, value, partitions);
         if (partition < 0 || partition >= partitions) {
           throw new IOException("Illegal partition for " + key + " (" +
@@ -766,31 +724,34 @@ class MapTask extends Task {
         out = localFs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills, partitions * 16);
+                             getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         indexOut = localFs.create(indexFilename);
         final int endPosition = (kvend > kvstart)
           ? kvend
           : kvoffsets.length + kvend;
         sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
         int spindex = kvstart;
-        InMemValBytes vbytes = new InMemValBytes();
+        InMemValBytes value = new InMemValBytes();
         for (int i = 0; i < partitions; ++i) {
-          SequenceFile.Writer writer = null;
+          IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = SequenceFile.createWriter(job, out,
-                keyClass, valClass, compressionType, codec);
+            writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
             if (null == combinerClass) {
               // spill directly
+              DataInputBuffer key = new DataInputBuffer();
+              long recordNo = 0;
               while (spindex < endPosition &&
                   kvindices[kvoffsets[spindex % kvoffsets.length]
                             + PARTITION] == i) {
                 final int kvoff = kvoffsets[spindex % kvoffsets.length];
-                getVBytesForOffset(kvoff, vbytes);
-                writer.appendRaw(kvbuffer, kvindices[kvoff + KEYSTART],
-                    kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART],
-                    vbytes);
+                getVBytesForOffset(kvoff, value);
+                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
+                          (kvindices[kvoff + VALSTART] - 
+                           kvindices[kvoff + KEYSTART]));
+                writer.append(key, value);
                 ++spindex;
+                ++recordNo;
               }
             } else {
               int spstart = spindex;
@@ -812,12 +773,13 @@ class MapTask extends Task {
                 combineAndSpill(kvIter, combineInputCounter);
               }
             }
-            // we need to close the writer to flush buffered data, obtaining
-            // the correct offset
+
+            // close the writer
             writer.close();
+            
+            // write the index as <offset, raw-length, compressed-length> 
+            writeIndexRecord(indexOut, out, segmentStart, writer);
             writer = null;
-            indexOut.writeLong(segmentStart);
-            indexOut.writeLong(out.getPos() - segmentStart);
           } finally {
             if (null != writer) writer.close();
           }
@@ -836,7 +798,7 @@ class MapTask extends Task {
      * directly to a spill file. Consider this "losing".
      */
     @SuppressWarnings("unchecked")
-    private void spillSingleRecord(final Object key, final Object value) 
+    private void spillSingleRecord(final K key, final V value) 
         throws IOException {
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
@@ -849,15 +811,16 @@ class MapTask extends Task {
         out = localFs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills, partitions * 16);
+                             getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         indexOut = localFs.create(indexFilename);
         // we don't run the combiner for a single record
         for (int i = 0; i < partitions; ++i) {
-          SequenceFile.Writer writer = null;
+          IFile.Writer writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = SequenceFile.createWriter(job, out,
-                keyClass, valClass, compressionType, codec);
+            // Create a new codec, don't care!
+            writer = new IFile.Writer(job, out, keyClass, valClass, codec);
+
             if (i == partition) {
               final long recordStart = out.getPos();
               writer.append(key, value);
@@ -866,8 +829,9 @@ class MapTask extends Task {
               mapOutputByteCounter.increment(out.getPos() - recordStart);
             }
             writer.close();
-            indexOut.writeLong(segmentStart);
-            indexOut.writeLong(out.getPos() - segmentStart);
+
+            // index record
+            writeIndexRecord(indexOut, out, segmentStart, writer);
           } catch (IOException e) {
             if (null != writer) writer.close();
             throw e;
@@ -891,7 +855,7 @@ class MapTask extends Task {
       int vallen = (nextindex > kvindices[kvoff + VALSTART])
         ? nextindex - kvindices[kvoff + VALSTART]
         : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
-      vbytes.reset(kvindices[kvoff + VALSTART], vallen);
+      vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
     }
 
     @SuppressWarnings("unchecked")
@@ -917,36 +881,30 @@ class MapTask extends Task {
     /**
      * Inner class wrapping valuebytes, used for appendRaw.
      */
-    protected class InMemValBytes implements ValueBytes {
+    protected class InMemValBytes extends DataInputBuffer {
+      private byte[] buffer;
       private int start;
-      private int len;
-      public void reset(int start, int len) {
+      private int length;
+            
+      public void reset(byte[] buffer, int start, int length) {
+        this.buffer = buffer;
         this.start = start;
-        this.len = len;
-      }
-      public int getSize() {
-        return len;
-      }
-      public void writeUncompressedBytes(DataOutputStream outStream)
-          throws IOException {
-        if (start + len > bufvoid) {
+        this.length = length;
+        
+        if (start + length > bufvoid) {
+          this.buffer = new byte[this.length];
           final int taillen = bufvoid - start;
-          outStream.write(kvbuffer, start, taillen);
-          outStream.write(kvbuffer, 0, len - taillen);
-          return;
+          System.arraycopy(buffer, start, this.buffer, 0, taillen);
+          System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
+          this.start = 0;
         }
-        outStream.write(kvbuffer, start, len);
-      }
-      public void writeCompressedBytes(DataOutputStream outStream)
-          throws IOException {
-        // If writing record-compressed data, kvbuffer vals rec-compressed
-        // and may be written directly. Note: not contiguous
-        writeUncompressedBytes(outStream);
+        
+        super.reset(this.buffer, this.start, this.length);
       }
     }
 
     protected class MRResultIterator implements RawKeyValueIterator {
-      private final DataOutputBuffer keybuf = new DataOutputBuffer();
+      private final DataInputBuffer keybuf = new DataInputBuffer();
       private final InMemValBytes vbytes = new InMemValBytes();
       private final int end;
       private int current;
@@ -957,14 +915,13 @@ class MapTask extends Task {
       public boolean next() throws IOException {
         return ++current < end;
       }
-      public DataOutputBuffer getKey() throws IOException {
+      public DataInputBuffer getKey() throws IOException {
         final int kvoff = kvoffsets[current % kvoffsets.length];
-        keybuf.reset();
-        keybuf.write(kvbuffer, kvindices[kvoff + KEYSTART],
-            kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
+        keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART],
+                     kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
         return keybuf;
       }
-      public ValueBytes getValue() throws IOException {
+      public DataInputBuffer getValue() throws IOException {
         getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
         return vbytes;
       }
@@ -999,7 +956,7 @@ class MapTask extends Task {
       //lengths for each partition
       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
       
-      finalIndexFileSize = partitions * 16;
+      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
       
       Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskID(), 
                              finalOutFileSize);
@@ -1009,6 +966,7 @@ class MapTask extends Task {
       //The output stream for the final single output file
       FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
                                                    4096);
+      
       //The final index file output stream
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
                                                         4096);
@@ -1018,12 +976,11 @@ class MapTask extends Task {
         //create dummy files
         for (int i = 0; i < partitions; i++) {
           segmentStart = finalOut.getPos();
-          Writer writer = SequenceFile.createWriter(job, finalOut, 
-                                                    job.getMapOutputKeyClass(), 
-                                                    job.getMapOutputValueClass(), 
-                                                    compressionType, codec);
+          Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
+                                                 keyClass, valClass, null);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
+          finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
           writer.close();
         }
         finalOut.close();
@@ -1031,48 +988,55 @@ class MapTask extends Task {
         return;
       }
       {
-        //create a sorter object as we need access to the SegmentDescriptor
-        //class and merge methods
-        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(),
-                                   keyClass, valClass, job);
-        sorter.setProgressable(reporter);
-        
         for (int parts = 0; parts < partitions; parts++){
-          List<SegmentDescriptor> segmentList =
-            new ArrayList<SegmentDescriptor>(numSpills);
+          //create the segments to be merged
+          List<Segment<K, V>> segmentList =
+            new ArrayList<Segment<K, V>>(numSpills);
           for(int i = 0; i < numSpills; i++) {
             FSDataInputStream indexIn = localFs.open(indexFileName[i]);
-            indexIn.seek(parts * 16);
+            indexIn.seek(parts * MAP_OUTPUT_INDEX_RECORD_LENGTH);
             long segmentOffset = indexIn.readLong();
+            long rawSegmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             indexIn.close();
-            SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
-                                                               segmentLength, filename[i]);
-            s.preserveInput(true);
-            s.doSync();
+            FSDataInputStream in = localFs.open(filename[i]);
+            in.seek(segmentOffset);
+            Segment<K, V> s = 
+              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
+                                true);
             segmentList.add(i, s);
+            
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Index: (" + indexFileName[i] + ", " + segmentOffset + 
+                        rawSegmentLength + ", " + segmentLength + ")");
+            }
           }
+          
+          //merge
+          @SuppressWarnings("unchecked")
+          RawKeyValueIterator kvIter = 
+            Merger.merge(job, localFs, 
+                         keyClass, valClass,
+                         segmentList, job.getInt("io.sort.factor", 100), 
+                         new Path(getTaskID().toString()), 
+                         job.getOutputKeyComparator(), reporter);
+
+          //write merged output to disk
           segmentStart = finalOut.getPos();
-          RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskID().toString())); 
-          SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
-                                                                 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
-                                                                 compressionType, codec);
+          Writer<K, V> writer = 
+              new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
           if (null == combinerClass || numSpills < minSpillsForCombine) {
-            sorter.writeFile(kvIter, writer);
+            Merger.writeFile(kvIter, writer, reporter);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(kvIter, combineInputCounter);
           }
-          //close the file - required esp. for block compression to ensure
-          //partition data don't span partition boundaries
+
+          //close
           writer.close();
-          //when we write the offset/length to the final index file, we write
-          //longs for both. This helps us to reliably seek directly to the
-          //offset/length for a partition when we start serving the byte-ranges
-          //to the reduces. We probably waste some space in the file by doing
-          //this as opposed to writing VLong but it helps us later on.
-          finalIndexOut.writeLong(segmentStart);
-          finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
+          
+          //write index record
+          writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
         }
         finalOut.close();
         finalIndexOut.close();
@@ -1084,8 +1048,26 @@ class MapTask extends Task {
       }
     }
 
-  }
-
+    private void writeIndexRecord(FSDataOutputStream indexOut, 
+                                  FSDataOutputStream out, long start, 
+                                  Writer<K, V> writer) 
+    throws IOException {
+      //when we write the offset/decompressed-length/compressed-length to  
+      //the final index file, we write longs for both compressed and 
+      //decompressed lengths. This helps us to reliably seek directly to 
+      //the offset/length for a partition when we start serving the 
+      //byte-ranges to the reduces. We probably waste some space in the 
+      //file by doing this as opposed to writing VLong but it helps us later on.
+      // index record: <offset, raw-length, compressed-length> 
+      //StringBuffer sb = new StringBuffer();
+      indexOut.writeLong(start);
+      indexOut.writeLong(writer.getRawLength());
+      long segmentLength = out.getPos() - start;
+      indexOut.writeLong(segmentLength);
+    }
+    
+  } // MapOutputBuffer
+  
   /**
    * Exception indicating that the allocated sort buffer is insufficient
    * to hold the current record.

+ 416 - 0
src/java/org/apache/hadoop/mapred/Merger.java

@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+
+class Merger {  
+  private static final Log LOG = LogFactory.getLog(Merger.class);
+  
+  private static final long PROGRESS_BAR = 10000;
+  
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter)
+  throws IOException {
+    return 
+      new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter).merge(keyClass, valueClass,
+                                           mergeFactor, tmpDir);
+  }
+  
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
+                            Class<K> keyClass, Class<V> valueClass, 
+                            List<Segment<K, V>> segments, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter)
+  throws IOException {
+    return 
+      new MergeQueue<K, V>(conf, fs, segments, 
+                           comparator, reporter).merge(keyClass, valueClass,
+                                                       mergeFactor, tmpDir);
+  }
+
+  public static <K extends Object, V extends Object>
+  void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
+                 Progressable progressable) 
+  throws IOException {
+    long recordCtr = 0;
+    while(records.next()) {
+      writer.append(records.getKey(), records.getValue());
+      
+      if ((++recordCtr % PROGRESS_BAR) == 0) {
+        progressable.progress();
+      }
+    }
+}
+
+  public static class Segment<K extends Object, V extends Object> {
+    Reader<K, V> reader = null;
+    DataInputBuffer key = new DataInputBuffer();
+    DataInputBuffer value = new DataInputBuffer();
+    
+    Configuration conf = null;
+    FileSystem fs = null;
+    Path file = null;
+    boolean preserve = false;
+    CompressionCodec codec = null;
+    long segmentLength = -1;
+    
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve) throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.file = file;
+      this.codec = codec;
+      this.preserve = preserve;
+      
+      this.segmentLength = fs.getFileStatus(file).getLen();
+    }
+    
+    public Segment(Reader<K, V> reader, boolean preserve) {
+      this.reader = reader;
+      this.preserve = preserve;
+      
+      this.segmentLength = reader.getLength();
+    }
+
+    private void init() throws IOException {
+      if (reader == null) {
+        reader = new Reader<K, V>(conf, fs, file, codec);
+      }
+    }
+    
+    DataInputBuffer getKey() { return key; }
+    DataInputBuffer getValue() { return value; }
+
+    long getLength() { return segmentLength; }
+    
+    boolean next() throws IOException {
+      return reader.next(key, value);
+    }
+    
+    void close() throws IOException {
+      reader.close();
+      
+      if (!preserve && fs != null) {
+        fs.delete(file, false);
+      }
+    }
+  }
+  
+  private static class MergeQueue<K extends Object, V extends Object> 
+  extends PriorityQueue implements RawKeyValueIterator {
+    Configuration conf;
+    FileSystem fs;
+    CompressionCodec codec;
+    
+    List<Segment<K, V>> segments = new ArrayList<Segment<K,V>>();
+    
+    RawComparator<K> comparator;
+    
+    private long totalBytesProcessed;
+    private float progPerByte;
+    private Progress mergeProgress = new Progress();
+    
+    Progressable reporter;
+    
+    DataInputBuffer key;
+    DataInputBuffer value;
+    
+    Segment<K, V> minSegment;
+    Comparator<Segment<K, V>> segmentComparator =   
+      new Comparator<Segment<K, V>>() {
+      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    };
+
+    
+    public MergeQueue(Configuration conf, FileSystem fs, 
+                      Path[] inputs, boolean deleteInputs, 
+                      CompressionCodec codec, RawComparator<K> comparator,
+                      Progressable reporter) 
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.codec = codec;
+      this.comparator = comparator;
+      this.reporter = reporter;
+      
+      for (Path file : inputs) {
+        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
+      }
+      
+      // Sort segments on file-lengths
+      Collections.sort(segments, segmentComparator); 
+    }
+    
+
+    public MergeQueue(Configuration conf, FileSystem fs, 
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter) {
+      this.conf = conf;
+      this.fs = fs;
+      this.comparator = comparator;
+      this.segments = segments;
+      this.reporter = reporter;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void close() throws IOException {
+      Segment<K, V> segment;
+      while((segment = (Segment<K, V>)pop()) != null) {
+        segment.close();
+      }
+    }
+
+    public DataInputBuffer getKey() throws IOException {
+      return key;
+    }
+
+    public DataInputBuffer getValue() throws IOException {
+      return value;
+    }
+
+    private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
+      if (reader.next()) {
+        adjustTop();
+      } else {
+        pop();
+        reader.close();
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    public boolean next() throws IOException {
+      if (size() == 0)
+        return false;
+
+      if (minSegment != null) {
+        //minSegment is non-null for all invocations of next except the first
+        //one. For the first invocation, the priority queue is ready for use
+        //but for the subsequent invocations, first adjust the queue 
+        adjustPriorityQueue(minSegment);
+        if (size() == 0) {
+          minSegment = null;
+          return false;
+        }
+      }
+      minSegment = (Segment<K, V>)top();
+      
+      key = minSegment.getKey();
+      value = minSegment.getValue();
+
+      totalBytesProcessed += (key.getLength()-key.getPosition()) + 
+                             (value.getLength()-value.getPosition());
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+
+      return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected boolean lessThan(Object a, Object b) {
+      DataInputBuffer key1 = ((Segment<K, V>)a).getKey();
+      DataInputBuffer key2 = ((Segment<K, V>)b).getKey();
+      int s1 = key1.getPosition();
+      int l1 = key1.getLength() - s1;
+      int s2 = key2.getPosition();
+      int l2 = key2.getLength() - s2;
+
+      return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+    }
+    
+    public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
+                                     int factor, Path tmpDir) 
+    throws IOException {
+      LOG.info("Merging " + segments.size() + " sorted segments");
+      
+      //create the MergeStreams from the sorted map created in the constructor
+      //and dump the final output to a file
+      int numSegments = segments.size();
+      int origFactor = factor;
+      int passNo = 1;
+      LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+      do {
+        //get the factor for this pass of merge
+        factor = getPassFactor(factor, passNo, numSegments);
+        List<Segment<K, V>> segmentsToMerge =
+          new ArrayList<Segment<K, V>>();
+        int segmentsConsidered = 0;
+        int numSegmentsToConsider = factor;
+        while (true) {
+          //extract the smallest 'factor' number of segments  
+          //Call cleanup on the empty segments (no key/value data)
+          List<Segment<K, V>> mStream = 
+            getSegmentDescriptors(numSegmentsToConsider);
+          for (Segment<K, V> segment : mStream) {
+            // Initialize the segment at the last possible moment;
+            // this helps in ensuring we don't use buffers until we need them
+            segment.init();
+            
+            if (segment.next()) {
+              segmentsToMerge.add(segment);
+              segmentsConsidered++;
+            }
+            else {
+              segment.close();
+              numSegments--; //we ignore this segment for the merge
+            }
+          }
+          //if we have the desired number of segments
+          //or looked at all available segments, we break
+          if (segmentsConsidered == factor || 
+              segments.size() == 0) {
+            break;
+          }
+            
+          numSegmentsToConsider = factor - segmentsConsidered;
+        }
+        
+        //feed the streams to the priority queue
+        initialize(segmentsToMerge.size()); clear();
+        for (Segment<K, V> segment : segmentsToMerge) {
+          put(segment);
+        }
+        
+        //if we have lesser number of segments remaining, then just return the
+        //iterator, else do another single level merge
+        if (numSegments <= factor) {
+          //calculate the length of the remaining segments. Required for 
+          //calculating the merge progress
+          long totalBytes = 0;
+          for (int i = 0; i < segmentsToMerge.size(); i++) {
+            totalBytes += segmentsToMerge.get(i).getLength();
+          }
+          if (totalBytes != 0) //being paranoid
+            progPerByte = 1.0f / (float)totalBytes;
+
+          // Reset bytes-processed to track the progress of the final merge
+          totalBytesProcessed = 0;
+          
+          LOG.info("Down to the last merge-pass, with " + numSegments + 
+                   " segments left of total size: " + totalBytes + " bytes");
+          return this;
+        } else {
+          LOG.info("Merging " + segmentsToMerge.size() + 
+                   " intermediate segments out of a total of " + 
+                   (segments.size()+segmentsToMerge.size()));
+          
+          //we want to spread the creation of temp files on multiple disks if 
+          //available under the space constraints
+          long approxOutputSize = 0; 
+          for (Segment<K, V> s : segmentsToMerge) {
+            approxOutputSize += s.getLength() + 
+                                ChecksumFileSystem.getApproxChkSumLength(
+                                s.getLength());
+          }
+          Path tmpFilename = 
+            new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+          Path outputFile =  lDirAlloc.getLocalPathForWrite(
+                                              tmpFilename.toString(),
+                                              approxOutputSize, conf);
+
+          Writer<K, V> writer = 
+            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
+          writeFile(this, writer, reporter);
+          writer.close();
+          
+          //we finished one single level merge; now clean up the priority 
+          //queue
+          this.close();
+
+          // Add the newly create segment to the list of segments to be merged
+          Segment<K, V> tempSegment = 
+            new Segment<K, V>(conf, fs, outputFile, codec, false);
+          segments.add(tempSegment);
+          numSegments = segments.size();
+          Collections.sort(segments, segmentComparator);
+          
+          passNo++;
+        }
+        //we are worried about only the first pass merge factor. So reset the 
+        //factor to what it originally was
+        factor = origFactor;
+      } while(true);
+    }
+    
+    //HADOOP-591
+    private int getPassFactor(int factor, int passNo, int numSegments) {
+      if (passNo > 1 || numSegments <= factor || factor == 1) 
+        return factor;
+      int mod = (numSegments - 1) % (factor - 1);
+      if (mod == 0)
+        return factor;
+      return mod + 1;
+    }
+    
+    /** Return (& remove) the requested number of segment descriptors from the
+     * sorted map.
+     */
+    private List<Segment<K, V>> getSegmentDescriptors(int numDescriptors) {
+      if (numDescriptors > segments.size()) {
+        List<Segment<K, V>> subList = new ArrayList<Segment<K,V>>(segments);
+        segments.clear();
+        return subList;
+      }
+      
+      List<Segment<K, V>> subList = 
+        new ArrayList<Segment<K,V>>(segments.subList(0, numDescriptors));
+      for (int i=0; i < numDescriptors; ++i) {
+        segments.remove(0);
+      }
+      return subList;
+    }
+
+    public Progress getProgress() {
+      return mergeProgress;
+    }
+
+  }
+}

+ 48 - 0
src/java/org/apache/hadoop/mapred/RamManager.java

@@ -0,0 +1,48 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+class RamManager {
+  volatile private int numReserved = 0;
+  volatile private int size = 0;
+  private final int maxSize;
+  
+  public RamManager(Configuration conf) {
+    maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+  }
+  
+  synchronized boolean reserve(long requestedSize) {
+    if (requestedSize > Integer.MAX_VALUE || 
+        (size + requestedSize) > Integer.MAX_VALUE) {
+      return false;
+    }
+    
+    if ((size + requestedSize) < maxSize) {
+      size += requestedSize;
+      ++numReserved;
+      return true;
+    }
+    return false;
+  }
+  
+  synchronized void unreserve(int requestedSize) {
+    size -= requestedSize;
+    --numReserved;
+  }
+  
+  int getUsedMemory() {
+    return size;
+  }
+  
+  float getPercentUsed() {
+    return (float)size/maxSize;
+  }
+  
+  int getReservedFiles() {
+    return numReserved;
+  }
+  
+  int getMemoryLimit() {
+    return maxSize;
+  }
+}

+ 66 - 0
src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>RawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data. 
+ */
+interface RawKeyValueIterator {
+  /** 
+   * Gets the current raw key.
+   * 
+   * @return Gets the current raw key as a DataInputBuffer
+   * @throws IOException
+   */
+  DataInputBuffer getKey() throws IOException;
+  
+  /** 
+   * Gets the current raw value.
+   * 
+   * @return Gets the current raw value as a DataInputBuffer 
+   * @throws IOException
+   */
+  DataInputBuffer getValue() throws IOException;
+  
+  /** 
+   * Sets up the current key and value (for getKey and getValue).
+   * 
+   * @return <code>true</code> if there exists a key/value, 
+   *         <code>false</code> otherwise. 
+   * @throws IOException
+   */
+  boolean next() throws IOException;
+  
+  /** 
+   * Closes the iterator so that the underlying streams can be closed.
+   * 
+   * @throws IOException
+   */
+  void close() throws IOException;
+  
+  /** Gets the Progress object; this has a float (0.0 - 1.0) 
+   * indicating the bytes processed by the iterator so far
+   */
+  Progress getProgress();
+}

文件差异内容过多而无法显示
+ 459 - 287
src/java/org/apache/hadoop/mapred/ReduceTask.java


+ 125 - 7
src/java/org/apache/hadoop/mapred/Task.java

@@ -24,7 +24,9 @@ import java.io.IOException;
 import java.net.URI;
 import java.text.NumberFormat;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -40,13 +42,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.kfs.KosmosFileSystem;
 import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -602,28 +607,141 @@ abstract class Task implements Writable, Configurable {
   /**
    * OutputCollector for the combiner.
    */
-  protected static class CombineOutputCollector implements OutputCollector {
-    private SequenceFile.Writer writer;
+  protected static class CombineOutputCollector<K extends Object, V extends Object> 
+  implements OutputCollector<K, V> {
+    private Writer<K, V> writer;
     private Counters.Counter outCounter;
     public CombineOutputCollector(Counters.Counter outCounter) {
       this.outCounter = outCounter;
     }
-    public synchronized void setWriter(SequenceFile.Writer writer) {
+    public synchronized void setWriter(Writer<K, V> writer) {
       this.writer = writer;
     }
-    public synchronized void collect(Object key, Object value)
+    public synchronized void collect(K key, V value)
         throws IOException {
       outCounter.increment(1);
       writer.append(key, value);
     }
   }
 
+  /** Iterates values while keys match in sorted input. */
+  static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+    protected RawKeyValueIterator in; //input iterator
+    private KEY key;               // current key
+    private KEY nextKey;
+    private VALUE value;             // current value
+    private boolean hasNext;                      // more w/ this key
+    private boolean more;                         // more in file
+    private RawComparator<KEY> comparator;
+    protected Progressable reporter;
+    private Deserializer<KEY> keyDeserializer;
+    private Deserializer<VALUE> valDeserializer;
+    private DataInputBuffer keyIn = new DataInputBuffer();
+    private DataInputBuffer valueIn = new DataInputBuffer();
+    
+    @SuppressWarnings("unchecked")
+    public ValuesIterator (RawKeyValueIterator in, 
+                           RawComparator<KEY> comparator, 
+                           Class<KEY> keyClass,
+                           Class<VALUE> valClass, Configuration conf, 
+                           Progressable reporter)
+      throws IOException {
+      this.in = in;
+      this.comparator = comparator;
+      this.reporter = reporter;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+      this.keyDeserializer.open(keyIn);
+      this.valDeserializer = serializationFactory.getDeserializer(valClass);
+      this.valDeserializer.open(this.valueIn);
+      readNextKey();
+      key = nextKey;
+      nextKey = null; // force new instance creation
+      hasNext = more;
+    }
+
+    RawKeyValueIterator getRawIterator() { return in; }
+    
+    /// Iterator methods
+
+    public boolean hasNext() { return hasNext; }
+
+    private int ctr = 0;
+    public VALUE next() {
+      if (!hasNext) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      try {
+        readNextValue();
+        readNextKey();
+      } catch (IOException ie) {
+        throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+      }
+      reporter.progress();
+      return value;
+    }
+
+    public void remove() { throw new RuntimeException("not implemented"); }
+
+    /// Auxiliary methods
+
+    /** Start processing next unique key. */
+    void nextKey() throws IOException {
+      // read until we find a new key
+      while (hasNext) { 
+        readNextKey();
+      }
+      ++ctr;
+      
+      // move the next key to the current one
+      KEY tmpKey = key;
+      key = nextKey;
+      nextKey = tmpKey;
+      hasNext = more;
+    }
+
+    /** True iff more keys remain. */
+    boolean more() { 
+      return more; 
+    }
+
+    /** The current key. */
+    KEY getKey() { 
+      return key; 
+    }
+
+    /** 
+     * read the next key 
+     */
+    private void readNextKey() throws IOException {
+      more = in.next();
+      if (more) {
+        DataInputBuffer nextKeyBytes = in.getKey();
+        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+        nextKey = keyDeserializer.deserialize(nextKey);
+        hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+      } else {
+        hasNext = false;
+      }
+    }
+
+    /**
+     * Read the next value
+     * @throws IOException
+     */
+    private void readNextValue() throws IOException {
+      DataInputBuffer nextValueBytes = in.getValue();
+      valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+      value = valDeserializer.deserialize(value);
+    }
+  }
+
   protected static class CombineValuesIterator<KEY,VALUE>
       extends ValuesIterator<KEY,VALUE> {
 
     private final Counters.Counter combineInputCounter;
 
-    public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+    public CombineValuesIterator(RawKeyValueIterator in,
         RawComparator<KEY> comparator, Class<KEY> keyClass,
         Class<VALUE> valClass, Configuration conf, Reporter reporter,
         Counters.Counter combineInputCounter) throws IOException {

+ 2 - 2
src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

@@ -79,7 +79,7 @@ public class TaskCompletionEvent implements Writable{
   /**
    * Returns task id. 
    * @return task id
-   * @deprecated use {@link #getTaskID()} instead.
+   * @deprecated use {@link #getTaskAttemptId()} instead.
    */
   @Deprecated
   public String getTaskId() {
@@ -90,7 +90,7 @@ public class TaskCompletionEvent implements Writable{
    * Returns task id. 
    * @return task id
    */
-  public TaskAttemptID getTaskID() {
+  public TaskAttemptID getTaskAttemptId() {
     return taskId;
   }
   

+ 12 - 2
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -2346,18 +2346,24 @@ public class TaskTracker
         indexIn = fileSys.open(indexFileName);
 
         //seek to the correct offset for the given reduce
-        indexIn.seek(reduce * 16);
+        indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
           
         //read the offset and length of the partition data
         long startOffset = indexIn.readLong();
+        long rawPartLength = indexIn.readLong();
         long partLength = indexIn.readLong();
 
         indexIn.close();
         indexIn = null;
           
+        //set the custom "Raw-Map-Output-Length" http header to 
+        //the raw (decompressed) length
+        response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
+
         //set the custom "Map-Output-Length" http header to 
         //the actual number of bytes being transferred
-        response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength));
+        response.setHeader(MAP_OUTPUT_LENGTH, 
+                           Long.toString(partLength));
 
         //use the same buffersize as used for reading the data from disk
         response.setBufferSize(MAX_BYTES_TO_READ);
@@ -2390,6 +2396,10 @@ public class TaskTracker
                                  (partLength - totalRead) < MAX_BYTES_TO_READ
                                  ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
         }
+        
+        LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
+                 " from map: " + mapId + " given " + partLength + "/" + 
+                 rawPartLength);
       } catch (IOException ie) {
         TaskTracker tracker = 
           (TaskTracker) context.getAttribute("task.tracker");

+ 11 - 0
src/java/org/apache/hadoop/util/ReflectionUtils.java

@@ -171,6 +171,17 @@ public class ReflectionUtils {
     }
   }
 
+  /**
+   * Return the correctly-typed {@link Class} of the given object.
+   *  
+   * @param o object whose correctly-typed <code>Class</code> is to be obtained
+   * @return the correctly typed <code>Class</code> of the given object.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> Class<T> getClass(T o) {
+    return (Class<T>)o.getClass();
+  }
+  
   // methods to support testing
   static void clearCache() {
     CONSTRUCTOR_CACHE.clear();

+ 2 - 2
src/test/org/apache/hadoop/io/compress/TestCodecFactory.java

@@ -45,7 +45,7 @@ public class TestCodecFactory extends TestCase {
       return null;
     }
     
-    public Class getCompressorType() {
+    public Class<? extends Compressor> getCompressorType() {
       return null;
     }
 
@@ -70,7 +70,7 @@ public class TestCodecFactory extends TestCase {
       return null;
     }
 
-    public Class getDecompressorType() {
+    public Class<? extends Decompressor> getDecompressorType() {
       return null;
     }
 

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java

@@ -98,7 +98,7 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
     TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0);
     assertEquals(events0.length, events1.length);    
     for (int i = 0; i < events0.length; i++) {
-      assertEquals(events0[i].getTaskID(), events1[i].getTaskID());
+      assertEquals(events0[i].getTaskAttemptId(), events1[i].getTaskAttemptId());
       assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus());
     }
   }

+ 8 - 12
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -329,9 +329,8 @@ public class TestMapRed extends TestCase {
     // partition too low
     conf.setBoolean("test.testmapred.badpartition", true);
     boolean pass = true;
-    RunningJob rj = null;
     try {
-      rj = JobClient.runJob(conf);
+      JobClient.runJob(conf);
     } catch (IOException e) {
       pass = false;
     }
@@ -341,14 +340,14 @@ public class TestMapRed extends TestCase {
     conf.setBoolean("test.testmapred.badpartition", false);
     pass = true;
     try {
-      rj = JobClient.runJob(conf);
+      JobClient.runJob(conf);
     } catch (IOException e) {
       pass = false;
     }
     assertFalse("should fail for partition >= numPartitions", pass);
   }
     
-  private void checkCompression(CompressionType mapCompression,
+  private void checkCompression(boolean compressMapOutputs,
                                 CompressionType redCompression,
                                 boolean includeCombine
                                 ) throws Exception {
@@ -368,8 +367,7 @@ public class TestMapRed extends TestCase {
     if (includeCombine) {
       conf.setCombinerClass(IdentityReducer.class);
     }
-    conf.setMapOutputCompressionType(mapCompression);
-    conf.setCompressMapOutput(mapCompression != CompressionType.NONE);
+    conf.setCompressMapOutput(compressMapOutputs);
     SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression);
     try {
       if (!fs.mkdirs(testdir)) {
@@ -404,12 +402,10 @@ public class TestMapRed extends TestCase {
   public void testCompression() throws Exception {
     EnumSet<SequenceFile.CompressionType> seq =
       EnumSet.allOf(SequenceFile.CompressionType.class);
-    for (CompressionType mapCompression : seq) {
-      for (CompressionType redCompression : seq) {
-        for(int combine=0; combine < 2; ++combine) {
-          checkCompression(mapCompression, redCompression,
-                           combine == 1);
-        }
+    for (CompressionType redCompression : seq) {
+      for(int combine=0; combine < 2; ++combine) {
+        checkCompression(false, redCompression, combine == 1);
+        checkCompression(true, redCompression, combine == 1);
       }
     }
   }

+ 8 - 7
src/test/org/apache/hadoop/mapred/TestReduceTask.java

@@ -76,17 +76,18 @@ public class TestReduceTask extends TestCase {
                                Configuration conf) throws IOException {
     FileSystem fs = tmpDir.getFileSystem(conf);
     Path path = new Path(tmpDir, "data.in");
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
-                                                         Text.class, 
-                                                         Text.class);
+    IFile.Writer<Text, Text> writer = 
+      new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class, null);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
     }
     writer.close();
-    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, 
-                                                         Text.class, conf);
-    SequenceFile.Sorter.RawKeyValueIterator rawItr = 
-      sorter.merge(new Path[]{path}, false, tmpDir);
+    
+    @SuppressWarnings("unchecked")
+    RawKeyValueIterator rawItr = 
+      Merger.merge(conf, fs, Text.class, Text.class, null, new Path[]{path}, 
+                   false, conf.getInt("io.sort.factor", 100), tmpDir, 
+                   new Text.Comparator(), new NullProgress());
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     ReduceTask.ValuesIterator valItr = 
       new ReduceTask.ValuesIterator<Text,Text>(rawItr,

部分文件因为文件数量过多而无法显示