Explorar o código

HDFS-1057 Concurrent readers hit ChecksumExceptions if following a writer to very end of file. Contributed by Sam Rash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@959324 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang %!s(int64=15) %!d(string=hai) anos
pai
achega
e8714ec7b8

+ 3 - 0
CHANGES.txt

@@ -1004,6 +1004,9 @@ Release 0.21.0 - Unreleased
 
     HDFS-1256. libhdfs is missing from the tarball. (tomwhite)
 
+    HDFS_1057. Concurrent readers hit ChecksumExceptions if following a
+    writer to very end of file. (sam rash via hairong)
+
 Release 0.20.3 - Unreleased
 
   IMPROVEMENTS

+ 21 - 1
src/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -36,7 +36,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -139,6 +141,8 @@ class DFSInputStream extends FSInputStream {
     if (locatedblock == null || locatedblock.getLocations().length == 0) {
       return 0;
     }
+    int replicaNotFoundCount = locatedblock.getLocations().length;
+    
     for(DatanodeInfo datanode : locatedblock.getLocations()) {
       try {
         final ClientDatanodeProtocol cdp = DFSClient.createClientDatanodeProtocolProxy(
@@ -149,12 +153,28 @@ class DFSInputStream extends FSInputStream {
         }
       }
       catch(IOException ioe) {
+        if (ioe instanceof RemoteException &&
+          (((RemoteException) ioe).unwrapRemoteException() instanceof
+            ReplicaNotFoundException)) {
+          // special case : replica might not be on the DN, treat as 0 length
+          replicaNotFoundCount--;
+        }
+        
         if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Faild to getReplicaVisibleLength from datanode "
+          DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode "
               + datanode + " for block " + locatedblock.getBlock(), ioe);
         }
       }
     }
+
+    // Namenode told us about these locations, but none know about the replica
+    // means that we hit the race between pipeline creation start and end.
+    // we require all 3 because some other exception could have happened
+    // on a DN that has it.  we want to report that error
+    if (replicaNotFoundCount == 0) {
+      return 0;
+    }
+
     throw new IOException("Cannot obtain block length for " + locatedblock);
   }
   

+ 17 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -28,6 +28,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.zip.Checksum;
 
@@ -509,6 +510,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
         verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
       }
 
+      byte[] lastChunkChecksum;
+      
       try {
         long onDiskLen = replicaInfo.getBytesOnDisk();
         if (onDiskLen<offsetInBlock) {
@@ -546,16 +549,28 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             }
             partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
+            lastChunkChecksum = Arrays.copyOfRange(
+              buf, buf.length - checksumSize, buf.length
+            );
             checksumOut.write(buf);
             LOG.debug("Writing out partial crc for data len " + len);
             partialCrc = null;
           } else {
+            lastChunkChecksum = Arrays.copyOfRange(
+              pktBuf, 
+              checksumOff + checksumLen - checksumSize, 
+              checksumOff + checksumLen
+            );
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
-          replicaInfo.setBytesOnDisk(offsetInBlock);
-          datanode.myMetrics.bytesWritten.inc(len);
           /// flush entire packet
           flush();
+          
+          replicaInfo.setLastChecksumAndDataLen(
+            offsetInBlock, lastChunkChecksum
+          );
+          
+          datanode.myMetrics.bytesWritten.inc(len);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);

+ 51 - 9
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -76,6 +76,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
    * not sure if there will be much more improvement.
    */
   private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+  private volatile ChunkChecksum lastChunkChecksum = null;
 
   
   BlockSender(Block block, long startOffset, long length,
@@ -98,6 +99,32 @@ class BlockSender implements java.io.Closeable, FSConstants {
         }
         this.replicaVisibleLength = replica.getVisibleLength();
       }
