Browse Source

HDFS-8669. Erasure Coding: handle missing internal block locations in DFSStripedInputStream. Contributed by Jing Zhao.

Jing Zhao 10 years ago
parent
commit
b1e6429a6b

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

@@ -344,3 +344,6 @@
 
 
     HDFS-8744. Erasure Coding: the number of chunks in packet is not updated
     HDFS-8744. Erasure Coding: the number of chunks in packet is not updated
     when writing parity data. (Li Bo)
     when writing parity data. (Li Bo)
+
+    HDFS-8669. Erasure Coding: handle missing internal block locations in
+    DFSStripedInputStream. (jing9)

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.hadoop.hdfs;
 package org.apache.hadoop.hdfs;
 
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.EnumSet;
 
 
@@ -31,7 +32,7 @@ import org.apache.hadoop.util.DataChecksum;
  * from a single datanode.
  * from a single datanode.
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
-public interface BlockReader extends ByteBufferReadable {
+public interface BlockReader extends ByteBufferReadable, Closeable {
   
   
 
 
   /* same interface as inputStream java.io.InputStream#read()
   /* same interface as inputStream java.io.InputStream#read()
@@ -63,6 +64,7 @@ public interface BlockReader extends ByteBufferReadable {
    *
    *
    * @throws IOException
    * @throws IOException
    */
    */
+  @Override // java.io.Closeable
   void close() throws IOException;
   void close() throws IOException;
 
 
   /**
   /**

+ 203 - 128
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

@@ -43,6 +43,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 
 
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 
 
@@ -113,16 +114,43 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     }
   }
   }
 
 
-  private final BlockReader[] blockReaders;
-  /**
-   * when initializing block readers, their starting offsets are set to the same
-   * number: the smallest internal block offsets among all the readers. This is
-   * because it is possible that for some internal blocks we have to read
-   * "backwards" for decoding purpose. We thus use this offset array to track
-   * offsets for all the block readers so that we can skip data if necessary.
-   */
-  private final long[] blockReaderOffsets;
-  private final DatanodeInfo[] currentNodes;
+  private static class BlockReaderInfo {
+    final BlockReader reader;
+    final DatanodeInfo datanode;
+    /**
+     * when initializing block readers, their starting offsets are set to the same
+     * number: the smallest internal block offsets among all the readers. This is
+     * because it is possible that for some internal blocks we have to read
+     * "backwards" for decoding purpose. We thus use this offset array to track
+     * offsets for all the block readers so that we can skip data if necessary.
+     */
+    long blockReaderOffset;
+    LocatedBlock targetBlock;
+    /**
+     * We use this field to indicate whether we should use this reader. In case
+     * we hit any issue with this reader, we set this field to true and avoid
+     * using it for the next stripe.
+     */
+    boolean shouldSkip = false;
+
+    BlockReaderInfo(BlockReader reader, LocatedBlock targetBlock,
+        DatanodeInfo dn, long offset) {
+      this.reader = reader;
+      this.targetBlock = targetBlock;
+      this.datanode = dn;
+      this.blockReaderOffset = offset;
+    }
+
+    void setOffset(long offset) {
+      this.blockReaderOffset = offset;
+    }
+
+    void skip() {
+      this.shouldSkip = true;
+    }
+  }
+
+  private final BlockReaderInfo[] blockReaders;
   private final int cellSize;
   private final int cellSize;
   private final short dataBlkNum;
   private final short dataBlkNum;
   private final short parityBlkNum;
   private final short parityBlkNum;
@@ -151,9 +179,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     dataBlkNum = (short) schema.getNumDataUnits();
     dataBlkNum = (short) schema.getNumDataUnits();
     parityBlkNum = (short) schema.getNumParityUnits();
     parityBlkNum = (short) schema.getNumParityUnits();
     groupSize = dataBlkNum + parityBlkNum;
     groupSize = dataBlkNum + parityBlkNum;
-    blockReaders = new BlockReader[groupSize];
-    blockReaderOffsets = new long[groupSize];
-    currentNodes = new DatanodeInfo[groupSize];
+    blockReaders = new BlockReaderInfo[groupSize];
     curStripeRange = new StripeRange(0, 0);
     curStripeRange = new StripeRange(0, 0);
     readingService =
     readingService =
         new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
         new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
@@ -218,18 +244,26 @@ public class DFSStripedInputStream extends DFSInputStream {
     for (int i = 0; i < dataBlkNum; i++) {
     for (int i = 0; i < dataBlkNum; i++) {
       LocatedBlock targetBlock = targetBlocks[i];
       LocatedBlock targetBlock = targetBlocks[i];
       if (targetBlock != null) {
       if (targetBlock != null) {
-        DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
-        if (retval != null) {
-          currentNodes[i] = retval.info;
-          blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+        DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null);
+        if (dnInfo != null) {
+          BlockReader reader = getBlockReaderWithRetry(targetBlock,
               minOffset, targetBlock.getBlockSize() - minOffset,
               minOffset, targetBlock.getBlockSize() - minOffset,
-              retval.addr, retval.storageType, retval.info, target, retry);
-          blockReaderOffsets[i] = minOffset;
+              dnInfo.addr, dnInfo.storageType, dnInfo.info, target, retry);
+          if (reader != null) {
+            blockReaders[i] = new BlockReaderInfo(reader, targetBlock,
+                dnInfo.info, minOffset);
+          }
         }
         }
       }
       }
     }
     }
   }
   }
 
 
