소스 검색

HDFS-8319. Erasure Coding: support decoding for stateful read. Contributed by Jing Zhao.

Zhe Zhang 10 년 전
부모
커밋
c0929ab3c2

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

@@ -283,3 +283,6 @@
 
     HDFS-8328. Follow-on to update decode for DataNode striped blocks
     reconstruction. (yliu)
+
+    HDFS-8319. Erasure Coding: support decoding for stateful read.
+    (Jing Zhao via zhz)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -1639,7 +1639,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    */
   @Override
-  public synchronized long getPos() throws IOException {
+  public synchronized long getPos() {
     return pos;
   }
 

+ 399 - 196
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

@@ -30,12 +30,13 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.ByteBufferPool;
 
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
@@ -55,6 +56,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.Set;
 import java.util.Collection;
@@ -63,8 +65,6 @@ import java.util.HashMap;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
@@ -113,11 +113,19 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   private final BlockReader[] blockReaders;
+  /**
+   * when initializing block readers, their starting offsets are set to the same
+   * number: the smallest internal block offsets among all the readers. This is
+   * because it is possible that for some internal blocks we have to read
+   * "backwards" for decoding purpose. We thus use this offset array to track
+   * offsets for all the block readers so that we can skip data if necessary.
+   */
+  private final long[] blockReaderOffsets;
   private final DatanodeInfo[] currentNodes;
   private final int cellSize;
   private final short dataBlkNum;
   private final short parityBlkNum;
-  private final short groupSize;
+  private final int groupSize;
   /** the buffer for a complete stripe */
   private ByteBuffer curStripeBuf;
   private final ECSchema schema;
@@ -128,7 +136,8 @@ public class DFSStripedInputStream extends DFSInputStream {
    * block group
    */
   private StripeRange curStripeRange;
-  private final CompletionService<Integer> readingService;
+  private final CompletionService<Void> readingService;
+  private ReaderRetryPolicy retry;
 
   DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
       ECSchema schema, int cellSize) throws IOException {
@@ -139,8 +148,9 @@ public class DFSStripedInputStream extends DFSInputStream {
     this.cellSize = cellSize;
     dataBlkNum = (short) schema.getNumDataUnits();
     parityBlkNum = (short) schema.getNumParityUnits();
-    groupSize = dataBlkNum;
+    groupSize = dataBlkNum + parityBlkNum;
     blockReaders = new BlockReader[groupSize];
+    blockReaderOffsets = new long[groupSize];
     currentNodes = new DatanodeInfo[groupSize];
     curStripeRange = new StripeRange(0, 0);
     readingService =
@@ -197,20 +207,21 @@ public class DFSStripedInputStream extends DFSInputStream {
     // The purpose is to get start offset into each block.
     long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
         cellSize, targetBlockGroup, offsetIntoBlockGroup);
-    Preconditions.checkNotNull(offsetsForInternalBlocks);
+    Preconditions.checkState(
+        offsetsForInternalBlocks.length == dataBlkNum + parityBlkNum);
+    long minOffset = offsetsForInternalBlocks[dataBlkNum];
 
-    final ReaderRetryPolicy retry = new ReaderRetryPolicy();
-    for (int i = 0; i < groupSize; i++) {
+    retry = new ReaderRetryPolicy();
+    for (int i = 0; i < dataBlkNum; i++) {
       LocatedBlock targetBlock = targetBlocks[i];
       if (targetBlock != null) {
-        long offsetInBlock = offsetsForInternalBlocks[i] < 0 ?
-            0 : offsetsForInternalBlocks[i];
         DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
         if (retval != null) {
           currentNodes[i] = retval.info;
           blockReaders[i] = getBlockReaderWithRetry(targetBlock,
-              offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
+              minOffset, targetBlock.getBlockSize() - minOffset,
               retval.addr, retval.storageType, retval.info, target, retry);
+          blockReaderOffsets[i] = minOffset;
         }
       }
     }
@@ -260,19 +271,24 @@ public class DFSStripedInputStream extends DFSInputStream {
       return;
     }
     for (int i = 0; i < groupSize; i++) {
-      if (blockReaders[i] != null) {
-        try {
-          blockReaders[i].close();
-        } catch (IOException e) {
-          DFSClient.LOG.error("error closing blockReader", e);
-        }
-        blockReaders[i] = null;
-      }
+      closeReader(i);
       currentNodes[i] = null;
     }
     blockEnd = -1;
   }
 
+  private void closeReader(int index) {
+    if (blockReaders[index] != null) {
+      try {
+        blockReaders[index].close();
+      } catch (IOException e) {
+        DFSClient.LOG.error("error closing blockReader " + index, e);
+      }
+      blockReaders[index] = null;
+    }
+    blockReaderOffsets[index] = 0;
+  }
+
   private long getOffsetInBlockGroup() {
     return getOffsetInBlockGroup(pos);
   }
@@ -300,54 +316,81 @@ public class DFSStripedInputStream extends DFSInputStream {
     curStripeRange = new StripeRange(offsetInBlockGroup,
         stripeLimit - stripeBufOffset);
 
-    final int startCell = stripeBufOffset / cellSize;
-    final int numCell = (stripeLimit - 1) / cellSize + 1;
-
-    // read the whole stripe in parallel
-    Map<Future<Integer>, Integer> futures = new HashMap<>();
-    for (int i = startCell; i < numCell; i++) {
-      int bufPos = i == startCell ? stripeBufOffset : cellSize * i;
-      curStripeBuf.position(bufPos);
-      curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit));
-      ByteBuffer buf = curStripeBuf.slice();
-      ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
-      final int targetLength = buf.remaining();
-      Callable<Integer> readCallable = readCell(blockReaders[i],
-          currentNodes[i], strategy, targetLength, corruptedBlockMap);
-      Future<Integer> request = readingService.submit(readCallable);
-      futures.put(request, i);
-    }
-    while (!futures.isEmpty()) {
-      try {
-        waitNextCompletion(readingService, futures);
-        // TODO: decode and record bad reader if necessary
-      } catch (InterruptedException ignored) {
-        // ignore and retry
-      }
+    LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
+    AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
+        blockGroup, offsetInBlockGroup,
+        offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf);
+    // TODO handle null elements in blks (e.g., NN does not know locations for
+    // all the internal blocks)
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
+        blockGroup, cellSize, dataBlkNum, parityBlkNum);
+    // read the whole stripe
+    for (AlignedStripe stripe : stripes) {
+      // Parse group to get chosen DN location
+      StripeReader sreader = new StatefulStripeReader(readingService, stripe,
+          blks);
+      sreader.readStripe(blks, corruptedBlockMap);
     }
+    curStripeBuf.position(stripeBufOffset);
+    curStripeBuf.limit(stripeLimit);
   }
 
