|
@@ -1006,54 +1006,80 @@ public class DFSInputStream extends FSInputStream
|
|
|
|
|
|
private DNAddrPair chooseDataNode(LocatedBlock block,
|
|
|
Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
|
|
+ return chooseDataNode(block, ignoredNodes, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Choose datanode to read from.
|
|
|
+ *
|
|
|
+ * @param block Block to choose datanode addr from
|
|
|
+ * @param ignoredNodes Ignored nodes inside.
|
|
|
+ * @param refetchIfRequired Whether to refetch if no nodes to chose
|
|
|
+ * from.
|
|
|
+ * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
|
|
|
+ * false.
|
|
|
+ */
|
|
|
+ private DNAddrPair chooseDataNode(LocatedBlock block,
|
|
|
+ Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
|
|
|
+ throws IOException {
|
|
|
while (true) {
|
|
|
DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
|
|
|
if (result != null) {
|
|
|
return result;
|
|
|
+ } else if (refetchIfRequired) {
|
|
|
+ block = refetchLocations(block, ignoredNodes);
|
|
|
} else {
|
|
|
- String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
|
|
|
- deadNodes, ignoredNodes);
|
|
|
- String blockInfo = block.getBlock() + " file=" + src;
|
|
|
- if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
|
|
|
- String description = "Could not obtain block: " + blockInfo;
|
|
|
- DFSClient.LOG.warn(description + errMsg
|
|
|
- + ". Throwing a BlockMissingException");
|
|
|
- throw new BlockMissingException(src, description,
|
|
|
- block.getStartOffset());
|
|
|
- }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- DatanodeInfo[] nodes = block.getLocations();
|
|
|
- if (nodes == null || nodes.length == 0) {
|
|
|
- DFSClient.LOG.info("No node available for " + blockInfo);
|
|
|
- }
|
|
|
- DFSClient.LOG.info("Could not obtain " + block.getBlock()
|
|
|
- + " from any node: " + errMsg
|
|
|
- + ". Will get new block locations from namenode and retry...");
|
|
|
- try {
|
|
|
- // Introducing a random factor to the wait time before another retry.
|
|
|
- // The wait time is dependent on # of failures and a random factor.
|
|
|
- // At the first time of getting a BlockMissingException, the wait time
|
|
|
- // is a random number between 0..3000 ms. If the first retry
|
|
|
- // still fails, we will wait 3000 ms grace period before the 2nd retry.
|
|
|
- // Also at the second retry, the waiting window is expanded to 6000 ms
|
|
|
- // alleviating the request rate from the server. Similarly the 3rd retry
|
|
|
- // will wait 6000ms grace period before retry and the waiting window is
|
|
|
- // expanded to 9000ms.
|
|
|
- final int timeWindow = dfsClient.getConf().getTimeWindow();
|
|
|
- double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
|
|
- // expanding time window for each failure
|
|
|
- timeWindow * (failures + 1) *
|
|
|
+ private LocatedBlock refetchLocations(LocatedBlock block,
|
|
|
+ Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
|
|
+ String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
|
|
|
+ deadNodes, ignoredNodes);
|
|
|
+ String blockInfo = block.getBlock() + " file=" + src;
|
|
|
+ if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
|
|
|
+ String description = "Could not obtain block: " + blockInfo;
|
|
|
+ DFSClient.LOG.warn(description + errMsg
|
|
|
+ + ". Throwing a BlockMissingException");
|
|
|
+ throw new BlockMissingException(src, description,
|
|
|
+ block.getStartOffset());
|
|
|
+ }
|
|
|
+
|
|
|
+ DatanodeInfo[] nodes = block.getLocations();
|
|
|
+ if (nodes == null || nodes.length == 0) {
|
|
|
+ DFSClient.LOG.info("No node available for " + blockInfo);
|
|
|
+ }
|
|
|
+ DFSClient.LOG.info("Could not obtain " + block.getBlock()
|
|
|
+ + " from any node: " + errMsg
|
|
|
+ + ". Will get new block locations from namenode and retry...");
|
|
|
+ try {
|
|
|
+ // Introducing a random factor to the wait time before another retry.
|
|
|
+ // The wait time is dependent on # of failures and a random factor.
|
|
|
+ // At the first time of getting a BlockMissingException, the wait time
|
|
|
+ // is a random number between 0..3000 ms. If the first retry
|
|
|
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
|
|
|
+ // Also at the second retry, the waiting window is expanded to 6000 ms
|
|
|
+ // alleviating the request rate from the server. Similarly the 3rd retry
|
|
|
+ // will wait 6000ms grace period before retry and the waiting window is
|
|
|
+ // expanded to 9000ms.
|
|
|
+ final int timeWindow = dfsClient.getConf().getTimeWindow();
|
|
|
+ // grace period for the last round of attempt
|
|
|
+ double waitTime = timeWindow * failures +
|
|
|
+ // expanding time window for each failure
|
|
|
+ timeWindow * (failures + 1) *
|
|
|
ThreadLocalRandom.current().nextDouble();
|
|
|
- DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
|
|
|
- Thread.sleep((long)waitTime);
|
|
|
- } catch (InterruptedException ignored) {
|
|
|
- }
|
|
|
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
|
|
|
- openInfo(true);
|
|
|
- block = refreshLocatedBlock(block);
|
|
|
- failures++;
|
|
|
- }
|
|
|
+ DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
|
|
|
+ " IOException, will wait for " + waitTime + " msec.");
|
|
|
+ Thread.sleep((long)waitTime);
|
|
|
+ } catch (InterruptedException ignored) {
|
|
|
}
|
|
|
+ deadNodes.clear(); //2nd option is to remove only nodes[blockId]
|
|
|
+ openInfo(true);
|
|
|
+ block = refreshLocatedBlock(block);
|
|
|
+ failures++;
|
|
|
+ return block;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1148,6 +1174,7 @@ public class DFSInputStream extends FSInputStream
|
|
|
return new Callable<ByteBuffer>() {
|
|
|
@Override
|
|
|
public ByteBuffer call() throws Exception {
|
|
|
+ DFSClientFaultInjector.get().sleepBeforeHedgedGet();
|
|
|
byte[] buf = bb.array();
|
|
|
int offset = bb.position();
|
|
|
try (TraceScope ignored = dfsClient.getTracer().
|
|
@@ -1341,20 +1368,22 @@ public class DFSInputStream extends FSInputStream
|
|
|
// We are starting up a 'hedged' read. We have a read already
|
|
|
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
|
|
|
// If no nodes to do hedged reads against, pass.
|
|
|
+ boolean refetch = false;
|
|
|
try {
|
|
|
- chosenNode = getBestNodeDNAddrPair(block, ignored);
|
|
|
- if (chosenNode == null) {
|
|
|
- chosenNode = chooseDataNode(block, ignored);
|
|
|
+ chosenNode = chooseDataNode(block, ignored, false);
|
|
|
+ if (chosenNode != null) {
|
|
|
+ // Latest block, if refreshed internally
|
|
|
+ block = chosenNode.block;
|
|
|
+ bb = ByteBuffer.allocate(len);
|
|
|
+ Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
|
+ chosenNode, block, start, end, bb, corruptedBlockMap,
|
|
|
+ hedgedReadId++);
|
|
|
+ Future<ByteBuffer> oneMoreRequest = hedgedService
|
|
|
+ .submit(getFromDataNodeCallable);
|
|
|
+ futures.add(oneMoreRequest);
|
|
|
+ } else {
|
|
|
+ refetch = true;
|
|
|
}
|
|
|
- // Latest block, if refreshed internally
|
|
|
- block = chosenNode.block;
|
|
|
- bb = ByteBuffer.allocate(len);
|
|
|
- Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
|
- chosenNode, block, start, end, bb,
|
|
|
- corruptedBlockMap, hedgedReadId++);
|
|
|
- Future<ByteBuffer> oneMoreRequest = hedgedService
|
|
|
- .submit(getFromDataNodeCallable);
|
|
|
- futures.add(oneMoreRequest);
|
|
|
} catch (IOException ioe) {
|
|
|
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
|
|
|
ioe.getMessage());
|
|
@@ -1372,6 +1401,9 @@ public class DFSInputStream extends FSInputStream
|
|
|
} catch (InterruptedException ie) {
|
|
|
// Ignore and retry
|
|
|
}
|
|
|
+ if (refetch) {
|
|
|
+ refetchLocations(block, ignored);
|
|
|
+ }
|
|
|
// We got here if exception. Ignore this node on next go around IFF
|
|
|
// we found a chosenNode to hedge read against.
|
|
|
if (chosenNode != null && chosenNode.info != null) {
|