+  /**
+   * @throws IOException only when failing to refetch block token, which happens
+   * when this client cannot get located block information from NameNode. This
+   * method returns null instead of throwing exception when failing to connect
+   * to the DataNode.
+   */
   private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
   private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
       long offsetInBlock, long length, InetSocketAddress targetAddr,
       long offsetInBlock, long length, InetSocketAddress targetAddr,
       StorageType storageType, DatanodeInfo datanode, long offsetInFile,
       StorageType storageType, DatanodeInfo datanode, long offsetInFile,
@@ -275,21 +309,16 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     }
     for (int i = 0; i < groupSize; i++) {
     for (int i = 0; i < groupSize; i++) {
       closeReader(i);
       closeReader(i);
-      currentNodes[i] = null;
+      blockReaders[i] = null;
     }
     }
     blockEnd = -1;
     blockEnd = -1;
   }
   }
 
 
   private void closeReader(int index) {
   private void closeReader(int index) {
     if (blockReaders[index] != null) {
     if (blockReaders[index] != null) {
-      try {
-        blockReaders[index].close();
-      } catch (IOException e) {
-        DFSClient.LOG.error("error closing blockReader " + index, e);
-      }
-      blockReaders[index] = null;
+      IOUtils.cleanup(DFSClient.LOG, blockReaders[index].reader);
+      blockReaders[index].skip();
     }
     }
-    blockReaderOffsets[index] = 0;
   }
   }
 
 
   private long getOffsetInBlockGroup() {
   private long getOffsetInBlockGroup() {
@@ -323,16 +352,14 @@ public class DFSStripedInputStream extends DFSInputStream {
     AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
     AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
         blockGroup, offsetInBlockGroup,
         blockGroup, offsetInBlockGroup,
         offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf);
         offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf);
-    // TODO handle null elements in blks (e.g., NN does not know locations for
-    // all the internal blocks)
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     // read the whole stripe
     // read the whole stripe
     for (AlignedStripe stripe : stripes) {
     for (AlignedStripe stripe : stripes) {
       // Parse group to get chosen DN location
       // Parse group to get chosen DN location
       StripeReader sreader = new StatefulStripeReader(readingService, stripe,
       StripeReader sreader = new StatefulStripeReader(readingService, stripe,
-          blks);
-      sreader.readStripe(blks, corruptedBlockMap);
+          blks, corruptedBlockMap);
+      sreader.readStripe();
     }
     }
     curStripeBuf.position(stripeBufOffset);
     curStripeBuf.position(stripeBufOffset);
     curStripeBuf.limit(stripeLimit);
     curStripeBuf.limit(stripeLimit);
@@ -549,14 +576,13 @@ public class DFSStripedInputStream extends DFSInputStream {
         blockGroup, start, end, buf, offset);
         blockGroup, start, end, buf, offset);
     CompletionService<Void> readService = new ExecutorCompletionService<>(
     CompletionService<Void> readService = new ExecutorCompletionService<>(
         dfsClient.getStripedReadsThreadPool());
         dfsClient.getStripedReadsThreadPool());
-    // TODO handle null elements in blks (e.g., NN does not know locations for
-    // all the internal blocks)
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     for (AlignedStripe stripe : stripes) {
     for (AlignedStripe stripe : stripes) {
       // Parse group to get chosen DN location
       // Parse group to get chosen DN location
-      StripeReader preader = new PositionStripeReader(readService, stripe);
-      preader.readStripe(blks, corruptedBlockMap);
+      StripeReader preader = new PositionStripeReader(readService, stripe,
+          blks, corruptedBlockMap);
+      preader.readStripe();
     }
     }
   }
   }
 
 
@@ -586,43 +612,89 @@ public class DFSStripedInputStream extends DFSInputStream {
     final Map<Future<Void>, Integer> futures = new HashMap<>();
     final Map<Future<Void>, Integer> futures = new HashMap<>();
     final AlignedStripe alignedStripe;
     final AlignedStripe alignedStripe;
     final CompletionService<Void> service;
     final CompletionService<Void> service;
+    final LocatedBlock[] targetBlocks;
+    final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
 
 
-    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe) {
+    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
+        LocatedBlock[] targetBlocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
       this.service = service;
       this.service = service;
       this.alignedStripe = alignedStripe;
       this.alignedStripe = alignedStripe;
+      this.targetBlocks = targetBlocks;
+      this.corruptedBlockMap = corruptedBlockMap;
     }
     }
 
 
