|
@@ -983,12 +983,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
return new Callable<ByteBuffer>() {
|
|
|
@Override
|
|
|
public ByteBuffer call() throws Exception {
|
|
|
- byte[] buf = bb.array();
|
|
|
- int offset = bb.position();
|
|
|
- actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
|
|
|
- corruptedBlockMap);
|
|
|
- latch.countDown();
|
|
|
- return bb;
|
|
|
+ try {
|
|
|
+ byte[] buf = bb.array();
|
|
|
+ int offset = bb.position();
|
|
|
+ actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
|
|
|
+ corruptedBlockMap);
|
|
|
+ return bb;
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
}
|
|
|
};
|
|
|
}
|
|
@@ -1101,7 +1104,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
long end, byte[] buf, int offset,
|
|
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
|
|
throws IOException {
|
|
|
- ArrayList<Future<ByteBuffer>> futures = null;
|
|
|
+ ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
|
|
|
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
|
|
|
ByteBuffer bb = null;
|
|
|
int len = (int) (end - start + 1);
|
|
@@ -1112,7 +1115,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
DNAddrPair chosenNode = null;
|
|
|
Future<ByteBuffer> future = null;
|
|
|
// futures is null if there is no request already executing.
|
|
|
- if (futures == null) {
|
|
|
+ if (futures.isEmpty()) {
|
|
|
// chooseDataNode is a commitment. If no node, we go to
|
|
|
// the NN to reget block locations. Only go here on first read.
|
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
@@ -1130,7 +1133,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|
|
// Ignore this node on next go around.
|
|
|
ignored.add(chosenNode.info);
|
|
|
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
|
|
|
- futures = new ArrayList<Future<ByteBuffer>>();
|
|
|
futures.add(future);
|
|
|
continue; // no need to refresh block locations
|
|
|
} catch (InterruptedException e) {
|