|
@@ -36,8 +36,12 @@ import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.hdfs.AppendTestUtil;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
|
|
@@ -870,4 +874,100 @@ public class TestBlockScanner {
|
|
|
}
|
|
|
info.sem.release(1);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test concurrent append and scan.
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testAppendWhileScanning() throws Exception {
|
|
|
+ GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // throttle the block scanner: 1MB per second
|
|
|
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576);
|
|
|
+ // Set a really long scan period.
|
|
|
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
|
|
|
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
|
|
|
+ TestScanResultHandler.class.getName());
|
|
|
+ conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
|
|
|
+ final int numExpectedFiles = 1;
|
|
|
+ final int numExpectedBlocks = 1;
|
|
|
+ final int numNameServices = 1;
|
|
|
+ // the initial file length can not be too small.
|
|
|
+ // Otherwise checksum file stream buffer will be pre-filled and
|
|
|
+ // BlockSender will not see the updated checksum.
|
|
|
+ final int initialFileLength = 2*1024*1024+100;
|
|
|
+ final TestContext ctx = new TestContext(conf, numNameServices);
|
|
|
+ // create one file, with one block.
|
|
|
+ ctx.createFiles(0, numExpectedFiles, initialFileLength);
|
|
|
+ final TestScanResultHandler.Info info =
|
|
|
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
|
|
|
+ String storageID = ctx.volumes.get(0).getStorageID();
|
|
|
+ synchronized (info) {
|
|
|
+ info.sem = new Semaphore(numExpectedBlocks*2);
|
|
|
+ info.shouldRun = true;
|
|
|
+ info.notify();
|
|
|
+ }
|
|
|
+ // VolumeScanner scans the first block when DN starts.
|
|
|
+ // Due to throttler, this should take approximately 2 seconds.
|
|
|
+ waitForRescan(info, numExpectedBlocks);
|
|
|
+
|
|
|
+ // update throttler to schedule rescan immediately.
|
|
|
+ // this number must be larger than initial file length, otherwise
|
|
|
+ // throttler prevents immediate rescan.
|
|
|
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
|
|
|
+ initialFileLength+32*1024);
|
|
|
+ BlockScanner.Conf newConf = new BlockScanner.Conf(conf);
|
|
|
+ ctx.datanode.getBlockScanner().setConf(newConf);
|
|
|
+ // schedule the first block for scanning
|
|
|
+ ExtendedBlock first = ctx.getFileBlock(0, 0);
|
|
|
+ ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
|
|
|
+
|
|
|
+ // append the file before VolumeScanner completes scanning the block,
|
|
|
+ // which takes approximately 2 seconds to complete.
|
|
|
+ FileSystem fs = ctx.cluster.getFileSystem();
|
|
|
+ FSDataOutputStream os = fs.append(ctx.getPath(0));
|
|
|
+ long seed = -1;
|
|
|
+ int size = 200;
|
|
|
+ final byte[] bytes = AppendTestUtil.randomBytes(seed, size);
|
|
|
+ os.write(bytes);
|
|
|
+ os.hflush();
|
|
|
+ os.close();
|
|
|
+ fs.close();
|
|
|
+
|
|
|
+ // verify that volume scanner does not find bad blocks after append.
|
|
|
+ waitForRescan(info, numExpectedBlocks);
|
|
|
+
|
|
|
+ GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitForRescan(final TestScanResultHandler.Info info,
|
|
|
+ final int numExpectedBlocks)
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
+ LOG.info("Waiting for the first 1 blocks to be scanned.");
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ synchronized (info) {
|
|
|
+ if (info.blocksScanned >= numExpectedBlocks) {
|
|
|
+ LOG.info("info = {}. blockScanned has now reached 1.", info);
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ LOG.info("info = {}. Waiting for blockScanned to reach 1.", info);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 1000, 30000);
|
|
|
+
|
|
|
+ synchronized (info) {
|
|
|
+ assertEquals("Expected 1 good block.",
|
|
|
+ numExpectedBlocks, info.goodBlocks.size());
|
|
|
+ info.goodBlocks.clear();
|
|
|
+ assertEquals("Expected 1 blocksScanned",
|
|
|
+ numExpectedBlocks, info.blocksScanned);
|
|
|
+ assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
|
|
|
+ info.blocksScanned = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|