-    /** submit reading chunk task */
-    abstract void readChunk(final CompletionService<Void> service,
-        final LocatedBlock block, int chunkIndex,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap);
+    abstract boolean readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex);
 
 
-    /**
-     * When seeing first missing block, initialize decode input buffers.
-     * Also prepare the reading for data blocks outside of the reading range.
-     */
-    abstract void prepareDecodeInputs() throws IOException;
+    /** prepare all the data chunks */
+    abstract void prepareDecodeInputs();
 
 
-    /**
-     * Prepare reading for one more parity chunk.
-     */
-    abstract void prepareParityChunk() throws IOException;
+    /** prepare the parity chunk and block reader if necessary */
+    abstract boolean prepareParityChunk(int index) throws IOException;
 
 
     abstract void decode();
     abstract void decode();
 
 
     abstract void updateState4SuccessRead(StripingChunkReadResult result);
     abstract void updateState4SuccessRead(StripingChunkReadResult result);
 
 
+    private void checkMissingBlocks() throws IOException {
+      if (alignedStripe.missingChunksNum > parityBlkNum) {
+        clearFutures(futures.keySet());
+        throw new IOException(alignedStripe.missingChunksNum
+            + " missing blocks, the stripe is: " + alignedStripe);
+      }
+    }
+
+    /**
+     * We need decoding. Thus go through all the data chunks and make sure we
+     * submit read requests for all of them.
+     */
+    private void readDataForDecoding() throws IOException {
+      prepareDecodeInputs();
+      for (int i = 0; i < dataBlkNum; i++) {
+        Preconditions.checkNotNull(alignedStripe.chunks[i]);
+        if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
+          if (!readChunk(service, targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      checkMissingBlocks();
+    }
+
+    void readParityChunks(int num) throws IOException {
+      for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
+           i++) {
+        if (alignedStripe.chunks[i] == null) {
+          if (prepareParityChunk(i) && readChunk(service, targetBlocks[i], i)) {
+            j++;
+          } else {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      checkMissingBlocks();
+    }
+
     /** read the whole stripe. do decoding if necessary */
     /** read the whole stripe. do decoding if necessary */
-    void readStripe(LocatedBlock[] blocks,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
-        throws IOException {
-      assert alignedStripe.getSpanInBlock() > 0;
-      for (short i = 0; i < dataBlkNum; i++) {
-        if (alignedStripe.chunks[i] != null
-            && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
-          readChunk(service, blocks[i], i, corruptedBlockMap);
+    void readStripe() throws IOException {
+      for (int i = 0; i < dataBlkNum; i++) {
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+          if (!readChunk(service, targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
         }
         }
       }
       }
+      // There are missing block locations at this stage. Thus we need to read
+      // the full stripe and one more parity block.
+      if (alignedStripe.missingChunksNum > 0) {
+        checkMissingBlocks();
+        readDataForDecoding();
+        // read parity chunks
+        readParityChunks(alignedStripe.missingChunksNum);
+      }
+      // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
 
 
       // Input buffers for potential decode operation, which remains null until
       // Input buffers for potential decode operation, which remains null until
       // first read failure
       // first read failure
@@ -648,24 +720,15 @@ public class DFSStripedInputStream extends DFSInputStream {
             }
             }
           } else {
           } else {
             returnedChunk.state = StripingChunk.MISSING;
             returnedChunk.state = StripingChunk.MISSING;
-            alignedStripe.missingChunksNum++;
-            if (alignedStripe.missingChunksNum > parityBlkNum) {
-              clearFutures(futures.keySet());
-              throw new IOException("Too many blocks are missing: "
-                  + alignedStripe);
-            }
-
-            prepareDecodeInputs();
-            prepareParityChunk();
             // close the corresponding reader
             // close the corresponding reader
             closeReader(r.index);
             closeReader(r.index);
 
 
-            for (int i = 0; i < alignedStripe.chunks.length; i++) {
-              StripingChunk chunk = alignedStripe.chunks[i];
-              if (chunk != null && chunk.state == StripingChunk.REQUESTED) {
-                readChunk(service, blocks[i], i, corruptedBlockMap);
-              }
-            }
+            final int missing = alignedStripe.missingChunksNum;
+            alignedStripe.missingChunksNum++;
+            checkMissingBlocks();
+
+            readDataForDecoding();
+            readParityChunks(alignedStripe.missingChunksNum - missing);
           }
           }
         } catch (InterruptedException ie) {
         } catch (InterruptedException ie) {
           String err = "Read request interrupted";
           String err = "Read request interrupted";
@@ -686,20 +749,24 @@ public class DFSStripedInputStream extends DFSInputStream {
     private byte[][] decodeInputs = null;
     private byte[][] decodeInputs = null;
 
 
     PositionStripeReader(CompletionService<Void> service,
     PositionStripeReader(CompletionService<Void> service,
-        AlignedStripe alignedStripe) {
-      super(service, alignedStripe);
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      super(service, alignedStripe, targetBlocks, corruptedBlockMap);
     }
     }
 
 
     @Override
     @Override
-    void readChunk(final CompletionService<Void> service,
-        final LocatedBlock block, int chunkIndex,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    boolean readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex) {
+      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      if (block == null) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
       DatanodeInfo loc = block.getLocations()[0];
       DatanodeInfo loc = block.getLocations()[0];
       StorageType type = block.getStorageTypes()[0];
       StorageType type = block.getStorageTypes()[0];
       DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
       DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
           loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
           loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
           type);
           type);
-      StripingChunk chunk = alignedStripe.chunks[chunkIndex];
       chunk.state = StripingChunk.PENDING;
       chunk.state = StripingChunk.PENDING;
       Callable<Void> readCallable = getFromOneDataNode(dnAddr,
       Callable<Void> readCallable = getFromOneDataNode(dnAddr,
           block, alignedStripe.getOffsetInBlock(),
           block, alignedStripe.getOffsetInBlock(),
@@ -715,6 +782,7 @@ public class DFSStripedInputStream extends DFSInputStream {
             + alignedStripe.getSpanInBlock() - 1));
             + alignedStripe.getSpanInBlock() - 1));
       }
       }
       futures.put(getFromDNRequest, chunkIndex);
       futures.put(getFromDNRequest, chunkIndex);
+      return true;
     }
     }
 
 
     @Override
     @Override
@@ -728,18 +796,15 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     }
 
 
     @Override
     @Override
