|
@@ -21,6 +21,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNull;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.fail;
|
|
|
|
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.FileInputStream;
|
|
import java.io.FileInputStream;
|
|
@@ -65,11 +72,10 @@ import org.apache.hadoop.net.unix.DomainSocket;
|
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
-import org.junit.AfterClass;
|
|
|
|
-import org.junit.Assert;
|
|
|
|
-import org.junit.Assume;
|
|
|
|
-import org.junit.BeforeClass;
|
|
|
|
-import org.junit.Test;
|
|
|
|
|
|
+import org.junit.jupiter.api.AfterAll;
|
|
|
|
+import org.junit.jupiter.api.BeforeAll;
|
|
|
|
+import org.junit.jupiter.api.Test;
|
|
|
|
+import org.junit.jupiter.api.Timeout;
|
|
|
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
import org.apache.hadoop.util.Preconditions;
|
|
import java.util.function.Supplier;
|
|
import java.util.function.Supplier;
|
|
@@ -85,7 +91,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
|
|
|
static private CacheManipulator prevCacheManipulator;
|
|
static private CacheManipulator prevCacheManipulator;
|
|
|
|
|
|
- @BeforeClass
|
|
|
|
|
|
+ @BeforeAll
|
|
public static void init() {
|
|
public static void init() {
|
|
sockDir = new TemporarySocketDirectory();
|
|
sockDir = new TemporarySocketDirectory();
|
|
DomainSocket.disableBindPathValidation();
|
|
DomainSocket.disableBindPathValidation();
|
|
@@ -99,7 +105,7 @@ public class TestEnhancedByteBufferAccess {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- @AfterClass
|
|
|
|
|
|
+ @AfterAll
|
|
public static void teardown() {
|
|
public static void teardown() {
|
|
// Restore the original CacheManipulator
|
|
// Restore the original CacheManipulator
|
|
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
|
|
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
|
|
@@ -116,8 +122,8 @@ public class TestEnhancedByteBufferAccess {
|
|
(int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
|
(int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
|
|
|
|
|
public static HdfsConfiguration initZeroCopyTest() {
|
|
public static HdfsConfiguration initZeroCopyTest() {
|
|
- Assume.assumeTrue(NativeIO.isAvailable());
|
|
|
|
- Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
|
|
|
|
|
+ assumeTrue(NativeIO.isAvailable());
|
|
|
|
+ assumeTrue(SystemUtils.IS_OS_UNIX);
|
|
HdfsConfiguration conf = new HdfsConfiguration();
|
|
HdfsConfiguration conf = new HdfsConfiguration();
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
@@ -152,10 +158,10 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- Assert.fail("unexpected InterruptedException during " +
|
|
|
|
|
|
+ fail("unexpected InterruptedException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- Assert.fail("unexpected TimeoutException during " +
|
|
|
|
|
|
+ fail("unexpected TimeoutException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
}
|
|
}
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -165,13 +171,13 @@ public class TestEnhancedByteBufferAccess {
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
ByteBuffer result = fsIn.read(null, BLOCK_SIZE,
|
|
ByteBuffer result = fsIn.read(null, BLOCK_SIZE,
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE, result.remaining());
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
- Assert.assertEquals(BLOCK_SIZE,
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE,
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
- Assert.assertEquals(BLOCK_SIZE,
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE,
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
fsIn.releaseBuffer(result);
|
|
fsIn.releaseBuffer(result);
|
|
} finally {
|
|
} finally {
|
|
@@ -198,10 +204,10 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- Assert.fail("unexpected InterruptedException during " +
|
|
|
|
|
|
+ fail("unexpected InterruptedException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- Assert.fail("unexpected TimeoutException during " +
|
|
|
|
|
|
+ fail("unexpected TimeoutException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
}
|
|
}
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -214,20 +220,20 @@ public class TestEnhancedByteBufferAccess {
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
|
ByteBuffer result =
|
|
ByteBuffer result =
|
|
dfsIn.read(null, 2 * BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
dfsIn.read(null, 2 * BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
- Assert.assertEquals(BLOCK_SIZE,
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
+ assertEquals(BLOCK_SIZE,
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
- Assert.assertEquals(BLOCK_SIZE,
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE,
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
dfsIn.releaseBuffer(result);
|
|
dfsIn.releaseBuffer(result);
|
|
|
|
|
|
// Try to read (1 + ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size.
|
|
// Try to read (1 + ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size.
|
|
result =
|
|
result =
|
|
dfsIn.read(null, 1 + BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
dfsIn.read(null, 1 + BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, BLOCK_SIZE, 2 * BLOCK_SIZE),
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, BLOCK_SIZE, 2 * BLOCK_SIZE),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
dfsIn.releaseBuffer(result);
|
|
dfsIn.releaseBuffer(result);
|
|
} finally {
|
|
} finally {
|
|
@@ -255,10 +261,10 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- Assert.fail("unexpected InterruptedException during " +
|
|
|
|
|
|
+ fail("unexpected InterruptedException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- Assert.fail("unexpected TimeoutException during " +
|
|
|
|
|
|
+ fail("unexpected TimeoutException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
}
|
|
}
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -270,17 +276,17 @@ public class TestEnhancedByteBufferAccess {
|
|
ByteBuffer result;
|
|
ByteBuffer result;
|
|
try {
|
|
try {
|
|
result = dfsIn.read(null, BLOCK_SIZE + 1, EnumSet.noneOf(ReadOption.class));
|
|
result = dfsIn.read(null, BLOCK_SIZE + 1, EnumSet.noneOf(ReadOption.class));
|
|
- Assert.fail("expected UnsupportedOperationException");
|
|
|
|
|
|
+ fail("expected UnsupportedOperationException");
|
|
} catch (UnsupportedOperationException e) {
|
|
} catch (UnsupportedOperationException e) {
|
|
// expected
|
|
// expected
|
|
}
|
|
}
|
|
result = dfsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
result = dfsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
- Assert.assertEquals(BLOCK_SIZE,
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE, result.remaining());
|
|
|
|
+ assertEquals(BLOCK_SIZE,
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
|
- Assert.assertEquals(BLOCK_SIZE,
|
|
|
|
|
|
+ assertEquals(BLOCK_SIZE,
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
} finally {
|
|
} finally {
|
|
if (fsIn != null) fsIn.close();
|
|
if (fsIn != null) fsIn.close();
|
|
@@ -311,16 +317,16 @@ public class TestEnhancedByteBufferAccess {
|
|
LinkedMap evictable,
|
|
LinkedMap evictable,
|
|
LinkedMap evictableMmapped) {
|
|
LinkedMap evictableMmapped) {
|
|
if (expectedNumOutstandingMmaps >= 0) {
|
|
if (expectedNumOutstandingMmaps >= 0) {
|
|
- Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
|
|
|
|
|
|
+ assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
|
|
}
|
|
}
|
|
if (expectedNumReplicas >= 0) {
|
|
if (expectedNumReplicas >= 0) {
|
|
- Assert.assertEquals(expectedNumReplicas, replicas.size());
|
|
|
|
|
|
+ assertEquals(expectedNumReplicas, replicas.size());
|
|
}
|
|
}
|
|
if (expectedNumEvictable >= 0) {
|
|
if (expectedNumEvictable >= 0) {
|
|
- Assert.assertEquals(expectedNumEvictable, evictable.size());
|
|
|
|
|
|
+ assertEquals(expectedNumEvictable, evictable.size());
|
|
}
|
|
}
|
|
if (expectedNumMmapedEvictable >= 0) {
|
|
if (expectedNumMmapedEvictable >= 0) {
|
|
- Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
|
|
|
|
|
|
+ assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -346,10 +352,10 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- Assert.fail("unexpected InterruptedException during " +
|
|
|
|
|
|
+ fail("unexpected InterruptedException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- Assert.fail("unexpected TimeoutException during " +
|
|
|
|
|
|
+ fail("unexpected TimeoutException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
}
|
|
}
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -378,10 +384,10 @@ public class TestEnhancedByteBufferAccess {
|
|
LinkedMap evictableMmapped) {
|
|
LinkedMap evictableMmapped) {
|
|
ShortCircuitReplica replica = replicas.get(
|
|
ShortCircuitReplica replica = replicas.get(
|
|
new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
|
|
new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
|
|
- Assert.assertNotNull(replica);
|
|
|
|
- Assert.assertTrue(replica.hasMmap());
|
|
|
|
|
|
+ assertNotNull(replica);
|
|
|
|
+ assertTrue(replica.hasMmap());
|
|
// The replica should not yet be evictable, since we have it open.
|
|
// The replica should not yet be evictable, since we have it open.
|
|
- Assert.assertNull(replica.getEvictableTimeNs());
|
|
|
|
|
|
+ assertNull(replica.getEvictableTimeNs());
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
@@ -449,10 +455,10 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- Assert.fail("unexpected InterruptedException during " +
|
|
|
|
|
|
+ fail("unexpected InterruptedException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- Assert.fail("unexpected TimeoutException during " +
|
|
|
|
|
|
+ fail("unexpected TimeoutException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
}
|
|
}
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -493,22 +499,22 @@ public class TestEnhancedByteBufferAccess {
|
|
stream instanceof ByteBufferReadable);
|
|
stream instanceof ByteBufferReadable);
|
|
|
|
|
|
ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
|
|
ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
|
|
- Assert.assertEquals(10, result.remaining());
|
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
|
|
|
|
|
|
+ assertEquals(10, result.remaining());
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
|
|
|
|
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
|
|
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
|
|
- Assert.assertEquals(5000, result.remaining());
|
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
|
|
|
|
|
|
+ assertEquals(5000, result.remaining());
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
|
|
|
|
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
|
|
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
|
|
- Assert.assertEquals(11375, result.remaining());
|
|
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
|
|
|
|
|
|
+ assertEquals(11375, result.remaining());
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
|
|
byteBufferToArray(result));
|
|
byteBufferToArray(result));
|
|
|
|
|
|
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
|
|
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
|
|
- Assert.assertNull(result);
|
|
|
|
|
|
+ assertNull(result);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -533,10 +539,10 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- Assert.fail("unexpected InterruptedException during " +
|
|
|
|
|
|
+ fail("unexpected InterruptedException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
- Assert.fail("unexpected TimeoutException during " +
|
|
|
|
|
|
+ fail("unexpected TimeoutException during " +
|
|
"waitReplication: " + e);
|
|
"waitReplication: " + e);
|
|
}
|
|
}
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -584,7 +590,8 @@ public class TestEnhancedByteBufferAccess {
|
|
* Test that we can zero-copy read cached data even without disabling
|
|
* Test that we can zero-copy read cached data even without disabling
|
|
* checksums.
|
|
* checksums.
|
|
*/
|
|
*/
|
|
- @Test(timeout=120000)
|
|
|
|
|
|
+ @Test
|
|
|
|
+ @Timeout(value = 120)
|
|
public void testZeroCopyReadOfCachedData() throws Exception {
|
|
public void testZeroCopyReadOfCachedData() throws Exception {
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
|
|
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
|
|
@@ -618,7 +625,7 @@ public class TestEnhancedByteBufferAccess {
|
|
try {
|
|
try {
|
|
result = fsIn.read(null, TEST_FILE_LENGTH / 2,
|
|
result = fsIn.read(null, TEST_FILE_LENGTH / 2,
|
|
EnumSet.noneOf(ReadOption.class));
|
|
EnumSet.noneOf(ReadOption.class));
|
|
- Assert.fail("expected UnsupportedOperationException");
|
|
|
|
|
|
+ fail("expected UnsupportedOperationException");
|
|
} catch (UnsupportedOperationException e) {
|
|
} catch (UnsupportedOperationException e) {
|
|
// expected
|
|
// expected
|
|
}
|
|
}
|
|
@@ -637,9 +644,9 @@ public class TestEnhancedByteBufferAccess {
|
|
result = fsIn.read(null, TEST_FILE_LENGTH,
|
|
result = fsIn.read(null, TEST_FILE_LENGTH,
|
|
EnumSet.noneOf(ReadOption.class));
|
|
EnumSet.noneOf(ReadOption.class));
|
|
} catch (UnsupportedOperationException e) {
|
|
} catch (UnsupportedOperationException e) {
|
|
- Assert.fail("expected to be able to read cached file via zero-copy");
|
|
|
|
|
|
+ fail("expected to be able to read cached file via zero-copy");
|
|
}
|
|
}
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
|
|
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 0,
|
|
BLOCK_SIZE), byteBufferToArray(result));
|
|
BLOCK_SIZE), byteBufferToArray(result));
|
|
// Test that files opened after the cache operation has finished
|
|
// Test that files opened after the cache operation has finished
|
|
// still get the benefits of zero-copy (regression test for HDFS-6086)
|
|
// still get the benefits of zero-copy (regression test for HDFS-6086)
|
|
@@ -648,9 +655,9 @@ public class TestEnhancedByteBufferAccess {
|
|
result2 = fsIn2.read(null, TEST_FILE_LENGTH,
|
|
result2 = fsIn2.read(null, TEST_FILE_LENGTH,
|
|
EnumSet.noneOf(ReadOption.class));
|
|
EnumSet.noneOf(ReadOption.class));
|
|
} catch (UnsupportedOperationException e) {
|
|
} catch (UnsupportedOperationException e) {
|
|
- Assert.fail("expected to be able to read cached file via zero-copy");
|
|
|
|
|
|
+ fail("expected to be able to read cached file via zero-copy");
|
|
}
|
|
}
|
|
- Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
|
|
|
|
|
|
+ assertArrayEquals(Arrays.copyOfRange(original, 0,
|
|
BLOCK_SIZE), byteBufferToArray(result2));
|
|
BLOCK_SIZE), byteBufferToArray(result2));
|
|
fsIn2.releaseBuffer(result2);
|
|
fsIn2.releaseBuffer(result2);
|
|
fsIn2.close();
|
|
fsIn2.close();
|
|
@@ -688,10 +695,10 @@ public class TestEnhancedByteBufferAccess {
|
|
Map<ExtendedBlockId, InvalidToken> failedLoads,
|
|
Map<ExtendedBlockId, InvalidToken> failedLoads,
|
|
LinkedMap evictable,
|
|
LinkedMap evictable,
|
|
LinkedMap evictableMmapped) {
|
|
LinkedMap evictableMmapped) {
|
|
- Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
|
|
|
|
|
|
+ assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
|
|
ShortCircuitReplica replica =
|
|
ShortCircuitReplica replica =
|
|
replicas.get(ExtendedBlockId.fromExtendedBlock(block));
|
|
replicas.get(ExtendedBlockId.fromExtendedBlock(block));
|
|
- Assert.assertNotNull(replica);
|
|
|
|
|
|
+ assertNotNull(replica);
|
|
Slot slot = replica.getSlot();
|
|
Slot slot = replica.getSlot();
|
|
if ((expectedIsAnchorable != slot.isAnchorable()) ||
|
|
if ((expectedIsAnchorable != slot.isAnchorable()) ||
|
|
(expectedIsAnchored != slot.isAnchored())) {
|
|
(expectedIsAnchored != slot.isAnchored())) {
|
|
@@ -734,7 +741,7 @@ public class TestEnhancedByteBufferAccess {
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
try {
|
|
try {
|
|
fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.fail("expected zero-copy read to fail when client mmaps " +
|
|
|
|
|
|
+ fail("expected zero-copy read to fail when client mmaps " +
|
|
"were disabled.");
|
|
"were disabled.");
|
|
} catch (UnsupportedOperationException e) {
|
|
} catch (UnsupportedOperationException e) {
|
|
}
|
|
}
|
|
@@ -764,7 +771,7 @@ public class TestEnhancedByteBufferAccess {
|
|
// Test EOF behavior
|
|
// Test EOF behavior
|
|
IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1);
|
|
IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1);
|
|
buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(null, buf);
|
|
|
|
|
|
+ assertEquals(null, buf);
|
|
} finally {
|
|
} finally {
|
|
if (fsIn != null) fsIn.close();
|
|
if (fsIn != null) fsIn.close();
|
|
if (fs != null) fs.close();
|
|
if (fs != null) fs.close();
|
|
@@ -774,7 +781,7 @@ public class TestEnhancedByteBufferAccess {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void test2GBMmapLimit() throws Exception {
|
|
public void test2GBMmapLimit() throws Exception {
|
|
- Assume.assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles());
|
|
|
|
|
|
+ assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles());
|
|
HdfsConfiguration conf = initZeroCopyTest();
|
|
HdfsConfiguration conf = initZeroCopyTest();
|
|
final long TEST_FILE_LENGTH = 2469605888L;
|
|
final long TEST_FILE_LENGTH = 2469605888L;
|
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
|
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
|
|
@@ -795,20 +802,20 @@ public class TestEnhancedByteBufferAccess {
|
|
|
|
|
|
fsIn = fs.open(TEST_PATH);
|
|
fsIn = fs.open(TEST_PATH);
|
|
buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(1, buf1.remaining());
|
|
|
|
|
|
+ assertEquals(1, buf1.remaining());
|
|
fsIn.releaseBuffer(buf1);
|
|
fsIn.releaseBuffer(buf1);
|
|
buf1 = null;
|
|
buf1 = null;
|
|
fsIn.seek(2147483640L);
|
|
fsIn.seek(2147483640L);
|
|
buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(7, buf1.remaining());
|
|
|
|
- Assert.assertEquals(Integer.MAX_VALUE, buf1.limit());
|
|
|
|
|
|
+ assertEquals(7, buf1.remaining());
|
|
|
|
+ assertEquals(Integer.MAX_VALUE, buf1.limit());
|
|
fsIn.releaseBuffer(buf1);
|
|
fsIn.releaseBuffer(buf1);
|
|
buf1 = null;
|
|
buf1 = null;
|
|
- Assert.assertEquals(2147483647L, fsIn.getPos());
|
|
|
|
|
|
+ assertEquals(2147483647L, fsIn.getPos());
|
|
try {
|
|
try {
|
|
buf1 = fsIn.read(null, 1024,
|
|
buf1 = fsIn.read(null, 1024,
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.fail("expected UnsupportedOperationException");
|
|
|
|
|
|
+ fail("expected UnsupportedOperationException");
|
|
} catch (UnsupportedOperationException e) {
|
|
} catch (UnsupportedOperationException e) {
|
|
// expected; can't read past 2GB boundary.
|
|
// expected; can't read past 2GB boundary.
|
|
}
|
|
}
|
|
@@ -825,13 +832,13 @@ public class TestEnhancedByteBufferAccess {
|
|
fsIn2 = fs.open(TEST_PATH2);
|
|
fsIn2 = fs.open(TEST_PATH2);
|
|
fsIn2.seek(2147483640L);
|
|
fsIn2.seek(2147483640L);
|
|
buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(8, buf2.remaining());
|
|
|
|
- Assert.assertEquals(2147483648L, fsIn2.getPos());
|
|
|
|
|
|
+ assertEquals(8, buf2.remaining());
|
|
|
|
+ assertEquals(2147483648L, fsIn2.getPos());
|
|
fsIn2.releaseBuffer(buf2);
|
|
fsIn2.releaseBuffer(buf2);
|
|
buf2 = null;
|
|
buf2 = null;
|
|
buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
|
- Assert.assertEquals(1024, buf2.remaining());
|
|
|
|
- Assert.assertEquals(2147484672L, fsIn2.getPos());
|
|
|
|
|
|
+ assertEquals(1024, buf2.remaining());
|
|
|
|
+ assertEquals(2147484672L, fsIn2.getPos());
|
|
fsIn2.releaseBuffer(buf2);
|
|
fsIn2.releaseBuffer(buf2);
|
|
buf2 = null;
|
|
buf2 = null;
|
|
} finally {
|
|
} finally {
|