-  private Callable<Integer> readCell(final BlockReader reader,
-      final DatanodeInfo datanode, final ByteBufferStrategy strategy,
+  private Callable<Void> readCell(final BlockReader reader,
+      final DatanodeInfo datanode, final long currentReaderOffset,
+      final long targetReaderOffset, final ByteBufferStrategy strategy,
       final int targetLength,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
-    return new Callable<Integer>() {
+    return new Callable<Void>() {
       @Override
-      public Integer call() throws Exception {
+      public Void call() throws Exception {
+        // reader can be null if getBlockReaderWithRetry failed or
+        // the reader hit exception before
+        if (reader == null) {
+          throw new IOException("The BlockReader is null. " +
+              "The BlockReader creation failed or the reader hit exception.");
+        }
+        Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
+        if (currentReaderOffset < targetReaderOffset) {
+          long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
+          Preconditions.checkState(
+              skipped == targetReaderOffset - currentReaderOffset);
+        }
         int result = 0;
         while (result < targetLength) {
-          int ret = readBuffer(reader, datanode, strategy, corruptedBlockMap);
+          int ret = readToBuffer(reader, datanode, strategy, corruptedBlockMap);
           if (ret < 0) {
             throw new IOException("Unexpected EOS from the reader");
           }
           result += ret;
         }
         updateReadStatistics(readStatistics, targetLength, reader);
-        return result;
+        return null;
       }
     };
   }
 
+  private int readToBuffer(BlockReader blockReader,
+      DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    try {
+      return readerStrategy.doRead(blockReader, 0, 0);
+    } catch (ChecksumException ce) {
+      DFSClient.LOG.warn("Found Checksum error for "
+          + getCurrentBlock() + " from " + currentNode
+          + " at " + ce.getPos());
+      // we want to remember which block replicas we have tried
+      addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+          corruptedBlockMap);
+      throw ce;
+    } catch (IOException e) {
+      DFSClient.LOG.warn("Exception while reading from "
+          + getCurrentBlock() + " of " + src + " from "
+          + currentNode, e);
+      throw e;
+    }
+  }
+
   /**
    * Seek to a new arbitrary location
    */
@@ -416,7 +459,7 @@ public class DFSStripedInputStream extends DFSInputStream {
           if (!curStripeRange.include(getOffsetInBlockGroup())) {
             readOneStripe(corruptedBlockMap);
           }
-          int ret = copy(strategy, off + result, realLen - result);
+          int ret = copyToTargetBuf(strategy, off + result, realLen - result);
           result += ret;
           pos += ret;
         }
@@ -434,26 +477,6 @@ public class DFSStripedInputStream extends DFSInputStream {
     return -1;
   }
 
-  private int readBuffer(BlockReader blockReader,
-      DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
-    try {
-      return readerStrategy.doRead(blockReader, 0, 0);
-    } catch ( ChecksumException ce ) {
-      DFSClient.LOG.warn("Found Checksum error for "
-          + getCurrentBlock() + " from " + currentNode
-          + " at " + ce.getPos());
-      // we want to remember which block replicas we have tried
-      addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
-          corruptedBlockMap);
-    } catch (IOException e) {
-      DFSClient.LOG.warn("Exception while reading from "
-          + getCurrentBlock() + " of " + src + " from "
-          + currentNode, e);
-    }
-    return -1;
-  }
-
   /**
    * Copy the data from {@link #curStripeBuf} into the given buffer
    * @param strategy the ReaderStrategy containing the given buffer
@@ -462,7 +485,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    * @param length target length
    * @return number of bytes copied
    */
-  private int copy(ReaderStrategy strategy, int offset, int length) {
+  private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
     final long offsetInBlk = getOffsetInBlockGroup();
     int bufOffset = getStripedBufOffset(offsetInBlk);
     curStripeBuf.position(bufOffset);
@@ -519,120 +542,19 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize,
         blockGroup, start, end, buf, offset);
+    CompletionService<Void> readService = new ExecutorCompletionService<>(
+        dfsClient.getStripedReadsThreadPool());
+    // TODO handle null elements in blks (e.g., NN does not know locations for
+    // all the internal blocks)
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
+        blockGroup, cellSize, dataBlkNum, parityBlkNum);
     for (AlignedStripe stripe : stripes) {
-      fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
-    }
-  }
-
-  private void fetchOneStripe(LocatedStripedBlock blockGroup,
-      byte[] buf, AlignedStripe alignedStripe, Map<ExtendedBlock,
-      Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
-    Map<Future<Void>, Integer> futures = new HashMap<>();
-    CompletionService<Void> service =
-        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
-    if (alignedStripe.getSpanInBlock() == 0) {
-      DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup);
-      return;
-    }
-    // Parse group to get chosen DN location
-    LocatedBlock[] blks = StripedBlockUtil.
-        parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
-    for (short i = 0; i < dataBlkNum; i++) {
-      if (alignedStripe.chunks[i] != null
-          && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
-        fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
-            corruptedBlockMap);
-      }
-    }
-    // Input buffers for potential decode operation, which remains null until
-    // first read failure
-    byte[][] decodeInputs = null;
-    while (!futures.isEmpty()) {
-      try {
-        StripingChunkReadResult r = getNextCompletedStripedRead(
-            service, futures, 0);
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe);
-        }
-        StripingChunk returnedChunk = alignedStripe.chunks[r.index];
-        Preconditions.checkNotNull(returnedChunk);
-        Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
-        if (r.state == StripingChunkReadResult.SUCCESSFUL) {
-          returnedChunk.state = StripingChunk.FETCHED;
-          alignedStripe.fetchedChunksNum++;
-          if (alignedStripe.fetchedChunksNum == dataBlkNum) {
-            clearFutures(futures.keySet());
-            break;
-          }
-        } else {
-          returnedChunk.state = StripingChunk.MISSING;
-          alignedStripe.missingChunksNum++;
-          if (alignedStripe.missingChunksNum > parityBlkNum) {
-            clearFutures(futures.keySet());
-            throw new IOException("Too many blocks are missing: " + alignedStripe);
-          }
-          // When seeing first missing block, initialize decode input buffers
-          if (decodeInputs == null) {
-            decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
-          }
-          for (int i = 0; i < alignedStripe.chunks.length; i++) {
-            StripingChunk chunk = alignedStripe.chunks[i];
-            Preconditions.checkNotNull(chunk);
-            if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) {
-              fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
-                  corruptedBlockMap);
-            }
-          }
-        }
-      } catch (InterruptedException ie) {
-        String err = "Read request interrupted";
-        DFSClient.LOG.error(err);
-        clearFutures(futures.keySet());
-        // Don't decode if read interrupted
-        throw new InterruptedIOException(err);
-      }
-    }
-
-    if (alignedStripe.missingChunksNum > 0) {
-      finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum,
-          alignedStripe);
-      decodeAndFillBuffer(decodeInputs, buf, alignedStripe, dataBlkNum,
-          parityBlkNum, decoder);
+      // Parse group to get chosen DN location
+      StripeReader preader = new PositionStripeReader(readService, stripe);
+      preader.readStripe(blks, corruptedBlockMap);
     }
   }
 
