|
@@ -1279,7 +1279,7 @@ public class DFSInputStream extends FSInputStream
|
|
|
// chooseDataNode is a commitment. If no node, we go to
|
|
|
// the NN to reget block locations. Only go here on first read.
|
|
|
chosenNode = chooseDataNode(block, ignored);
|
|
|
- bb = ByteBuffer.wrap(buf, offset, len);
|
|
|
+ bb = ByteBuffer.allocate(len);
|
|
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
|
|
chosenNode, block, start, end, bb,
|
|
|
corruptedBlocks, hedgedReadId++);
|
|
@@ -1290,7 +1290,9 @@ public class DFSInputStream extends FSInputStream
|
|
|
Future<ByteBuffer> future = hedgedService.poll(
|
|
|
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
|
|
|
if (future != null) {
|
|
|
- future.get();
|
|
|
+ ByteBuffer result = future.get();
|
|
|
+ System.arraycopy(result.array(), result.position(), buf, offset,
|
|
|
+ len);
|
|
|
return;
|
|
|
}
|
|
|
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
|
|
@@ -1328,13 +1330,9 @@ public class DFSInputStream extends FSInputStream
|
|
|
ByteBuffer result = getFirstToComplete(hedgedService, futures);
|
|
|
// cancel the rest.
|
|
|
cancelAll(futures);
|
|
|
- if (result.array() != buf) { // compare the array pointers
|
|
|
- dfsClient.getHedgedReadMetrics().incHedgedReadWins();
|
|
|
- System.arraycopy(result.array(), result.position(), buf, offset,
|
|
|
- len);
|
|
|
- } else {
|
|
|
- dfsClient.getHedgedReadMetrics().incHedgedReadOps();
|
|
|
- }
|
|
|
+ dfsClient.getHedgedReadMetrics().incHedgedReadWins();
|
|
|
+ System.arraycopy(result.array(), result.position(), buf, offset,
|
|
|
+ len);
|
|
|
return;
|
|
|
} catch (InterruptedException ie) {
|
|
|
// Ignore and retry
|