Sfoglia il codice sorgente

HADOOP-1470. Factor checksum generation and validation out of ChecksumFileSystem to make it reusable. Contributed by Hairong.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@555414 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 anni fa
parent
commit
628dbb929e

+ 4 - 0
CHANGES.txt

@@ -318,6 +318,10 @@ Trunk (unreleased changes)
  98. HADOOP-1486.  Fix so that fatal exceptions in namenode cause it
      to exit.  (Dhruba Borthakur via cutting)
 
+ 99. HADOOP-1470.  Factor checksum code out of ChecksumFileSystem so
+     that it can be reused by FileSystem's with built-in checksumming.
+     (Hairong Kuang via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

+ 48 - 15
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -273,6 +273,12 @@ class DFSClient implements FSConstants {
     return new DFSInputStream(src.toString());
   }
 
+  public DFSInputStream open(UTF8 src, int buffersize) throws IOException {
+    checkOpen();
+    //    Get block info from namenode
+    return new DFSInputStream(src.toString(), buffersize);
+  }
+
   /**
    * Create a new dfs file and return an output stream for writing into it. 
    * 
@@ -321,6 +327,7 @@ class DFSClient implements FSConstants {
     return create(src, overwrite, replication, blockSize, null);
   }
 
+  
   /**
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
@@ -338,9 +345,30 @@ class DFSClient implements FSConstants {
                              long blockSize,
                              Progressable progress
                              ) throws IOException {
+    return create(src, overwrite, replication, blockSize, progress,
+        conf.getInt("io.file.buffer.size", 4096));
+  }
+  /**
+   * Create a new dfs file with the specified block replication 
+   * with write-progress reporting and return an output stream for writing
+   * into the file.  
+   * 
+   * @param src stream name
+   * @param overwrite do not check for file existence if true
+   * @param replication block replication
+   * @return output stream
+   * @throws IOException
+   */
+  public OutputStream create(UTF8 src, 
+                             boolean overwrite, 
+                             short replication,
+                             long blockSize,
+                             Progressable progress,
+                             int buffersize
+                             ) throws IOException {
     checkOpen();
-    OutputStream result = new DFSOutputStream(src, overwrite, 
-                                              replication, blockSize, progress);
+    OutputStream result = new DFSOutputStream(
+        src, overwrite, replication, blockSize, progress, buffersize);
     synchronized (pendingCreates) {
       pendingCreates.put(src.toString(), result);
     }
@@ -577,10 +605,16 @@ class DFSClient implements FSConstants {
     private long pos = 0;
     private long blockEnd = -1;
     private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
+    private int buffersize;
         
     /**
      */
     public DFSInputStream(String src) throws IOException {
+      this(src, conf.getInt("io.file.buffer.size", 4096));
+    }
+    
+    public DFSInputStream(String src, int buffersize) throws IOException {
+      this.buffersize = buffersize;
       this.src = src;
       prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
       openInfo();
@@ -747,7 +781,7 @@ class DFSClient implements FSConstants {
           //
           // Get bytes in block, set streams
           //
-          DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+          DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream(), buffersize));
           long curBlockSize = in.readLong();
           long amtSkipped = in.readLong();
           if (curBlockSize != block.getNumBytes()) {
@@ -1066,39 +1100,35 @@ class DFSClient implements FSConstants {
     public void mark(int readLimit) {
     }
     public void reset() throws IOException {
-      throw new IOException("Mark not supported");
+      throw new IOException("Mark/reset not supported");
     }
   }
     
   static class DFSDataInputStream extends FSDataInputStream {
-    DFSDataInputStream(DFSInputStream in, Configuration conf)
+    DFSDataInputStream(DFSInputStream in)
       throws IOException {
-      super(in, conf);
-    }
-      
-    DFSDataInputStream(DFSInputStream in, int bufferSize) throws IOException {
-      super(in, bufferSize);
+      super(in);
     }
       
     /**
      * Returns the datanode from which the stream is currently reading.
      */
     public DatanodeInfo getCurrentDatanode() {
-      return ((DFSInputStream)inStream).getCurrentDatanode();
+      return ((DFSInputStream)in).getCurrentDatanode();
     }
       
     /**
      * Returns the block containing the target position. 
      */
     public Block getCurrentBlock() {
-      return ((DFSInputStream)inStream).getCurrentBlock();
+      return ((DFSInputStream)in).getCurrentBlock();
     }
 
     /**
      * Return collection of blocks that has already been located.
      */
     synchronized List<LocatedBlock> getAllBlocks() throws IOException {
-      return ((DFSInputStream)inStream).getAllBlocks();
+      return ((DFSInputStream)in).getAllBlocks();
     }
 
   }
@@ -1126,6 +1156,7 @@ class DFSClient implements FSConstants {
     private int bytesWrittenToBlock = 0;
     private String datanodeName;
     private long blockSize;
+    private int buffersize;
 
     private Progressable progress;
     /**
@@ -1133,7 +1164,8 @@ class DFSClient implements FSConstants {
      */
     public DFSOutputStream(UTF8 src, boolean overwrite, 
                            short replication, long blockSize,
-                           Progressable progress
+                           Progressable progress,
+                           int buffersize
                            ) throws IOException {
       this.src = src;
       this.overwrite = overwrite;
@@ -1145,6 +1177,7 @@ class DFSClient implements FSConstants {
       if (progress != null) {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
       }
+      this.buffersize = buffersize;
     }
 
     /* Wrapper for closing backupStream. This sets backupStream to null so
@@ -1234,7 +1267,7 @@ class DFSClient implements FSConstants {
         //
         // Xmit header info to datanode
         //
-        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
         out.write(OP_WRITE_BLOCK);
         out.writeBoolean(true);
         block.write(out);

+ 3 - 5
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -120,7 +120,7 @@ public class DistributedFileSystem extends ChecksumFileSystem {
     }
 
     public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-      return new DFSClient.DFSDataInputStream(dfs.open(getPath(f)), bufferSize);
+      return new DFSClient.DFSDataInputStream(dfs.open(getPath(f),bufferSize));
     }
 
     public FSDataOutputStream create(Path f, boolean overwrite,
@@ -134,10 +134,8 @@ public class DistributedFileSystem extends ChecksumFileSystem {
         throw new IOException("Mkdirs failed to create " + parent);
       }
       
-      return new FSDataOutputStream(
-                                    dfs.create(getPath(f), overwrite,
-                                               replication, blockSize, progress),
-                                    bufferSize);
+      return new FSDataOutputStream( dfs.create(
+          getPath(f), overwrite, replication, blockSize, progress, bufferSize));
     }
     
     public boolean setReplication(Path src, 

+ 96 - 0
src/java/org/apache/hadoop/fs/BufferedFSInputStream.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+
+
+/**
+ * A class optimizes reading from FSInputStream by bufferring
+ */
+
+
+public class BufferedFSInputStream extends BufferedInputStream
+implements Seekable, PositionedReadable {
+  /**
+   * Creates a <code>BufferedFSInputStream</code>
+   * with the specified buffer size,
+   * and saves its  argument, the input stream
+   * <code>in</code>, for later use.  An internal
+   * buffer array of length  <code>size</code>
+   * is created and stored in <code>buf</code>.
+   *
+   * @param   in     the underlying input stream.
+   * @param   size   the buffer size.
+   * @exception IllegalArgumentException if size <= 0.
+   */
+  public BufferedFSInputStream(FSInputStream in, int size) {
+    super(in, size);
+  }
+
+  public long getPos() throws IOException {
+    return ((FSInputStream)in).getPos()-(count-pos);
+  }
+
+  public long skip(long n) throws IOException {
+    if (n <= 0) {
+      return 0;
+    }
+
+    seek(getPos()+n);
+    return n;
+  }
+
+  public void seek(long pos) throws IOException {
+    if( pos<0 ) {
+      return;
+    }
+    // optimize: check if the pos is in the buffer
+    long end = ((FSInputStream)in).getPos();
+    long start = end - count;
+    if( pos>=start && pos<end) {
+      this.pos = (int)(pos-start);
+      return;
+    }
+
+    // invalidate buffer
+    this.pos = 0;
+    this.count = 0;
+
+    ((FSInputStream)in).seek(pos);
+  }
+
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    pos = 0;
+    count = 0;
+    return ((FSInputStream)in).seekToNewSource(targetPos);
+  }
+
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    return ((FSInputStream)in).read(position, buffer, offset, length) ;
+  }
+
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    ((FSInputStream)in).readFully(position, buffer, offset, length);
+  }
+
+  public void readFully(long position, byte[] buffer) throws IOException {
+    ((FSInputStream)in).readFully(position, buffer);
+  }
+}

+ 86 - 234
src/java/org/apache/hadoop/fs/ChecksumFileSystem.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.fs;
 import java.io.*;
 import java.util.Arrays;
 import java.util.zip.CRC32;
-import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,7 +39,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
 
   public static double getApproxChkSumLength(long size) {
-    return FSOutputSummer.CHKSUM_AS_FRACTION * size;
+    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
   }
   
   public ChecksumFileSystem(FileSystem fs) {
@@ -67,7 +66,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
    * actual file.
    **/
   public long getChecksumFileLength(Path file, long fileSize) {
-    return FSOutputSummer.getChecksumLength(fileSize, getBytesPerSum());
+    return ChecksumFSOutputSummer.getChecksumLength(fileSize, getBytesPerSum());
   }
 
   /** Return the bytes Per Checksum */
@@ -86,31 +85,28 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
    * For open()'s FSInputStream
    * It verifies that data matches checksums.
    *******************************************************/
-  private static class FSInputChecker extends FSInputStream {
+  private static class ChecksumFSInputChecker extends FSInputChecker {
     public static final Log LOG 
       = LogFactory.getLog("org.apache.hadoop.fs.FSInputChecker");
     
     private ChecksumFileSystem fs;
-    private Path file;
     private FSDataInputStream datas;
     private FSDataInputStream sums;
-    private Checksum sum = new CRC32();
-    private int inSum;
     
     private static final int HEADER_LENGTH = 8;
     
     private int bytesPerSum = 1;
     
-    public FSInputChecker(ChecksumFileSystem fs, Path file)
+    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
       throws IOException {
       this(fs, file, fs.getConf().getInt("io.file.buffer.size", 4096));
     }
     
-    public FSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
+    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
       throws IOException {
-      datas = fs.getRawFileSystem().open(file, bufferSize);
+      super( file, fs.getFileStatus(file).getReplication() );
+      this.datas = fs.getRawFileSystem().open(file, bufferSize);
       this.fs = fs;
-      this.file = file;
       Path sumFile = fs.getChecksumFile(file);
       try {
         int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
@@ -120,208 +116,92 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         sums.readFully(version);
         if (!Arrays.equals(version, CHECKSUM_VERSION))
           throw new IOException("Not a checksum file: "+sumFile);
-        bytesPerSum = sums.readInt();
+        this.bytesPerSum = sums.readInt();
+        set(new CRC32(), bytesPerSum, 4);
       } catch (FileNotFoundException e) {         // quietly ignore
-        stopSumming();
+        set(null, 1, 0);
       } catch (IOException e) {                   // loudly ignore
         LOG.warn("Problem opening checksum file: "+ file + 
                  ".  Ignoring exception: " + 
                  StringUtils.stringifyException(e));
-        stopSumming();
+        set(null, 1, 0);
       }
     }
-
+    
     private long getChecksumFilePos( long dataPos ) {
       return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
     }
     
-    public void seek(long desired) throws IOException {
-      // seek to a checksum boundary
-      long checksumBoundary = desired/bytesPerSum*bytesPerSum;
-      if (checksumBoundary != getPos()) {
-        datas.seek(checksumBoundary);
-        if (sums != null) {
-          sums.seek(getChecksumFilePos(checksumBoundary));
-        }
-      }
-      
-      if (sums != null) {
-        sum.reset();
-        inSum = 0;
-      }
-      
-      // scan to desired position
-      int delta = (int)(desired - checksumBoundary);
-      readBuffer(new byte[delta], 0, delta);
-    }
-    
-    public int read() throws IOException {
-      byte[] b = new byte[1];
-      readBuffer(b, 0, 1);
-      return b[0] & 0xff;
-    }
-
-    public int read(byte b[]) throws IOException {
-      return read(b, 0, b.length);
-    }
-
-    public int read(byte b[], int off, int len) throws IOException {
-      // make sure that it ends at a checksum boundary
-      long curPos = getPos();
-      long endPos = len+curPos/bytesPerSum*bytesPerSum;
-      return readBuffer(b, off, (int)(endPos-curPos));
+    protected long getChunkPosition( long dataPos ) {
+      return dataPos/bytesPerSum*bytesPerSum;
     }
     
-    private int readBuffer(byte b[], int off, int len) throws IOException {
-      int read;
-      boolean retry;
-      int retriesLeft = 3;
-      long oldPos = getPos();
-      do {
-        retriesLeft--;
-        retry = false;
-        
-        read = 0;
-        boolean endOfFile=false;
-        while (read < len && !endOfFile) {
-          int count = datas.read(b, off + read, len - read);
-          if (count < 0)
-            endOfFile = true;
-          else
-            read += count;
-        }
-        
-        if (sums != null && read!=0) {
-          long oldSumsPos = sums.getPos();
-          try {
-            int summed = 0;
-            while (summed < read) {
-              int goal = bytesPerSum - inSum;
-              int inBuf = read - summed;
-              int toSum = inBuf <= goal ? inBuf : goal;
-              
-              try {
-                sum.update(b, off+summed, toSum);
-              } catch (ArrayIndexOutOfBoundsException e) {
-                throw new RuntimeException("Summer buffer overflow b.len=" + 
-                                           b.length + ", off=" + off + 
-                                           ", summed=" + summed + ", read=" + 
-                                           read + ", bytesPerSum=" + bytesPerSum +
-                                           ", inSum=" + inSum, e);
-              }
-              summed += toSum;
-              
-              inSum += toSum;
-              if (inSum == bytesPerSum) {
-                verifySum(read-(summed-bytesPerSum));
-              } else if (read == summed && endOfFile) {
-                verifySum(read-read/bytesPerSum*bytesPerSum);
-              }
-            }
-          } catch (ChecksumException ce) {
-            LOG.info("Found checksum error: "+StringUtils.stringifyException(ce));
-            long errPos = ce.getPos();
-            boolean shouldRetry = fs.reportChecksumFailure(
-                                                           file, datas, errPos, sums, errPos/bytesPerSum);
-            if (!shouldRetry || retriesLeft == 0) {
-              throw ce;
-            }
-            
-            if (seekToNewSource(oldPos)) {
-              // Since at least one of the sources is different, 
-              // the read might succeed, so we'll retry.
-              retry = true;
-              seek(oldPos); //make sure Checksum sum's value gets restored
-            } else {
-              // Neither the data stream nor the checksum stream are being read
-              // from different sources, meaning we'll still get a checksum error 
-              // if we try to do the read again.  We throw an exception instead.
-              throw ce;
-            }
-          }
-        }
-      } while (retry);
-      return read==0?-1:read;
+    public int available() throws IOException {
+      return datas.available() + super.available();
     }
     
-    private void verifySum(int delta) throws IOException {
-      int crc;
-      try {
-        crc = sums.readInt();
-      } catch (IOException e) {
-        LOG.warn("Problem reading checksum file: "+e+". Ignoring.");
-        stopSumming();
-        return;
+    public int read(long position, byte[] b, int off, int len)
+      throws IOException {
+      // parameter check
+      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+        throw new IndexOutOfBoundsException();
+      } else if (len == 0) {
+        return 0;
       }
-      int sumValue = (int)sum.getValue();
-      sum.reset();
-      inSum = 0;
-      if (crc != sumValue) {
-        long pos = getPos() - delta;
-        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
+      if( position<0 ) {
+        throw new IllegalArgumentException(
+            "Parameter position can not to be negative");
       }
-    }
-    
-    public long getPos() throws IOException {
-      return datas.getPos();
-    }
-    
-    public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      return datas.read(position, buffer, offset, length);
-    }
-    
-    public void readFully(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      datas.readFully(position, buffer, offset, length);
-    }
-    
-    public void readFully(long position, byte[] buffer)
-      throws IOException {
-      datas.readFully(position, buffer);
+
+      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
+      checker.seek(position);
+      int nread = checker.read(b, off, len);
+      checker.close();
+      return nread;
     }
     
     public void close() throws IOException {
       datas.close();
-      stopSumming();
-    }
-    
-    private void stopSumming() {
-      if (sums != null) {
-        try {
-          sums.close();
-        } catch (IOException f) {}
-        sums = null;
-        bytesPerSum = 1;
+      if( sums != null ) {
+        sums.close();
       }
+      set(null, 1, 0);
     }
     
-    public int available() throws IOException {
-      return datas.available();
-    }
-    
-    public boolean markSupported() {
-      return datas.markSupported();
-    }
-    
-    public synchronized void mark(int readlimit) {
-      datas.mark(readlimit);
-    }
-    
-    public synchronized void reset() throws IOException {
-      datas.reset();
-    }
-    
-    public long skip(long n) throws IOException {
-      return datas.skip(n);
-    }
 
     @Override
     public boolean seekToNewSource(long targetPos) throws IOException {
+      long sumsPos = getChecksumFilePos(targetPos);
+      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
       boolean newDataSource = datas.seekToNewSource(targetPos);
-      return sums.seekToNewSource(getChecksumFilePos(targetPos)) || newDataSource;
+      return sums.seekToNewSource(sumsPos) || newDataSource;
     }
 
+    @Override
+    protected int readChunk(long pos, byte[] buf, int offset, int len,
+        byte[] checksum) throws IOException {
+      boolean eof = false;
+      if(needChecksum()) {
+        try {
+          long checksumPos = getChecksumFilePos(pos); 
+          if(checksumPos != sums.getPos()) {
+            sums.seek(checksumPos);
+          }
+          sums.readFully(checksum);
+        } catch (EOFException e) {
+          eof = true;
+        }
+        len = bytesPerSum;
+      }
+      if(pos != datas.getPos()) {
+        datas.seek(pos);
+      }
+      int nread = readFully(datas, buf, offset, len);
+      if( eof && nread > 0) {
+        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
+      }
+      return nread;
+    }
   }
 
   /**
@@ -331,21 +211,18 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
    */
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return new FSDataInputStream(new FSInputChecker(this, f, bufferSize),
-                                 getBytesPerSum());
+    return new FSDataInputStream(
+        new ChecksumFSInputChecker(this, f, bufferSize) );
   }
 
   /** This class provides an output stream for a checksummed file.
    * It generates checksums for data. */
-  private static class FSOutputSummer extends FilterOutputStream {
-    
+  private static class ChecksumFSOutputSummer extends FSOutputSummer {
+    private FSDataOutputStream datas;    
     private FSDataOutputStream sums;
-    private Checksum sum = new CRC32();
-    private int inSum;
-    private int bytesPerSum;
     private static final float CHKSUM_AS_FRACTION = 0.01f;
     
-    public FSOutputSummer(ChecksumFileSystem fs, 
+    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
                           Path file, 
                           boolean overwrite, 
                           short replication,
@@ -357,7 +234,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
            replication, blockSize, null);
     }
     
-    public FSOutputSummer(ChecksumFileSystem fs, 
+    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
                           Path file, 
                           boolean overwrite,
                           int bufferSize,
@@ -365,51 +242,22 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
                           long blockSize,
                           Progressable progress)
       throws IOException {
-      super(fs.getRawFileSystem().create(file, overwrite, bufferSize, 
-                                         replication, blockSize, progress));
-      this.bytesPerSum = fs.getBytesPerSum();
+      super(new CRC32(), fs.getBytesPerSum(), 4);
+      int bytesPerSum = fs.getBytesPerSum();
+      this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
+                                         replication, blockSize, progress);
       int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
       this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
                                                sumBufferSize, replication,
                                                blockSize);
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
-      sums.writeInt(this.bytesPerSum);
-    }
-    
-    public void write(byte b[], int off, int len) throws IOException {
-      int summed = 0;
-      while (summed < len) {
-        
-        int goal = this.bytesPerSum - inSum;
-        int inBuf = len - summed;
-        int toSum = inBuf <= goal ? inBuf : goal;
-        
-        sum.update(b, off+summed, toSum);
-        summed += toSum;
-        
-        inSum += toSum;
-        if (inSum == this.bytesPerSum) {
-          writeSum();
-        }
-      }
-      
-      out.write(b, off, len);
-    }
-    
-    private void writeSum() throws IOException {
-      if (inSum != 0) {
-        sums.writeInt((int)sum.getValue());
-        sum.reset();
-        inSum = 0;
-      }
+      sums.writeInt(bytesPerSum);
     }
     
     public void close() throws IOException {
-      writeSum();
-      if (sums != null) {
-        sums.close();
-      }
-      out.close();
+      flushBuffer();
+      sums.close();
+      datas.close();
     }
     
     public static long getChecksumLength(long size, int bytesPerSum) {
@@ -418,6 +266,13 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 + 
         CHECKSUM_VERSION.length;  
     }
+
+    @Override
+    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+    throws IOException {
+      datas.write(b, offset, len);
+      sums.write(checksum);
+    }
   }
 
   /**
@@ -433,15 +288,12 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
                                    short replication, long blockSize, Progressable progress)
     throws IOException {
-    if (exists(f) && !overwrite) {
-      throw new IOException("File already exists:" + f);
-    }
     Path parent = f.getParent();
     if (parent != null && !mkdirs(parent)) {
       throw new IOException("Mkdirs failed to create " + parent);
     }
-    return new FSDataOutputStream(new FSOutputSummer(this, f, overwrite,
-                                                     bufferSize, replication, blockSize, progress), getBytesPerSum());
+    return new FSDataOutputStream(new ChecksumFSOutputSummer(
+        this, f, overwrite, bufferSize, replication, blockSize, progress));
   }
 
   /**

+ 12 - 105
src/java/org/apache/hadoop/fs/FSDataInputStream.java

@@ -19,137 +19,44 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 
-import org.apache.hadoop.conf.*;
-
 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
  * and buffers input through a {@link BufferedInputStream}. */
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable {
 
-  /** Cache the file position.  This improves performance significantly.*/
-  private static class PositionCache extends FilterInputStream {
-    long position;
-
-    public PositionCache(FSInputStream in) throws IOException {
-      super(in);
-    }
-
-    // This is the only read() method called by BufferedInputStream, so we trap
-    // calls to it in order to cache the position.
-    public int read(byte b[], int off, int len) throws IOException {
-      int result;
-      if ((result = in.read(b, off, len)) > 0)
-        position += result;
-      return result;
-    }
-
-    public void seek(long desired) throws IOException {
-      ((FSInputStream)in).seek(desired);          // seek underlying stream
-      position = desired;                         // update position
-    }
-      
-    public long getPos() throws IOException {
-      return position;                            // return cached position
-    }
-    
-    public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      return ((FSInputStream)in).read(position, buffer, offset, length);
-    }
-    
-    public void readFully(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      ((FSInputStream)in).readFully(position, buffer, offset, length);
-    }
-  }
-
-  /** Buffer input.  This improves performance significantly.*/
-  private static class Buffer extends BufferedInputStream {
-    public Buffer(PositionCache in, int bufferSize)
-      throws IOException {
-      super(in, bufferSize);
-    }
-
-    public void seek(long desired) throws IOException {
-      long end = ((PositionCache)in).getPos();
-      long start = end - this.count;
-      int avail = this.count - this.pos;
-      if (desired >= start && desired < end && avail > 0) {
-        this.pos = (int)(desired - start);        // can position within buffer
-      } else {
-        this.count = 0;                           // invalidate buffer
-        this.pos = 0;
-        ((PositionCache)in).seek(desired);
-      }
-    }
-      
-    public long getPos() throws IOException {     // adjust for buffer
-      return ((PositionCache)in).getPos() - (this.count - this.pos);
-    }
-
-    // optimized version of read()
-    public int read() throws IOException {
-      if (pos >= count)
-        return super.read();
-      return buf[pos++] & 0xff;
-    }
-
-    public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      return ((PositionCache)in).read(position, buffer, offset, length);
-    }
-    
-    public void readFully(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      ((PositionCache)in).readFully(position, buffer, offset, length);
-    }
-
-    // Disable marking, as its use can cause the BufferedInputStream superclass
-    // to read a smaller amount of data than bytesPerChecksum, which breaks
-    // ChecksumFileSystem.
-    public boolean markSupported() {
-      return false;
-    }
-
-    public synchronized void mark(int readlimit) {}
-  }
-
-  protected FSInputStream inStream;
-  
-  public FSDataInputStream(FSInputStream in, Configuration conf) throws IOException {
-    this(in, conf.getInt("io.file.buffer.size", 4096));
-  }
-  
-  public FSDataInputStream(FSInputStream in, int bufferSize)
+  public FSDataInputStream(InputStream in)
     throws IOException {
-    super(new Buffer(new PositionCache(in), bufferSize));
-    this.inStream = in;
+    super(in);
+    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
+      throw new IllegalArgumentException(
+          "In is not an instance of Seekable or PositionedReadable");
+    }
   }
   
   public synchronized void seek(long desired) throws IOException {
-    ((Buffer)in).seek(desired);
+    ((Seekable)in).seek(desired);
   }
 
   public long getPos() throws IOException {
-    return ((Buffer)in).getPos();
+    return ((Seekable)in).getPos();
   }
   
   public int read(long position, byte[] buffer, int offset, int length)
     throws IOException {
-    return ((Buffer)in).read(position, buffer, offset, length);
+    return ((PositionedReadable)in).read(position, buffer, offset, length);
   }
   
   public void readFully(long position, byte[] buffer, int offset, int length)
     throws IOException {
-    ((Buffer)in).readFully(position, buffer, offset, length);
+    ((PositionedReadable)in).readFully(position, buffer, offset, length);
   }
   
   public void readFully(long position, byte[] buffer)
     throws IOException {
-    ((Buffer)in).readFully(position, buffer, 0, buffer.length);
+    ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
   }
   
   public boolean seekToNewSource(long targetPos) throws IOException {
-    return inStream.seekToNewSource(targetPos); 
+    return ((Seekable)in).seekToNewSource(targetPos); 
   }
 }

+ 8 - 36
src/java/org/apache/hadoop/fs/FSDataOutputStream.java

@@ -19,8 +19,6 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 
-import org.apache.hadoop.conf.Configuration;
-
 /** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream},
  * buffers output through a {@link BufferedOutputStream} and creates a checksum
  * file. */
@@ -32,8 +30,11 @@ public class FSDataOutputStream extends DataOutputStream {
       super(out);
     }
 
-    // This is the only write() method called by BufferedOutputStream, so we
-    // trap calls to it in order to cache the position.
+    public void write(int b) throws IOException {
+      out.write(b);
+      position++;
+    }
+    
     public void write(byte b[], int off, int len) throws IOException {
       out.write(b, off, len);
       position += len;                            // update position
@@ -49,42 +50,13 @@ public class FSDataOutputStream extends DataOutputStream {
     }
   }
 
-  private static class Buffer extends BufferedOutputStream {
-    public Buffer(PositionCache out, int bufferSize) throws IOException {
-      super(out, bufferSize);
-    }
-
-    public long getPos() throws IOException {
-      return ((PositionCache)out).getPos() + this.count;
-    }
-
-    // optimized version of write(int)
-    public void write(int b) throws IOException {
-      if (count >= buf.length) {
-        super.write(b);
-      } else {
-        buf[count++] = (byte)b;
-      }
-    }
-
-    public void close() throws IOException {
-      flush();
-      out.close();
-    }
-  }
-
-  public FSDataOutputStream(OutputStream out, int bufferSize)
+  public FSDataOutputStream(OutputStream out)
     throws IOException {
-    super(new Buffer(new PositionCache(out), bufferSize));
+    super(new PositionCache(out));
   }
   
-  public FSDataOutputStream(OutputStream out, Configuration conf)
-    throws IOException {
-    this(out, conf.getInt("io.file.buffer.size", 4096));
-  }
-
   public long getPos() throws IOException {
-    return ((Buffer)out).getPos();
+    return ((PositionCache)out).getPos();
   }
 
   public void close() throws IOException {

+ 399 - 0
src/java/org/apache/hadoop/fs/FSInputChecker.java

@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This is a generic input stream for verifying checksums for
+ * data before it is read by a user.
+ */
+
+abstract public class FSInputChecker extends FSInputStream {
+  public static final Log LOG 
+  = LogFactory.getLog("org.apache.hadoop.fs.FSInputChecker");
+  
+  /** The file name from which data is read from */
+  protected Path file;
+  private Checksum sum;
+  private byte[] buf;
+  private byte[] checksum;
+  private int pos;
+  private int count;
+  
+  private int numOfRetries;
+  
+  // cached file position
+  private long chunkPos = 0;
+  
+  /** Constructor
+   * 
+   * @param file The name of the file to be read
+   * @param numOfRetries Number of read retries when ChecksumError occurs
+   */
+  protected FSInputChecker( Path file, int numOfRetries) {
+    this.file = file;
+    this.numOfRetries = numOfRetries;
+  }
+  
+  /** Constructor
+   * 
+   * @param file The name of the file to be read
+   * @param numOfRetries Number of read retries when ChecksumError occurs
+   * @param sum the type of Checksum engine
+   * @param chunkSize maximun chunk size
+   * @param checksumSize the number byte of each checksum
+   */
+  protected FSInputChecker( Path file, int numOfRetries, 
+      Checksum sum, int chunkSize, int checksumSize ) {
+    this(file, numOfRetries);
+    set(sum, chunkSize, checksumSize);
+  }
+  
+  /** Reads in next checksum chunk data into <code>buf</code> at <code>offset</code>
+   * and checksum into <code>checksum</code>.
+   * The method is used for implementing read, therefore, it should be optimized
+   * for sequential reading
+   * @param pos chunkPos
+   * @param buf desitination buffer
+   * @param offset offset in buf at which to store data
+   * @param len maximun number of bytes to read
+   * @return number of bytes read
+   */
+  abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
+      byte[] checksum) throws IOException;
+
+  /** Return position of beginning of chunk containing pos. 
+   *
+   * @param pos a postion in the file
+   * @return the starting position of the chunk which contains the byte
+   */
+  abstract protected long getChunkPosition(long pos);
+
+  /** Return true if there is a need for checksum verification */
+  protected synchronized boolean needChecksum() {
+    return sum != null;
+  }
+  
+  /**
+   * Read one checksum-verified byte
+   * 
+   * @return     the next byte of data, or <code>-1</code> if the end of the
+   *             stream is reached.
+   * @exception  IOException  if an I/O error occurs.
+   */
+
+  public synchronized int read() throws IOException {
+    if (pos >= count) {
+      fill();
+      if (pos >= count) {
+        return -1;
+      }
+    }
+    return buf[pos++] & 0xff;
+  }
+  
+  /**
+   * Read checksum verified bytes from this byte-input stream into 
+   * the specified byte array, starting at the given offset.
+   *
+   * <p> This method implements the general contract of the corresponding
+   * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+   * the <code>{@link InputStream}</code> class.  As an additional
+   * convenience, it attempts to read as many bytes as possible by repeatedly
+   * invoking the <code>read</code> method of the underlying stream.  This
+   * iterated <code>read</code> continues until one of the following
+   * conditions becomes true: <ul>
+   *
+   *   <li> The specified number of bytes have been read,
+   *
+   *   <li> The <code>read</code> method of the underlying stream returns
+   *   <code>-1</code>, indicating end-of-file.
+   *
+   * </ul> If the first <code>read</code> on the underlying stream returns
+   * <code>-1</code> to indicate end-of-file then this method returns
+   * <code>-1</code>.  Otherwise this method returns the number of bytes
+   * actually read.
+   *
+   * @param      b     destination buffer.
+   * @param      off   offset at which to start storing bytes.
+   * @param      len   maximum number of bytes to read.
+   * @return     the number of bytes read, or <code>-1</code> if the end of
+   *             the stream has been reached.
+   * @exception  IOException  if an I/O error occurs.
+   *             ChecksumException if any checksum error occurs
+   */
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // parameter check
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+
+    int n = 0;
+    for (;;) {
+      int nread = read1(b, off + n, len - n);
+      if (nread <= 0) 
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+  
+  /*
+   * Fills the buffer with a chunk data. 
+   * No mark is supported.
+   * This method assumes that all data in the buffer has already been read in,
+   * hence pos > count.
+   */
+  private void fill(  ) throws IOException {
+    assert(pos>=count);
+    // fill internal buffer
+    pos = 0;
+    count = readChecksumChunk(buf, 0, buf.length);
+  }
+  
+  /*
+   * Read characters into a portion of an array, reading from the underlying
+   * stream at most once if necessary.
+   */
+  private int read1(byte b[], int off, int len)
+  throws IOException {
+    int avail = count-pos;
+    if( avail <= 0 ) {
+      if(len>=buf.length) {
+        // read a chunk to user buffer directly; avoid one copy
+        int nread = readChecksumChunk(b, off, len);
+        return nread;
+      } else {
+        // read a chunk into the local buffer
+        fill();
+        if( count <= 0 ) {
+          return -1;
+        } else {
+          avail = count;
+        }
+      }
+    }
+    
+    // copy content of the local buffer to the user buffer
+    int cnt = (avail < len) ? avail : len;
+    System.arraycopy(buf, pos, b, off, cnt);
+    pos += cnt;
+    return cnt;    
+  }
+  
+  /* Read up one checksum chunk to array <i>b</i> at pos <i>off</i>
+   * It requires a checksum chunk boundary
+   * in between <cur_pos, cur_pos+len> 
+   * and it stops reading at the boundary or at the end of the stream;
+   * Otherwise an IllegalArgumentException is thrown.
+   * This makes sure that all data read are checksum verified.
+   * 
+   * @param b   the buffer into which the data is read.
+   * @param off the start offset in array <code>b</code>
+   *            at which the data is written.
+   * @param len the maximum number of bytes to read.
+   * @return    the total number of bytes read into the buffer, or
+   *            <code>-1</code> if there is no more data because the end of
+   *            the stream has been reached.
+   * @throws IOException if an I/O error occurs.
+   */ 
+  private int readChecksumChunk(byte b[], int off, int len)
+  throws IOException {
+    int read = 0;
+    boolean retry = true;
+    int retriesLeft = numOfRetries; 
+    do {
+      retriesLeft--;
+
+      try {
+        read = readChunk(chunkPos, b, off, len, checksum);
+        if( read > 0 ) {
+          if( needChecksum() ) {
+            sum.update(b, off, read);
+            verifySum(chunkPos);
+          }
+          chunkPos += read;
+        } 
+        retry = false;
+      } catch (ChecksumException ce) {
+          LOG.info("Found checksum error: "+StringUtils.stringifyException(ce));
+          if (retriesLeft == 0) {
+            throw ce;
+          }
+          
+          // invalidate buffer
+          count = pos = 0;
+          
+          // try a new replica
+          if (seekToNewSource(chunkPos)) {
+            // Since at least one of the sources is different, 
+            // the read might succeed, so we'll retry.
+            seek(chunkPos);
+          } else {
+            // Neither the data stream nor the checksum stream are being read
+            // from different sources, meaning we'll still get a checksum error 
+            // if we try to do the read again.  We throw an exception instead.
+            throw ce;
+          }
+        }
+    } while (retry);
+    return read;
+  }
+  
+  /* verify checksum for the chunk.
+   * @throws ChecksumException if there is a mismatch
+   */
+  private void verifySum(long errPos) throws ChecksumException {
+    long crc = getChecksum();
+    long sumValue = sum.getValue();
+    sum.reset();
+    if (crc != sumValue) {
+      throw new ChecksumException(
+          "Checksum error: "+file+" at "+errPos, errPos);
+    }
+  }
+  
+  /* calculate checksum value */
+  private long getChecksum() {
+    long crc = 0L;
+    for(int i=0; i<checksum.length; i++) {
+      crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
+    }
+    return crc;
+  }
+  
+  @Override
+  public synchronized long getPos() throws IOException {
+    return chunkPos-(count-pos);
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    return count-pos;
+  }
+  
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (n <= 0) {
+      return 0;
+    }
+
+    seek(getPos()+n);
+    return n;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if( pos<0 ) {
+      return;
+    }
+    // optimize: check if the pos is in the buffer
+    long start = chunkPos - this.count;
+    if( pos>=start && pos<chunkPos) {
+      this.pos = (int)(pos-start);
+      return;
+    }
+    
+    // reset the current state
+    resetState();
+    
+    // seek to a checksum boundary
+    chunkPos = getChunkPosition(pos);
+    
+    // scan to the desired position
+    int delta = (int)(pos - chunkPos);
+    if( delta > 0) {
+      int nread = readFully(this, new byte[delta], 0, delta);
+      if (nread < delta) {
+        throw new IOException("Cannot seek after EOF");
+      }
+    }
+  }
+
+  /**
+   * A utility function that tries to read up to <code>len</code> bytes from
+   * <code>stm</code>
+   * 
+   * @param stm    an input stream
+   * @param buf    destiniation buffer
+   * @param offset offset at which to store data
+   * @param len    number of bytes to read
+   * @return actual number of bytes read
+   * @throws IOException if there is any IO error
+   */
+  protected static int readFully(InputStream stm, 
+      byte[] buf, int offset, int len) throws IOException {
+    int n = 0;
+    for (;;) {
+      int nread = stm.read(buf, offset + n, len - n);
+      if (nread <= 0) 
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+  
+  /**
+   * Set the checksum related parameters
+   * @param sum which type of checksum to use
+   * @param maxChunkSize maximun chunk size
+   * @param checksumSize checksum size
+   */
+  final protected synchronized void set(
+      Checksum sum, int maxChunkSize, int checksumSize ) {
+    this.sum = sum;
+    this.buf = new byte[maxChunkSize];
+    this.checksum = new byte[checksumSize];
+    this.count = 0;
+    this.pos = 0;
+  }
+
+  final public boolean markSupported() {
+    return false;
+  }
+  
+  final public void mark(int readlimit) {
+  }
+  
+  final public void reset() throws IOException {
+    throw new IOException("mark/reset not supported");
+  }
+  
+
+  /* reset this FSInputChecker's state */
+  private void resetState() {
+    // invalidate buffer
+    count = 0;
+    pos = 0;
+    // reset Checksum
+    if (sum != null) {
+      sum.reset();
+    }
+  }
+}

+ 142 - 0
src/java/org/apache/hadoop/fs/FSOutputSummer.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Checksum;
+
+/**
+ * This is a generic output stream for generating checksums for
+ * data before it is written to the underlying stream
+ */
+
+abstract public class FSOutputSummer extends OutputStream {
+  // data checksum
+  private Checksum sum;
+  // internal buffer for storing data before it is checksumed
+  private byte buf[];
+  // internal buffer for storing checksum
+  private byte checksum[];
+  // The number of valid bytes in the buffer.
+  private int count;
+  
+  protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
+    this.sum = sum;
+    this.buf = new byte[maxChunkSize];
+    this.checksum = new byte[checksumSize];
+    this.count = 0;
+  }
+  
+  /* write the data chunk in <code>b</code> staring at <code>offset</code> with
+   * a length of <code>len</code>, and its checksum
+   */
+  protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+  throws IOException;
+
+  /** Write one byte */
+  public synchronized void write(int b) throws IOException {
+    sum.update(b);
+    buf[count++] = (byte)b;
+    if(count == buf.length) {
+      flushBuffer();
+    }
+  }
+
+  /**
+   * Writes <code>len</code> bytes from the specified byte array 
+   * starting at offset <code>off</code> and generate a checksum for
+   * each data chunk.
+   *
+   * <p> This method stores bytes from the given array into this
+   * stream's buffer before it gets checksumed. The buffer gets checksumed 
+   * and flushed to the underlying output stream when all data 
+   * in a checksum chunk are in the buffer.  If the buffer is empty and
+   * requested length is at least as large as the size of next checksum chunk
+   * size, this method will checksum and write the chunk directly 
+   * to the underlying output stream.  Thus it avoids uneccessary data copy.
+   *
+   * @param      b     the data.
+   * @param      off   the start offset in the data.
+   * @param      len   the number of bytes to write.
+   * @exception  IOException  if an I/O error occurs.
+   */
+  public synchronized void write(byte b[], int off, int len)
+  throws IOException {
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
+    }
+  }
+  
+  /*
+   * Write a portion of an array, flushing to the underlying
+   * stream at most once if necessary.
+   */
+
+  private int write1(byte b[], int off, int len) throws IOException {
+    if(count==0 && len>=buf.length) {
+      // local buffer is empty and user data has one chunk
+      // checksum and output data
+      sum.update(b, off, buf.length);
+      writeChecksumChunk(b, off, buf.length);
+      return buf.length;
+    }
+    
+    // copy user data to local buffer
+    int bytesToCopy = buf.length-count;
+    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
+    sum.update(b, off, bytesToCopy);
+    System.arraycopy(b, off, buf, count, bytesToCopy);
+    count += bytesToCopy;
+    if (count == buf.length) {
+      // local buffer is full
+      flushBuffer();
+    } 
+    return bytesToCopy;
+  }
+
+  /* Forces any buffered output bytes to be checksumed and written out to
+   * the underlying output stream. 
+   */
+  protected synchronized void flushBuffer() throws IOException {
+    if(count != 0) {
+      writeChecksumChunk(buf, 0, count);
+      count = 0;
+    }
+  }
+  
+  /* Generate checksum for the data chunk and output data chunk & checksum
+   * to the underlying output stream
+   */
+  private void writeChecksumChunk(byte b[], int off, int len)
+  throws IOException {
+    int tempChecksum = (int)sum.getValue();
+    sum.reset();
+    
+    checksum[0] = (byte)((tempChecksum >>> 24) & 0xFF);
+    checksum[1] = (byte)((tempChecksum >>> 16) & 0xFF);
+    checksum[2] = (byte)((tempChecksum >>>  8) & 0xFF);
+    checksum[3] = (byte)((tempChecksum >>>  0) & 0xFF);
+
+    writeChunk(b, off, len, checksum);
+  }
+}

+ 2 - 3
src/java/org/apache/hadoop/fs/InMemoryFileSystem.java

@@ -143,7 +143,7 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
     }
 
     public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-      return new FSDataInputStream(new InMemoryInputStream(f), bufferSize);
+      return new FSDataInputStream(new InMemoryInputStream(f));
     }
 
     private class InMemoryOutputStream extends OutputStream {
@@ -212,8 +212,7 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
       // map) until close is called on the outputstream that this method is
       // going to return
       // Create an output stream out of data byte array
-      return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
-                                    getConf());
+      return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr));
     }
 
     public void close() throws IOException {

+ 5 - 7
src/java/org/apache/hadoop/fs/RawLocalFileSystem.java

@@ -83,7 +83,7 @@ public class RawLocalFileSystem extends FileSystem {
    *******************************************************/
   class LocalFSFileInputStream extends FSInputStream {
     FileInputStream fis;
-    
+
     public LocalFSFileInputStream(Path f) throws IOException {
       this.fis = new FileInputStream(pathToFile(f));
     }
@@ -140,7 +140,8 @@ public class RawLocalFileSystem extends FileSystem {
     if (!exists(f)) {
       throw new FileNotFoundException(f.toString());
     }
-    return new FSDataInputStream(new LocalFSFileInputStream(f), bufferSize);
+    return new FSDataInputStream(new BufferedFSInputStream(
+        new LocalFSFileInputStream(f), bufferSize));
   }
   
   /*********************************************************
@@ -153,10 +154,6 @@ public class RawLocalFileSystem extends FileSystem {
       this.fos = new FileOutputStream(pathToFile(f));
     }
     
-    public long getPos() throws IOException {
-      return fos.getChannel().position();
-    }
-    
     /*
      * Just forward to the fos
      */
@@ -189,7 +186,8 @@ public class RawLocalFileSystem extends FileSystem {
     if (parent != null && !mkdirs(parent)) {
       throw new IOException("Mkdirs failed to create " + parent.toString());
     }
-    return new FSDataOutputStream(new LocalFSFileOutputStream(f), getConf());
+    return new FSDataOutputStream(
+        new BufferedOutputStream(new LocalFSFileOutputStream(f), bufferSize));
   }
   
   /**

+ 11 - 0
src/java/org/apache/hadoop/fs/Seekable.java

@@ -27,4 +27,15 @@ public interface Seekable {
    * seek past the end of the file.
    */
   void seek(long pos) throws IOException;
+  
+  /**
+   * Return the current offset from the start of the file
+   */
+  long getPos() throws IOException;
+
+  /**
+   * Seeks a different copy of the data.  Returns true if 
+   * found a new source, false otherwise.
+   */
+  boolean seekToNewSource(long targetPos) throws IOException;
 }

+ 3 - 4
src/java/org/apache/hadoop/fs/s3/S3FileSystem.java

@@ -177,15 +177,14 @@ public class S3FileSystem extends FileSystem {
       }      
     }
     return new FSDataOutputStream(
-                                  new S3OutputStream(getConf(), store, makeAbsolute(file),
-                                                     blockSize, progress), bufferSize);
+        new S3OutputStream(getConf(), store, makeAbsolute(file),
+                           blockSize, progress, bufferSize));
   }
 
   @Override
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
     INode inode = checkFile(path);