+      long minEndOffset = startOffset + length;
+      // if this is a write in progress
+      ChunkChecksum chunkChecksum = null;
+      if (replica instanceof ReplicaBeingWritten) {
+        for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ie) {
+            throw new IOException(ie);
+          }
+        }
+
+        long currentBytesOnDisk = replica.getBytesOnDisk();
+        
+        if (currentBytesOnDisk < minEndOffset) {
+          throw new IOException(String.format(
+            "need %d bytes, but only %d bytes available",
+            minEndOffset,
+            currentBytesOnDisk
+          ));
+        }
+
+        ReplicaInPipeline rip = (ReplicaInPipeline) replica;
+        chunkChecksum = rip.getLastChecksumAndDataLen();
+      }
+
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
         throw new IOException(
             "replica.getGenerationStamp() < block.getGenerationStamp(), block="
@@ -154,7 +181,14 @@ class BlockSender implements java.io.Closeable, FSConstants {
         length = replicaVisibleLength;
       }
 
-      endOffset = replicaVisibleLength;
+      // end is either last byte on disk or the length for which we have a 
+      // checksum
+      if (chunkChecksum != null) {
+        endOffset = chunkChecksum.getDataLength();
+      } else {
+        endOffset = replica.getBytesOnDisk();
+      }
+      
       if (startOffset < 0 || startOffset > endOffset
           || (length + startOffset) > endOffset) {
         String msg = " Offset " + startOffset + " and length " + length
@@ -172,7 +206,12 @@ class BlockSender implements java.io.Closeable, FSConstants {
           tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
         }
         if (tmpLen < endOffset) {
+          // will use on-disk checksum here since the end is a stable chunk
           endOffset = tmpLen;
+        } else if (chunkChecksum != null) {
+          //in last chunk which is changing. flag that we need to use in-memory 
+          // checksum 
+          this.lastChunkChecksum = chunkChecksum;
         }
       }
 
@@ -187,14 +226,6 @@ class BlockSender implements java.io.Closeable, FSConstants {
       }
       seqno = 0;
 
-      //sleep a few times if getBytesOnDisk() < visible length
-      for(int i = 0; i < 30 && replica.getBytesOnDisk() < replicaVisibleLength; i++) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException ie) {
-          throw new IOException(ie);
-        }
-      }
       if (DataNode.LOG.isDebugEnabled()) {
         DataNode.LOG.debug("replica=" + replica);
       }
@@ -272,6 +303,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
                        bytesPerChecksum*maxChunks);
     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
     int packetLen = len + numChunks*checksumSize + 4;
+    boolean lastDataPacket = offset + len == endOffset && len > 0;
     pkt.clear();
     
     // write packet header
@@ -304,6 +336,16 @@ class BlockSender implements java.io.Closeable, FSConstants {
           throw e;
         }
       }
+
+      // write in progress that we need to use to get last checksum
+      if (lastDataPacket && lastChunkChecksum != null) {
+        int start = checksumOff + checksumLen - checksumSize;
+        byte[] updatedChecksum = lastChunkChecksum.getChecksum();
+        
+        if (updatedChecksum != null) {
+          System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
+        }
+      }
     }
     
     int dataOff = checksumOff + checksumLen;

+ 46 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/ChunkChecksum.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+/**
+ * holder class that holds checksum bytes and the length in a block at which
+ * the checksum bytes end
+ * 
+ * ex: length = 1023 and checksum is 4 bytes which is for 512 bytes, then
+ *     the checksum applies for the last chunk, or bytes 512 - 1023
+ */
+
+class ChunkChecksum {
+  private final long dataLength;
+  // can be null if not available
+  private final byte[] checksum;
+
+  ChunkChecksum(long dataLength, byte[] checksum) {
+    this.dataLength = dataLength;
+    this.checksum = checksum;
+  }
+
+  public long getDataLength() {
+    return dataLength;
+  }
+
+  public byte[] getChecksum() {
+    return checksum;
+  }
+}