-  /**
-   * Schedule a single read request to an internal block
-   * @param block The internal block
-   * @param index Index of the internal block in the group
-   * @param corruptedBlockMap Map of corrupted blocks
-   */
-  private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
-      final CompletionService<Void> service, final LocatedBlock block,
-      final AlignedStripe alignedStripe, final int index,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
-    DatanodeInfo loc = block.getLocations()[0];
-    StorageType type = block.getStorageTypes()[0];
-    DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
-        loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
-        type);
-    StripingChunk chunk = alignedStripe.chunks[index];
-    chunk.state = StripingChunk.PENDING;
-    Callable<Void> readCallable = getFromOneDataNode(dnAddr,
-        block, alignedStripe.getOffsetInBlock(),
-        alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1,
-        chunk.buf, chunk.getOffsets(), chunk.getLengths(),
-        corruptedBlockMap, index);
-    Future<Void> getFromDNRequest = service.submit(readCallable);
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Submitting striped read request for " + index +
-          ". Info of the block: " + block + ", offset in block is " +
-          alignedStripe.getOffsetInBlock() + ", end is " +
-          (alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1));
-    }
-    futures.put(getFromDNRequest, index);
-  }
-
   private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
       final LocatedBlock block, final long start, final long end,
       final byte[] buf, final int[] offsets, final int[] lengths,
@@ -655,21 +577,302 @@ public class DFSStripedInputStream extends DFSInputStream {
     };
   }
 