-    return new FSDataInputStream(new S3InputStream(getConf(), store, inode),
-                                 bufferSize);
+    return new FSDataInputStream(new S3InputStream(getConf(), store, inode));
   }
 
   @Override

+ 3 - 2
src/java/org/apache/hadoop/fs/s3/S3OutputStream.java

@@ -46,7 +46,8 @@ class S3OutputStream extends OutputStream {
   private Block nextBlock;
 
   public S3OutputStream(Configuration conf, FileSystemStore store,
-                        Path path, long blockSize, Progressable progress) throws IOException {
+                        Path path, long blockSize, Progressable progress,
+                        int buffersize) throws IOException {
     
     this.conf = conf;
     this.store = store;
@@ -54,7 +55,7 @@ class S3OutputStream extends OutputStream {
     this.blockSize = blockSize;
     this.backupFile = newBackupFile();
     this.backupStream = new FileOutputStream(backupFile);
-    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    this.bufferSize = buffersize;
     this.outBuf = new byte[bufferSize];
 
   }

+ 241 - 0
src/test/org/apache/hadoop/dfs/TestFSInputChecker.java

@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests if FSInputChecker works correctly.
+ */
+public class TestFSInputChecker extends TestCase {
+  static final long seed = 0xDEADBEEFL;
+  static final int BYTES_PER_SUM = 10;
+  static final int BLOCK_SIZE = 2*BYTES_PER_SUM;
+  static final int HALF_CHUNK_SIZE = BYTES_PER_SUM/2;
+  static final int FILE_SIZE = 2*BLOCK_SIZE-1;
+  static final short NUM_OF_DATANODES = 2;
+  byte[] expected = new byte[FILE_SIZE];
+  byte[] actual;
+  FSDataInputStream stm;
+
+  /* create a file */
+  private void writeFile(FileSystem fileSys, Path name) throws IOException {
+    // create and write a file that contains three blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, 
+                     fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                     NUM_OF_DATANODES, BLOCK_SIZE);
+    stm.write(expected);
+    stm.close();
+  }
+  
+  /*validate data*/
+  private void checkAndEraseData(byte[] actual, int from, byte[] expected, 
+      String message) throws Exception {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                        expected[from+idx]+" actual "+actual[idx],
+                        actual[idx], expected[from+idx]);
+      actual[idx] = 0;
+    }
+  }
+  
+  /* test read and getPos */
+  private void checkReadAndGetPos() throws Exception {
+    actual = new byte[FILE_SIZE];
+    // test reads that do not cross checksum boundary
+    stm.seek(0);
+    int offset;
+    for(offset=0; offset<BLOCK_SIZE+BYTES_PER_SUM;
+                  offset += BYTES_PER_SUM ) {
+      assertEquals(stm.getPos(), offset);
+      stm.readFully(actual, offset, BYTES_PER_SUM);
+    }
+    stm.readFully(actual, offset, FILE_SIZE-BLOCK_SIZE-BYTES_PER_SUM);
+    assertEquals(stm.getPos(), FILE_SIZE);
+    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+    
+    // test reads that cross checksum boundary
+    stm.seek(0L);
+    assertEquals(stm.getPos(), 0L);
+    stm.readFully(actual, 0, HALF_CHUNK_SIZE);
+    assertEquals(stm.getPos(), HALF_CHUNK_SIZE);
+    stm.readFully(actual, HALF_CHUNK_SIZE, BLOCK_SIZE-HALF_CHUNK_SIZE);
+    assertEquals(stm.getPos(), BLOCK_SIZE);
+    stm.readFully(actual, BLOCK_SIZE, BYTES_PER_SUM+HALF_CHUNK_SIZE);
+    assertEquals(stm.getPos(), BLOCK_SIZE+BYTES_PER_SUM+HALF_CHUNK_SIZE);
+    stm.readFully(actual, 2*BLOCK_SIZE-HALF_CHUNK_SIZE, 
+        FILE_SIZE-(2*BLOCK_SIZE-HALF_CHUNK_SIZE));
+    assertEquals(stm.getPos(), FILE_SIZE);
+    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+    
+    // test read that cross block boundary
+    stm.seek(0L);
+    stm.readFully(actual, 0, BYTES_PER_SUM+HALF_CHUNK_SIZE);
+    assertEquals(stm.getPos(), BYTES_PER_SUM+HALF_CHUNK_SIZE);
+    stm.readFully(actual, BYTES_PER_SUM+HALF_CHUNK_SIZE, BYTES_PER_SUM);
+    assertEquals(stm.getPos(), BLOCK_SIZE+HALF_CHUNK_SIZE);
+    stm.readFully(actual, BLOCK_SIZE+HALF_CHUNK_SIZE,
+        FILE_SIZE-BLOCK_SIZE-HALF_CHUNK_SIZE);
+    assertEquals(stm.getPos(), FILE_SIZE);
+    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+  }
+  
+  /* test if one seek is correct */
+  private void testSeek1(int offset) 
+  throws Exception {
+    stm.seek(offset);
+    assertEquals(offset, stm.getPos());
+    stm.readFully(actual);
+    checkAndEraseData(actual, offset, expected, "Read Sanity Test");
+  }
+
+  /* test seek() */
+  private void checkSeek( ) throws Exception {
+    actual = new byte[HALF_CHUNK_SIZE];
+    
+    // test seeks to checksum boundary
+    testSeek1(0);
+    testSeek1(BYTES_PER_SUM);
+    testSeek1(BLOCK_SIZE);
+    
+    // test seek to non-checksum-boundary pos
+    testSeek1(BLOCK_SIZE+HALF_CHUNK_SIZE);
+    testSeek1(HALF_CHUNK_SIZE);
+    
+    // test seek to a position at the same checksum chunk
+    testSeek1(HALF_CHUNK_SIZE/2);
+    testSeek1(HALF_CHUNK_SIZE*3/2);
+    
+    // test end of file
+    actual = new byte[1];
+    testSeek1(FILE_SIZE-1);
+    
+    String errMsg = null;
+    try {
+      stm.seek(FILE_SIZE);
+    } catch (IOException e) {
+      errMsg = e.getMessage();
+    }
+    assertTrue(errMsg==null);
+  }
+
+  /* test if one skip is correct */
+  private void testSkip1(int skippedBytes) 
+  throws Exception {
+    long oldPos = stm.getPos();
+    long nSkipped = stm.skip(skippedBytes);
+    long newPos = oldPos+nSkipped;
+    assertEquals(stm.getPos(), newPos);
+    stm.readFully(actual);
+    checkAndEraseData(actual, (int)newPos, expected, "Read Sanity Test");
+  }
+
+  /* test skip() */
+  private void checkSkip( ) throws Exception {
+    actual = new byte[HALF_CHUNK_SIZE];
+    
+    // test skip to a checksum boundary
+    stm.seek(0);
+    testSkip1(BYTES_PER_SUM);
+    testSkip1(HALF_CHUNK_SIZE);
+    testSkip1(HALF_CHUNK_SIZE);
+    
+    // test skip to non-checksum-boundary pos
+    stm.seek(0);
+    testSkip1(HALF_CHUNK_SIZE+1);
+    testSkip1(BYTES_PER_SUM);
+    testSkip1(HALF_CHUNK_SIZE);
+    
+    // test skip to a position at the same checksum chunk
+    stm.seek(0);
+    testSkip1(1);
+    testSkip1(1);
+    
+    // test skip to end of file
+    stm.seek(0);
+    actual = new byte[1];
+    testSkip1(FILE_SIZE-1);
+    
+    stm.seek(0);
+    assertEquals(stm.skip(FILE_SIZE), FILE_SIZE);
+  }
+
+  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name);
+    assertTrue(!fileSys.exists(name));
+  }
+  
+  /**
+   * Tests read/seek/getPos/skipped opeation for input stream.
+   */
+  private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
+  throws Exception {
+    Path file = new Path("try.dat");
+    if( readCS ) {
+      writeFile(fileSys, file);
+    } else {
+      writeFile(fileSys.getRawFileSystem(), file);
+    }
+    stm = fileSys.open(file);
+    checkReadAndGetPos();
+    checkSeek();
+    checkSkip();
+    //checkMark
+    assertFalse(stm.markSupported());
+    stm.close();
+    cleanupFile(fileSys, file);
+  }
+  
+  public void testFSInputChecker() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+
+    // test DFS
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
+    try {
+      testChecker(fileSys, true);
+      testChecker(fileSys, false);
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+    
+    
+    // test Local FS
+    fileSys = FileSystem.getLocal(conf);
+    try {
+      testChecker(fileSys, true);
+      testChecker(fileSys, false);
+    }finally {
+      fileSys.close();
+    }
+  }
+}

