|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
|
|
|
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
|
|
@@ -38,6 +39,7 @@ import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import java.util.function.Supplier;
|
|
@@ -96,9 +98,19 @@ public class TestBlockScanner {
|
|
|
TestContext(Configuration conf, int numNameServices) throws Exception {
|
|
|
this.numNameServices = numNameServices;
|
|
|
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
|
|
|
+ long volumeScannerTimeOutFromConf =
|
|
|
+ conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1);
|
|
|
+ long expectedVScannerTimeOut =
|
|
|
+ volumeScannerTimeOutFromConf == -1
|
|
|
+ ? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC
|
|
|
+ : volumeScannerTimeOutFromConf;
|
|
|
MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
|
|
|
numDataNodes(1).
|
|
|
storagesPerDatanode(1);
|
|
|
+ // verify that the builder was initialized to get the default
|
|
|
+ // configuration designated for Junit tests.
|
|
|
+ assertEquals(expectedVScannerTimeOut,
|
|
|
+ conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1));
|
|
|
if (numNameServices > 1) {
|
|
|
bld.nnTopology(MiniDFSNNTopology.
|
|
|
simpleFederatedTopology(numNameServices));
|
|
@@ -1012,4 +1024,134 @@ public class TestBlockScanner {
|
|
|
0, info.blocksScanned);
|
|
|
ctx.close();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a DN does not wait for the VolumeScanners to finish before shutting
|
|
|
+ * down.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testFastDatanodeShutdown() throws Exception {
|
|
|
+ // set the joinTimeOut to a value smaller than the completion time of the
|
|
|
+ // VolumeScanner.
|
|
|
+ testDatanodeShutDown(50L, 1000L, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a DN waits for the VolumeScanners to finish before shutting down.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testSlowDatanodeShutdown() throws Exception {
|
|
|
+ // Set the joinTimeOut to a value larger than the completion time of the
|
|
|
+ // volume scanner
|
|
|
+ testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L,
|
|
|
+ false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testDatanodeShutDown(final long joinTimeOutMS,
|
|
|
+ final long delayMS, boolean isFastShutdown) throws Exception {
|
|
|
+ VolumeScannerCBInjector prevVolumeScannerCBInject =
|
|
|
+ VolumeScannerCBInjector.get();
|
|
|
+ try {
|
|
|
+ DelayVolumeScannerResponseToInterrupt injectDelay =
|
|
|
+ new DelayVolumeScannerResponseToInterrupt(delayMS);
|
|
|
+ VolumeScannerCBInjector.set(injectDelay);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
|
|
|
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
|
|
|
+ TestScanResultHandler.class.getName());
|
|
|
+ conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
|
|
|
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
|
|
|
+ joinTimeOutMS);
|
|
|
+ final TestContext ctx = new TestContext(conf, 1);
|
|
|
+ final int numExpectedBlocks = 10;
|
|
|
+ ctx.createFiles(0, numExpectedBlocks, 1);
|
|
|
+ final TestScanResultHandler.Info info =
|
|
|
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
|
|
|
+ synchronized (info) {
|
|
|
+ info.sem = new Semaphore(5);
|
|
|
+ info.shouldRun = true;
|
|
|
+ info.notify();
|
|
|
+ }
|
|
|
+ // make sure that the scanners are doing progress
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ synchronized (info) {
|
|
|
+ return info.blocksScanned >= 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 3, 30000);
|
|
|
+ // mark the time where the
|
|
|
+ long startShutdownTime = Time.monotonicNow();
|
|
|
+ ctx.datanode.shutdown();
|
|
|
+ long endShutdownTime = Time.monotonicNow();
|
|
|
+ long totalTimeShutdown = endShutdownTime - startShutdownTime;
|
|
|
+
|
|
|
+ if (isFastShutdown) {
|
|
|
+ assertTrue("total shutdown time of DN must be smaller than "
|
|
|
+ + "VolumeScanner Response time: " + totalTimeShutdown,
|
|
|
+ totalTimeShutdown < delayMS
|
|
|
+ && totalTimeShutdown >= joinTimeOutMS);
|
|
|
+ // wait for scanners to terminate before we move to the next test.
|
|
|
+ injectDelay.waitForScanners();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ assertTrue("total shutdown time of DN must be larger than " +
|
|
|
+ "VolumeScanner Response time: " + totalTimeShutdown,
|
|
|
+ totalTimeShutdown >= delayMS
|
|
|
+ && totalTimeShutdown < joinTimeOutMS);
|
|
|
+ } finally {
|
|
|
+ // restore the VolumeScanner callback injector.
|
|
|
+ VolumeScannerCBInjector.set(prevVolumeScannerCBInject);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class DelayVolumeScannerResponseToInterrupt extends
|
|
|
+ VolumeScannerCBInjector {
|
|
|
+ final private long delayAmountNS;
|
|
|
+ final private Set<VolumeScanner> scannersToShutDown;
|
|
|
+
|
|
|
+ DelayVolumeScannerResponseToInterrupt(long delayMS) {
|
|
|
+ delayAmountNS =
|
|
|
+ TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS);
|
|
|
+ scannersToShutDown = ConcurrentHashMap.newKeySet();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) {
|
|
|
+ long remainingTimeNS = delayAmountNS;
|
|
|
+ // busy delay without sleep().
|
|
|
+ long startTime = Time.monotonicNowNanos();
|
|
|
+ long endTime = startTime + remainingTimeNS;
|
|
|
+ long currTime, waitTime = 0;
|
|
|
+ while ((currTime = Time.monotonicNowNanos()) < endTime) {
|
|
|
+ // empty loop. No need to sleep because the thread could be in an
|
|
|
+ // interrupt mode.
|
|
|
+ waitTime = currTime - startTime;
|
|
|
+ }
|
|
|
+ LOG.info("VolumeScanner {} finished delayed Task after {}",
|
|
|
+ volumeScanner.toString(),
|
|
|
+ TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void shutdownCallBack(VolumeScanner volumeScanner) {
|
|
|
+ scannersToShutDown.add(volumeScanner);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void terminationCallBack(VolumeScanner volumeScanner) {
|
|
|
+ scannersToShutDown.remove(volumeScanner);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void waitForScanners() throws TimeoutException,
|
|
|
+ InterruptedException {
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ () -> scannersToShutDown.isEmpty(), 10, 120000);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|