+ 10 - 3
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -41,6 +41,7 @@ class ReplicaInPipeline extends ReplicaInfo
                         implements ReplicaInPipelineInterface {
   private long bytesAcked;
   private long bytesOnDisk;
+  private byte[] lastChecksum;  
   private Thread writer;
   
   /**
@@ -122,11 +123,17 @@ class ReplicaInPipeline extends ReplicaInfo
     return bytesOnDisk;
   }
   
-  @Override //ReplicaInPipelineInterface
-  public void setBytesOnDisk(long bytesOnDisk) {
-    this.bytesOnDisk = bytesOnDisk;
+  @Override // ReplicaInPipelineInterface
+  public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
+    this.bytesOnDisk = dataLength;
+    this.lastChecksum = lastChecksum;
   }
   
+  @Override // ReplicaInPipelineInterface
+  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
+    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
+  }
+
   /**
    * Set the thread that is writing to this replica
    * @param writer a thread writing to this replica

+ 10 - 3
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java

@@ -44,10 +44,17 @@ interface ReplicaInPipelineInterface extends Replica {
   void setBytesAcked(long bytesAcked);
   
   /**
-   * Set the number of bytes on disk
-   * @param bytesOnDisk number of bytes on disk
+   * store the checksum for the last chunk along with the data length
+   * @param dataLength number of bytes on disk
+   * @param lastChecksum - checksum bytes for the last chunk
    */
-  void setBytesOnDisk(long bytesOnDisk);
+  public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
+  
+  /**
+   * gets the last chunk checksum and the length of the block corresponding
+   * to that checksum
+   */
+  public ChunkChecksum getLastChecksumAndDataLen();
   
   /**
    * Create output streams for writing to this replica, 

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
  * Exception indicating that DataNode does not have a replica
  * that matches the target block.  
  */
-class ReplicaNotFoundException extends IOException {
+public class ReplicaNotFoundException extends IOException {
   private static final long serialVersionUID = 1L;
   final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
   final static String UNFINALIZED_REPLICA = 

+ 10 - 0
src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -361,4 +361,14 @@ public class DFSTestUtil {
       }
     });
   }
+
+  public static byte[] generateSequentialBytes(int start, int length) {
+    byte[] result = new byte[length];
+
+    for (int i = 0; i < length; i++) {
+      result[i] = (byte) ((start + i) % 127);
+    }
+
+    return result;
+  }
 }

+ 442 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestFileConcurrentReader.java

