Browse Source

HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading it from a different .crc files. Contributed by Jothi Padmanabhan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@693455 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
96a1c00078

+ 3 - 0
CHANGES.txt

@@ -316,6 +316,9 @@ Trunk (unreleased changes)
     GenericMRLoadGenerator public, so they can be used in other contexts. 
     GenericMRLoadGenerator public, so they can be used in other contexts. 
     (Lingyun Yang via omalley)
     (Lingyun Yang via omalley)
 
 
+    HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading
+    it from a different .crc file. (Jothi Padmanabhan via ddas)
+
   BUG FIXES
   BUG FIXES
 
 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 

+ 8 - 2
src/core/org/apache/hadoop/fs/LocalFileSystem.java

@@ -29,13 +29,19 @@ import java.util.*;
 public class LocalFileSystem extends ChecksumFileSystem {
 public class LocalFileSystem extends ChecksumFileSystem {
   static final URI NAME = URI.create("file:///");
   static final URI NAME = URI.create("file:///");
   static private Random rand = new Random();
   static private Random rand = new Random();
-
+  FileSystem rfs;
+  
   public LocalFileSystem() {
   public LocalFileSystem() {
-    super(new RawLocalFileSystem());
+    this(new RawLocalFileSystem());
+  }
+  
+  public FileSystem getRaw() {
+    return rfs;
   }
   }
     
     
   public LocalFileSystem(FileSystem rawLocalFileSystem) {
   public LocalFileSystem(FileSystem rawLocalFileSystem) {
     super(rawLocalFileSystem);
     super(rawLocalFileSystem);
+    rfs = rawLocalFileSystem;
   }
   }
     
     
   /** Convert a path to a File. */
   /** Convert a path to a File. */

+ 0 - 3
src/core/org/apache/hadoop/util/DataChecksum.java

@@ -223,9 +223,6 @@ public class DataChecksum implements Checksum {
       summer.update( b, off, len );
       summer.update( b, off, len );
       inSum += len;
       inSum += len;
     }
     }
-    // Can be removed.
-    assert inSum <= bytesPerChecksum : "DataChecksum.update() : inSum " + 
-                inSum + " > " + " bytesPerChecksum " + bytesPerChecksum ; 
   }
   }
   public void update( int b ) {
   public void update( int b ) {
     summer.update( b );
     summer.update( b );

+ 55 - 17
src/mapred/org/apache/hadoop/mapred/IFile.java

@@ -66,6 +66,8 @@ class IFile {
     long decompressedBytesWritten = 0;
     long decompressedBytesWritten = 0;
     long compressedBytesWritten = 0;
     long compressedBytesWritten = 0;
     
     
+    IFileOutputStream checksumOut;
+
     Class<K> keyClass;
     Class<K> keyClass;
     Class<V> valueClass;
     Class<V> valueClass;
     Serializer<K> keySerializer;
     Serializer<K> keySerializer;
@@ -83,17 +85,18 @@ class IFile {
     public Writer(Configuration conf, FSDataOutputStream out, 
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class<K> keyClass, Class<V> valueClass,
         Class<K> keyClass, Class<V> valueClass,
         CompressionCodec codec) throws IOException {
         CompressionCodec codec) throws IOException {
+      this.checksumOut = new IFileOutputStream(out);
       this.rawOut = out;
       this.rawOut = out;
       this.start = this.rawOut.getPos();
       this.start = this.rawOut.getPos();
       
       
       if (codec != null) {
       if (codec != null) {
         this.compressor = CodecPool.getCompressor(codec);
         this.compressor = CodecPool.getCompressor(codec);
         this.compressor.reset();
         this.compressor.reset();
-        this.compressedOut = codec.createOutputStream(out, compressor);
+        this.compressedOut = codec.createOutputStream(checksumOut, compressor);
         this.out = new FSDataOutputStream(this.compressedOut,  null);
         this.out = new FSDataOutputStream(this.compressedOut,  null);
         this.compressOutput = true;
         this.compressOutput = true;
       } else {
       } else {
-        this.out = out;
+        this.out = new FSDataOutputStream(checksumOut,null);
       }
       }
       
       
       this.keyClass = keyClass;
       this.keyClass = keyClass;
@@ -106,6 +109,7 @@ class IFile {
     }
     }
     
     
     public void close() throws IOException {
     public void close() throws IOException {
+
       // Close the serializers
       // Close the serializers
       keySerializer.close();
       keySerializer.close();
       valueSerializer.close();
       valueSerializer.close();
@@ -115,24 +119,25 @@ class IFile {
       WritableUtils.writeVInt(out, EOF_MARKER);
       WritableUtils.writeVInt(out, EOF_MARKER);
       decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
       decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
       
       
+      //Flush the stream
+      out.flush();
+  
       if (compressOutput) {
       if (compressOutput) {
-        // Flush data from buffers into the compressor
-        out.flush();
-        
         // Flush & return the compressor
         // Flush & return the compressor
         compressedOut.finish();
         compressedOut.finish();
         compressedOut.resetState();
         compressedOut.resetState();
         CodecPool.returnCompressor(compressor);
         CodecPool.returnCompressor(compressor);
         compressor = null;
         compressor = null;
       }
       }
-
+      
       // Close the stream
       // Close the stream
-      rawOut.flush();
+      checksumOut.close();
+      
       compressedBytesWritten = rawOut.getPos() - start;
       compressedBytesWritten = rawOut.getPos() - start;
-
+    
       // Close the underlying stream iff we own it...
       // Close the underlying stream iff we own it...
       if (ownOutputStream) {
       if (ownOutputStream) {
-        out.close();
+        rawOut.close();
       }
       }
       out = null;
       out = null;
     }
     }
@@ -216,43 +221,71 @@ class IFile {
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
     private static final int MAX_VINT_SIZE = 9;
     private static final int MAX_VINT_SIZE = 9;
 
 
-    FSDataInputStream rawIn;   // Raw InputStream from file
     InputStream in;            // Possibly decompressed stream that we read
     InputStream in;            // Possibly decompressed stream that we read
     Decompressor decompressor;
     Decompressor decompressor;
     long bytesRead = 0;
     long bytesRead = 0;
     long fileLength = 0;
     long fileLength = 0;
     boolean eof = false;
     boolean eof = false;
+    IFileInputStream checksumIn;
     
     
     byte[] buffer = null;
     byte[] buffer = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
     int bufferSize = DEFAULT_BUFFER_SIZE;
     DataInputBuffer dataIn = new DataInputBuffer();
     DataInputBuffer dataIn = new DataInputBuffer();
 
 
     int recNo = 1;
     int recNo = 1;
-
+    
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param fs  FileSystem
+     * @param file Path of the file to be opened. This file should have
+     *             checksum bytes for the data at the end of the file.
+     * @param codec codec
+     * @throws IOException
+     */
+    
     public Reader(Configuration conf, FileSystem fs, Path file,
     public Reader(Configuration conf, FileSystem fs, Path file,
                   CompressionCodec codec) throws IOException {
                   CompressionCodec codec) throws IOException {
-      this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
+      this(conf, fs.open(file), 
+           fs.getFileStatus(file).getLen(),
+           codec);
     }
     }
     
     
     protected Reader() {}
     protected Reader() {}
+
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param in   The input stream
+     * @param length Length of the data in the stream, including the checksum
+     *               bytes.
+     * @param codec codec
+     * @throws IOException
+     */
     
     
     public Reader(Configuration conf, FSDataInputStream in, long length, 
     public Reader(Configuration conf, FSDataInputStream in, long length, 
                   CompressionCodec codec) throws IOException {
                   CompressionCodec codec) throws IOException {
-      this.rawIn = in;
+      checksumIn = new IFileInputStream(in,length);
       if (codec != null) {
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         decompressor = CodecPool.getDecompressor(codec);
-        this.in = codec.createInputStream(in, decompressor);
+        this.in = codec.createInputStream(checksumIn, decompressor);
       } else {
       } else {
-        this.in = in;
+        this.in = checksumIn;
       }
       }
       this.fileLength = length;
       this.fileLength = length;
       
       
       this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
       this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
     }
     }
     
     
-    public long getLength() { return fileLength; }
+    public long getLength() { 
+      return fileLength - checksumIn.getSize();
+    }
     
     
-    public long getPosition() throws IOException { return rawIn.getPos(); }
+    public long getPosition() throws IOException {    
+      return checksumIn.getPosition(); 
+    }
     
     
     /**
     /**
      * Read upto len bytes into buf starting at offset off.
      * Read upto len bytes into buf starting at offset off.
@@ -414,6 +447,11 @@ class IFile {
       return bytesRead;
       return bytesRead;
     }
     }
     
     
+    @Override
+    public long getLength() { 
+      return fileLength;
+    }
+    
     private void dumpOnError() {
     private void dumpOnError() {
       File dumpFile = new File("../output/" + taskAttemptId + ".dump");
       File dumpFile = new File("../output/" + taskAttemptId + ".dump");
       System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
       System.err.println("Dumping corrupt map-output of " + taskAttemptId + 

+ 175 - 0
src/mapred/org/apache/hadoop/mapred/IFileInputStream.java

@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A checksum input stream, used for IFiles.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}. 
+ */
+
+class IFileInputStream extends InputStream {
+  
+  private final InputStream in; //The input stream to be verified for checksum. 
+  private final long length; //The total length of the input file
+  private final long dataLength;
+  private DataChecksum sum;
+  private long currentOffset = 0;
+  private byte b[]; 
+  private byte csum[] = null;
+  private int checksumSize;
+  
+  /**
+   * Create a checksum input stream that reads
+   * @param in The input stream to be verified for checksum.
+   * @param len The length of the input stream including checksum bytes.
+   */
+  public IFileInputStream(InputStream in, long len) {
+    this.in = in;
+    sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+        Integer.MAX_VALUE);
+    checksumSize = sum.getChecksumSize();
+    length = len;
+    dataLength = length - checksumSize;
+    b = new byte[1];
+  }
+
+  /**
+   * Close the input stream.
+   */
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+  
+  @Override
+  public long skip(long n) throws IOException {
+   throw new IOException("Skip not supported for IFileInputStream");
+  }
+  
+  public long getPosition() {
+    return (currentOffset >= dataLength) ? dataLength : currentOffset;
+  }
+  
+  public long getSize() {
+    return checksumSize;
+  }
+  
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated, but the checksum
+   * bytes are not passed back in the buffer. 
+   */
+  public int read(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset >= dataLength) {
+      return -1;
+    }
+    
+    return doRead(b,off,len);
+  }
+
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated and sent back
+   * as the last four bytes of the buffer. The caller should handle
+   * these bytes appropriately
+   */
+  public int readWithChecksum(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset == length) {
+      return -1;
+    }
+    else if (currentOffset >= dataLength) {
+      // If the previous read drained off all the data, then just return
+      // the checksum now. Note that checksum validation would have 
+      // happened in the earlier read
+      int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
+      if (len < lenToCopy) {
+        lenToCopy = len;
+      }
+      System.arraycopy(csum, (int) (currentOffset - dataLength), b, off, 
+          lenToCopy);
+      currentOffset += lenToCopy;
+      return lenToCopy;
+    }
+
+    int bytesRead = doRead(b,off,len);
+
+    if (currentOffset == dataLength) {
+      if (len >= bytesRead + checksumSize) {
+        System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
+        bytesRead += checksumSize;
+        currentOffset += checksumSize;
+      }
+    }
+    return bytesRead;
+  }
+
+  private int doRead(byte[]b, int off, int len) throws IOException {
+    
+    // If we are trying to read past the end of data, just read
+    // the left over data
+    if (currentOffset + len > dataLength) {
+      len = (int) dataLength - (int)currentOffset;
+    }
+    
+    int bytesRead = in.read(b, off, len);
+
+    if (bytesRead < 0) {
+      throw new ChecksumException("Checksum Error", 0);
+    }
+    
+    sum.update(b,off,bytesRead);
+
+    currentOffset += bytesRead;
+    
+    if (currentOffset == dataLength) {
+      // The last four bytes are checksum. Strip them and verify
+      csum = new byte[checksumSize];
+      IOUtils.readFully(in, csum, 0, checksumSize);
+      if (!sum.compare(csum, 0)) {
+        throw new ChecksumException("Checksum Error", 0);
+      }
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public int read() throws IOException {    
+    b[0] = 0;
+    int l = read(b,0,1);
+    if (l < 0)  return l;
+    
+    // Upgrade the b[0] to an int so as not to misinterpret the
+    // first bit of the byte as a sign bit
+    int result = 0xFF & b[0];
+    return result;
+  }
+
+  public byte[] getChecksum() {
+    return csum;
+  }
+}

+ 78 - 0
src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java

@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream; 
+import java.io.FilterOutputStream;
+
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A Checksum output stream.
+ * Checksum for the contents of the file is calculated and
+ * appended to the end of the file on close of the stream.
+ * Used for IFiles
+ */
+class IFileOutputStream extends FilterOutputStream {
+  /**
+   * The output stream to be checksummed. 
+   */
+  private final DataChecksum sum;
+  private byte[] barray;
+  private boolean closed = false;
+
+  /**
+   * Create a checksum output stream that writes
+   * the bytes to the given stream.
+   * @param out
+   */
+  public IFileOutputStream(OutputStream out) {
+    super(out);
+    sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+        Integer.MAX_VALUE);
+    barray = new byte[sum.getChecksumSize()];
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    sum.writeValue(barray, 0, false);
+    out.write (barray, 0, sum.getChecksumSize());
+    out.flush();
+  }
+  
+  /**
+   * Write bytes to the stream.
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    sum.update(b, off,len);
+    out.write(b,off,len);
+  }
+ 
+  @Override
+  public void write(int b) throws IOException {
+    barray[0] = (byte) (b & 0xFF);
+    write(barray,0,1);
+  }
+
+}

+ 24 - 17
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -425,7 +426,8 @@ class MapTask extends Task {
     private final SpillThread spillThread = new SpillThread();
     private final SpillThread spillThread = new SpillThread();
 
 
     private final FileSystem localFs;
     private final FileSystem localFs;
-
+    private final FileSystem rfs;
+   
     private final Counters.Counter mapOutputByteCounter;
     private final Counters.Counter mapOutputByteCounter;
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineInputCounter;
     private final Counters.Counter combineInputCounter;
@@ -439,7 +441,10 @@ class MapTask extends Task {
       localFs = FileSystem.getLocal(job);
       localFs = FileSystem.getLocal(job);
       partitions = job.getNumReduceTasks();
       partitions = job.getNumReduceTasks();
       partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
       partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
-      // sanity checks
+       
+      rfs = ((LocalFileSystem)localFs).getRaw();
+
+      //sanity checks
       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
       final int sortmb = job.getInt("io.sort.mb", 100);
       final int sortmb = job.getInt("io.sort.mb", 100);
@@ -891,7 +896,7 @@ class MapTask extends Task {
         // create spill file
         // create spill file
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
                                       numSpills, size);
                                       numSpills, size);
-        out = localFs.create(filename);
+        out = rfs.create(filename);
         // create spill index
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
                              getTaskID(), numSpills,
                              getTaskID(), numSpills,
@@ -972,7 +977,7 @@ class MapTask extends Task {
         // create spill file
         // create spill file
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
                                       numSpills, size);
                                       numSpills, size);
-        out = localFs.create(filename);
+        out = rfs.create(filename);
         // create spill index
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
                              getTaskID(), numSpills,
                              getTaskID(), numSpills,
@@ -1107,15 +1112,15 @@ class MapTask extends Task {
       for(int i = 0; i < numSpills; i++) {
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
         indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
-        finalOutFileSize += localFs.getFileStatus(filename[i]).getLen();
+        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       }
       
       
       if (numSpills == 1) { //the spill is the final output
       if (numSpills == 1) { //the spill is the final output
-        localFs.rename(filename[0], 
-                       new Path(filename[0].getParent(), "file.out"));
-        localFs.rename(indexFileName[0], 
-                       new Path(indexFileName[0].getParent(),"file.out.index"));
-        return;
+    	  rfs.rename(filename[0],
+    			  new Path(filename[0].getParent(), "file.out"));
+    	  localFs.rename(indexFileName[0],
+    			  new Path(indexFileName[0].getParent(),"file.out.index"));
+    	  return;
       }
       }
       //make correction in the length to include the sequence file header
       //make correction in the length to include the sequence file header
       //lengths for each partition
       //lengths for each partition
@@ -1129,9 +1134,10 @@ class MapTask extends Task {
                             getTaskID(), finalIndexFileSize);
                             getTaskID(), finalIndexFileSize);
       
       
       //The output stream for the final single output file
       //The output stream for the final single output file
-      FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
-                                                   4096);
-      
+
+      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true,
+                                               4096);
+
       //The final index file output stream
       //The final index file output stream
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
                                                         4096);
                                                         4096);
@@ -1160,8 +1166,9 @@ class MapTask extends Task {
             long rawSegmentLength = indexIn.readLong();
             long rawSegmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             indexIn.close();
             indexIn.close();
-            FSDataInputStream in = localFs.open(filename[i]);
+            FSDataInputStream in = rfs.open(filename[i]);
             in.seek(segmentOffset);
             in.seek(segmentOffset);
+
             Segment<K, V> s = 
             Segment<K, V> s = 
               new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
               new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
                                 true);
                                 true);
@@ -1176,7 +1183,7 @@ class MapTask extends Task {
           //merge
           //merge
           @SuppressWarnings("unchecked")
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = 
           RawKeyValueIterator kvIter = 
-            Merger.merge(job, localFs, 
+            Merger.merge(job, rfs,
                          keyClass, valClass,
                          keyClass, valClass,
                          segmentList, job.getInt("io.sort.factor", 100), 
                          segmentList, job.getInt("io.sort.factor", 100), 
                          new Path(getTaskID().toString()), 
                          new Path(getTaskID().toString()), 
@@ -1203,7 +1210,7 @@ class MapTask extends Task {
         finalIndexOut.close();
         finalIndexOut.close();
         //cleanup
         //cleanup
         for(int i = 0; i < numSpills; i++) {
         for(int i = 0; i < numSpills; i++) {
-          localFs.delete(filename[i], true);
+          rfs.delete(filename[i],true);
           localFs.delete(indexFileName[i], true);
           localFs.delete(indexFileName[i], true);
         }
         }
       }
       }
@@ -1223,7 +1230,7 @@ class MapTask extends Task {
       //StringBuffer sb = new StringBuffer();
       //StringBuffer sb = new StringBuffer();
       indexOut.writeLong(start);
       indexOut.writeLong(start);
       indexOut.writeLong(writer.getRawLength());
       indexOut.writeLong(writer.getRawLength());
-      long segmentLength = out.getPos() - start;
+      long segmentLength = writer.getCompressedLength();
       indexOut.writeLong(segmentLength);
       indexOut.writeLong(segmentLength);
       LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
       LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
                segmentLength + ")");
                segmentLength + ")");

+ 4 - 1
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -128,7 +128,10 @@ class Merger {
     DataInputBuffer getKey() { return key; }
     DataInputBuffer getKey() { return key; }
     DataInputBuffer getValue() { return value; }
     DataInputBuffer getValue() { return value; }
 
 
-    long getLength() { return segmentLength; }
+    long getLength() { 
+      return (reader == null) ?
+        segmentLength : reader.getLength();
+    }
     
     
     boolean next() throws IOException {
     boolean next() throws IOException {
       return reader.next(key, value);
       return reader.next(key, value);

+ 25 - 11
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
+import java.lang.Math;
 import java.net.URI;
 import java.net.URI;
 import java.net.URL;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.net.URLClassLoader;
@@ -335,7 +336,8 @@ class ReduceTask extends Task {
     }
     }
     
     
     FileSystem lfs = FileSystem.getLocal(job);
     FileSystem lfs = FileSystem.getLocal(job);
-    
+    FileSystem rfs = ((LocalFileSystem)lfs).getRaw();
+
     // Initialize the codec
     // Initialize the codec
     codec = initCodec();
     codec = initCodec();
 
 
@@ -362,7 +364,7 @@ class ReduceTask extends Task {
     LOG.info("Initiating final on-disk merge with " + mapFiles.length + 
     LOG.info("Initiating final on-disk merge with " + mapFiles.length + 
              " files");
              " files");
     RawKeyValueIterator rIter = 
     RawKeyValueIterator rIter = 
-      Merger.merge(job, lfs,
+      Merger.merge(job,rfs,
                    job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
                    job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
                    codec, mapFiles, !conf.getKeepFailedTaskFiles(), 
                    codec, mapFiles, !conf.getKeepFailedTaskFiles(), 
                    job.getInt("io.sort.factor", 100), tempDir, 
                    job.getInt("io.sort.factor", 100), tempDir, 
@@ -509,6 +511,7 @@ class ReduceTask extends Task {
      */
      */
     private FileSystem localFileSys;
     private FileSystem localFileSys;
 
 
+    private FileSystem rfs;
     /**
     /**
      * Number of files to merge at a time
      * Number of files to merge at a time
      */
      */
@@ -1215,13 +1218,16 @@ class ReduceTask extends Task {
               compressedLength + " raw bytes) " + 
               compressedLength + " raw bytes) " + 
               "into RAM from " + mapOutputLoc.getTaskAttemptId());
               "into RAM from " + mapOutputLoc.getTaskAttemptId());
 
 
-          mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength);
+          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
+                                      (int)decompressedLength,
+                                      (int)compressedLength);
         } else {
         } else {
           LOG.info("Shuffling " + decompressedLength + " bytes (" + 
           LOG.info("Shuffling " + decompressedLength + " bytes (" + 
               compressedLength + " raw bytes) " + 
               compressedLength + " raw bytes) " + 
               "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
               "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
 
 
-          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength);
+          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
+              compressedLength);
         }
         }
             
             
         return mapOutput;
         return mapOutput;