-  private <T> void waitNextCompletion(CompletionService<T> service,
-      Map<Future<T>, Integer> futures) throws InterruptedException {
-    if (futures.isEmpty()) {
-      throw new InterruptedException("Futures already empty");
+  private abstract class StripeReader {
+    final Map<Future<Void>, Integer> futures = new HashMap<>();
+    final AlignedStripe alignedStripe;
+    final CompletionService<Void> service;
+
+    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe) {
+      this.service = service;
+      this.alignedStripe = alignedStripe;
+    }
+
+    /** submit reading chunk task */
+    abstract void readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap);
+
+    /**
+     * When seeing first missing block, initialize decode input buffers.
+     * Also prepare the reading for data blocks outside of the reading range.
+     */
+    abstract void prepareDecodeInputs() throws IOException;
+
+    /**
+     * Prepare reading for one more parity chunk.
+     */
+    abstract void prepareParityChunk() throws IOException;
+
+    abstract void decode();
+
+    abstract void updateState4SuccessRead(StripingChunkReadResult result);
+
+    /** read the whole stripe. do decoding if necessary */
+    void readStripe(LocatedBlock[] blocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+        throws IOException {
+      assert alignedStripe.getSpanInBlock() > 0;
+      for (short i = 0; i < dataBlkNum; i++) {
+        if (alignedStripe.chunks[i] != null
+            && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+          readChunk(service, blocks[i], i, corruptedBlockMap);
+        }
+      }
+
+      // Input buffers for potential decode operation, which remains null until
+      // first read failure
+      while (!futures.isEmpty()) {
+        try {
+          StripingChunkReadResult r = getNextCompletedStripedRead(service,
+              futures, 0);
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+                + alignedStripe);
+          }
+          StripingChunk returnedChunk = alignedStripe.chunks[r.index];
+          Preconditions.checkNotNull(returnedChunk);
+          Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
+
+          if (r.state == StripingChunkReadResult.SUCCESSFUL) {
+            returnedChunk.state = StripingChunk.FETCHED;
+            alignedStripe.fetchedChunksNum++;
+            updateState4SuccessRead(r);
+            if (alignedStripe.fetchedChunksNum == dataBlkNum) {
+              clearFutures(futures.keySet());
+              break;
+            }
+          } else {
+            returnedChunk.state = StripingChunk.MISSING;
+            alignedStripe.missingChunksNum++;
+            if (alignedStripe.missingChunksNum > parityBlkNum) {
+              clearFutures(futures.keySet());
+              throw new IOException("Too many blocks are missing: "
+                  + alignedStripe);
+            }
+
+            prepareDecodeInputs();
+            prepareParityChunk();
+            // close the corresponding reader
+            closeReader(r.index);
+
+            for (int i = 0; i < alignedStripe.chunks.length; i++) {
+              StripingChunk chunk = alignedStripe.chunks[i];
+              if (chunk != null && chunk.state == StripingChunk.REQUESTED) {
+                readChunk(service, blocks[i], i, corruptedBlockMap);
+              }
+            }
+          }
+        } catch (InterruptedException ie) {
+          String err = "Read request interrupted";
+          DFSClient.LOG.error(err);
+          clearFutures(futures.keySet());
+          // Don't decode if read interrupted
+          throw new InterruptedIOException(err);
+        }
+      }
+
+      if (alignedStripe.missingChunksNum > 0) {
+        decode();
+      }
+    }
+  }
+
+  class PositionStripeReader extends StripeReader {
+    private byte[][] decodeInputs = null;
+
+    PositionStripeReader(CompletionService<Void> service,
+        AlignedStripe alignedStripe) {
+      super(service, alignedStripe);
+    }
+
+    @Override
+    void readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      DatanodeInfo loc = block.getLocations()[0];
+      StorageType type = block.getStorageTypes()[0];
+      DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+          loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
+          type);
+      StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      chunk.state = StripingChunk.PENDING;
+      Callable<Void> readCallable = getFromOneDataNode(dnAddr,
+          block, alignedStripe.getOffsetInBlock(),
+          alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1,
+          chunk.byteArray.buf(), chunk.byteArray.getOffsets(),
+          chunk.byteArray.getLengths(), corruptedBlockMap, chunkIndex);
+      Future<Void> getFromDNRequest = service.submit(readCallable);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("Submitting striped read request for " + chunkIndex
+            + ". Info of the block: " + block + ", offset in block is "
+            + alignedStripe.getOffsetInBlock() + ", end is "
+            + (alignedStripe.getOffsetInBlock()
+            + alignedStripe.getSpanInBlock() - 1));
+      }
+      futures.put(getFromDNRequest, chunkIndex);
+    }
+
+    @Override
+    void updateState4SuccessRead(StripingChunkReadResult r) {}
+
+    @Override
+    void prepareDecodeInputs() {
+      if (decodeInputs == null) {
+        decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
+      }
+    }
+
+    @Override
+    void prepareParityChunk() {
+      for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
+        if (alignedStripe.chunks[i] == null) {
+          final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+          alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
+          alignedStripe.chunks[i].addByteArraySlice(0,
+              (int) alignedStripe.getSpanInBlock());
+          break;
+        }
+      }
+    }
+
+    @Override
+    void decode() {
+      finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum,
+          alignedStripe);
+      decodeAndFillBuffer(decodeInputs, alignedStripe, dataBlkNum,
+          parityBlkNum, decoder);
+    }
+  }
+
+  class StatefulStripeReader extends StripeReader {
+    ByteBuffer[] decodeInputs;
+    final LocatedBlock[] targetBlocks;
+
+    StatefulStripeReader(CompletionService<Void> service,
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks) {
+      super(service, alignedStripe);
+      this.targetBlocks = targetBlocks;
+    }
+
+    @Override
+    void readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex, Map<ExtendedBlock,
+        Set<DatanodeInfo>> corruptedBlockMap) {
+      StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      chunk.state = StripingChunk.PENDING;
+      ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
+      Callable<Void> readCallable = readCell(blockReaders[chunkIndex],
+          currentNodes[chunkIndex], blockReaderOffsets[chunkIndex],
+          alignedStripe.getOffsetInBlock(), strategy,
+          chunk.byteBuffer.remaining(), corruptedBlockMap);
+      Future<Void> request = readingService.submit(readCallable);
+      futures.put(request, chunkIndex);
+    }
+
+    @Override
+    void updateState4SuccessRead(StripingChunkReadResult result) {
+      Preconditions.checkArgument(
+          result.state == StripingChunkReadResult.SUCCESSFUL);
+      blockReaderOffsets[result.index] =
+          alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock();
+    }
+
+    @Override
+    void prepareDecodeInputs() throws IOException {
+      if (decodeInputs == null) {
+        decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+        ByteBuffer cur = curStripeBuf.duplicate();
+        StripedBlockUtil.VerticalRange range = alignedStripe.range;
+        for (int i = 0; i < dataBlkNum; i++) {
+          cur.limit(cur.capacity());
+          int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
+          cur.position(pos);
+          cur.limit((int) (pos + range.spanInBlock));
+          final int decodeIndex = convertIndex4Decode(i, dataBlkNum,
+              parityBlkNum);
+          decodeInputs[decodeIndex] = cur.slice();
+          if (alignedStripe.chunks[i] == null) {
+            alignedStripe.chunks[i] =
+                new StripingChunk(decodeInputs[decodeIndex]);
+          }
+        }
+      }
+    }
+
+    @Override
+    void prepareParityChunk() throws IOException {
+      for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
+        if (alignedStripe.chunks[i] == null) {
+          final int decodeIndex = convertIndex4Decode(i, dataBlkNum,
+              parityBlkNum);
+          decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
+              (int) alignedStripe.range.spanInBlock);
+          alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
+          if (blockReaders[i] == null) {
+            prepareParityBlockReader(i);
+          }
+          break;
+        }
+      }
+    }
+
+    private void prepareParityBlockReader(int i) throws IOException {
+      // prepare the block reader for the parity chunk
+      LocatedBlock targetBlock = targetBlocks[i];
+      if (targetBlock != null) {
+        final long offsetInBlock = alignedStripe.getOffsetInBlock();
+        DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
+        if (retval != null) {
+          currentNodes[i] = retval.info;
+          blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+              offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
+              retval.addr, retval.storageType, retval.info,
+              DFSStripedInputStream.this.getPos(), retry);
+          blockReaderOffsets[i] = offsetInBlock;
+        }
+      }
+    }
+
+    @Override
+    void decode() {
+      // TODO no copy for data chunks. this depends on HADOOP-12047 for some
+      // decoders to work
+      final int span = (int) alignedStripe.getSpanInBlock();
+      for (int i = 0; i < alignedStripe.chunks.length; i++) {
+        final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
+          for (int j = 0; j < span; j++) {
+            decodeInputs[decodeIndex].put((byte) 0);
+          }
+          decodeInputs[decodeIndex].flip();
+        } else if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
+          decodeInputs[decodeIndex].position(0);
+          decodeInputs[decodeIndex].limit(span);
+        }
+      }
+      int[] decodeIndices = new int[parityBlkNum];
+      int pos = 0;
+      for (int i = 0; i < alignedStripe.chunks.length; i++) {
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state == StripingChunk.MISSING) {
+          decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+        }
+      }
+      decodeIndices = Arrays.copyOf(decodeIndices, pos);
+
+      final int decodeChunkNum = decodeIndices.length;
+      ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
+      for (int i = 0; i < decodeChunkNum; i++) {
+        outputs[i] = decodeInputs[decodeIndices[i]];
+        outputs[i].position(0);
+        outputs[i].limit((int) alignedStripe.range.spanInBlock);
+        decodeInputs[decodeIndices[i]] = null;
+      }
+
+      decoder.decode(decodeInputs, decodeIndices, outputs);
     }