@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+
+/**
+ * This class tests the cases of a concurrent reads/writes to a file;
+ * ie, one writer and one or more readers can see unfinsihed blocks
+ */
+public class TestFileConcurrentReader extends junit.framework.TestCase {
+
+  private enum SyncType {
+    SYNC,
+    APPEND,
+  }
+
+
+  private static final Logger LOG =
+    Logger.getLogger(TestFileConcurrentReader.class);
+
+  {
+    ((Log4JLogger) LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  private static final int DEFAULT_WRITE_SIZE = 1024 + 1;
+  private static final int SMALL_WRITE_SIZE = 61;
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fileSystem;
+
+
+  @Override
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    init(conf);
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    cluster.shutdown();
+    cluster = null;
+    
+    super.tearDown();
+  }
+
+  private void init(Configuration conf) throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitClusterUp();
+    fileSystem = cluster.getFileSystem();
+  }
+
+  private void writeFileAndSync(FSDataOutputStream stm, int size)
+    throws IOException {
+    byte[] buffer = DFSTestUtil.generateSequentialBytes(0, size);
+    stm.write(buffer, 0, size);
+    stm.hflush();
+  }
+
+  private void checkCanRead(FileSystem fileSys, Path path, int numBytes)
+    throws IOException {
+    waitForBlocks(fileSys, path);
+    assertBytesAvailable(fileSys, path, numBytes);
+  }
+
+  // make sure bytes are available and match expected
+  private void assertBytesAvailable(
+    FileSystem fileSystem,
+    Path path,
+    int numBytes
+  ) throws IOException {
+    byte[] buffer = new byte[numBytes];
+    FSDataInputStream inputStream = fileSystem.open(path);
+    IOUtils.readFully(inputStream, buffer, 0, numBytes);
+    inputStream.close();
+
+    assertTrue(
+      "unable to validate bytes",
+      validateSequentialBytes(buffer, 0, numBytes)
+    );
+  }
+
+  private void waitForBlocks(FileSystem fileSys, Path name)
+    throws IOException {
+    // wait until we have at least one block in the file to read.
+    boolean done = false;
+
+    while (!done) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+      done = true;
+      BlockLocation[] locations = fileSys.getFileBlockLocations(
+        fileSys.getFileStatus(name), 0, blockSize);
+      if (locations.length < 1) {
+        done = false;
+        continue;
+      }
+    }
+  }
+
+  /**
+   * Test that that writes to an incomplete block are available to a reader
+   */
+  public void testUnfinishedBlockRead()
+    throws IOException {
+    // create a new file in the root, write data, do no close
+    Path file1 = new Path("/unfinished-block");
+    FSDataOutputStream stm = TestFileCreation.createFile(fileSystem, file1, 1);
+
+    // write partial block and sync
+    int partialBlockSize = blockSize / 2;
+    writeFileAndSync(stm, partialBlockSize);
+
+    // Make sure a client can read it before it is closed
+    checkCanRead(fileSystem, file1, partialBlockSize);
+
+    stm.close();
+  }
+
+  /**
+   * test case: if the BlockSender decides there is only one packet to send,
+   * the previous computation of the pktSize based on transferToAllowed
+   * would result in too small a buffer to do the buffer-copy needed
+   * for partial chunks.
+   */
+  public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
+    // check that / exists
+    Path path = new Path("/");
+    System.out.println("Path : \"" + path.toString() + "\"");
+
+    // create a new file in the root, write data, do no close
+    Path file1 = new Path("/unfinished-block");
+    final FSDataOutputStream stm =
+      TestFileCreation.createFile(fileSystem, file1, 1);
+
+    // write partial block and sync
+    final int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+    final int partialBlockSize = bytesPerChecksum - 1;
+
+    writeFileAndSync(stm, partialBlockSize);
+
+    // Make sure a client can read it before it is closed
+    checkCanRead(fileSystem, file1, partialBlockSize);
+
+    stm.close();
+  }
+
+  // use a small block size and a large write so that DN is busy creating
+  // new blocks.  This makes it almost 100% sure we can reproduce
+  // case of client getting a DN that hasn't yet created the blocks
+  public void testImmediateReadOfNewFile()
+    throws IOException {
+    final int blockSize = 64 * 1024;
+    final int writeSize = 10 * blockSize;
+    Configuration conf = new Configuration();
+    
+    conf.setLong("dfs.block.size", blockSize);
+    init(conf);
+
+    final int requiredSuccessfulOpens = 100;
+    final Path file = new Path("/file1");
+    final AtomicBoolean openerDone = new AtomicBoolean(false);
+    final AtomicReference<String> errorMessage = new AtomicReference<String>();
+    final FSDataOutputStream out = fileSystem.create(file);
+    
+    final Thread writer = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (!openerDone.get()) {
+            out.write(DFSTestUtil.generateSequentialBytes(0, writeSize));
+            out.hflush();
+          }
+        } catch (IOException e) {
+          LOG.warn("error in writer", e);
+        } finally {
+          try {
+            out.close();
+          } catch (IOException e) {
+            LOG.error("unable to close file");
+          }
+        }
+      }
+    });
+    
+    Thread opener = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < requiredSuccessfulOpens; i++) {
+            fileSystem.open(file).close();
+          }
+          openerDone.set(true);
+        } catch (IOException e) {
+          openerDone.set(true);
+          errorMessage.set(String.format(
+            "got exception : %s",
+            StringUtils.stringifyException(e)
+          ));
+        } catch (Exception e) {
+          openerDone.set(true);
+          errorMessage.set(String.format(
+            "got exception : %s", 
+            StringUtils.stringifyException(e)
+          ));
+          writer.interrupt();
+          fail("here");
+        }
+      }
+    });
+    
+    writer.start();
+    opener.start();
+
+    try {
+      writer.join();
+      opener.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    
+    assertNull(errorMessage.get(), errorMessage.get());
+  }
+
+  // for some reason, using tranferTo evokes the race condition more often
+  // so test separately
+  public void testUnfinishedBlockCRCErrorTransferTo() throws IOException {
+    runTestUnfinishedBlockCRCError(true, SyncType.SYNC, DEFAULT_WRITE_SIZE);
+  }
+
+  public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite()
+    throws IOException {
+    runTestUnfinishedBlockCRCError(true, SyncType.SYNC, SMALL_WRITE_SIZE);
+  }
+
+  // fails due to issue w/append, disable 
+  public void _testUnfinishedBlockCRCErrorTransferToAppend()
+    throws IOException {
+    runTestUnfinishedBlockCRCError(true, SyncType.APPEND, DEFAULT_WRITE_SIZE);
+  }
+
+  public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException {
+    runTestUnfinishedBlockCRCError(false, SyncType.SYNC, DEFAULT_WRITE_SIZE);
+  }
+
+  public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite()
+    throws IOException {
+    runTestUnfinishedBlockCRCError(false, SyncType.SYNC, SMALL_WRITE_SIZE);
+  }
+
+  // fails due to issue w/append, disable 
+  public void _testUnfinishedBlockCRCErrorNormalTransferAppend()
+    throws IOException {
+    runTestUnfinishedBlockCRCError(false, SyncType.APPEND, DEFAULT_WRITE_SIZE);
+  }
+
+  private void runTestUnfinishedBlockCRCError(
+    final boolean transferToAllowed, SyncType syncType, int writeSize
+  ) throws IOException {
+    runTestUnfinishedBlockCRCError(
+      transferToAllowed, syncType, writeSize, new Configuration()
+    );
+  }
+
+  private void runTestUnfinishedBlockCRCError(
+    final boolean transferToAllowed,
+    final SyncType syncType,
+    final int writeSize,
+    Configuration conf
+  ) throws IOException {
+    conf.setBoolean("dfs.support.append", syncType == SyncType.APPEND);
+    conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed);
+    init(conf);
+
+    final Path file = new Path("/block-being-written-to");
+    final int numWrites = 2000;
+    final AtomicBoolean writerDone = new AtomicBoolean(false);
+    final AtomicBoolean writerStarted = new AtomicBoolean(false);
+    final AtomicBoolean error = new AtomicBoolean(false);
+    final FSDataOutputStream initialOutputStream = fileSystem.create(file);
+    final Thread writer = new Thread(new Runnable() {
+      private FSDataOutputStream outputStream = initialOutputStream;
+
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; !error.get() && i < numWrites; i++) {
+            try {
+              final byte[] writeBuf =
+                DFSTestUtil.generateSequentialBytes(i * writeSize, writeSize);
+              outputStream.write(writeBuf);
+              if (syncType == SyncType.SYNC) {
+                outputStream.hflush();
+              } else { // append
+                outputStream.close();
+                outputStream = fileSystem.append(file);
+              }
+              writerStarted.set(true);
+            } catch (IOException e) {
+              error.set(true);
+              LOG.error("error writing to file", e);
+            }
+          }
+
+          writerDone.set(true);
+          outputStream.close();
+        } catch (Exception e) {
+          LOG.error("error in writer", e);
+
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    Thread tailer = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          long startPos = 0;
+          while (!writerDone.get() && !error.get()) {
+            if (writerStarted.get()) {
+              try {
+                startPos = tailFile(file, startPos);
+              } catch (IOException e) {
+                LOG.error(String.format("error tailing file %s", file), e);
+
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        } catch (RuntimeException e) {
+          if (e.getCause() instanceof ChecksumException) {
+            error.set(true);
+          }
+
+          writer.interrupt();
+          LOG.error("error in tailer", e);
+          throw e;
+        }
+      }
+    });
+
+    writer.start();
+    tailer.start();
+
+    try {
+      writer.join();
+      tailer.join();
+
+      assertFalse(
+        "error occurred, see log above", error.get()
+      );
+    } catch (InterruptedException e) {
+      LOG.info("interrupted waiting for writer or tailer to complete");
+
+      Thread.currentThread().interrupt();
+    }
+    initialOutputStream.close();
+  }
+
+  private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {
+    for (int i = 0; i < len; i++) {
+      int expected = (i + startPos) % 127;
+
+      if (buf[i] % 127 != expected) {
+        LOG.error(String.format("at position [%d], got [%d] and expected [%d]",
+          startPos, buf[i], expected));
+
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private long tailFile(Path file, long startPos) throws IOException {
+    long numRead = 0;
+    FSDataInputStream inputStream = fileSystem.open(file);
+    inputStream.seek(startPos);
+
+    int len = 4 * 1024;
+    byte[] buf = new byte[len];
+    int read;
+    while ((read = inputStream.read(buf)) > -1) {
+      LOG.info(String.format("read %d bytes", read));
+
+      if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) {
+        LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
+        throw new ChecksumException(
+          String.format("unable to validate bytes"),
+          startPos
+        );
+      }
+
+      numRead += read;
+    }
+
+    inputStream.close();
+    return numRead + startPos - 1;
+  }
+}

+ 7 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -249,10 +249,13 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     }
 
     @Override
-    synchronized public void setBytesOnDisk(long bytesOnDisk) {
-      if (!finalized) {
-        oStream.setLength(bytesOnDisk);
-      }
+    public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
+      oStream.setLength(dataLength);
+    }
+
+    @Override
+    public ChunkChecksum getLastChecksumAndDataLen() {
+      return new ChunkChecksum(oStream.getLength(), null);
     }
   }