@@ -1266,7 +1272,8 @@ class ReduceTask extends Task {
       private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
       private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
                                         URLConnection connection, 
                                         URLConnection connection, 
                                         InputStream input,
                                         InputStream input,
-                                        int mapOutputLength) 
+                                        int mapOutputLength,
+                                        int compressedLength)
       throws IOException, InterruptedException {
       throws IOException, InterruptedException {
         // Reserve ram for the map-output
         // Reserve ram for the map-output
         boolean createdNow = ramManager.reserve(mapOutputLength, input);
         boolean createdNow = ramManager.reserve(mapOutputLength, input);
@@ -1289,6 +1296,11 @@ class ReduceTask extends Task {
             throw ioe;
             throw ioe;
           }
           }
         }
         }
+
+        IFileInputStream checksumIn = 
+          new IFileInputStream(input,compressedLength);
+
+        input = checksumIn;       
       
       
         // Are map-outputs compressed?
         // Are map-outputs compressed?
         if (codec != null) {
         if (codec != null) {
@@ -1402,7 +1414,7 @@ class ReduceTask extends Task {
         OutputStream output = null;
         OutputStream output = null;
         long bytesRead = 0;
         long bytesRead = 0;
         try {
         try {
-          output = localFileSys.create(localFilename);
+          output = rfs.create(localFilename);
           
           
           byte[] buf = new byte[64 * 1024];
           byte[] buf = new byte[64 * 1024];
           int n = input.read(buf, 0, buf.length);
           int n = input.read(buf, 0, buf.length);
@@ -1541,7 +1553,9 @@ class ReduceTask extends Task {
         (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
         (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
       
       
       localFileSys = FileSystem.getLocal(conf);
       localFileSys = FileSystem.getLocal(conf);
-      
+
+      rfs = ((LocalFileSystem)localFileSys).getRaw();
+
       // hosts -> next contact time
       // hosts -> next contact time
       this.penaltyBox = new LinkedHashMap<String, Long>();
       this.penaltyBox = new LinkedHashMap<String, Long>();
       
       
@@ -2187,7 +2201,7 @@ class ReduceTask extends Task {
                                              approxOutputSize, conf)
                                              approxOutputSize, conf)
               .suffix(".merged");
               .suffix(".merged");
             Writer writer = 
             Writer writer = 
-              new Writer(conf, localFileSys, outputPath, 
+              new Writer(conf,rfs, outputPath, 
                          conf.getMapOutputKeyClass(), 
                          conf.getMapOutputKeyClass(), 
                          conf.getMapOutputValueClass(),
                          conf.getMapOutputValueClass(),
                          codec);
                          codec);
@@ -2195,7 +2209,7 @@ class ReduceTask extends Task {
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
             final Reporter reporter = getReporter(umbilical);
             final Reporter reporter = getReporter(umbilical);
             try {
             try {
-              iter = Merger.merge(conf, localFileSys,
+              iter = Merger.merge(conf, rfs,
                                   conf.getMapOutputKeyClass(),
                                   conf.getMapOutputKeyClass(),
                                   conf.getMapOutputValueClass(),
                                   conf.getMapOutputValueClass(),
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
@@ -2275,7 +2289,7 @@ class ReduceTask extends Task {
                           reduceTask.getTaskID(), ramfsMergeOutputSize);
                           reduceTask.getTaskID(), ramfsMergeOutputSize);
 
 
         Writer writer = 
         Writer writer = 
-          new Writer(conf, localFileSys, outputPath,
+          new Writer(conf, rfs, outputPath,
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputValueClass(),
                      conf.getMapOutputValueClass(),
                      codec);
                      codec);
@@ -2289,7 +2303,7 @@ class ReduceTask extends Task {
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                    " segments...");
                    " segments...");
           
           
-          rIter = Merger.merge(conf, localFileSys,
+          rIter = Merger.merge(conf, rfs,
                                (Class<K>)conf.getMapOutputKeyClass(),
                                (Class<K>)conf.getMapOutputKeyClass(),
                                (Class<V>)conf.getMapOutputValueClass(),
                                (Class<V>)conf.getMapOutputValueClass(),
                                inMemorySegments, inMemorySegments.size(),
                                inMemorySegments, inMemorySegments.size(),

+ 14 - 8
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
@@ -2538,6 +2539,8 @@ public class TaskTracker
       OutputStream outStream = null;
       OutputStream outStream = null;
       FSDataInputStream indexIn = null;
       FSDataInputStream indexIn = null;
       FSDataInputStream mapOutputIn = null;
       FSDataInputStream mapOutputIn = null;
+ 
+      IFileInputStream checksumInputStream = null;
       
       
       long totalRead = 0;
       long totalRead = 0;
       ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
       ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
@@ -2598,8 +2601,9 @@ public class TaskTracker
          * send it to the reducer.
          * send it to the reducer.
          */
          */
         //open the map-output file
         //open the map-output file
-        mapOutputIn = fileSys.open(mapOutputFileName);
-        
+        FileSystem rfs = ((LocalFileSystem)fileSys).getRaw();
+
+        mapOutputIn = rfs.open(mapOutputFileName);
         // TODO: Remove this after a 'fix' for HADOOP-3647
         // TODO: Remove this after a 'fix' for HADOOP-3647
         // The clever trick here to reduce the impact of the extra seek for
         // The clever trick here to reduce the impact of the extra seek for
         // logging the first key/value lengths is to read the lengths before
         // logging the first key/value lengths is to read the lengths before
@@ -2618,8 +2622,9 @@ public class TaskTracker
 
 
         //seek to the correct offset for the reduce
         //seek to the correct offset for the reduce
         mapOutputIn.seek(startOffset);
         mapOutputIn.seek(startOffset);
+        checksumInputStream = new IFileInputStream(mapOutputIn,partLength);
           
           
-        int len = mapOutputIn.read(buffer, 0,
+        int len = checksumInputStream.readWithChecksum(buffer, 0,
                                    partLength < MAX_BYTES_TO_READ 
                                    partLength < MAX_BYTES_TO_READ 
                                    ? (int)partLength : MAX_BYTES_TO_READ);
                                    ? (int)partLength : MAX_BYTES_TO_READ);
         while (len > 0) {
         while (len > 0) {
@@ -2633,9 +2638,9 @@ public class TaskTracker
           }
           }
           totalRead += len;
           totalRead += len;
           if (totalRead == partLength) break;
           if (totalRead == partLength) break;
-          len = mapOutputIn.read(buffer, 0, 
-                                 (partLength - totalRead) < MAX_BYTES_TO_READ
-                                 ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
+          len = checksumInputStream.readWithChecksum(buffer, 0, 
+                        (partLength - totalRead) < MAX_BYTES_TO_READ
+                          ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
         }
         }
         
         
         LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
         LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
@@ -2660,8 +2665,9 @@ public class TaskTracker
         if (indexIn != null) {
         if (indexIn != null) {
           indexIn.close();
           indexIn.close();
         }
         }
-        if (mapOutputIn != null) {
-          mapOutputIn.close();
+
+        if (checksumInputStream != null) {
+          checksumInputStream.close();
         }
         }
         shuffleMetrics.serverHandlerFree();
         shuffleMetrics.serverHandlerFree();
         if (ClientTraceLog.isInfoEnabled()) {
         if (ClientTraceLog.isInfoEnabled()) {

+ 5 - 3
src/test/org/apache/hadoop/mapred/TestReduceTask.java

@@ -23,6 +23,7 @@ import junit.framework.TestCase;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparator;
@@ -75,10 +76,11 @@ public class TestReduceTask extends TestCase {
   public void runValueIterator(Path tmpDir, Pair[] vals, 
   public void runValueIterator(Path tmpDir, Pair[] vals, 
                                Configuration conf, 
                                Configuration conf, 
                                CompressionCodec codec) throws IOException {
                                CompressionCodec codec) throws IOException {
-    FileSystem fs = tmpDir.getFileSystem(conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
     Path path = new Path(tmpDir, "data.in");
     Path path = new Path(tmpDir, "data.in");
     IFile.Writer<Text, Text> writer = 
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class, codec);
+      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, codec);
     for(Pair p: vals) {
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
       writer.append(new Text(p.key), new Text(p.value));
     }
     }
@@ -86,7 +88,7 @@ public class TestReduceTask extends TestCase {
     
     
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     RawKeyValueIterator rawItr = 
     RawKeyValueIterator rawItr = 
-      Merger.merge(conf, fs, Text.class, Text.class, codec, new Path[]{path}, 
+      Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
                    new Text.Comparator(), new NullProgress());
                    new Text.Comparator(), new NullProgress());
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     @SuppressWarnings("unchecked") // WritableComparators are not generic