-    Future<T> future = null;
-    try {
-      future = service.take();
-      future.get();
-      futures.remove(future);
-    } catch (ExecutionException | CancellationException e) {
-      // already logged in the Callable
-      futures.remove(future);
-    }
-    throw new InterruptedException("let's retry");
   }
 
   /**

+ 147 - 68
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java

@@ -33,6 +33,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.io.IOException;
 import java.util.concurrent.CancellationException;
@@ -79,7 +80,6 @@ public class StripedBlockUtil {
   public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
       int cellSize, int dataBlkNum, int parityBlkNum) {
     int locatedBGSize = bg.getBlockIndices().length;
-    // TODO not considering missing blocks for now, only identify data blocks
     LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
     for (short i = 0; i < locatedBGSize; i++) {
       final int idx = bg.getBlockIndices()[i];
@@ -212,7 +212,7 @@ public class StripedBlockUtil {
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
     } catch (ExecutionException e) {
-      DFSClient.LOG.error("ExecutionException " + e);
+      DFSClient.LOG.warn("ExecutionException " + e);
       return new StripingChunkReadResult(futures.remove(future),
           StripingChunkReadResult.FAILED);
     } catch (CancellationException e) {
@@ -253,12 +253,13 @@ public class StripedBlockUtil {
       int dataBlkNum, int parityBlkNum) {
     byte[][] decodeInputs =
         new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
-    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+    // read the full data aligned stripe
+    for (int i = 0; i < dataBlkNum; i++) {
       if (alignedStripe.chunks[i] == null) {
         final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
         alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
-        alignedStripe.chunks[i].offsetsInBuf.add(0);
-        alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
+        alignedStripe.chunks[i].addByteArraySlice(0,
+            (int) alignedStripe.getSpanInBlock());
       }
     }
     return decodeInputs;
@@ -276,14 +277,9 @@ public class StripedBlockUtil {
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
       final StripingChunk chunk = alignedStripe.chunks[i];
       final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
-      if (chunk.state == StripingChunk.FETCHED) {
-        int posInBuf = 0;
-        for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
-          System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
-              decodeInputs[decodeIndex], posInBuf, chunk.lengthsInBuf.get(j));
-          posInBuf += chunk.lengthsInBuf.get(j);
-        }
-      } else if (chunk.state == StripingChunk.ALLZERO) {
+      if (chunk != null && chunk.state == StripingChunk.FETCHED) {
+        chunk.copyTo(decodeInputs[decodeIndex]);
+      } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
         Arrays.fill(decodeInputs[decodeIndex], (byte) 0);
       } else {
         decodeInputs[decodeIndex] = null;
@@ -315,13 +311,14 @@ public class StripedBlockUtil {
    * Decode based on the given input buffers and schema.
    */
   public static void decodeAndFillBuffer(final byte[][] decodeInputs,
-      byte[] buf, AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
+      AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
       RawErasureDecoder decoder) {
     // Step 1: prepare indices and output buffers for missing data units
     int[] decodeIndices = new int[parityBlkNum];
     int pos = 0;
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
-      if (alignedStripe.chunks[i].state == StripingChunk.MISSING){
+      if (alignedStripe.chunks[i] != null &&
+          alignedStripe.chunks[i].state == StripingChunk.MISSING){
         decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
       }
     }
@@ -338,14 +335,56 @@ public class StripedBlockUtil {
           dataBlkNum, parityBlkNum);
       StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
       if (chunk.state == StripingChunk.MISSING) {
-        int srcPos = 0;
-        for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
-          System.arraycopy(decodeOutputs[i], srcPos, buf,
-              chunk.offsetsInBuf.get(j), chunk.lengthsInBuf.get(j));
-          srcPos += chunk.lengthsInBuf.get(j);
+        chunk.copyFrom(decodeOutputs[i]);
+      }
+    }
+  }
+
+  /**
+   * Similar functionality with {@link #divideByteRangeIntoStripes}, but is used
+   * by stateful read and uses ByteBuffer as reading target buffer. Besides the
+   * read range is within a single stripe thus the calculation logic is simpler.
+   */
+  public static AlignedStripe[] divideOneStripe(ECSchema ecSchema,
+      int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup,
+      long rangeEndInBlockGroup, ByteBuffer buf) {
+    final int dataBlkNum = ecSchema.getNumDataUnits();
+    // Step 1: map the byte range to StripingCells
+    StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize,
+        blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup);
+
+    // Step 2: get the unmerged ranges on each internal block
+    VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize,
+        cells);
+
+    // Step 3: merge into stripes
+    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
+
+    // Step 4: calculate each chunk's position in destination buffer. Since the
+    // whole read range is within a single stripe, the logic is simpler here.
+    int bufOffset = (int) (rangeStartInBlockGroup % (cellSize * dataBlkNum));
+    for (StripingCell cell : cells) {
+      long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
+      long cellEnd = cellStart + cell.size - 1;
+      for (AlignedStripe s : stripes) {
+        long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
+        long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
+        long overlapEnd = Math.min(cellEnd, stripeEnd);
+        int overLapLen = (int) (overlapEnd - overlapStart + 1);
+        if (overLapLen > 0) {
+          Preconditions.checkState(s.chunks[cell.idxInStripe] == null);
+          final int pos = (int) (bufOffset + overlapStart - cellStart);
+          buf.position(pos);
+          buf.limit(pos + overLapLen);
+          s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice());
         }
       }
+      bufOffset += cell.size;
     }
