|
@@ -648,6 +648,8 @@ public class DFSInputStream extends FSInputStream
|
|
chosenNode = retval.info;
|
|
chosenNode = retval.info;
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
StorageType storageType = retval.storageType;
|
|
StorageType storageType = retval.storageType;
|
|
|
|
+ // Latest block if refreshed by chooseDatanode()
|
|
|
|
+ targetBlock = retval.block;
|
|
|
|
|
|
try {
|
|
try {
|
|
blockReader = getBlockReader(targetBlock, offsetIntoBlock,
|
|
blockReader = getBlockReader(targetBlock, offsetIntoBlock,
|
|
@@ -1123,7 +1125,7 @@ public class DFSInputStream extends FSInputStream
|
|
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
|
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
|
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
|
|
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
|
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
|
- return new DNAddrPair(chosenNode, targetAddr, storageType);
|
|
|
|
|
|
+ return new DNAddrPair(chosenNode, targetAddr, storageType, block);
|
|
}
|
|
}
|
|
|
|
|
|
private static String getBestNodeDNAddrPairErrorString(
|
|
private static String getBestNodeDNAddrPairErrorString(
|
|
@@ -1155,12 +1157,13 @@ public class DFSInputStream extends FSInputStream
|
|
byte[] buf, int offset,
|
|
byte[] buf, int offset,
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
throws IOException {
|
|
throws IOException {
|
|
- block = refreshLocatedBlock(block);
|
|
|
|
while (true) {
|
|
while (true) {
|
|
DNAddrPair addressPair = chooseDataNode(block, null);
|
|
DNAddrPair addressPair = chooseDataNode(block, null);
|
|
|
|
+ // Latest block, if refreshed internally
|
|
|
|
+ block = addressPair.block;
|
|
try {
|
|
try {
|
|
- actualGetFromOneDataNode(addressPair, block, start, end,
|
|
|
|
- buf, offset, corruptedBlockMap);
|
|
|
|
|
|
+ actualGetFromOneDataNode(addressPair, start, end, buf, offset,
|
|
|
|
+ corruptedBlockMap);
|
|
return;
|
|
return;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
checkInterrupted(e); // check if the read has been interrupted
|
|
checkInterrupted(e); // check if the read has been interrupted
|
|
@@ -1183,8 +1186,8 @@ public class DFSInputStream extends FSInputStream
|
|
int offset = bb.position();
|
|
int offset = bb.position();
|
|
try (TraceScope ignored = dfsClient.getTracer().
|
|
try (TraceScope ignored = dfsClient.getTracer().
|
|
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
|
|
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
|
|
- actualGetFromOneDataNode(datanode, block, start, end, buf,
|
|
|
|
- offset, corruptedBlockMap);
|
|
|
|
|
|
+ actualGetFromOneDataNode(datanode, start, end, buf, offset,
|
|
|
|
+ corruptedBlockMap);
|
|
return bb;
|
|
return bb;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1195,18 +1198,17 @@ public class DFSInputStream extends FSInputStream
|
|
* Used when reading contiguous blocks
|
|
* Used when reading contiguous blocks
|
|
*/
|
|
*/
|
|
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
- LocatedBlock block, final long start, final long end, byte[] buf,
|
|
|
|
- int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
|
|
|
+ final long start, final long end, byte[] buf, int offset,
|
|
|
|
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
throws IOException {
|
|
throws IOException {
|
|
final int length = (int) (end - start + 1);
|
|
final int length = (int) (end - start + 1);
|
|
- actualGetFromOneDataNode(datanode, block, start, end, buf,
|
|
|
|
- new int[]{offset}, new int[]{length}, corruptedBlockMap);
|
|
|
|
|
|
+ actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
|
|
|
|
+ new int[] { length }, corruptedBlockMap);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Read data from one DataNode.
|
|
* Read data from one DataNode.
|
|
* @param datanode the datanode from which to read data
|
|
* @param datanode the datanode from which to read data
|
|
- * @param block the located block containing the requested data
|
|
|
|
* @param startInBlk the startInBlk offset of the block
|
|
* @param startInBlk the startInBlk offset of the block
|
|
* @param endInBlk the endInBlk offset of the block
|
|
* @param endInBlk the endInBlk offset of the block
|
|
* @param buf the given byte array into which the data is read
|
|
* @param buf the given byte array into which the data is read
|
|
@@ -1218,9 +1220,8 @@ public class DFSInputStream extends FSInputStream
|
|
* block replica
|
|
* block replica
|
|
*/
|
|
*/
|
|
void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
- LocatedBlock block, final long startInBlk, final long endInBlk,
|
|
|
|
- byte[] buf, int[] offsets, int[] lengths,
|
|
|
|
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
|
|
|
+ final long startInBlk, final long endInBlk, byte[] buf,
|
|
|
|
+ int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
throws IOException {
|
|
throws IOException {
|
|
DFSClientFaultInjector.get().startFetchFromDatanode();
|
|
DFSClientFaultInjector.get().startFetchFromDatanode();
|
|
int refetchToken = 1; // only need to get a new access token once
|
|
int refetchToken = 1; // only need to get a new access token once
|
|
@@ -1228,11 +1229,8 @@ public class DFSInputStream extends FSInputStream
|
|
final int len = (int) (endInBlk - startInBlk + 1);
|
|
final int len = (int) (endInBlk - startInBlk + 1);
|
|
checkReadPortions(offsets, lengths, len);
|
|
checkReadPortions(offsets, lengths, len);
|
|
|
|
|
|
|
|
+ LocatedBlock block = datanode.block;
|
|
while (true) {
|
|
while (true) {
|
|
- // cached block locations may have been updated by chooseDataNode()
|
|
|
|
- // or fetchBlockAt(). Always get the latest list of locations at the
|
|
|
|
- // start of the loop.
|
|
|
|
- block = refreshLocatedBlock(block);
|
|
|
|
BlockReader reader = null;
|
|
BlockReader reader = null;
|
|
try {
|
|
try {
|
|
DFSClientFaultInjector.get().fetchFromDatanodeException();
|
|
DFSClientFaultInjector.get().fetchFromDatanodeException();
|
|
@@ -1283,6 +1281,9 @@ public class DFSInputStream extends FSInputStream
|
|
addToDeadNodes(datanode.info);
|
|
addToDeadNodes(datanode.info);
|
|
throw new IOException(msg);
|
|
throw new IOException(msg);
|
|
}
|
|
}
|
|
|
|
+ // Refresh the block for updated tokens in case of token failures or
|
|
|
|
+ // encryption key failures.
|
|
|
|
+ block = refreshLocatedBlock(block);
|
|
} finally {
|
|
} finally {
|
|
if (reader != null) {
|
|
if (reader != null) {
|
|
reader.close();
|
|
reader.close();
|
|
@@ -1337,7 +1338,6 @@ public class DFSInputStream extends FSInputStream
|
|
ByteBuffer bb;
|
|
ByteBuffer bb;
|
|
int len = (int) (end - start + 1);
|
|
int len = (int) (end - start + 1);
|
|
int hedgedReadId = 0;
|
|
int hedgedReadId = 0;
|
|
- block = refreshLocatedBlock(block);
|
|
|
|
while (true) {
|
|
while (true) {
|
|
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
|
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
|
hedgedReadOpsLoopNumForTesting++;
|
|
hedgedReadOpsLoopNumForTesting++;
|
|
@@ -1347,6 +1347,8 @@ public class DFSInputStream extends FSInputStream
|
|
// chooseDataNode is a commitment. If no node, we go to
|
|
// chooseDataNode is a commitment. If no node, we go to
|
|
// the NN to reget block locations. Only go here on first read.
|
|
// the NN to reget block locations. Only go here on first read.
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
|
|
+ // Latest block, if refreshed internally
|
|
|
|
+ block = chosenNode.block;
|
|
bb = ByteBuffer.allocate(len);
|
|
bb = ByteBuffer.allocate(len);
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
chosenNode, block, start, end, bb,
|
|
chosenNode, block, start, end, bb,
|
|
@@ -1384,6 +1386,8 @@ public class DFSInputStream extends FSInputStream
|
|
if (chosenNode == null) {
|
|
if (chosenNode == null) {
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
}
|
|
}
|
|
|
|
+ // Latest block, if refreshed internally
|
|
|
|
+ block = chosenNode.block;
|
|
bb = ByteBuffer.allocate(len);
|
|
bb = ByteBuffer.allocate(len);
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
chosenNode, block, start, end, bb,
|
|
chosenNode, block, start, end, bb,
|
|
@@ -1736,12 +1740,14 @@ public class DFSInputStream extends FSInputStream
|
|
final DatanodeInfo info;
|
|
final DatanodeInfo info;
|
|
final InetSocketAddress addr;
|
|
final InetSocketAddress addr;
|
|
final StorageType storageType;
|
|
final StorageType storageType;
|
|
|
|
+ final LocatedBlock block;
|
|
|
|
|
|
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
|
|
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
|
|
- StorageType storageType) {
|
|
|
|
|
|
+ StorageType storageType, LocatedBlock block) {
|
|
this.info = info;
|
|
this.info = info;
|
|
this.addr = addr;
|
|
this.addr = addr;
|
|
this.storageType = storageType;
|
|
this.storageType = storageType;
|
|
|
|
+ this.block = block;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|