|
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
|
|
+import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
@@ -115,40 +116,55 @@ public class TestDFSStripedInputStream {
|
|
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
|
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
|
NUM_STRIPE_PER_BLOCK, false);
|
|
NUM_STRIPE_PER_BLOCK, false);
|
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
|
- filePath.toString(), 0, BLOCK_GROUP_SIZE);
|
|
|
|
|
|
+ filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
|
|
|
+ int fileLen = BLOCK_GROUP_SIZE * numBlocks;
|
|
|
|
|
|
- assert lbs.get(0) instanceof LocatedStripedBlock;
|
|
|
|
- LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
|
|
|
|
- for (int i = 0; i < DATA_BLK_NUM; i++) {
|
|
|
|
- Block blk = new Block(bg.getBlock().getBlockId() + i,
|
|
|
|
- NUM_STRIPE_PER_BLOCK * CELLSIZE,
|
|
|
|
- bg.getBlock().getGenerationStamp());
|
|
|
|
- blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
|
|
|
|
- cluster.injectBlocks(i, Arrays.asList(blk),
|
|
|
|
- bg.getBlock().getBlockPoolId());
|
|
|
|
- }
|
|
|
|
- DFSStripedInputStream in =
|
|
|
|
- new DFSStripedInputStream(fs.getClient(),
|
|
|
|
- filePath.toString(), false, schema);
|
|
|
|
- int readSize = BLOCK_GROUP_SIZE;
|
|
|
|
- byte[] readBuffer = new byte[readSize];
|
|
|
|
- int ret = in.read(0, readBuffer, 0, readSize);
|
|
|
|
|
|
+ byte[] expected = new byte[fileLen];
|
|
|
|
+ assertEquals(numBlocks, lbs.getLocatedBlocks().size());
|
|
|
|
+ for (int bgIdx = 0; bgIdx < numBlocks; bgIdx++) {
|
|
|
|
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(bgIdx));
|
|
|
|
+ for (int i = 0; i < DATA_BLK_NUM; i++) {
|
|
|
|
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
|
|
|
|
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
|
|
|
|
+ bg.getBlock().getGenerationStamp());
|
|
|
|
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
|
|
|
|
+ cluster.injectBlocks(i, Arrays.asList(blk),
|
|
|
|
+ bg.getBlock().getBlockPoolId());
|
|
|
|
+ }
|
|
|
|
|
|
- byte[] expected = new byte[readSize];
|
|
|
|
- /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
|
|
|
|
- for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
|
|
|
|
- for (int j = 0; j < DATA_BLK_NUM; j++) {
|
|
|
|
- for (int k = 0; k < CELLSIZE; k++) {
|
|
|
|
- int posInBlk = i * CELLSIZE + k;
|
|
|
|
- int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
|
|
|
|
- expected[posInFile] = SimulatedFSDataset.simulatedByte(
|
|
|
|
- new Block(bg.getBlock().getBlockId() + j), posInBlk);
|
|
|
|
|
|
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
|
|
|
|
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
|
|
|
|
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
|
|
|
|
+ for (int k = 0; k < CELLSIZE; k++) {
|
|
|
|
+ int posInBlk = i * CELLSIZE + k;
|
|
|
|
+ int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
|
|
|
|
+ expected[bgIdx*BLOCK_GROUP_SIZE + posInFile] =
|
|
|
|
+ SimulatedFSDataset.simulatedByte(
|
|
|
|
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
|
|
|
+ filePath.toString(), false, schema);
|
|
|
|
|
|
- assertEquals(readSize, ret);
|
|
|
|
- assertArrayEquals(expected, readBuffer);
|
|
|
|
|
|
+ int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102,
|
|
|
|
+ CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102,
|
|
|
|
+ BLOCK_GROUP_SIZE - 102, BLOCK_GROUP_SIZE, BLOCK_GROUP_SIZE + 102,
|
|
|
|
+ fileLen - 1};
|
|
|
|
+ for (int startOffset : startOffsets) {
|
|
|
|
+ startOffset = Math.max(0, Math.min(startOffset, fileLen - 1));
|
|
|
|
+ int remaining = fileLen - startOffset;
|
|
|
|
+ byte[] buf = new byte[fileLen];
|
|
|
|
+ int ret = in.read(startOffset, buf, 0, fileLen);
|
|
|
|
+ assertEquals(remaining, ret);
|
|
|
|
+ for (int i = 0; i < remaining; i++) {
|
|
|
|
+ Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
|
|
|
|
+ "same",
|
|
|
|
+ expected[startOffset + i], buf[i]);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ in.close();
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|