|
@@ -130,12 +130,12 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
|
|
|
|
- private final BlockReader[] blockReaders = new BlockReader[groupSize];
|
|
|
|
- private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
|
|
|
|
|
|
+ private final BlockReader[] blockReaders;
|
|
|
|
+ private final DatanodeInfo[] currentNodes;
|
|
private final int cellSize;
|
|
private final int cellSize;
|
|
private final short dataBlkNum;
|
|
private final short dataBlkNum;
|
|
private final short parityBlkNum;
|
|
private final short parityBlkNum;
|
|
|
|
+ private final short groupSize;
|
|
/** the buffer for a complete stripe */
|
|
/** the buffer for a complete stripe */
|
|
private ByteBuffer curStripeBuf;
|
|
private ByteBuffer curStripeBuf;
|
|
private final ECSchema schema;
|
|
private final ECSchema schema;
|
|
@@ -155,6 +155,9 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
cellSize = schema.getChunkSize();
|
|
cellSize = schema.getChunkSize();
|
|
dataBlkNum = (short) schema.getNumDataUnits();
|
|
dataBlkNum = (short) schema.getNumDataUnits();
|
|
parityBlkNum = (short) schema.getNumParityUnits();
|
|
parityBlkNum = (short) schema.getNumParityUnits();
|
|
|
|
+ groupSize = dataBlkNum;
|
|
|
|
+ blockReaders = new BlockReader[groupSize];
|
|
|
|
+ currentNodes = new DatanodeInfo[groupSize];
|
|
curStripeRange = new StripeRange(0, 0);
|
|
curStripeRange = new StripeRange(0, 0);
|
|
readingService =
|
|
readingService =
|
|
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
|
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
|
@@ -391,6 +394,12 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|
return (int) (offsetInBlockGroup % stripeLen);
|
|
return (int) (offsetInBlockGroup % stripeLen);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized boolean seekToNewSource(long targetPos)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected synchronized int readWithStrategy(ReaderStrategy strategy,
|
|
protected synchronized int readWithStrategy(ReaderStrategy strategy,
|
|
int off, int len) throws IOException {
|
|
int off, int len) throws IOException {
|