-    void prepareParityChunk() {
-      for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
-        if (alignedStripe.chunks[i] == null) {
-          final int decodeIndex = convertIndex4Decode(i,
-              dataBlkNum, parityBlkNum);
-          alignedStripe.chunks[i] =
-              new StripingChunk(decodeInputs[decodeIndex]);
-          alignedStripe.chunks[i].addByteArraySlice(0,
-              (int) alignedStripe.getSpanInBlock());
-          break;
-        }
-      }
+    boolean prepareParityChunk(int index) {
+      Preconditions.checkState(index >= dataBlkNum &&
+          alignedStripe.chunks[index] == null);
+      final int decodeIndex = convertIndex4Decode(index, dataBlkNum,
+          parityBlkNum);
+      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
+      alignedStripe.chunks[index].addByteArraySlice(0,
+          (int) alignedStripe.getSpanInBlock());
+      return true;
     }
     }
 
 
     @Override
     @Override
@@ -753,39 +818,43 @@ public class DFSStripedInputStream extends DFSInputStream {
 
 
   class StatefulStripeReader extends StripeReader {
   class StatefulStripeReader extends StripeReader {
     ByteBuffer[] decodeInputs;
     ByteBuffer[] decodeInputs;
-    final LocatedBlock[] targetBlocks;
 
 
     StatefulStripeReader(CompletionService<Void> service,
     StatefulStripeReader(CompletionService<Void> service,
-        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks) {
-      super(service, alignedStripe);
-      this.targetBlocks = targetBlocks;
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      super(service, alignedStripe, targetBlocks, corruptedBlockMap);
     }
     }
 
 
     @Override
     @Override
-    void readChunk(final CompletionService<Void> service,
-        final LocatedBlock block, int chunkIndex, Map<ExtendedBlock,
-        Set<DatanodeInfo>> corruptedBlockMap) {
-      StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+    boolean readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex) {
+      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      final BlockReaderInfo readerInfo = blockReaders[chunkIndex];
+      if (readerInfo == null || block == null || readerInfo.shouldSkip) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
       chunk.state = StripingChunk.PENDING;
       chunk.state = StripingChunk.PENDING;
       ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
       ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
-      Callable<Void> readCallable = readCell(blockReaders[chunkIndex],
-          currentNodes[chunkIndex], blockReaderOffsets[chunkIndex],
+      Callable<Void> readCallable = readCell(readerInfo.reader,
+          readerInfo.datanode, readerInfo.blockReaderOffset,
           alignedStripe.getOffsetInBlock(), strategy,
           alignedStripe.getOffsetInBlock(), strategy,
           chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap);
           chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap);
       Future<Void> request = readingService.submit(readCallable);
       Future<Void> request = readingService.submit(readCallable);
       futures.put(request, chunkIndex);
       futures.put(request, chunkIndex);
+      return true;
     }
     }
 
 
     @Override
     @Override
     void updateState4SuccessRead(StripingChunkReadResult result) {
     void updateState4SuccessRead(StripingChunkReadResult result) {
       Preconditions.checkArgument(
       Preconditions.checkArgument(
           result.state == StripingChunkReadResult.SUCCESSFUL);
           result.state == StripingChunkReadResult.SUCCESSFUL);
-      blockReaderOffsets[result.index] =
-          alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock();
+      blockReaders[result.index].setOffset(alignedStripe.getOffsetInBlock()
+          + alignedStripe.getSpanInBlock());
     }
     }
 
 
     @Override
     @Override
-    void prepareDecodeInputs() throws IOException {
+    void prepareDecodeInputs() {
       if (decodeInputs == null) {
       if (decodeInputs == null) {
         decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
         decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
         ByteBuffer cur = curStripeBuf.duplicate();
         ByteBuffer cur = curStripeBuf.duplicate();
@@ -799,52 +868,58 @@ public class DFSStripedInputStream extends DFSInputStream {
               parityBlkNum);
               parityBlkNum);
           decodeInputs[decodeIndex] = cur.slice();
           decodeInputs[decodeIndex] = cur.slice();
           if (alignedStripe.chunks[i] == null) {
           if (alignedStripe.chunks[i] == null) {
-            alignedStripe.chunks[i] =
-                new StripingChunk(decodeInputs[decodeIndex]);
+            alignedStripe.chunks[i] = new StripingChunk(
+                decodeInputs[decodeIndex]);
           }
           }
         }
         }
       }
       }
     }
     }
 
 
     @Override
     @Override