+
+    // Step 5: prepare ALLZERO blocks
+    prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
+    return stripes;
   }
 
   /**
@@ -369,7 +408,7 @@ public class StripedBlockUtil {
       int offsetInBuf) {
 
     // Step 0: analyze range and calculate basic parameters
-    int dataBlkNum = ecSchema.getNumDataUnits();
+    final int dataBlkNum = ecSchema.getNumDataUnits();
 
     // Step 1: map the byte range to StripingCells
     StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize,
@@ -386,7 +425,7 @@ public class StripedBlockUtil {
     calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
 
     // Step 5: prepare ALLZERO blocks
-    prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
+    prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
 
     return stripes;
   }
@@ -403,23 +442,25 @@ public class StripedBlockUtil {
     Preconditions.checkArgument(
         rangeStartInBlockGroup <= rangeEndInBlockGroup &&
             rangeEndInBlockGroup < blockGroup.getBlockSize());
-    int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1);
+    long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1;
     int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
     int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
     int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
     StripingCell[] cells = new StripingCell[numCells];
-    cells[0] = new StripingCell(ecSchema, cellSize, firstCellIdxInBG);
-    cells[numCells - 1] = new StripingCell(ecSchema, cellSize, lastCellIdxInBG);
 
-    cells[0].offset = (int) (rangeStartInBlockGroup % cellSize);
-    cells[0].size =
-        Math.min(cellSize - (int) (rangeStartInBlockGroup % cellSize), len);
+    final int firstCellOffset = (int) (rangeStartInBlockGroup % cellSize);
+    final int firstCellSize =
+        (int) Math.min(cellSize - (rangeStartInBlockGroup % cellSize), len);
+    cells[0] = new StripingCell(ecSchema, firstCellSize, firstCellIdxInBG,
+        firstCellOffset);
     if (lastCellIdxInBG != firstCellIdxInBG) {
-      cells[numCells - 1].size = (int) (rangeEndInBlockGroup % cellSize) + 1;
+      final int lastCellSize = (int) (rangeEndInBlockGroup % cellSize) + 1;
+      cells[numCells - 1] = new StripingCell(ecSchema, lastCellSize,
+          lastCellIdxInBG, 0);
     }
 
     for (int i = 1; i < numCells - 1; i++) {
-      cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG);
+      cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG, 0);
     }
 
     return cells;
@@ -438,8 +479,8 @@ public class StripedBlockUtil {
     long[] startOffsets = new long[dataBlkNum + parityBlkNum];
     Arrays.fill(startOffsets, -1L);
     int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
-    StripingCell firstCell = new StripingCell(ecSchema, cellSize, firstCellIdxInBG);
-    firstCell.offset = (int) (rangeStartInBlockGroup % cellSize);
+    StripingCell firstCell = new StripingCell(ecSchema, cellSize,
+        firstCellIdxInBG, (int) (rangeStartInBlockGroup % cellSize));
     startOffsets[firstCell.idxInStripe] =
         firstCell.idxInInternalBlk * cellSize + firstCell.offset;
     long earliestStart = startOffsets[firstCell.idxInStripe];
@@ -448,7 +489,7 @@ public class StripedBlockUtil {
       if (idx * (long) cellSize >= blockGroup.getBlockSize()) {
         break;
       }
-      StripingCell cell = new StripingCell(ecSchema, cellSize, idx);
+      StripingCell cell = new StripingCell(ecSchema, cellSize, idx, 0);
       startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * (long) cellSize;
       if (startOffsets[cell.idxInStripe] < earliestStart) {
         earliestStart = startOffsets[cell.idxInStripe];
@@ -563,10 +604,8 @@ public class StripedBlockUtil {
         if (s.chunks[cell.idxInStripe] == null) {
           s.chunks[cell.idxInStripe] = new StripingChunk(buf);
         }
-
-        s.chunks[cell.idxInStripe].offsetsInBuf.
-            add((int)(offsetInBuf + done + overlapStart - cellStart));
-        s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen);
+        s.chunks[cell.idxInStripe].addByteArraySlice(
+            (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen);
       }
       done += cell.size;
     }
@@ -577,15 +616,14 @@ public class StripedBlockUtil {
    * size, the chunk should be treated as zero bytes in decoding.
    */
   private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
-      byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
+      AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
     for (AlignedStripe s : stripes) {
       for (int i = 0; i < dataBlkNum; i++) {
         long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(),
             cellSize, dataBlkNum, i);
         if (internalBlkLen <= s.getOffsetInBlock()) {
           Preconditions.checkState(s.chunks[i] == null);
-          s.chunks[i] = new StripingChunk(buf);
-          s.chunks[i].state = StripingChunk.ALLZERO;
+          s.chunks[i] = new StripingChunk(); // chunk state is set to ALLZERO
         }
       }
     }
