|
@@ -505,33 +505,36 @@ public class DFSInputStream extends FSInputStream
|
|
|
}
|
|
|
else {
|
|
|
// search cached blocks first
|
|
|
- int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
|
- if (targetBlockIdx < 0) { // block is not cached
|
|
|
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
|
|
- // fetch more blocks
|
|
|
- final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
|
|
- assert (newBlocks != null) : "Could not find target position " + offset;
|
|
|
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
|
- }
|
|
|
- blk = locatedBlocks.get(targetBlockIdx);
|
|
|
+ blk = fetchBlockAt(offset, 0, true);
|
|
|
}
|
|
|
return blk;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Fetch a block from namenode and cache it */
|
|
|
- protected void fetchBlockAt(long offset) throws IOException {
|
|
|
+ protected LocatedBlock fetchBlockAt(long offset) throws IOException {
|
|
|
+ return fetchBlockAt(offset, 0, false); // don't use cache
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Fetch a block from namenode and cache it */
|
|
|
+ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
|
|
|
+ throws IOException {
|
|
|
synchronized(infoLock) {
|
|
|
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
|
if (targetBlockIdx < 0) { // block is not cached
|
|
|
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
|
|
+ useCache = false;
|
|
|
}
|
|
|
- // fetch blocks
|
|
|
- final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
|
|
- if (newBlocks == null) {
|
|
|
- throw new IOException("Could not find target position " + offset);
|
|
|
+ if (!useCache) { // fetch blocks
|
|
|
+ final LocatedBlocks newBlocks = (length == 0)
|
|
|
+ ? dfsClient.getLocatedBlocks(src, offset)
|
|
|
+ : dfsClient.getLocatedBlocks(src, offset, length);
|
|
|
+ if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
|
|
|
+ throw new EOFException("Could not find target position " + offset);
|
|
|
+ }
|
|
|
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
|
}
|
|
|
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
|
+ return locatedBlocks.get(targetBlockIdx);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -586,28 +589,15 @@ public class DFSInputStream extends FSInputStream
|
|
|
assert (locatedBlocks != null) : "locatedBlocks is null";
|
|
|
List<LocatedBlock> blockRange = new ArrayList<>();
|
|
|
// search cached blocks first
|
|
|
- int blockIdx = locatedBlocks.findBlock(offset);
|
|
|
- if (blockIdx < 0) { // block is not cached
|
|
|
- blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
|
|
|
- }
|
|
|
long remaining = length;
|
|
|
long curOff = offset;
|
|
|
while(remaining > 0) {
|
|
|
- LocatedBlock blk = null;
|
|
|
- if(blockIdx < locatedBlocks.locatedBlockCount())
|
|
|
- blk = locatedBlocks.get(blockIdx);
|
|
|
- if (blk == null || curOff < blk.getStartOffset()) {
|
|
|
- LocatedBlocks newBlocks;
|
|
|
- newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
|
|
|
- locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
|
|
|
- continue;
|
|
|
- }
|
|
|
+ LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
|
|
|
assert curOff >= blk.getStartOffset() : "Block not found";
|
|
|
blockRange.add(blk);
|
|
|
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
|
|
|
remaining -= bytesRead;
|
|
|
curOff += bytesRead;
|
|
|
- blockIdx++;
|
|
|
}
|
|
|
return blockRange;
|
|
|
}
|