-    void prepareParityChunk() throws IOException {
-      for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
-        if (alignedStripe.chunks[i] == null) {
-          final int decodeIndex = convertIndex4Decode(i, dataBlkNum,
-              parityBlkNum);
-          decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
-              (int) alignedStripe.range.spanInBlock);
-          alignedStripe.chunks[i] =
-              new StripingChunk(decodeInputs[decodeIndex]);
-          if (blockReaders[i] == null) {
-            prepareParityBlockReader(i);
-          }
-          break;
-        }
+    boolean prepareParityChunk(int index) throws IOException {
+      Preconditions.checkState(index >= dataBlkNum
+          && alignedStripe.chunks[index] == null);
+      if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
+        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
+        // we have failed the block reader before
+        return false;
       }
       }
+      final int decodeIndex = convertIndex4Decode(index, dataBlkNum,
+          parityBlkNum);
+      decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
+          (int) alignedStripe.range.spanInBlock);
+      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
+      if (blockReaders[index] == null && !prepareParityBlockReader(index)) {
+        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
+        return false;
+      }
+      return true;
     }
     }
 
 
-    private void prepareParityBlockReader(int i) throws IOException {
+    private boolean prepareParityBlockReader(int i) throws IOException {
       // prepare the block reader for the parity chunk
       // prepare the block reader for the parity chunk
       LocatedBlock targetBlock = targetBlocks[i];
       LocatedBlock targetBlock = targetBlocks[i];
       if (targetBlock != null) {
       if (targetBlock != null) {
         final long offsetInBlock = alignedStripe.getOffsetInBlock();
         final long offsetInBlock = alignedStripe.getOffsetInBlock();
-        DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
-        if (retval != null) {
-          currentNodes[i] = retval.info;
-          blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+        DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null);
+        if (dnInfo != null) {
+          BlockReader reader = getBlockReaderWithRetry(targetBlock,
               offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
               offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
-              retval.addr, retval.storageType, retval.info,
+              dnInfo.addr, dnInfo.storageType, dnInfo.info,
               DFSStripedInputStream.this.getPos(), retry);
               DFSStripedInputStream.this.getPos(), retry);
-          blockReaderOffsets[i] = offsetInBlock;
+          if (reader != null) {
+            blockReaders[i] = new BlockReaderInfo(reader, targetBlock,
+                dnInfo.info, offsetInBlock);
+            return true;
+          }
         }
         }
       }
       }