@@ -615,7 +653,7 @@ public class StripedBlockUtil {
    */
   @VisibleForTesting
   static class StripingCell {
-    public final ECSchema schema;
+    final ECSchema schema;
     /** Logical order in a block group, used when doing I/O to a block group */
     final int idxInBlkGroup;
     final int idxInInternalBlk;
@@ -626,27 +664,17 @@ public class StripedBlockUtil {
      * {@link #size} variable represent the start offset and size of the
      * overlap.
      */
-    int offset;
-    int size;
+    final int offset;
+    final int size;
 
-    StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup) {
+    StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup,
+        int offset) {
       this.schema = ecSchema;
       this.idxInBlkGroup = idxInBlkGroup;
       this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
       this.idxInStripe = idxInBlkGroup -
           this.idxInInternalBlk * ecSchema.getNumDataUnits();
-      this.offset = 0;
-      this.size = cellSize;
-    }
-
-    StripingCell(ECSchema ecSchema, int cellSize, int idxInInternalBlk,
-        int idxInStripe) {
-      this.schema = ecSchema;
-      this.idxInInternalBlk = idxInInternalBlk;
-      this.idxInStripe = idxInStripe;
-      this.idxInBlkGroup =
-          idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
-      this.offset = 0;
+      this.offset = offset;
       this.size = cellSize;
     }
   }
@@ -700,11 +728,6 @@ public class StripedBlockUtil {
       this.chunks = new StripingChunk[width];
     }
 
-    public AlignedStripe(VerticalRange range, int width) {
-      this.range = range;
-      this.chunks = new StripingChunk[width];
-    }
-
     public boolean include(long pos) {
       return range.include(pos);
     }
@@ -777,10 +800,6 @@ public class StripedBlockUtil {
    * |REQUESTED|  |REQUESTED|    ALLZERO    |  |null|  |null| <- AlignedStripe2
    * +---------+  +---------+               |  +----+  +----+
    * <----------- data blocks ------------> | <--- parity --->
-   *
-   * The class also carries {@link #buf}, {@link #offsetsInBuf}, and
-   * {@link #lengthsInBuf} to define how read task for this chunk should
-   * deliver the returned data.
    */
   public static class StripingChunk {
     /** Chunk has been successfully fetched */
@@ -808,11 +827,49 @@ public class StripedBlockUtil {
      * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ...
      */
     public int state = REQUESTED;
-    public byte[] buf;
-    public List<Integer> offsetsInBuf;
-    public List<Integer> lengthsInBuf;
+
+    public final ChunkByteArray byteArray;
+    public final ByteBuffer byteBuffer;
 
     public StripingChunk(byte[] buf) {
+      this.byteArray = new ChunkByteArray(buf);
+      byteBuffer = null;
+    }
+
+    public StripingChunk(ByteBuffer buf) {
+      this.byteArray = null;
+      this.byteBuffer = buf;
+    }
+
+    public StripingChunk() {
+      this.byteArray = null;
+      this.byteBuffer = null;
+      this.state = ALLZERO;
+    }
+
+    public void addByteArraySlice(int offset, int length) {
+      assert byteArray != null;
+      byteArray.offsetsInBuf.add(offset);
+      byteArray.lengthsInBuf.add(length);
+    }
+
+    void copyTo(byte[] target) {
+      assert byteArray != null;
+      byteArray.copyTo(target);
+    }
+
+    void copyFrom(byte[] src) {
+      assert byteArray != null;
+      byteArray.copyFrom(src);
+    }
+  }
+
+  public static class ChunkByteArray {
+    private final byte[] buf;
+    private final List<Integer> offsetsInBuf;
+    private final List<Integer> lengthsInBuf;
+
+    ChunkByteArray(byte[] buf) {
       this.buf = buf;
       this.offsetsInBuf = new ArrayList<>();
       this.lengthsInBuf = new ArrayList<>();
@@ -833,6 +890,28 @@ public class StripedBlockUtil {
       }
       return lens;
     }
+
+    public byte[] buf() {
+      return buf;
+    }
+
+    void copyTo(byte[] target) {
+      int posInBuf = 0;
+      for (int i = 0; i < offsetsInBuf.size(); i++) {
+        System.arraycopy(buf, offsetsInBuf.get(i),
+            target, posInBuf, lengthsInBuf.get(i));
+        posInBuf += lengthsInBuf.get(i);
+      }
+    }
+
+    void copyFrom(byte[] src) {
+      int srcPos = 0;
+      for (int j = 0; j < offsetsInBuf.size(); j++) {
+        System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j),
+            lengthsInBuf.get(j));
+        srcPos += lengthsInBuf.get(j);
+      }
+    }
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java

@@ -32,7 +32,7 @@ public class StripedFileTestUtil {
   static final int blockSize = cellSize * stripesPerBlock;
   static final int numDNs = dataBlocks + parityBlocks + 2;
 
-  static final Random r = new Random();
+  static final Random random = new Random();
 
   static byte[] generateBytes(int cnt) {
     byte[] bytes = new byte[cnt];

+ 29 - 12
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java

@@ -29,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
@@ -58,28 +59,28 @@ public class TestReadStripedFileWithDecoding {
   }
 
   @Test
-  public void testWritePreadWithDNFailure1() throws IOException {
-    testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0);
+  public void testReadWithDNFailure1() throws IOException {
+    testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0);
   }
 
   @Test
-  public void testWritePreadWithDNFailure2() throws IOException {
-    testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5);
+  public void testReadWithDNFailure2() throws IOException {
+    testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5);
   }
 
   @Test
-  public void testWritePreadWithDNFailure3() throws IOException {
-    testWritePreadWithDNFailure("/foo", cellSize * dataBlocks, 0);
+  public void testReadWithDNFailure3() throws IOException {
+    testReadWithDNFailure("/foo", cellSize * dataBlocks, 0);
   }
 
-  private void testWritePreadWithDNFailure(String file, int fileSize, int startOffsetInFile)
-      throws IOException {
+  private void testReadWithDNFailure(String file, int fileSize,
+      int startOffsetInFile) throws IOException {
     final int failedDNIdx = 2;
     Path testPath = new Path(file);
     final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
     DFSTestUtil.writeFile(fs, testPath, bytes);
 
-    // shut down the DN that holds the last internal data block
+    // shut down the DN that holds an internal data block
     BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
         cellSize);
     String name = (locs[0].getNames())[failedDNIdx];
@@ -99,14 +100,30 @@ public class TestReadStripedFileWithDecoding {
           fileSize - startOffsetInFile, readLen);
 
       byte[] expected = new byte[readLen];
-      for (int i = startOffsetInFile; i < fileSize; i++) {
-        expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i);
-      }
+      System.arraycopy(bytes, startOffsetInFile, expected, 0,
+          fileSize - startOffsetInFile);
 
       for (int i = startOffsetInFile; i < fileSize; i++) {
         Assert.assertEquals("Byte at " + i + " should be the same",
             expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
       }
     }
+
+    // stateful read
+    ByteBuffer result = ByteBuffer.allocate(fileSize);
+    ByteBuffer buf = ByteBuffer.allocate(1024);
+    int readLen = 0;
+    int ret;
+    try (FSDataInputStream in = fs.open(testPath)) {
+      while ((ret = in.read(buf)) >= 0) {
+        readLen += ret;
+        buf.flip();
+        result.put(buf);
+        buf.clear();
+      }
+    }
+    Assert.assertEquals("The length of file should be the same to write size",
+        fileSize, readLen);
+    Assert.assertArrayEquals(bytes, result.array());
   }
 }

+ 63 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java

@@ -17,17 +17,21 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.EOFException;
@@ -41,12 +45,13 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
 import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock;
 
 public class TestWriteReadStripedFile {
+  public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class);
   private static MiniDFSCluster cluster;
   private static FileSystem fs;
   private static Configuration conf;
 
-  @BeforeClass
-  public static void setup() throws IOException {
+  @Before
+  public void setup() throws IOException {
     conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
@@ -55,8 +60,8 @@ public class TestWriteReadStripedFile {
     fs = cluster.getFileSystem();
   }
 
-  @AfterClass
-  public static void tearDown() throws IOException {
+  @After
+  public void tearDown() throws IOException {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -65,75 +70,98 @@ public class TestWriteReadStripedFile {
   @Test
   public void testFileEmpty() throws IOException {
     testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
+    testOneFileUsingDFSStripedInputStream("/EmptyFile2", 0, true);
   }
 
   @Test
   public void testFileSmallerThanOneCell1() throws IOException {
     testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", 1, true);
   }
 
   @Test
   public void testFileSmallerThanOneCell2() throws IOException {
     testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", cellSize - 1,
+        true);
   }
 
   @Test
   public void testFileEqualsWithOneCell() throws IOException {
     testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell2", cellSize, true);
   }
 
   @Test
   public void testFileSmallerThanOneStripe1() throws IOException {
     testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
         cellSize * dataBlocks - 1);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2",
+        cellSize * dataBlocks - 1, true);
   }
 
   @Test
   public void testFileSmallerThanOneStripe2() throws IOException {
     testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
         cellSize + 123);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2",
+        cellSize + 123, true);
   }
 
   @Test
   public void testFileEqualsWithOneStripe() throws IOException {
     testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
         cellSize * dataBlocks);
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe2",
+        cellSize * dataBlocks, true);
   }
 
   @Test
   public void testFileMoreThanOneStripe1() throws IOException {
     testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
         cellSize * dataBlocks + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe12",
+        cellSize * dataBlocks + 123, true);
   }
 
   @Test
   public void testFileMoreThanOneStripe2() throws IOException {
     testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
         cellSize * dataBlocks + cellSize * dataBlocks + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe22",
+        cellSize * dataBlocks + cellSize * dataBlocks + 123, true);
   }
 
   @Test
   public void testLessThanFullBlockGroup() throws IOException {
     testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
         cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
+    testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup2",
+        cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize, true);
   }
 
   @Test
   public void testFileFullBlockGroup() throws IOException {
     testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
         blockSize * dataBlocks);
