|
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
|
@@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
@VisibleForTesting
|
|
|
public static boolean tcpReadsDisabledForTesting = false;
|
|
|
private long hedgedReadOpsLoopNumForTesting = 0;
|
|
|
- private final DFSClient dfsClient;
|
|
|
- private AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
- private final String src;
|
|
|
- private final boolean verifyChecksum;
|
|
|
+ protected final DFSClient dfsClient;
|
|
|
+ protected AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
+ protected final String src;
|
|
|
+ protected final boolean verifyChecksum;
|
|
|
|
|
|
// state by stateful read only:
|
|
|
// (protected by lock on this)
|
|
|
/////
|
|
|
private DatanodeInfo currentNode = null;
|
|
|
- private LocatedBlock currentLocatedBlock = null;
|
|
|
- private long pos = 0;
|
|
|
- private long blockEnd = -1;
|
|
|
+ protected LocatedBlock currentLocatedBlock = null;
|
|
|
+ protected long pos = 0;
|
|
|
+ protected long blockEnd = -1;
|
|
|
private BlockReader blockReader = null;
|
|
|
////
|
|
|
|
|
|
// state shared by stateful and positional read:
|
|
|
// (protected by lock on infoLock)
|
|
|
////
|
|
|
- private LocatedBlocks locatedBlocks = null;
|
|
|
+ protected LocatedBlocks locatedBlocks = null;
|
|
|
private long lastBlockBeingWrittenLength = 0;
|
|
|
private FileEncryptionInfo fileEncryptionInfo = null;
|
|
|
- private CachingStrategy cachingStrategy;
|
|
|
+ protected CachingStrategy cachingStrategy;
|
|
|
////
|
|
|
|
|
|
- private final ReadStatistics readStatistics = new ReadStatistics();
|
|
|
+ protected final ReadStatistics readStatistics = new ReadStatistics();
|
|
|
// lock for state shared between read and pread
|
|
|
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
|
|
|
// (it's OK to acquire this lock when the lock on <this> is held)
|
|
|
- private final Object infoLock = new Object();
|
|
|
+ protected final Object infoLock = new Object();
|
|
|
|
|
|
/**
|
|
|
* Track the ByteBuffers that we have handed out to readers.
|
|
@@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
* back to the namenode to get a new list of block locations, and is
|
|
|
* capped at maxBlockAcquireFailures
|
|
|
*/
|
|
|
- private int failures = 0;
|
|
|
+ protected int failures = 0;
|
|
|
|
|
|
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
|
|
|
* parallel accesses to DFSInputStream (through ptreads) properly */
|
|
@@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
deadNodes.put(dnInfo, dnInfo);
|
|
|
}
|
|
|
|
|
|
- DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
|
|
|
- ) throws IOException, UnresolvedLinkException {
|
|
|
+ DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
|
|
+ LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
|
|
|
this.dfsClient = dfsClient;
|
|
|
this.verifyChecksum = verifyChecksum;
|
|
|
this.src = src;
|
|
|
synchronized (infoLock) {
|
|
|
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
|
|
|
}
|
|
|
- openInfo();
|
|
|
+ this.locatedBlocks = locatedBlocks;
|
|
|
+ openInfo(false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Grab the open-file info from namenode
|
|
|
+ * @param refreshLocatedBlocks whether to re-fetch locatedblocks
|
|
|
*/
|
|
|
- void openInfo() throws IOException, UnresolvedLinkException {
|
|
|
+ void openInfo(boolean refreshLocatedBlocks) throws IOException,
|
|
|
+ UnresolvedLinkException {
|
|
|
final DfsClientConf conf = dfsClient.getConf();
|
|
|
synchronized(infoLock) {
|
|
|
- lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
|
|
+ lastBlockBeingWrittenLength =
|
|
|
+ fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
|
|
|
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
|
|
|
while (retriesForLastBlockLength > 0) {
|
|
|
// Getting last block length as -1 is a special case. When cluster
|
|
@@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
+ "Datanodes might not have reported blocks completely."
|
|
|
+ " Will retry for " + retriesForLastBlockLength + " times");
|
|
|
waitFor(conf.getRetryIntervalForGetLastBlockLength());
|
|
|
- lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
|
|
+ lastBlockBeingWrittenLength =
|
|
|
+ fetchLocatedBlocksAndGetLastBlockLength(true);
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
@@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
|
|
|
- final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
|
|
|
+ private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
|
|
|
+ throws IOException {
|
|
|
+ LocatedBlocks newInfo = locatedBlocks;
|
|
|
+ if (locatedBlocks == null || refresh) {
|
|
|
+ newInfo = dfsClient.getLocatedBlocks(src, 0);
|
|
|
+ }
|
|
|
if (DFSClient.LOG.isDebugEnabled()) {
|
|
|
DFSClient.LOG.debug("newInfo = " + newInfo);
|
|
|
}
|
|
@@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
* @return located block
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private LocatedBlock getBlockAt(long offset) throws IOException {
|
|
|
+ protected LocatedBlock getBlockAt(long offset) throws IOException {
|
|
|
synchronized(infoLock) {
|
|
|
assert (locatedBlocks != null) : "locatedBlocks is null";
|
|
|
|
|
@@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
|
|
|
/** Fetch a block from namenode and cache it */
|
|
|
- private void fetchBlockAt(long offset) throws IOException {
|
|
|
+ protected void fetchBlockAt(long offset) throws IOException {
|
|
|
synchronized(infoLock) {
|
|
|
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
|
if (targetBlockIdx < 0) { // block is not cached
|
|
@@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
|
|
|
// Will be getting a new BlockReader.
|
|
|
- closeCurrentBlockReader();
|
|
|
+ closeCurrentBlockReaders();
|
|
|
|
|
|
//
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
@@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
return chosenNode;
|
|
|
} catch (IOException ex) {
|
|
|
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
|
|
|
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
|
|
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
|
|
+ "encryption key was invalid when connecting to " + targetAddr
|
|
|
+ " : " + ex);
|
|
|
// The encryption key used is invalid.
|
|
@@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
fetchBlockAt(target);
|
|
|
} else {
|
|
|
connectFailedOnce = true;
|
|
|
- DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
|
|
|
- +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
|
|
|
+ DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
|
|
|
+ + ", add to deadNodes and continue. " + ex, ex);
|
|
|
// Put chosen node into dead list, continue
|
|
|
addToDeadNodes(chosenNode);
|
|
|
}
|
|
@@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
"unreleased ByteBuffers allocated by read(). " +
|
|
|
"Please release " + builder.toString() + ".");
|
|
|
}
|
|
|
- closeCurrentBlockReader();
|
|
|
+ closeCurrentBlockReaders();
|
|
|
super.close();
|
|
|
}
|
|
|
|
|
@@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
* Wraps different possible read implementations so that readBuffer can be
|
|
|
* strategy-agnostic.
|
|
|
*/
|
|
|
- private interface ReaderStrategy {
|
|
|
+ interface ReaderStrategy {
|
|
|
public int doRead(BlockReader blockReader, int off, int len)
|
|
|
throws ChecksumException, IOException;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Copy data from the src ByteBuffer into the read buffer.
|
|
|
+ * @param src The src buffer where the data is copied from
|
|
|
+ * @param offset Useful only when the ReadStrategy is based on a byte array.
|
|
|
+ * Indicate the offset of the byte array for copy.
|
|
|
+ * @param length Useful only when the ReadStrategy is based on a byte array.
|
|
|
+ * Indicate the length of the data to copy.
|
|
|
+ */
|
|
|
+ public int copyFrom(ByteBuffer src, int offset, int length);
|
|
|
}
|
|
|
|
|
|
- private void updateReadStatistics(ReadStatistics readStatistics,
|
|
|
+ protected void updateReadStatistics(ReadStatistics readStatistics,
|
|
|
int nRead, BlockReader blockReader) {
|
|
|
if (nRead <= 0) return;
|
|
|
synchronized(infoLock) {
|
|
@@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
updateReadStatistics(readStatistics, nRead, blockReader);
|
|
|
return nRead;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int copyFrom(ByteBuffer src, int offset, int length) {
|
|
|
+ ByteBuffer writeSlice = src.duplicate();
|
|
|
+ writeSlice.get(buf, offset, length);
|
|
|
+ return length;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Used to read bytes into a user-supplied ByteBuffer
|
|
|
*/
|
|
|
- private class ByteBufferStrategy implements ReaderStrategy {
|
|
|
+ protected class ByteBufferStrategy implements ReaderStrategy {
|
|
|
final ByteBuffer buf;
|
|
|
ByteBufferStrategy(ByteBuffer buf) {
|
|
|
this.buf = buf;
|
|
@@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
int ret = blockReader.read(buf);
|
|
|
success = true;
|
|
|
updateReadStatistics(readStatistics, ret, blockReader);
|
|
|
+ if (ret == 0) {
|
|
|
+ DFSClient.LOG.warn("zero");
|
|
|
+ }
|
|
|
return ret;
|
|
|
} finally {
|
|
|
if (!success) {
|
|
@@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int copyFrom(ByteBuffer src, int offset, int length) {
|
|
|
+ ByteBuffer writeSlice = src.duplicate();
|
|
|
+ int remaining = Math.min(buf.remaining(), writeSlice.remaining());
|
|
|
+ writeSlice.limit(writeSlice.position() + remaining);
|
|
|
+ buf.put(writeSlice);
|
|
|
+ return remaining;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* This is a used by regular read() and handles ChecksumExceptions.
|
|
@@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
|
|
+ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
|
|
dfsClient.checkOpen();
|
|
|
if (closed.get()) {
|
|
|
throw new IOException("Stream closed");
|
|
@@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
/**
|
|
|
* Add corrupted block replica into map.
|
|
|
*/
|
|
|
- private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
|
|
|
+ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
|
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
|
|
|
Set<DatanodeInfo> dnSet = null;
|
|
|
if((corruptedBlockMap.containsKey(blk))) {
|
|
@@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
} catch (InterruptedException iex) {
|
|
|
}
|
|
|
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
|
|
|
- openInfo();
|
|
|
- block = getBlockAt(block.getStartOffset());
|
|
|
+ openInfo(true);
|
|
|
+ block = refreshLocatedBlock(block);
|
|
|
failures++;
|
|
|
}
|
|
|
}
|
|
@@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
* @param ignoredNodes Do not choose nodes in this array (may be null)
|
|
|
* @return The DNAddrPair of the best node. Null if no node can be chosen.
|
|
|
*/
|
|
|
- private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
|
|
|
+ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
|
|
|
Collection<DatanodeInfo> ignoredNodes) {
|
|
|
DatanodeInfo[] nodes = block.getLocations();
|
|
|
StorageType[] storageTypes = block.getStorageTypes();
|
|
@@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
return errMsgr.toString();
|
|
|
}
|
|
|
|
|
|
- private void fetchBlockByteRange(long blockStartOffset, long start, long end,
|
|
|
+ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
|
|
|
byte[] buf, int offset,
|
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
throws IOException {
|
|
|
- LocatedBlock block = getBlockAt(blockStartOffset);
|
|
|
+ block = refreshLocatedBlock(block);
|
|
|
while (true) {
|
|
|
DNAddrPair addressPair = chooseDataNode(block, null);
|
|
|
try {
|
|
|
- actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
|
|
|
+ actualGetFromOneDataNode(addressPair, block, start, end,
|
|
|
buf, offset, corruptedBlockMap);
|
|
|
return;
|
|
|
} catch (IOException e) {
|
|
@@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
|
|
|
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
|
|
|
- final long blockStartOffset, final long start, final long end,
|
|
|
+ final LocatedBlock block, final long start, final long end,
|
|
|
final ByteBuffer bb,
|
|
|
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
|
|
|
final int hedgedReadId) {
|
|
@@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
TraceScope scope =
|
|
|
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
|
|
|
try {
|
|
|
- actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
|
|
|
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
|
|
|
offset, corruptedBlockMap);
|
|
|
return bb;
|
|
|
} finally {
|
|
@@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Used when reading contiguous blocks
|
|
|
+ */
|
|
|
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
|
- long blockStartOffset, final long start, final long end, byte[] buf,
|
|
|
+ LocatedBlock block, final long start, final long end, byte[] buf,
|
|
|
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
throws IOException {
|
|
|
+ final int length = (int) (end - start + 1);
|
|
|
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
|
|
|
+ new int[]{offset}, new int[]{length}, corruptedBlockMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read data from one DataNode.
|
|
|
+ * @param datanode the datanode from which to read data
|
|
|
+ * @param block the located block containing the requested data
|
|
|
+ * @param startInBlk the startInBlk offset of the block
|
|
|
+ * @param endInBlk the endInBlk offset of the block
|
|
|
+ * @param buf the given byte array into which the data is read
|
|
|
+ * @param offsets the data may be read into multiple segments of the buf
|
|
|
+ * (when reading a striped block). this array indicates the
|
|
|
+ * offset of each buf segment.
|
|
|
+ * @param lengths the length of each buf segment
|
|
|
+ * @param corruptedBlockMap map recording list of datanodes with corrupted
|
|
|
+ * block replica
|
|
|
+ */
|
|
|
+ void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
|
+ LocatedBlock block, final long startInBlk, final long endInBlk,
|
|
|
+ byte[] buf, int[] offsets, int[] lengths,
|
|
|
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
+ throws IOException {
|
|
|
DFSClientFaultInjector.get().startFetchFromDatanode();
|
|
|
int refetchToken = 1; // only need to get a new access token once
|
|
|
int refetchEncryptionKey = 1; // only need to get a new encryption key once
|
|
|
+ final int len = (int) (endInBlk - startInBlk + 1);
|
|
|
+ checkReadPortions(offsets, lengths, len);
|
|
|
|
|
|
while (true) {
|
|
|
// cached block locations may have been updated by chooseDataNode()
|
|
|
// or fetchBlockAt(). Always get the latest list of locations at the
|
|
|
// start of the loop.
|
|
|
- LocatedBlock block = getBlockAt(blockStartOffset);
|
|
|
+ block = refreshLocatedBlock(block);
|
|
|
BlockReader reader = null;
|
|
|
try {
|
|
|
DFSClientFaultInjector.get().fetchFromDatanodeException();
|
|
|
- int len = (int) (end - start + 1);
|
|
|
- reader = getBlockReader(block, start, len, datanode.addr,
|
|
|
+ reader = getBlockReader(block, startInBlk, len, datanode.addr,
|
|
|
datanode.storageType, datanode.info);
|
|
|
- int nread = reader.readAll(buf, offset, len);
|
|
|
- updateReadStatistics(readStatistics, nread, reader);
|
|
|
-
|
|
|
- if (nread != len) {
|
|
|
- throw new IOException("truncated return from reader.read(): " +
|
|
|
- "excpected " + len + ", got " + nread);
|
|
|
+ for (int i = 0; i < offsets.length; i++) {
|
|
|
+ int nread = reader.readAll(buf, offsets[i], lengths[i]);
|
|
|
+ updateReadStatistics(readStatistics, nread, reader);
|
|
|
+ if (nread != lengths[i]) {
|
|
|
+ throw new IOException("truncated return from reader.read(): " +
|
|
|
+ "excpected " + lengths[i] + ", got " + nread);
|
|
|
+ }
|
|
|
}
|
|
|
DFSClientFaultInjector.get().readFromDatanodeDelay();
|
|
|
return;
|
|
@@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
|
|
|
+ * Refresh cached block locations.
|
|
|
+ * @param block The currently cached block locations
|
|
|
+ * @return Refreshed block locations
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
|
|
|
+ throws IOException {
|
|
|
+ return getBlockAt(block.getStartOffset());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method verifies that the read portions are valid and do not overlap
|
|
|
+ * with each other.
|
|
|
+ */
|
|
|
+ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
|
|
|
+ Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
|
|
|
+ int sum = 0;
|
|
|
+ for (int i = 0; i < lengths.length; i++) {
|
|
|
+ if (i > 0) {
|
|
|
+ int gap = offsets[i] - offsets[i - 1];
|
|
|
+ // make sure read portions do not overlap with each other
|
|
|
+ Preconditions.checkArgument(gap >= lengths[i - 1]);
|
|
|
+ }
|
|
|
+ sum += lengths[i];
|
|
|
+ }
|
|
|
+ Preconditions.checkArgument(sum == totalLen);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
|
|
|
* 'hedged' read if the first read is taking longer than configured amount of
|
|
|
* time. We then wait on which ever read returns first.
|
|
|
*/
|
|
|
- private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
|
|
|
+ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
|
|
|
long end, byte[] buf, int offset,
|
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
throws IOException {
|
|
@@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
ByteBuffer bb = null;
|
|
|
int len = (int) (end - start + 1);
|
|
|
int hedgedReadId = 0;
|
|
|
- LocatedBlock block = getBlockAt(blockStartOffset);
|
|
|
+ block = refreshLocatedBlock(block);
|
|
|
while (true) {
|
|
|
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
|
|
hedgedReadOpsLoopNumForTesting++;
|
|
@@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
|
bb = ByteBuffer.wrap(buf, offset, len);
|
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
|
- chosenNode, block.getStartOffset(), start, end, bb,
|
|
|
+ chosenNode, block, start, end, bb,
|
|
|
corruptedBlockMap, hedgedReadId++);
|
|
|
Future<ByteBuffer> firstRequest = hedgedService
|
|
|
.submit(getFromDataNodeCallable);
|
|
@@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
bb = ByteBuffer.allocate(len);
|
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
|
- chosenNode, block.getStartOffset(), start, end, bb,
|
|
|
+ chosenNode, block, start, end, bb,
|
|
|
corruptedBlockMap, hedgedReadId++);
|
|
|
Future<ByteBuffer> oneMoreRequest = hedgedService
|
|
|
.submit(getFromDataNodeCallable);
|
|
@@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
* @return true if block access token has expired or invalid and it should be
|
|
|
* refetched
|
|
|
*/
|
|
|
- private static boolean tokenRefetchNeeded(IOException ex,
|
|
|
+ protected static boolean tokenRefetchNeeded(IOException ex,
|
|
|
InetSocketAddress targetAddr) {
|
|
|
/*
|
|
|
* Get a new access token and retry. Retry is needed in 2 cases. 1)
|
|
@@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
|
|
|
try {
|
|
|
if (dfsClient.isHedgedReadsEnabled()) {
|
|
|
- hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
|
|
|
- targetStart + bytesToRead - 1, buffer, offset,
|
|
|
- corruptedBlockMap);
|
|
|
+ hedgedFetchBlockByteRange(blk, targetStart,
|
|
|
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
|
|
|
} else {
|
|
|
- fetchBlockByteRange(blk.getStartOffset(), targetStart,
|
|
|
- targetStart + bytesToRead - 1, buffer, offset,
|
|
|
- corruptedBlockMap);
|
|
|
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
|
|
|
+ buffer, offset, corruptedBlockMap);
|
|
|
}
|
|
|
} finally {
|
|
|
// Check and report if any block replicas are corrupted.
|
|
@@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
* @param corruptedBlockMap map of corrupted blocks
|
|
|
* @param dataNodeCount number of data nodes who contains the block replicas
|
|
|
*/
|
|
|
- private void reportCheckSumFailure(
|
|
|
+ protected void reportCheckSumFailure(
|
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
|
|
|
int dataNodeCount) {
|
|
|
if (corruptedBlockMap.isEmpty()) {
|
|
@@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
/**
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized long getPos() throws IOException {
|
|
|
+ public synchronized long getPos() {
|
|
|
return pos;
|
|
|
}
|
|
|
|
|
@@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
|
|
|
/** Utility class to encapsulate data node info and its address. */
|
|
|
- private static final class DNAddrPair {
|
|
|
+ static final class DNAddrPair {
|
|
|
final DatanodeInfo info;
|
|
|
final InetSocketAddress addr;
|
|
|
final StorageType storageType;
|
|
@@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void closeCurrentBlockReader() {
|
|
|
+ protected void closeCurrentBlockReaders() {
|
|
|
if (blockReader == null) return;
|
|
|
// Close the current block reader so that the new caching settings can
|
|
|
// take effect immediately.
|
|
@@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
this.cachingStrategy =
|
|
|
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
|
|
|
}
|
|
|
- closeCurrentBlockReader();
|
|
|
+ closeCurrentBlockReaders();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
this.cachingStrategy =
|
|
|
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
|
|
|
}
|
|
|
- closeCurrentBlockReader();
|
|
|
+ closeCurrentBlockReaders();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
|
|
|
@Override
|
|
|
public synchronized void unbuffer() {
|
|
|
- closeCurrentBlockReader();
|
|
|
+ closeCurrentBlockReaders();
|
|
|
}
|
|
|
}
|