+      return false;
     }
     }
 
 
     @Override
     @Override
     void decode() {
     void decode() {
-      // TODO no copy for data chunks. this depends on HADOOP-12047 for some
-      // decoders to work
+      // TODO no copy for data chunks. this depends on HADOOP-12047
       final int span = (int) alignedStripe.getSpanInBlock();
       final int span = (int) alignedStripe.getSpanInBlock();
       for (int i = 0; i < alignedStripe.chunks.length; i++) {
       for (int i = 0; i < alignedStripe.chunks.length; i++) {
         final int decodeIndex = convertIndex4Decode(i,
         final int decodeIndex = convertIndex4Decode(i,

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java

@@ -83,6 +83,7 @@ public class StripedBlockUtil {
     LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
     LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
     for (short i = 0; i < locatedBGSize; i++) {
     for (short i = 0; i < locatedBGSize; i++) {
       final int idx = bg.getBlockIndices()[i];
       final int idx = bg.getBlockIndices()[i];
+      // for now we do not use redundant replica of an internal block
       if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
       if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
         lbs[idx] = constructInternalBlock(bg, i, cellSize,
         lbs[idx] = constructInternalBlock(bg, i, cellSize,
             dataBlkNum, idx);
             dataBlkNum, idx);
@@ -212,7 +213,9 @@ public class StripedBlockUtil {
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
       }
     } catch (ExecutionException e) {
     } catch (ExecutionException e) {
-      DFSClient.LOG.warn("ExecutionException " + e);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("ExecutionException " + e);
+      }
       return new StripingChunkReadResult(futures.remove(future),
       return new StripingChunkReadResult(futures.remove(future),
           StripingChunkReadResult.FAILED);
           StripingChunkReadResult.FAILED);
     } catch (CancellationException e) {
     } catch (CancellationException e) {
@@ -623,7 +626,7 @@ public class StripedBlockUtil {
             cellSize, dataBlkNum, i);
             cellSize, dataBlkNum, i);
         if (internalBlkLen <= s.getOffsetInBlock()) {
         if (internalBlkLen <= s.getOffsetInBlock()) {
           Preconditions.checkState(s.chunks[i] == null);
           Preconditions.checkState(s.chunks[i] == null);
-          s.chunks[i] = new StripingChunk(); // chunk state is set to ALLZERO
+          s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO);
         }
         }
       }
       }
     }
     }
@@ -841,10 +844,10 @@ public class StripedBlockUtil {
       this.byteBuffer = buf;
       this.byteBuffer = buf;
     }
     }
 
 
-    public StripingChunk() {
+    public StripingChunk(int state) {
       this.byteArray = null;
       this.byteArray = null;
       this.byteBuffer = null;
       this.byteBuffer = null;
-      this.state = ALLZERO;
+      this.state = state;
     }
     }
 
 
     public void addByteArraySlice(int offset, int length) {
     public void addByteArraySlice(int offset, int length) {

+ 128 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java

@@ -18,9 +18,16 @@
 package org.apache.hadoop.hdfs;
 package org.apache.hadoop.hdfs;
 
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.junit.Assert;
 
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.Random;
 
 
 public class StripedFileTestUtil {
 public class StripedFileTestUtil {
@@ -56,4 +63,125 @@ public class StripedFileTestUtil {
     final int mod = 29;
     final int mod = 29;
     return (byte) (pos % mod + 1);
     return (byte) (pos % mod + 1);
   }
   }
+
+  static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(srcPath);
+    Assert.assertEquals("File length should be the same", fileLength, status.getLen());
+  }
+
+  static void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
+      byte[] expected, byte[] buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
+          cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
+          cellSize * dataBlocks, fileLength - 102, fileLength - 1};
+      for (int startOffset : startOffsets) {
+        startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+        int remaining = fileLength - startOffset;
+        in.readFully(startOffset, buf, 0, remaining);
+        for (int i = 0; i < remaining; i++) {
+          Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
+              "same", expected[startOffset + i], buf[i]);
+        }
+      }
+    }
+  }
+
+  static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+      byte[] expected, byte[] buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      final byte[] result = new byte[fileLength];
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf, 0, buf.length)) >= 0) {
+        System.arraycopy(buf, 0, result, readLen, ret);
+        readLen += ret;
+      }
+      Assert.assertEquals("The length of file should be the same to write size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result);
+    }
+  }
+
+  static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+      byte[] expected, ByteBuffer buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      ByteBuffer result = ByteBuffer.allocate(fileLength);
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf)) >= 0) {
+        readLen += ret;
+        buf.flip();
+        result.put(buf);
+        buf.clear();
+      }
+      Assert.assertEquals("The length of file should be the same to write size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result.array());
+    }
+  }
+
+  static void verifySeek(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      // seek to 1/2 of content
+      int pos = fileLength / 2;
+      assertSeekAndRead(in, pos, fileLength);
+
+      // seek to 1/3 of content
+      pos = fileLength / 3;
+      assertSeekAndRead(in, pos, fileLength);
+
+      // seek to 0 pos
+      pos = 0;
+      assertSeekAndRead(in, pos, fileLength);
+
+      if (fileLength > cellSize) {
+        // seek to cellSize boundary
+        pos = cellSize - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (fileLength > cellSize * dataBlocks) {
+        // seek to striped cell group boundary
+        pos = cellSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (fileLength > blockSize * dataBlocks) {
+        // seek to striped block group boundary
+        pos = blockSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
+        try {
+          in.seek(-1);
+          Assert.fail("Should be failed if seek to negative offset");
+        } catch (EOFException e) {
+          // expected
+        }
+
+        try {
+          in.seek(fileLength + 1);
+          Assert.fail("Should be failed if seek after EOF");
+        } catch (EOFException e) {
+          // expected
+        }
+      }
+    }
+  }
+
+  static void assertSeekAndRead(FSDataInputStream fsdis, int pos,
+      int writeBytes) throws IOException {
+    fsdis.seek(pos);
+    byte[] buf = new byte[writeBytes];
+    int readLen = StripedFileTestUtil.readAll(fsdis, buf);
+    Assert.assertEquals(readLen, writeBytes - pos);
+    for (int i = 0; i < readLen; i++) {
+      Assert.assertEquals("Byte at " + i + " should be the same",
+          StripedFileTestUtil.getByte(pos + i), buf[i]);
+    }
+  }
 }
 }