+    testOneFileUsingDFSStripedInputStream("/FullBlockGroup2",
+        blockSize * dataBlocks, true);
   }
 
   @Test
   public void testFileMoreThanABlockGroup1() throws IOException {
     testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
         blockSize * dataBlocks + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup12",
+        blockSize * dataBlocks + 123, true);
   }
 
   @Test
   public void testFileMoreThanABlockGroup2() throws IOException {
     testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
         blockSize * dataBlocks + cellSize + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup22",
+        blockSize * dataBlocks + cellSize + 123, true);
   }
 
 
@@ -142,6 +170,9 @@ public class TestWriteReadStripedFile {
     testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
         blockSize * dataBlocks * 3 + cellSize * dataBlocks
             + cellSize + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup32",
+        blockSize * dataBlocks * 3 + cellSize * dataBlocks
+            + cellSize + 123, true);
   }
 
   private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
@@ -158,12 +189,23 @@ public class TestWriteReadStripedFile {
 
   private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
       throws IOException {
+    testOneFileUsingDFSStripedInputStream(src, fileLength, false);
+  }
+
+  private void testOneFileUsingDFSStripedInputStream(String src, int fileLength,
+      boolean withDataNodeFailure) throws IOException {
     final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
     Path srcPath = new Path(src);
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
 
     verifyLength(fs, srcPath, fileLength);
 
+    if (withDataNodeFailure) {
+      int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks);
+      LOG.info("stop DataNode " + dnIndex);
+      stopDataNode(srcPath, dnIndex);
+    }
+
     byte[] smallBuf = new byte[1024];
     byte[] largeBuf = new byte[fileLength + 100];
     verifyPread(fs, srcPath, fileLength, expected, largeBuf);
@@ -177,6 +219,21 @@ public class TestWriteReadStripedFile {
         ByteBuffer.allocate(1024));
   }
 
+  private void stopDataNode(Path path, int failedDNIdx)
+      throws IOException {
+    BlockLocation[] locs = fs.getFileBlockLocations(path, 0, cellSize);
+    if (locs != null && locs.length > 0) {
+      String name = (locs[0].getNames())[failedDNIdx];
+      for (DataNode dn : cluster.getDataNodes()) {
+        int port = dn.getXferPort();
+        if (name.contains(Integer.toString(port))) {
+          dn.shutdown();
+          break;
+        }
+      }
+    }
+  }
+
   @Test
   public void testWriteReadUsingWebHdfs() throws Exception {
     int fileLength = blockSize * dataBlocks + cellSize + 123;

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java

@@ -152,7 +152,7 @@ public class TestStripedBlockUtil {
     int done = 0;
     while (done < bgSize) {
       Preconditions.checkState(done % CELLSIZE == 0);
-      StripingCell cell = new StripingCell(SCEHMA, CELLSIZE, done / CELLSIZE);
+      StripingCell cell = new StripingCell(SCEHMA, CELLSIZE, done / CELLSIZE, 0);
       int idxInStripe = cell.idxInStripe;
       int size = Math.min(CELLSIZE, bgSize - done);
       for (int i = 0; i < size; i++) {
@@ -176,8 +176,7 @@ public class TestStripedBlockUtil {
       assertFalse(blocks[i].isStriped());
       assertEquals(i,
           BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock()));
-      assertEquals(i * CELLSIZE, blocks[i].getStartOffset());
-      /** TODO: properly define {@link LocatedBlock#offset} for internal blocks */
+      assertEquals(0, blocks[i].getStartOffset());
       assertEquals(1, blocks[i].getLocations().length);
       assertEquals(i, blocks[i].getLocations()[0].getIpcPort());
       assertEquals(i, blocks[i].getLocations()[0].getXferPort());
@@ -256,11 +255,12 @@ public class TestStripedBlockUtil {
                 continue;
               }
               int done = 0;
-              for (int j = 0; j < chunk.getLengths().length; j++) {
+              for (int j = 0; j < chunk.byteArray.getLengths().length; j++) {
                 System.arraycopy(internalBlkBufs[i],
                     (int) stripe.getOffsetInBlock() + done, assembled,
-                    chunk.getOffsets()[j], chunk.getLengths()[j]);
-                done += chunk.getLengths()[j];
+                    chunk.byteArray.getOffsets()[j],
+                    chunk.byteArray.getLengths()[j]);
+                done += chunk.byteArray.getLengths()[j];
               }
             }
           }