|
@@ -492,47 +492,98 @@ public class TestFsDatasetImpl {
|
|
|
// Will write and remove on dn0.
|
|
|
final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
|
|
|
final CountDownLatch startFinalizeLatch = new CountDownLatch(1);
|
|
|
- final CountDownLatch brReceivedLatch = new CountDownLatch(1);
|
|
|
+ final CountDownLatch blockReportReceivedLatch = new CountDownLatch(1);
|
|
|
+ final CountDownLatch volRemoveStartedLatch = new CountDownLatch(1);
|
|
|
+ final CountDownLatch volRemoveCompletedLatch = new CountDownLatch(1);
|
|
|
class BlockReportThread extends Thread {
|
|
|
public void run() {
|
|
|
+ // Lets wait for the volume remove process to start
|
|
|
+ try {
|
|
|
+ volRemoveStartedLatch.await();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Unexpected exception when waiting for vol removal:", e);
|
|
|
+ }
|
|
|
LOG.info("Getting block report");
|
|
|
dataset.getBlockReports(eb.getBlockPoolId());
|
|
|
LOG.info("Successfully received block report");
|
|
|
- brReceivedLatch.countDown();
|
|
|
+ blockReportReceivedLatch.countDown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- final BlockReportThread brt = new BlockReportThread();
|
|
|
class ResponderThread extends Thread {
|
|
|
public void run() {
|
|
|
try (ReplicaHandler replica = dataset
|
|
|
- .createRbw(StorageType.DEFAULT, eb, false)) {
|
|
|
- LOG.info("createRbw finished");
|
|
|
+ .createRbw(StorageType.DEFAULT, eb, false)) {
|
|
|
+ LOG.info("CreateRbw finished");
|
|
|
startFinalizeLatch.countDown();
|
|
|
|
|
|
- // Slow down while we're holding the reference to the volume
|
|
|
- Thread.sleep(1000);
|
|
|
+ // Slow down while we're holding the reference to the volume.
|
|
|
+ // As we finalize a block, the volume is removed in parallel.
|
|
|
+ // Ignore any interrupts coming out of volume shutdown.
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.info("Ignoring ", ie);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Lets wait for the other thread finish getting block report
|
|
|
+ blockReportReceivedLatch.await();
|
|
|
+
|
|
|
dataset.finalizeBlock(eb);
|
|
|
- LOG.info("finalizeBlock finished");
|
|
|
+ LOG.info("FinalizeBlock finished");
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Exception caught. This should not affect the test", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- ResponderThread res = new ResponderThread();
|
|
|
- res.start();
|
|
|
+ class VolRemoveThread extends Thread {
|
|
|
+ public void run() {
|
|
|
+ Set<File> volumesToRemove = new HashSet<>();
|
|
|
+ try {
|
|
|
+ volumesToRemove.add(StorageLocation.parse(
|
|
|
+ dataset.getVolume(eb).getBasePath()).getFile());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Problem preparing volumes to remove: " + e);
|
|
|
+ Assert.fail("Exception in remove volume thread, check log for " +
|
|
|
+ "details.");
|
|
|
+ }
|
|
|
+ LOG.info("Removing volume " + volumesToRemove);
|
|
|
+ dataset.removeVolumes(volumesToRemove, true);
|
|
|
+ volRemoveCompletedLatch.countDown();
|
|
|
+ LOG.info("Removed volume " + volumesToRemove);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start the volume write operation
|
|
|
+ ResponderThread responderThread = new ResponderThread();
|
|
|
+ responderThread.start();
|
|
|
startFinalizeLatch.await();
|
|
|
|
|
|
- Set<File> volumesToRemove = new HashSet<>();
|
|
|
- volumesToRemove.add(
|
|
|
- StorageLocation.parse(dataset.getVolume(eb).getBasePath()).getFile());
|
|
|
- LOG.info("Removing volume " + volumesToRemove);
|
|
|
- // Verify block report can be received during this
|
|
|
- brt.start();
|
|
|
- dataset.removeVolumes(volumesToRemove, true);
|
|
|
- LOG.info("Volumes removed");
|
|
|
- brReceivedLatch.await();
|
|
|
+ // Start the block report get operation
|
|
|
+ final BlockReportThread blockReportThread = new BlockReportThread();
|
|
|
+ blockReportThread.start();
|
|
|
+
|
|
|
+ // Start the volume remove operation
|
|
|
+ VolRemoveThread volRemoveThread = new VolRemoveThread();
|
|
|
+ volRemoveThread.start();
|
|
|
+
|
|
|
+ // Let volume write and remove operation be
|
|
|
+ // blocked for few seconds
|
|
|
+ Thread.sleep(2000);
|
|
|
+
|
|
|
+ // Signal block report receiver and volume writer
|
|
|
+ // thread to complete their operations so that vol
|
|
|
+ // remove can proceed
|
|
|
+ volRemoveStartedLatch.countDown();
|
|
|
+
|
|
|
+ // Verify if block report can be received
|
|
|
+ // when volume is in use and also being removed
|
|
|
+ blockReportReceivedLatch.await();
|
|
|
+
|
|
|
+ // Verify if volume can be removed safely when there
|
|
|
+ // are read/write operation in-progress
|
|
|
+ volRemoveCompletedLatch.await();
|
|
|
}
|
|
|
|
|
|
/**
|