+ 150 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java

@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+
+/**
+ * Test reading a striped file when some of its blocks are missing (not included
+ * in the block locations returned by the NameNode).
+ */
+public class TestReadStripedFileWithMissingBlocks {
+  public static final Log LOG = LogFactory
+      .getLog(TestReadStripedFileWithMissingBlocks.class);
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static Configuration conf = new HdfsConfiguration();
+  private final int fileLength = blockSize * dataBlocks + 123;
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks1() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 0);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks2() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 1);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks3() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 2);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks4() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 0);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks5() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 1);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks6() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 3, 0);
+  }
+
+  private void readFileWithMissingBlocks(Path srcPath, int fileLength,
+      int missingDataNum, int missingParityNum)
+      throws IOException {
+    LOG.info("readFileWithMissingBlocks: (" + missingDataNum + ","
+        + missingParityNum + ")");
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+    int dataBlocks = (fileLength - 1) / cellSize + 1;
+    BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize);
+
+    int[] missingDataNodes = new int[missingDataNum + missingParityNum];
+    for (int i = 0; i < missingDataNum; i++) {
+      missingDataNodes[i] = i;
+    }
+    for (int i = 0; i < missingParityNum; i++) {
+      missingDataNodes[i + missingDataNum] = i +
+          Math.min(StripedFileTestUtil.dataBlocks, dataBlocks);
+    }
+    stopDataNodes(locs, missingDataNodes);
+
+    // make sure there are missing block locations
+    BlockLocation[] newLocs = fs.getFileBlockLocations(srcPath, 0, cellSize);
+    Assert.assertTrue(newLocs[0].getNames().length < locs[0].getNames().length);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+    // delete the file
+    fs.delete(srcPath, true);
+  }
+
+  private void stopDataNodes(BlockLocation[] locs, int[] datanodes)
+      throws IOException {
+    if (locs != null && locs.length > 0) {
+      for (int failedDNIdx : datanodes) {
+        String name = (locs[0].getNames())[failedDNIdx];
+        for (DataNode dn : cluster.getDataNodes()) {
+          int port = dn.getXferPort();
+          if (name.contains(Integer.toString(port))) {
+            dn.shutdown();
+            cluster.setDataNodeDead(dn.getDatanodeId());
+            LOG.info("stop datanode " + failedDNIdx);
+            break;
+          }
+        }
+      }
+    }
+  }
+}

+ 17 - 145
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java

@@ -21,20 +21,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.junit.After;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 
 
@@ -48,11 +43,10 @@ public class TestWriteReadStripedFile {
   public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class);
   public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class);
   private static MiniDFSCluster cluster;
   private static MiniDFSCluster cluster;
   private static FileSystem fs;
   private static FileSystem fs;
-  private static Configuration conf;
+  private static Configuration conf = new HdfsConfiguration();
 
 
   @Before
   @Before
   public void setup() throws IOException {
   public void setup() throws IOException {
-    conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().createErasureCodingZone("/",
     cluster.getFileSystem().getClient().createErasureCodingZone("/",
@@ -175,18 +169,6 @@ public class TestWriteReadStripedFile {
             + cellSize + 123, true);
             + cellSize + 123, true);
   }
   }
 
 
-  private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
-                                 int writeBytes) throws IOException {
-    fsdis.seek(pos);
-    byte[] buf = new byte[writeBytes];
-    int readLen = StripedFileTestUtil.readAll(fsdis, buf);
-    Assert.assertEquals(readLen, writeBytes - pos);
-    for (int i = 0; i < readLen; i++) {
-      Assert.assertEquals("Byte at " + i + " should be the same",
-          StripedFileTestUtil.getByte(pos + i), buf[i]);
-    }
-  }
-
   private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
   private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
       throws IOException {
       throws IOException {
     testOneFileUsingDFSStripedInputStream(src, fileLength, false);
     testOneFileUsingDFSStripedInputStream(src, fileLength, false);
@@ -198,7 +180,7 @@ public class TestWriteReadStripedFile {
     Path srcPath = new Path(src);
     Path srcPath = new Path(src);
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
 
 
-    verifyLength(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
 
 
     if (withDataNodeFailure) {
     if (withDataNodeFailure) {
       int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks);
       int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks);
@@ -208,14 +190,16 @@ public class TestWriteReadStripedFile {
 
 
     byte[] smallBuf = new byte[1024];
     byte[] smallBuf = new byte[1024];
     byte[] largeBuf = new byte[fileLength + 100];
     byte[] largeBuf = new byte[fileLength + 100];
-    verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
 
 
-    verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
-    verifySeek(fs, srcPath, fileLength);
-    verifyStatefulRead(fs, srcPath, fileLength, expected,
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        largeBuf);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
         ByteBuffer.allocate(fileLength + 100));
         ByteBuffer.allocate(fileLength + 100));
-    verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
-    verifyStatefulRead(fs, srcPath, fileLength, expected,
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
         ByteBuffer.allocate(1024));
         ByteBuffer.allocate(1024));
   }
   }
 
 
