|
@@ -633,6 +633,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
chosenNode = retval.info;
|
|
chosenNode = retval.info;
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
StorageType storageType = retval.storageType;
|
|
StorageType storageType = retval.storageType;
|
|
|
|
+ // Latest block if refreshed by chooseDatanode()
|
|
|
|
+ targetBlock = retval.block;
|
|
|
|
|
|
try {
|
|
try {
|
|
ExtendedBlock blk = targetBlock.getBlock();
|
|
ExtendedBlock blk = targetBlock.getBlock();
|
|
@@ -1045,7 +1047,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
|
|
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
|
|
}
|
|
}
|
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
|
- return new DNAddrPair(chosenNode, targetAddr, storageType);
|
|
|
|
|
|
+ return new DNAddrPair(chosenNode, targetAddr, storageType, block);
|
|
}
|
|
}
|
|
|
|
|
|
private static String getBestNodeDNAddrPairErrorString(
|
|
private static String getBestNodeDNAddrPairErrorString(
|
|
@@ -1077,11 +1079,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
byte[] buf, int offset,
|
|
byte[] buf, int offset,
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
throws IOException {
|
|
throws IOException {
|
|
- block = getBlockAt(block.getStartOffset());
|
|
|
|
while (true) {
|
|
while (true) {
|
|
DNAddrPair addressPair = chooseDataNode(block, null);
|
|
DNAddrPair addressPair = chooseDataNode(block, null);
|
|
|
|
+ block = addressPair.block;
|
|
try {
|
|
try {
|
|
- actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
|
|
|
|
|
|
+ actualGetFromOneDataNode(addressPair, start, end, buf, offset,
|
|
corruptedBlockMap);
|
|
corruptedBlockMap);
|
|
return;
|
|
return;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -1105,7 +1107,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
TraceScope scope =
|
|
TraceScope scope =
|
|
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
|
|
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
|
|
try {
|
|
try {
|
|
- actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
|
|
|
|
|
|
+ actualGetFromOneDataNode(datanode, start, end, buf, offset,
|
|
corruptedBlockMap);
|
|
corruptedBlockMap);
|
|
return bb;
|
|
return bb;
|
|
} finally {
|
|
} finally {
|
|
@@ -1116,20 +1118,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
}
|
|
}
|
|
|
|
|
|
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|
- LocatedBlock block, final long start, final long end, byte[] buf,
|
|
|
|
- int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
|
|
|
+ final long start, final long end, byte[] buf, int offset,
|
|
|
|
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
throws IOException {
|
|
throws IOException {
|
|
DFSClientFaultInjector.get().startFetchFromDatanode();
|
|
DFSClientFaultInjector.get().startFetchFromDatanode();
|
|
int refetchToken = 1; // only need to get a new access token once
|
|
int refetchToken = 1; // only need to get a new access token once
|
|
int refetchEncryptionKey = 1; // only need to get a new encryption key once
|
|
int refetchEncryptionKey = 1; // only need to get a new encryption key once
|
|
-
|
|
|
|
|
|
+ LocatedBlock block = datanode.block;
|
|
while (true) {
|
|
while (true) {
|
|
- // cached block locations may have been updated by chooseDataNode()
|
|
|
|
- // or fetchBlockAt(). Always get the latest list of locations at the
|
|
|
|
- // start of the loop.
|
|
|
|
CachingStrategy curCachingStrategy;
|
|
CachingStrategy curCachingStrategy;
|
|
boolean allowShortCircuitLocalReads;
|
|
boolean allowShortCircuitLocalReads;
|
|
- block = getBlockAt(block.getStartOffset());
|
|
|
|
synchronized(infoLock) {
|
|
synchronized(infoLock) {
|
|
curCachingStrategy = cachingStrategy;
|
|
curCachingStrategy = cachingStrategy;
|
|
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
|
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
|
@@ -1187,7 +1185,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
// The encryption key used is invalid.
|
|
// The encryption key used is invalid.
|
|
refetchEncryptionKey--;
|
|
refetchEncryptionKey--;
|
|
dfsClient.clearDataEncryptionKey();
|
|
dfsClient.clearDataEncryptionKey();
|
|
- continue;
|
|
|
|
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
|
|
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
|
|
refetchToken--;
|
|
refetchToken--;
|
|
try {
|
|
try {
|
|
@@ -1195,7 +1192,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
} catch (IOException fbae) {
|
|
} catch (IOException fbae) {
|
|
// ignore IOE, since we can retry it later in a loop
|
|
// ignore IOE, since we can retry it later in a loop
|
|
}
|
|
}
|
|
- continue;
|
|
|
|
} else {
|
|
} else {
|
|
String msg = "Failed to connect to " + targetAddr + " for file "
|
|
String msg = "Failed to connect to " + targetAddr + " for file "
|
|
+ src + " for block " + block.getBlock() + ":" + e;
|
|
+ src + " for block " + block.getBlock() + ":" + e;
|
|
@@ -1203,6 +1199,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
addToDeadNodes(chosenNode);
|
|
addToDeadNodes(chosenNode);
|
|
throw new IOException(msg);
|
|
throw new IOException(msg);
|
|
}
|
|
}
|
|
|
|
+ // Refresh the block for updated tokens in case of token failures or
|
|
|
|
+ // encryption key failures.
|
|
|
|
+ block = getBlockAt(block.getStartOffset());
|
|
} finally {
|
|
} finally {
|
|
if (reader != null) {
|
|
if (reader != null) {
|
|
reader.close();
|
|
reader.close();
|
|
@@ -1229,7 +1228,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
ByteBuffer bb = null;
|
|
ByteBuffer bb = null;
|
|
int len = (int) (end - start + 1);
|
|
int len = (int) (end - start + 1);
|
|
int hedgedReadId = 0;
|
|
int hedgedReadId = 0;
|
|
- block = getBlockAt(block.getStartOffset());
|
|
|
|
while (true) {
|
|
while (true) {
|
|
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
|
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
|
hedgedReadOpsLoopNumForTesting++;
|
|
hedgedReadOpsLoopNumForTesting++;
|
|
@@ -1239,6 +1237,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
// chooseDataNode is a commitment. If no node, we go to
|
|
// chooseDataNode is a commitment. If no node, we go to
|
|
// the NN to reget block locations. Only go here on first read.
|
|
// the NN to reget block locations. Only go here on first read.
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
|
|
+ // Latest block, if refreshed internally
|
|
|
|
+ block = chosenNode.block;
|
|
bb = ByteBuffer.allocate(len);
|
|
bb = ByteBuffer.allocate(len);
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
chosenNode, block, start, end, bb, corruptedBlockMap,
|
|
chosenNode, block, start, end, bb, corruptedBlockMap,
|
|
@@ -1279,6 +1279,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
}
|
|
}
|
|
|
|
+ // Latest block, if refreshed internally
|
|
|
|
+ block = chosenNode.block;
|
|
bb = ByteBuffer.allocate(len);
|
|
bb = ByteBuffer.allocate(len);
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
chosenNode, block, start, end, bb, corruptedBlockMap,
|
|
chosenNode, block, start, end, bb, corruptedBlockMap,
|
|
@@ -1631,12 +1633,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
final DatanodeInfo info;
|
|
final DatanodeInfo info;
|
|
final InetSocketAddress addr;
|
|
final InetSocketAddress addr;
|
|
final StorageType storageType;
|
|
final StorageType storageType;
|
|
|
|
+ final LocatedBlock block;
|
|
|
|
|
|
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
|
|
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
|
|
- StorageType storageType) {
|
|
|
|
|
|
+ StorageType storageType, LocatedBlock block) {
|
|
this.info = info;
|
|
this.info = info;
|
|
this.addr = addr;
|
|
this.addr = addr;
|
|
this.storageType = storageType;
|
|
this.storageType = storageType;
|
|
|
|
+ this.block = block;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|