|
@@ -17,24 +17,21 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
-import org.apache.hadoop.fs.ChecksumException;
|
|
|
import org.apache.hadoop.fs.ReadOption;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
|
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.StripeReader.BlockReaderInfo;
|
|
|
+import org.apache.hadoop.hdfs.StripeReader.ReaderRetryPolicy;
|
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
|
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
|
|
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
|
|
|
import org.apache.hadoop.io.ByteBufferPool;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
|
|
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
|
|
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
|
|
-
|
|
|
import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
@@ -44,7 +41,6 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
-import java.io.InterruptedIOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
@@ -53,111 +49,32 @@ import java.util.EnumSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.concurrent.CompletionService;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ExecutorCompletionService;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
|
/**
|
|
|
- * DFSStripedInputStream reads from striped block groups
|
|
|
+ * DFSStripedInputStream reads from striped block groups.
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
public class DFSStripedInputStream extends DFSInputStream {
|
|
|
|
|
|
- private static class ReaderRetryPolicy {
|
|
|
- private int fetchEncryptionKeyTimes = 1;
|
|
|
- private int fetchTokenTimes = 1;
|
|
|
-
|
|
|
- void refetchEncryptionKey() {
|
|
|
- fetchEncryptionKeyTimes--;
|
|
|
- }
|
|
|
-
|
|
|
- void refetchToken() {
|
|
|
- fetchTokenTimes--;
|
|
|
- }
|
|
|
-
|
|
|
- boolean shouldRefetchEncryptionKey() {
|
|
|
- return fetchEncryptionKeyTimes > 0;
|
|
|
- }
|
|
|
-
|
|
|
- boolean shouldRefetchToken() {
|
|
|
- return fetchTokenTimes > 0;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** Used to indicate the buffered data's range in the block group */
|
|
|
- private static class StripeRange {
|
|
|
- /** start offset in the block group (inclusive) */
|
|
|
- final long offsetInBlock;
|
|
|
- /** length of the stripe range */
|
|
|
- final long length;
|
|
|
-
|
|
|
- StripeRange(long offsetInBlock, long length) {
|
|
|
- Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
|
|
- this.offsetInBlock = offsetInBlock;
|
|
|
- this.length = length;
|
|
|
- }
|
|
|
-
|
|
|
- boolean include(long pos) {
|
|
|
- return pos >= offsetInBlock && pos < offsetInBlock + length;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static class BlockReaderInfo {
|
|
|
- final BlockReader reader;
|
|
|
- final DatanodeInfo datanode;
|
|
|
- /**
|
|
|
- * when initializing block readers, their starting offsets are set to the same
|
|
|
- * number: the smallest internal block offsets among all the readers. This is
|
|
|
- * because it is possible that for some internal blocks we have to read
|
|
|
- * "backwards" for decoding purpose. We thus use this offset array to track
|
|
|
- * offsets for all the block readers so that we can skip data if necessary.
|
|
|
- */
|
|
|
- long blockReaderOffset;
|
|
|
- /**
|
|
|
- * We use this field to indicate whether we should use this reader. In case
|
|
|
- * we hit any issue with this reader, we set this field to true and avoid
|
|
|
- * using it for the next stripe.
|
|
|
- */
|
|
|
- boolean shouldSkip = false;
|
|
|
-
|
|
|
- BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
|
|
|
- this.reader = reader;
|
|
|
- this.datanode = dn;
|
|
|
- this.blockReaderOffset = offset;
|
|
|
- }
|
|
|
-
|
|
|
- void setOffset(long offset) {
|
|
|
- this.blockReaderOffset = offset;
|
|
|
- }
|
|
|
-
|
|
|
- void skip() {
|
|
|
- this.shouldSkip = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
|
|
-
|
|
|
private final BlockReaderInfo[] blockReaders;
|
|
|
private final int cellSize;
|
|
|
private final short dataBlkNum;
|
|
|
private final short parityBlkNum;
|
|
|
private final int groupSize;
|
|
|
- /** the buffer for a complete stripe */
|
|
|
+ /** the buffer for a complete stripe. */
|
|
|
private ByteBuffer curStripeBuf;
|
|
|
private ByteBuffer parityBuf;
|
|
|
private final ErasureCodingPolicy ecPolicy;
|
|
|
private final RawErasureDecoder decoder;
|
|
|
|
|
|
/**
|
|
|
- * indicate the start/end offset of the current buffered stripe in the
|
|
|
- * block group
|
|
|
+ * Indicate the start/end offset of the current buffered stripe in the
|
|
|
+ * block group.
|
|
|
*/
|
|
|
private StripeRange curStripeRange;
|
|
|
- private final CompletionService<Void> readingService;
|
|
|
|
|
|
/**
|
|
|
* When warning the user of a lost block in striping mode, we remember the
|
|
@@ -167,8 +84,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
*
|
|
|
* To minimize the overhead, we only store the datanodeUuid in this set
|
|
|
*/
|
|
|
- private final Set<String> warnedNodes = Collections.newSetFromMap(
|
|
|
- new ConcurrentHashMap<String, Boolean>());
|
|
|
+ private final Set<String> warnedNodes =
|
|
|
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
|
|
|
|
|
|
DFSStripedInputStream(DFSClient dfsClient, String src,
|
|
|
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
|
|
@@ -183,8 +100,6 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
groupSize = dataBlkNum + parityBlkNum;
|
|
|
blockReaders = new BlockReaderInfo[groupSize];
|
|
|
curStripeRange = new StripeRange(0, 0);
|
|
|
- readingService =
|
|
|
- new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
|
|
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
|
|
|
dataBlkNum, parityBlkNum);
|
|
|
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
|
|
@@ -198,7 +113,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
return decoder.preferDirectBuffer();
|
|
|
}
|
|
|
|
|
|
- private void resetCurStripeBuffer() {
|
|
|
+ void resetCurStripeBuffer() {
|
|
|
if (curStripeBuf == null) {
|
|
|
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
|
|
cellSize * dataBlkNum);
|
|
@@ -207,7 +122,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
curStripeRange = new StripeRange(0, 0);
|
|
|
}
|
|
|
|
|
|
- private ByteBuffer getParityBuffer() {
|
|
|
+ protected ByteBuffer getParityBuffer() {
|
|
|
if (parityBuf == null) {
|
|
|
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
|
|
cellSize * parityBlkNum);
|
|
@@ -216,6 +131,29 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
return parityBuf;
|
|
|
}
|
|
|
|
|
|
+ protected ByteBuffer getCurStripeBuf() {
|
|
|
+ return curStripeBuf;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String getSrc() {
|
|
|
+ return src;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected DFSClient getDFSClient() {
|
|
|
+ return dfsClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected LocatedBlocks getLocatedBlocks() {
|
|
|
+ return locatedBlocks;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected ByteBufferPool getBufferPool() {
|
|
|
+ return BUFFER_POOL;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected ThreadPoolExecutor getStripedReadsThreadPool(){
|
|
|
+ return dfsClient.getStripedReadsThreadPool();
|
|
|
+ }
|
|
|
/**
|
|
|
* When seeking into a new block group, create blockReader for each internal
|
|
|
* block in the group.
|
|
@@ -268,7 +206,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
blockEnd = -1;
|
|
|
}
|
|
|
|
|
|
- private void closeReader(BlockReaderInfo readerInfo) {
|
|
|
+ protected void closeReader(BlockReaderInfo readerInfo) {
|
|
|
if (readerInfo != null) {
|
|
|
if (readerInfo.reader != null) {
|
|
|
try {
|
|
@@ -288,6 +226,59 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
return pos - currentLocatedBlock.getStartOffset();
|
|
|
}
|
|
|
|
|
|
+ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
|
|
|
+ LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
|
|
+ int chunkIndex) throws IOException {
|
|
|
+ BlockReader reader = null;
|
|
|
+ final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
|
|
+ DFSInputStream.DNAddrPair dnInfo =
|
|
|
+ new DFSInputStream.DNAddrPair(null, null, null);
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ // the cached block location might have been re-fetched, so always
|
|
|
+ // get it from cache.
|
|
|
+ block = refreshLocatedBlock(block);
|
|
|
+ targetBlocks[chunkIndex] = block;
|
|
|
+
|
|
|
+ // internal block has one location, just rule out the deadNodes
|
|
|
+ dnInfo = getBestNodeDNAddrPair(block, null);
|
|
|
+ if (dnInfo == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ reader = getBlockReader(block, offsetInBlock,
|
|
|
+ block.getBlockSize() - offsetInBlock,
|
|
|
+ dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (e instanceof InvalidEncryptionKeyException &&
|
|
|
+ retry.shouldRefetchEncryptionKey()) {
|
|
|
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
|
|
+ + "encryption key was invalid when connecting to " + dnInfo.addr
|
|
|
+ + " : " + e);
|
|
|
+ dfsClient.clearDataEncryptionKey();
|
|
|
+ retry.refetchEncryptionKey();
|
|
|
+ } else if (retry.shouldRefetchToken() &&
|
|
|
+ tokenRefetchNeeded(e, dnInfo.addr)) {
|
|
|
+ fetchBlockAt(block.getStartOffset());
|
|
|
+ retry.refetchToken();
|
|
|
+ } else {
|
|
|
+ //TODO: handles connection issues
|
|
|
+ DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
|
|
|
+ "block" + block.getBlock(), e);
|
|
|
+ // re-fetch the block in case the block has been moved
|
|
|
+ fetchBlockAt(block.getStartOffset());
|
|
|
+ addToDeadNodes(dnInfo.info);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (reader != null) {
|
|
|
+ readerInfos[chunkIndex] =
|
|
|
+ new BlockReaderInfo(reader, dnInfo.info, offsetInBlock);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Read a new stripe covering the current position, and store the data in the
|
|
|
* {@link #curStripeBuf}.
|
|
@@ -303,20 +294,20 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
|
|
|
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
|
|
|
- (stripeIndex * stripeLen), stripeLen);
|
|
|
- StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
|
|
|
- stripeLimit - stripeBufOffset);
|
|
|
+ StripeRange stripeRange =
|
|
|
+ new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset);
|
|
|
|
|
|
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
|
|
|
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
|
|
|
cellSize, blockGroup, offsetInBlockGroup,
|
|
|
- offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
|
|
|
+ offsetInBlockGroup + stripeRange.getLength() - 1, curStripeBuf);
|
|
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
|
|
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
|
|
// read the whole stripe
|
|
|
for (AlignedStripe stripe : stripes) {
|
|
|
// Parse group to get chosen DN location
|
|
|
- StripeReader sreader = new StatefulStripeReader(readingService, stripe,
|
|
|
- blks, blockReaders, corruptedBlocks);
|
|
|
+ StripeReader sreader = new StatefulStripeReader(stripe, ecPolicy, blks,
|
|
|
+ blockReaders, corruptedBlocks, decoder, this);
|
|
|
sreader.readStripe();
|
|
|
}
|
|
|
curStripeBuf.position(stripeBufOffset);
|
|
@@ -324,69 +315,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
curStripeRange = stripeRange;
|
|
|
}
|
|
|
|
|
|
- private Callable<Void> readCells(final BlockReader reader,
|
|
|
- final DatanodeInfo datanode, final long currentReaderOffset,
|
|
|
- final long targetReaderOffset, final ByteBufferStrategy[] strategies,
|
|
|
- final ExtendedBlock currentBlock,
|
|
|
- final CorruptedBlocks corruptedBlocks) {
|
|
|
- return new Callable<Void>() {
|
|
|
- @Override
|
|
|
- public Void call() throws Exception {
|
|
|
- // reader can be null if getBlockReaderWithRetry failed or
|
|
|
- // the reader hit exception before
|
|
|
- if (reader == null) {
|
|
|
- throw new IOException("The BlockReader is null. " +
|
|
|
- "The BlockReader creation failed or the reader hit exception.");
|
|
|
- }
|
|
|
- Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
|
|
|
- if (currentReaderOffset < targetReaderOffset) {
|
|
|
- long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
|
|
|
- Preconditions.checkState(
|
|
|
- skipped == targetReaderOffset - currentReaderOffset);
|
|
|
- }
|
|
|
- int result = 0;
|
|
|
- for (ByteBufferStrategy strategy : strategies) {
|
|
|
- result += readToBuffer(reader, datanode, strategy, currentBlock,
|
|
|
- corruptedBlocks);
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- private int readToBuffer(BlockReader blockReader,
|
|
|
- DatanodeInfo currentNode, ByteBufferStrategy strategy,
|
|
|
- ExtendedBlock currentBlock,
|
|
|
- CorruptedBlocks corruptedBlocks)
|
|
|
- throws IOException {
|
|
|
- final int targetLength = strategy.getTargetLength();
|
|
|
- int length = 0;
|
|
|
- try {
|
|
|
- while (length < targetLength) {
|
|
|
- int ret = strategy.readFromBlock(blockReader);
|
|
|
- if (ret < 0) {
|
|
|
- throw new IOException("Unexpected EOS from the reader");
|
|
|
- }
|
|
|
- length += ret;
|
|
|
- }
|
|
|
- return length;
|
|
|
- } catch (ChecksumException ce) {
|
|
|
- DFSClient.LOG.warn("Found Checksum error for "
|
|
|
- + currentBlock + " from " + currentNode
|
|
|
- + " at " + ce.getPos());
|
|
|
- // we want to remember which block replicas we have tried
|
|
|
- corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
|
|
- throw ce;
|
|
|
- } catch (IOException e) {
|
|
|
- DFSClient.LOG.warn("Exception while reading from "
|
|
|
- + currentBlock + " of " + src + " from "
|
|
|
- + currentNode, e);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
- * Seek to a new arbitrary location
|
|
|
+ * Seek to a new arbitrary location.
|
|
|
*/
|
|
|
@Override
|
|
|
public synchronized void seek(long targetPos) throws IOException {
|
|
@@ -469,7 +399,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Copy the data from {@link #curStripeBuf} into the given buffer
|
|
|
+ * Copy the data from {@link #curStripeBuf} into the given buffer.
|
|
|
* @param strategy the ReaderStrategy containing the given buffer
|
|
|
* @param length target length
|
|
|
* @return number of bytes copied
|
|
@@ -530,17 +460,19 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
|
|
|
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
|
|
|
ecPolicy, cellSize, blockGroup, start, end, buf);
|
|
|
- CompletionService<Void> readService = new ExecutorCompletionService<>(
|
|
|
- dfsClient.getStripedReadsThreadPool());
|
|
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
|
|
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
|
|
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
|
|
|
try {
|
|
|
for (AlignedStripe stripe : stripes) {
|
|
|
// Parse group to get chosen DN location
|
|
|
- StripeReader preader = new PositionStripeReader(readService, stripe,
|
|
|
- blks, preaderInfos, corruptedBlocks);
|
|
|
- preader.readStripe();
|
|
|
+ StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
|
|
|
+ preaderInfos, corruptedBlocks, decoder, this);
|
|
|
+ try {
|
|
|
+ preader.readStripe();
|
|
|
+ } finally {
|
|
|
+ preader.close();
|
|
|
+ }
|
|
|
}
|
|
|
buf.position(buf.position() + (int)(end - start + 1));
|
|
|
} finally {
|
|
@@ -570,376 +502,6 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * The reader for reading a complete {@link AlignedStripe}. Note that an
|
|
|
- * {@link AlignedStripe} may cross multiple stripes with cellSize width.
|
|
|
- */
|
|
|
- private abstract class StripeReader {
|
|
|
- final Map<Future<Void>, Integer> futures = new HashMap<>();
|
|
|
- final AlignedStripe alignedStripe;
|
|
|
- final CompletionService<Void> service;
|
|
|
- final LocatedBlock[] targetBlocks;
|
|
|
- final CorruptedBlocks corruptedBlocks;
|
|
|
- final BlockReaderInfo[] readerInfos;
|
|
|
-
|
|
|
- StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
|
|
|
- LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
|
|
- CorruptedBlocks corruptedBlocks) {
|
|
|
- this.service = service;
|
|
|
- this.alignedStripe = alignedStripe;
|
|
|
- this.targetBlocks = targetBlocks;
|
|
|
- this.readerInfos = readerInfos;
|
|
|
- this.corruptedBlocks = corruptedBlocks;
|
|
|
- }
|
|
|
-
|
|
|
- /** prepare all the data chunks */
|
|
|
- abstract void prepareDecodeInputs();
|
|
|
-
|
|
|
- /** prepare the parity chunk and block reader if necessary */
|
|
|
- abstract boolean prepareParityChunk(int index);
|
|
|
-
|
|
|
- abstract void decode();
|
|
|
-
|
|
|
- void updateState4SuccessRead(StripingChunkReadResult result) {
|
|
|
- Preconditions.checkArgument(
|
|
|
- result.state == StripingChunkReadResult.SUCCESSFUL);
|
|
|
- readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
|
|
|
- + alignedStripe.getSpanInBlock());
|
|
|
- }
|
|
|
-
|
|
|
- private void checkMissingBlocks() throws IOException {
|
|
|
- if (alignedStripe.missingChunksNum > parityBlkNum) {
|
|
|
- clearFutures(futures.keySet());
|
|
|
- throw new IOException(alignedStripe.missingChunksNum
|
|
|
- + " missing blocks, the stripe is: " + alignedStripe
|
|
|
- + "; locatedBlocks is: " + locatedBlocks);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * We need decoding. Thus go through all the data chunks and make sure we
|
|
|
- * submit read requests for all of them.
|
|
|
- */
|
|
|
- private void readDataForDecoding() throws IOException {
|
|
|
- prepareDecodeInputs();
|
|
|
- for (int i = 0; i < dataBlkNum; i++) {
|
|
|
- Preconditions.checkNotNull(alignedStripe.chunks[i]);
|
|
|
- if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
|
|
|
- if (!readChunk(targetBlocks[i], i)) {
|
|
|
- alignedStripe.missingChunksNum++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- checkMissingBlocks();
|
|
|
- }
|
|
|
-
|
|
|
- void readParityChunks(int num) throws IOException {
|
|
|
- for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
|
|
|
- i++) {
|
|
|
- if (alignedStripe.chunks[i] == null) {
|
|
|
- if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
|
|
|
- j++;
|
|
|
- } else {
|
|
|
- alignedStripe.missingChunksNum++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- checkMissingBlocks();
|
|
|
- }
|
|
|
-
|
|
|
- boolean createBlockReader(LocatedBlock block, int chunkIndex)
|
|
|
- throws IOException {
|
|
|
- BlockReader reader = null;
|
|
|
- final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
|
|
- DNAddrPair dnInfo = new DNAddrPair(null, null, null);
|
|
|
-
|
|
|
- while(true) {
|
|
|
- try {
|
|
|
- // the cached block location might have been re-fetched, so always
|
|
|
- // get it from cache.
|
|
|
- block = refreshLocatedBlock(block);
|
|
|
- targetBlocks[chunkIndex] = block;
|
|
|
-
|
|
|
- // internal block has one location, just rule out the deadNodes
|
|
|
- dnInfo = getBestNodeDNAddrPair(block, null);
|
|
|
- if (dnInfo == null) {
|
|
|
- break;
|
|
|
- }
|
|
|
- reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
|
|
|
- block.getBlockSize() - alignedStripe.getOffsetInBlock(),
|
|
|
- dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof InvalidEncryptionKeyException &&
|
|
|
- retry.shouldRefetchEncryptionKey()) {
|
|
|
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
|
|
- + "encryption key was invalid when connecting to " + dnInfo.addr
|
|
|
- + " : " + e);
|
|
|
- dfsClient.clearDataEncryptionKey();
|
|
|
- retry.refetchEncryptionKey();
|
|
|
- } else if (retry.shouldRefetchToken() &&
|
|
|
- tokenRefetchNeeded(e, dnInfo.addr)) {
|
|
|
- fetchBlockAt(block.getStartOffset());
|
|
|
- retry.refetchToken();
|
|
|
- } else {
|
|
|
- //TODO: handles connection issues
|
|
|
- DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
|
|
|
- "block" + block.getBlock(), e);
|
|
|
- // re-fetch the block in case the block has been moved
|
|
|
- fetchBlockAt(block.getStartOffset());
|
|
|
- addToDeadNodes(dnInfo.info);
|
|
|
- }
|
|
|
- }
|
|
|
- if (reader != null) {
|
|
|
- readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
|
|
|
- alignedStripe.getOffsetInBlock());
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
|
|
|
- if (chunk.useByteBuffer()) {
|
|
|
- ByteBufferStrategy strategy = new ByteBufferStrategy(
|
|
|
- chunk.getByteBuffer(), readStatistics, dfsClient);
|
|
|
- return new ByteBufferStrategy[]{strategy};
|
|
|
- } else {
|
|
|
- ByteBufferStrategy[] strategies =
|
|
|
- new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
|
|
|
- for (int i = 0; i < strategies.length; i++) {
|
|
|
- ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
|
|
|
- strategies[i] =
|
|
|
- new ByteBufferStrategy(buffer, readStatistics, dfsClient);
|
|
|
- }
|
|
|
- return strategies;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- boolean readChunk(final LocatedBlock block, int chunkIndex)
|
|
|
- throws IOException {
|
|
|
- final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
|
|
|
- if (block == null) {
|
|
|
- chunk.state = StripingChunk.MISSING;
|
|
|
- return false;
|
|
|
- }
|
|
|
- if (readerInfos[chunkIndex] == null) {
|
|
|
- if (!createBlockReader(block, chunkIndex)) {
|
|
|
- chunk.state = StripingChunk.MISSING;
|
|
|
- return false;
|
|
|
- }
|
|
|
- } else if (readerInfos[chunkIndex].shouldSkip) {
|
|
|
- chunk.state = StripingChunk.MISSING;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- chunk.state = StripingChunk.PENDING;
|
|
|
- Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
|
|
|
- readerInfos[chunkIndex].datanode,
|
|
|
- readerInfos[chunkIndex].blockReaderOffset,
|
|
|
- alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
|
|
|
- block.getBlock(), corruptedBlocks);
|
|
|
-
|
|
|
- Future<Void> request = service.submit(readCallable);
|
|
|
- futures.put(request, chunkIndex);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /** read the whole stripe. do decoding if necessary */
|
|
|
- void readStripe() throws IOException {
|
|
|
- for (int i = 0; i < dataBlkNum; i++) {
|
|
|
- if (alignedStripe.chunks[i] != null &&
|
|
|
- alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
|
|
- if (!readChunk(targetBlocks[i], i)) {
|
|
|
- alignedStripe.missingChunksNum++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // There are missing block locations at this stage. Thus we need to read
|
|
|
- // the full stripe and one more parity block.
|
|
|
- if (alignedStripe.missingChunksNum > 0) {
|
|
|
- checkMissingBlocks();
|
|
|
- readDataForDecoding();
|
|
|
- // read parity chunks
|
|
|
- readParityChunks(alignedStripe.missingChunksNum);
|
|
|
- }
|
|
|
- // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
|
|
|
-
|
|
|
- // Input buffers for potential decode operation, which remains null until
|
|
|
- // first read failure
|
|
|
- while (!futures.isEmpty()) {
|
|
|
- try {
|
|
|
- StripingChunkReadResult r = StripedBlockUtil
|
|
|
- .getNextCompletedStripedRead(service, futures, 0);
|
|
|
- if (DFSClient.LOG.isDebugEnabled()) {
|
|
|
- DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
|
|
|
- + alignedStripe);
|
|
|
- }
|
|
|
- StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
|
|
- Preconditions.checkNotNull(returnedChunk);
|
|
|
- Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
|
|
|
-
|
|
|
- if (r.state == StripingChunkReadResult.SUCCESSFUL) {
|
|
|
- returnedChunk.state = StripingChunk.FETCHED;
|
|
|
- alignedStripe.fetchedChunksNum++;
|
|
|
- updateState4SuccessRead(r);
|
|
|
- if (alignedStripe.fetchedChunksNum == dataBlkNum) {
|
|
|
- clearFutures(futures.keySet());
|
|
|
- break;
|
|
|
- }
|
|
|
- } else {
|
|
|
- returnedChunk.state = StripingChunk.MISSING;
|
|
|
- // close the corresponding reader
|
|
|
- closeReader(readerInfos[r.index]);
|
|
|
-
|
|
|
- final int missing = alignedStripe.missingChunksNum;
|
|
|
- alignedStripe.missingChunksNum++;
|
|
|
- checkMissingBlocks();
|
|
|
-
|
|
|
- readDataForDecoding();
|
|
|
- readParityChunks(alignedStripe.missingChunksNum - missing);
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- String err = "Read request interrupted";
|
|
|
- DFSClient.LOG.error(err);
|
|
|
- clearFutures(futures.keySet());
|
|
|
- // Don't decode if read interrupted
|
|
|
- throw new InterruptedIOException(err);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (alignedStripe.missingChunksNum > 0) {
|
|
|
- decode();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- class PositionStripeReader extends StripeReader {
|
|
|
- private ByteBuffer[] decodeInputs = null;
|
|
|
-
|
|
|
- PositionStripeReader(CompletionService<Void> service,
|
|
|
- AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
|
|
|
- BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
|
|
|
- super(service, alignedStripe, targetBlocks, readerInfos,
|
|
|
- corruptedBlocks);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void prepareDecodeInputs() {
|
|
|
- if (decodeInputs == null) {
|
|
|
- decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
|
|
|
- dataBlkNum, parityBlkNum);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- boolean prepareParityChunk(int index) {
|
|
|
- Preconditions.checkState(index >= dataBlkNum &&
|
|
|
- alignedStripe.chunks[index] == null);
|
|
|
- alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void decode() {
|
|
|
- StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe);
|
|
|
- StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
|
|
|
- dataBlkNum, parityBlkNum, decoder);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- class StatefulStripeReader extends StripeReader {
|
|
|
- ByteBuffer[] decodeInputs;
|
|
|
-
|
|
|
- StatefulStripeReader(CompletionService<Void> service,
|
|
|
- AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
|
|
|
- BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
|
|
|
- super(service, alignedStripe, targetBlocks, readerInfos,
|
|
|
- corruptedBlocks);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void prepareDecodeInputs() {
|
|
|
- if (decodeInputs == null) {
|
|
|
- decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
|
|
|
- final ByteBuffer cur;
|
|
|
- synchronized (DFSStripedInputStream.this) {
|
|
|
- cur = curStripeBuf.duplicate();
|
|
|
- }
|
|
|
- StripedBlockUtil.VerticalRange range = alignedStripe.range;
|
|
|
- for (int i = 0; i < dataBlkNum; i++) {
|
|
|
- cur.limit(cur.capacity());
|
|
|
- int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
|
|
|
- cur.position(pos);
|
|
|
- cur.limit((int) (pos + range.spanInBlock));
|
|
|
- decodeInputs[i] = cur.slice();
|
|
|
- if (alignedStripe.chunks[i] == null) {
|
|
|
- alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- boolean prepareParityChunk(int index) {
|
|
|
- Preconditions.checkState(index >= dataBlkNum
|
|
|
- && alignedStripe.chunks[index] == null);
|
|
|
- if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
|
|
|
- alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
|
|
|
- // we have failed the block reader before
|
|
|
- return false;
|
|
|
- }
|
|
|
- final int parityIndex = index - dataBlkNum;
|
|
|
- ByteBuffer buf = getParityBuffer().duplicate();
|
|
|
- buf.position(cellSize * parityIndex);
|
|
|
- buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
|
|
|
- decodeInputs[index] = buf.slice();
|
|
|
- alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void decode() {
|
|
|
- final int span = (int) alignedStripe.getSpanInBlock();
|
|
|
- for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
|
|
- if (alignedStripe.chunks[i] != null &&
|
|
|
- alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
|
|
|
- for (int j = 0; j < span; j++) {
|
|
|
- decodeInputs[i].put((byte) 0);
|
|
|
- }
|
|
|
- decodeInputs[i].flip();
|
|
|
- } else if (alignedStripe.chunks[i] != null &&
|
|
|
- alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
|
|
|
- decodeInputs[i].position(0);
|
|
|
- decodeInputs[i].limit(span);
|
|
|
- }
|
|
|
- }
|
|
|
- int[] decodeIndices = new int[parityBlkNum];
|
|
|
- int pos = 0;
|
|
|
- for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
|
|
- if (alignedStripe.chunks[i] != null &&
|
|
|
- alignedStripe.chunks[i].state == StripingChunk.MISSING) {
|
|
|
- if (i < dataBlkNum) {
|
|
|
- decodeIndices[pos++] = i;
|
|
|
- } else {
|
|
|
- decodeInputs[i] = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- decodeIndices = Arrays.copyOf(decodeIndices, pos);
|
|
|
-
|
|
|
- final int decodeChunkNum = decodeIndices.length;
|
|
|
- ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
|
|
|
- for (int i = 0; i < decodeChunkNum; i++) {
|
|
|
- outputs[i] = decodeInputs[decodeIndices[i]];
|
|
|
- outputs[i].position(0);
|
|
|
- outputs[i].limit((int) alignedStripe.range.spanInBlock);
|
|
|
- decodeInputs[decodeIndices[i]] = null;
|
|
|
- }
|
|
|
-
|
|
|
- decoder.decode(decodeInputs, decodeIndices, outputs);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* May need online read recovery, zero-copy read doesn't make
|
|
|
* sense, so don't support it.
|
|
@@ -957,12 +519,4 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
|
throw new UnsupportedOperationException(
|
|
|
"Not support enhanced byte buffer access.");
|
|
|
}
|
|
|
-
|
|
|
- /** A variation to {@link DFSInputStream#cancelAll} */
|
|
|
- private void clearFutures(Collection<Future<Void>> futures) {
|
|
|
- for (Future<Void> future : futures) {
|
|
|
- future.cancel(false);
|
|
|
- }
|
|
|
- futures.clear();
|
|
|
- }
|
|
|
}
|