@@ -241,130 +225,18 @@ public class TestWriteReadStripedFile {
     final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
     final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
     FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
     FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
         WebHdfsConstants.WEBHDFS_SCHEME);
         WebHdfsConstants.WEBHDFS_SCHEME);
-    Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
+    Path srcPath = new Path("/testWriteReadUsingWebHdfs");
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
 
 
-    verifyLength(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
 
 
     byte[] smallBuf = new byte[1024];
     byte[] smallBuf = new byte[1024];
     byte[] largeBuf = new byte[fileLength + 100];
     byte[] largeBuf = new byte[fileLength + 100];
-    verifyPread(fs, srcPath, fileLength, expected, largeBuf);
-
-    verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
-    verifySeek(fs, srcPath, fileLength);
-    verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
-    //webhdfs doesn't support bytebuffer read
-  }
-
-  void verifyLength(FileSystem fs, Path srcPath, int fileLength)
-      throws IOException {
-    FileStatus status = fs.getFileStatus(srcPath);
-    Assert.assertEquals("File length should be the same",
-        fileLength, status.getLen());
-  }
-
-  void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
-                   byte[] expected, byte[] buf) throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
-          cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
-          cellSize * dataBlocks, fileLength - 102, fileLength - 1};
-      for (int startOffset : startOffsets) {
-        startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
-        int remaining = fileLength - startOffset;
-        in.readFully(startOffset, buf, 0, remaining);
-        for (int i = 0; i < remaining; i++) {
-          Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
-              "same", expected[startOffset + i], buf[i]);
-        }
-      }
-    }
-  }
-
-  void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
-                          byte[] expected, byte[] buf) throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      final byte[] result = new byte[fileLength];
-      int readLen = 0;
-      int ret;
-      while ((ret = in.read(buf, 0, buf.length)) >= 0) {
-        System.arraycopy(buf, 0, result, readLen, ret);
-        readLen += ret;
-      }
-      Assert.assertEquals("The length of file should be the same to write size",
-          fileLength, readLen);
-      Assert.assertArrayEquals(expected, result);
-    }
-  }
-
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
 
 
-  void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
-                          byte[] expected, ByteBuffer buf) throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      ByteBuffer result = ByteBuffer.allocate(fileLength);
-      int readLen = 0;
-      int ret;
-      while ((ret = in.read(buf)) >= 0) {
-        readLen += ret;
-        buf.flip();
-        result.put(buf);
-        buf.clear();
-      }
-      Assert.assertEquals("The length of file should be the same to write size",
-          fileLength, readLen);
-      Assert.assertArrayEquals(expected, result.array());
-    }
-  }
-
-
-  void verifySeek(FileSystem fs, Path srcPath, int fileLength)
-      throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      // seek to 1/2 of content
-      int pos = fileLength / 2;
-      assertSeekAndRead(in, pos, fileLength);
-
-      // seek to 1/3 of content
-      pos = fileLength / 3;
-      assertSeekAndRead(in, pos, fileLength);
-
-      // seek to 0 pos
-      pos = 0;
-      assertSeekAndRead(in, pos, fileLength);
-
-      if (fileLength > cellSize) {
-        // seek to cellSize boundary
-        pos = cellSize - 1;
-        assertSeekAndRead(in, pos, fileLength);
-      }
-
-      if (fileLength > cellSize * dataBlocks) {
-        // seek to striped cell group boundary
-        pos = cellSize * dataBlocks - 1;
-        assertSeekAndRead(in, pos, fileLength);
-      }
-
-      if (fileLength > blockSize * dataBlocks) {
-        // seek to striped block group boundary
-        pos = blockSize * dataBlocks - 1;
-        assertSeekAndRead(in, pos, fileLength);
-      }
-
-      if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
-        try {
-          in.seek(-1);
-          Assert.fail("Should be failed if seek to negative offset");
-        } catch (EOFException e) {
-          // expected
-        }
-
-        try {
-          in.seek(fileLength + 1);
-          Assert.fail("Should be failed if seek after EOF");
-        } catch (EOFException e) {
-          // expected
-        }
-      }
-    }
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
+    // webhdfs doesn't support bytebuffer read
   }
   }
 }
 }