|
@@ -113,7 +113,8 @@ public class TestEnhancedByteBufferAccess {
|
|
|
return resultArray;
|
|
|
}
|
|
|
|
|
|
- private static final int BLOCK_SIZE = 4096;
|
|
|
+ private static final int BLOCK_SIZE =
|
|
|
+ (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
|
|
|
|
|
public static HdfsConfiguration initZeroCopyTest() {
|
|
|
Assume.assumeTrue(NativeIO.isAvailable());
|
|
@@ -140,7 +141,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
MiniDFSCluster cluster = null;
|
|
|
final Path TEST_PATH = new Path("/a");
|
|
|
FSDataInputStream fsIn = null;
|
|
|
- final int TEST_FILE_LENGTH = 12345;
|
|
|
+ final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE;
|
|
|
|
|
|
FileSystem fs = null;
|
|
|
try {
|
|
@@ -163,15 +164,15 @@ public class TestEnhancedByteBufferAccess {
|
|
|
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
|
|
|
fsIn.close();
|
|
|
fsIn = fs.open(TEST_PATH);
|
|
|
- ByteBuffer result = fsIn.read(null, 4096,
|
|
|
+ ByteBuffer result = fsIn.read(null, BLOCK_SIZE,
|
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
- Assert.assertEquals(4096, result.remaining());
|
|
|
+ Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
|
- Assert.assertEquals(4096,
|
|
|
+ Assert.assertEquals(BLOCK_SIZE,
|
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
|
- Assert.assertEquals(4096,
|
|
|
+ Assert.assertEquals(BLOCK_SIZE,
|
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
|
|
|
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
|
byteBufferToArray(result));
|
|
|
fsIn.releaseBuffer(result);
|
|
|
} finally {
|
|
@@ -187,7 +188,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
MiniDFSCluster cluster = null;
|
|
|
final Path TEST_PATH = new Path("/a");
|
|
|
FSDataInputStream fsIn = null;
|
|
|
- final int TEST_FILE_LENGTH = 12345;
|
|
|
+ final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE;
|
|
|
|
|
|
FileSystem fs = null;
|
|
|
try {
|
|
@@ -210,24 +211,24 @@ public class TestEnhancedByteBufferAccess {
|
|
|
fsIn.close();
|
|
|
fsIn = fs.open(TEST_PATH);
|
|
|
|
|
|
- // Try to read 8192, but only get 4096 because of the block size.
|
|
|
+ // Try to read (2 * ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size.
|
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
|
ByteBuffer result =
|
|
|
- dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
- Assert.assertEquals(4096, result.remaining());
|
|
|
- Assert.assertEquals(4096,
|
|
|
+ dfsIn.read(null, 2 * BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
+ Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
+ Assert.assertEquals(BLOCK_SIZE,
|
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
|
- Assert.assertEquals(4096,
|
|
|
+ Assert.assertEquals(BLOCK_SIZE,
|
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
|
|
|
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
|
byteBufferToArray(result));
|
|
|
dfsIn.releaseBuffer(result);
|
|
|
|
|
|
- // Try to read 4097, but only get 4096 because of the block size.
|
|
|
+ // Try to read (1 + ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size.
|
|
|
result =
|
|
|
- dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
- Assert.assertEquals(4096, result.remaining());
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
|
|
|
+ dfsIn.read(null, 1 + BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
+ Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, BLOCK_SIZE, 2 * BLOCK_SIZE),
|
|
|
byteBufferToArray(result));
|
|
|
dfsIn.releaseBuffer(result);
|
|
|
} finally {
|
|
@@ -243,7 +244,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
MiniDFSCluster cluster = null;
|
|
|
final Path TEST_PATH = new Path("/a");
|
|
|
FSDataInputStream fsIn = null;
|
|
|
- final int TEST_FILE_LENGTH = 12345;
|
|
|
+ final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE;
|
|
|
|
|
|
FileSystem fs = null;
|
|
|
try {
|
|
@@ -269,18 +270,18 @@ public class TestEnhancedByteBufferAccess {
|
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
|
ByteBuffer result;
|
|
|
try {
|
|
|
- result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
|
|
|
+ result = dfsIn.read(null, BLOCK_SIZE + 1, EnumSet.noneOf(ReadOption.class));
|
|
|
Assert.fail("expected UnsupportedOperationException");
|
|
|
} catch (UnsupportedOperationException e) {
|
|
|
// expected
|
|
|
}
|
|
|
- result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
- Assert.assertEquals(4096, result.remaining());
|
|
|
- Assert.assertEquals(4096,
|
|
|
+ result = dfsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
+ Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
+ Assert.assertEquals(BLOCK_SIZE,
|
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
|
- Assert.assertEquals(4096,
|
|
|
+ Assert.assertEquals(BLOCK_SIZE,
|
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
|
|
|
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
|
byteBufferToArray(result));
|
|
|
} finally {
|
|
|
if (fsIn != null) fsIn.close();
|
|
@@ -330,7 +331,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
HdfsConfiguration conf = initZeroCopyTest();
|
|
|
MiniDFSCluster cluster = null;
|
|
|
final Path TEST_PATH = new Path("/a");
|
|
|
- final int TEST_FILE_LENGTH = 16385;
|
|
|
+ final int TEST_FILE_LENGTH = 5 * BLOCK_SIZE;
|
|
|
final int RANDOM_SEED = 23453;
|
|
|
final String CONTEXT = "testZeroCopyMmapCacheContext";
|
|
|
FSDataInputStream fsIn = null;
|
|
@@ -360,10 +361,10 @@ public class TestEnhancedByteBufferAccess {
|
|
|
final ShortCircuitCache cache = ClientContext.get(
|
|
|
CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
|
|
|
cache.accept(new CountingVisitor(0, 5, 5, 0));
|
|
|
- results[0] = fsIn.read(null, 4096,
|
|
|
+ results[0] = fsIn.read(null, BLOCK_SIZE,
|
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
fsIn.seek(0);
|
|
|
- results[1] = fsIn.read(null, 4096,
|
|
|
+ results[1] = fsIn.read(null, BLOCK_SIZE,
|
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
|
|
|
// The mmap should be of the first block of the file.
|
|
@@ -386,9 +387,9 @@ public class TestEnhancedByteBufferAccess {
|
|
|
});
|
|
|
|
|
|
// Read more blocks.
|
|
|
- results[2] = fsIn.read(null, 4096,
|
|
|
+ results[2] = fsIn.read(null, BLOCK_SIZE,
|
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
- results[3] = fsIn.read(null, 4096,
|
|
|
+ results[3] = fsIn.read(null, BLOCK_SIZE,
|
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
|
|
|
|
// we should have 3 mmaps, 1 evictable
|
|
@@ -592,7 +593,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
|
|
|
BlockReaderTestUtil.enableHdfsCachingTracing();
|
|
|
|
|
|
- final int TEST_FILE_LENGTH = 16385;
|
|
|
+ final int TEST_FILE_LENGTH = BLOCK_SIZE;
|
|
|
final Path TEST_PATH = new Path("/a");
|
|
|
final int RANDOM_SEED = 23453;
|
|
|
HdfsConfiguration conf = initZeroCopyTest();
|
|
@@ -601,7 +602,8 @@ public class TestEnhancedByteBufferAccess {
|
|
|
final String CONTEXT = "testZeroCopyReadOfCachedData";
|
|
|
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
|
|
|
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
|
|
- DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, 4096));
|
|
|
+ DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH,
|
|
|
+ (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize()));
|
|
|
MiniDFSCluster cluster = null;
|
|
|
ByteBuffer result = null, result2 = null;
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|