|
@@ -91,7 +91,6 @@ import org.slf4j.event.Level;
|
|
|
public class TestDirectoryScanner {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(TestDirectoryScanner.class);
|
|
|
- private static final Configuration CONF = new HdfsConfiguration();
|
|
|
private static final int DEFAULT_GEN_STAMP = 9999;
|
|
|
|
|
|
private MiniDFSCluster cluster;
|
|
@@ -103,12 +102,14 @@ public class TestDirectoryScanner {
|
|
|
private final Random r = new Random();
|
|
|
private static final int BLOCK_LENGTH = 100;
|
|
|
|
|
|
- static {
|
|
|
- CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
|
|
|
- CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
|
|
|
- CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
|
|
- CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
|
|
+ public Configuration getConfiguration() {
|
|
|
+ Configuration configuration = new HdfsConfiguration();
|
|
|
+ configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
|
|
|
+ configuration.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
|
|
|
+ configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
|
|
+ configuration.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
|
|
getMemlockLimit(Long.MAX_VALUE));
|
|
|
+ return configuration;
|
|
|
}
|
|
|
|
|
|
@Before
|
|
@@ -361,7 +362,8 @@ public class TestDirectoryScanner {
|
|
|
|
|
|
@Test(timeout = 300000)
|
|
|
public void testRetainBlockOnPersistentStorage() throws Exception {
|
|
|
- cluster = new MiniDFSCluster.Builder(CONF)
|
|
|
+ Configuration conf = getConfiguration();
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.storageTypes(
|
|
|
new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
|
|
|
.numDataNodes(1).build();
|
|
@@ -370,7 +372,7 @@ public class TestDirectoryScanner {
|
|
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
|
|
client = cluster.getFileSystem().getClient();
|
|
|
- scanner = new DirectoryScanner(fds, CONF);
|
|
|
+ scanner = new DirectoryScanner(fds, conf);
|
|
|
scanner.setRetainDiffs(true);
|
|
|
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
|
|
|
|
@@ -413,8 +415,9 @@ public class TestDirectoryScanner {
|
|
|
new WriterAppender(new SimpleLayout(), loggerStream);
|
|
|
rootLogger.addAppender(writerAppender);
|
|
|
|
|
|
+ Configuration conf = getConfiguration();
|
|
|
cluster = new MiniDFSCluster
|
|
|
- .Builder(CONF)
|
|
|
+ .Builder(conf)
|
|
|
.storageTypes(new StorageType[] {
|
|
|
StorageType.RAM_DISK, StorageType.DEFAULT })
|
|
|
.numDataNodes(1)
|
|
@@ -424,7 +427,7 @@ public class TestDirectoryScanner {
|
|
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
|
|
client = cluster.getFileSystem().getClient();
|
|
|
- scanner = new DirectoryScanner(fds, CONF);
|
|
|
+ scanner = new DirectoryScanner(fds, conf);
|
|
|
scanner.setRetainDiffs(true);
|
|
|
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
|
|
|
|
@@ -464,7 +467,8 @@ public class TestDirectoryScanner {
|
|
|
|
|
|
@Test(timeout = 300000)
|
|
|
public void testDeleteBlockOnTransientStorage() throws Exception {
|
|
|
- cluster = new MiniDFSCluster.Builder(CONF)
|
|
|
+ Configuration conf = getConfiguration();
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.storageTypes(
|
|
|
new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
|
|
|
.numDataNodes(1).build();
|
|
@@ -473,7 +477,7 @@ public class TestDirectoryScanner {
|
|
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
|
|
client = cluster.getFileSystem().getClient();
|
|
|
- scanner = new DirectoryScanner(fds, CONF);
|
|
|
+ scanner = new DirectoryScanner(fds, conf);
|
|
|
scanner.setRetainDiffs(true);
|
|
|
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
|
|
|
|
@@ -512,16 +516,17 @@ public class TestDirectoryScanner {
|
|
|
}
|
|
|
|
|
|
public void runTest(int parallelism) throws Exception {
|
|
|
- cluster = new MiniDFSCluster.Builder(CONF).build();
|
|
|
+ Configuration conf = getConfiguration();
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
try {
|
|
|
cluster.waitActive();
|
|
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
|
|
client = cluster.getFileSystem().getClient();
|
|
|
- CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
|
|
parallelism);
|
|
|
|
|
|
- scanner = new DirectoryScanner(fds, CONF);
|
|
|
+ scanner = new DirectoryScanner(fds, conf);
|
|
|
scanner.setRetainDiffs(true);
|
|
|
|
|
|
// Add files with 100 blocks
|
|
@@ -672,9 +677,9 @@ public class TestDirectoryScanner {
|
|
|
*
|
|
|
* @throws Exception thrown on unexpected failure
|
|
|
*/
|
|
|
- @Test(timeout = 600000)
|
|
|
+ @Test
|
|
|
public void testThrottling() throws Exception {
|
|
|
- Configuration conf = new Configuration(CONF);
|
|
|
+ Configuration conf = new Configuration(getConfiguration());
|
|
|
|
|
|
// We need lots of blocks so the report compiler threads have enough to
|
|
|
// keep them busy while we watch them.
|
|
@@ -714,7 +719,7 @@ public class TestDirectoryScanner {
|
|
|
// Waiting should be about 9x running.
|
|
|
LOG.info("RATIO: " + ratio);
|
|
|
assertTrue("Throttle is too restrictive", ratio <= 10f);
|
|
|
- assertTrue("Throttle is too permissive", ratio >= 7f);
|
|
|
+ assertTrue("Throttle is too permissive" + ratio, ratio >= 7f);
|
|
|
|
|
|
// Test with a different limit
|
|
|
conf.setInt(
|
|
@@ -754,7 +759,7 @@ public class TestDirectoryScanner {
|
|
|
assertTrue("Throttle is too permissive", ratio >= 7f);
|
|
|
|
|
|
// Test with no limit
|
|
|
- scanner = new DirectoryScanner(fds, CONF);
|
|
|
+ scanner = new DirectoryScanner(fds, getConfiguration());
|
|
|
scanner.setRetainDiffs(true);
|
|
|
scan(blocks, 0, 0, 0, 0, 0);
|
|
|
scanner.shutdown();
|
|
@@ -1095,13 +1100,14 @@ public class TestDirectoryScanner {
|
|
|
*/
|
|
|
@Test(timeout = 60000)
|
|
|
public void testExceptionHandlingWhileDirectoryScan() throws Exception {
|
|
|
- cluster = new MiniDFSCluster.Builder(CONF).build();
|
|
|
+ Configuration conf = getConfiguration();
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
try {
|
|
|
cluster.waitActive();
|
|
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
|
|
client = cluster.getFileSystem().getClient();
|
|
|
- CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
|
|
|
|
|
|
// Add files with 2 blocks
|
|
|
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
|
|
@@ -1121,7 +1127,7 @@ public class TestDirectoryScanner {
|
|
|
FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
|
|
|
Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
|
|
|
|
|
|
- scanner = new DirectoryScanner(spyFds, CONF);
|
|
|
+ scanner = new DirectoryScanner(spyFds, conf);
|
|
|
scanner.setRetainDiffs(true);
|
|
|
scanner.reconcile();
|
|
|
} finally {
|
|
@@ -1135,7 +1141,7 @@ public class TestDirectoryScanner {
|
|
|
|
|
|
@Test
|
|
|
public void testDirectoryScannerInFederatedCluster() throws Exception {
|
|
|
- HdfsConfiguration conf = new HdfsConfiguration(CONF);
|
|
|
+ HdfsConfiguration conf = new HdfsConfiguration(getConfiguration());
|
|
|
// Create Federated cluster with two nameservices and one DN
|
|
|
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
|