+ 131 - 0
src/test/org/apache/hadoop/dfs/TestFSOutputSummer.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests if FSOutputSummer works correctly.
+ */
+public class TestFSOutputSummer extends TestCase {
+  private static final long seed = 0xDEADBEEFL;
+  private static final int BYTES_PER_CHECKSUM = 10;
+  private static final int BLOCK_SIZE = 2*BYTES_PER_CHECKSUM;
+  private static final int HALF_CHUNK_SIZE = BYTES_PER_CHECKSUM/2;
+  private static final int FILE_SIZE = 2*BLOCK_SIZE-1;
+  private static final short NUM_OF_DATANODES = 2;
+  private byte[] expected = new byte[FILE_SIZE];
+  private byte[] actual = new byte[FILE_SIZE];
+  private FileSystem fileSys;
+
+  /* create a file, write all data at once */
+  private void writeFile1(Path name) throws Exception {
+    FSDataOutputStream stm = fileSys.create(name, true, 
+               fileSys.getConf().getInt("io.file.buffer.size", 4096),
+               NUM_OF_DATANODES, BLOCK_SIZE);
+    stm.write(expected);
+    stm.close();
+    checkFile(name);
+    cleanupFile(name);
+  }
+  
+  /* create a file, write data chunk by chunk */
+  private void writeFile2(Path name) throws Exception {
+    FSDataOutputStream stm = fileSys.create(name, true, 
+               fileSys.getConf().getInt("io.file.buffer.size", 4096),
+               NUM_OF_DATANODES, BLOCK_SIZE);
+    int i=0;
+    for( ;i<FILE_SIZE-BYTES_PER_CHECKSUM; i+=BYTES_PER_CHECKSUM) {
+      stm.write(expected, i, BYTES_PER_CHECKSUM);
+    }
+    stm.write(expected, i, FILE_SIZE-3*BYTES_PER_CHECKSUM);
+    stm.close();
+    checkFile(name);
+    cleanupFile(name);
+  }
+  
+  /* create a file, write data with vairable amount of data */
+  private void writeFile3(Path name) throws Exception {
+    FSDataOutputStream stm = fileSys.create(name, true, 
+        fileSys.getConf().getInt("io.file.buffer.size", 4096),
+        NUM_OF_DATANODES, BLOCK_SIZE);
+    stm.write(expected, 0, HALF_CHUNK_SIZE);
+    stm.write(expected, HALF_CHUNK_SIZE, BYTES_PER_CHECKSUM+2);
+    stm.write(expected, HALF_CHUNK_SIZE+BYTES_PER_CHECKSUM+2, 2);
+    stm.write(expected, HALF_CHUNK_SIZE+BYTES_PER_CHECKSUM+4, HALF_CHUNK_SIZE);
+    stm.write(expected, BLOCK_SIZE+4, BYTES_PER_CHECKSUM-4);
+    stm.write(expected, BLOCK_SIZE+BYTES_PER_CHECKSUM, 
+        FILE_SIZE-3*BYTES_PER_CHECKSUM);
+    stm.close();
+    checkFile(name);
+    cleanupFile(name);
+  }
+  private void checkAndEraseData(byte[] actual, int from, byte[] expected,
+      String message) throws Exception {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                        expected[from+idx]+" actual "+actual[idx],
+                        actual[idx], expected[from+idx]);
+      actual[idx] = 0;
+    }
+  }
+  
+  private void checkFile(Path name) throws Exception {
+    FSDataInputStream stm = fileSys.open(name);
+    // do a sanity check. Read the file
+    stm.readFully(0, actual);
+    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+    stm.close();
+  }
+
+  private void cleanupFile(Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name);
+    assertTrue(!fileSys.exists(name));
+  }
+  
+  /**
+   * Test write opeation for output stream in DFS.
+   */
+  public void testFSOutputSummer() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BYTES_PER_CHECKSUM);
+    MiniDFSCluster cluster = new MiniDFSCluster(
+        conf, NUM_OF_DATANODES, true, null);
+    fileSys = cluster.getFileSystem();
+    try {
+      Path file = new Path("try.dat");
+      Random rand = new Random(seed);
+      rand.nextBytes(expected);
+      writeFile1(file);
+      writeFile2(file);
+      writeFile3(file);
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+}