|
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
|
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -65,6 +66,7 @@ import java.io.Writer;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
@@ -89,8 +91,11 @@ import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class TestFsDatasetImpl {
|
|
|
+ Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class);
|
|
|
private static final String BASE_DIR =
|
|
|
new FileSystemTestHelper().getTestRootDir();
|
|
|
private static final int NUM_INIT_VOLUMES = 2;
|
|
@@ -119,6 +124,7 @@ public class TestFsDatasetImpl {
|
|
|
List<Storage.StorageDirectory> dirs =
|
|
|
new ArrayList<Storage.StorageDirectory>();
|
|
|
List<String> dirStrings = new ArrayList<String>();
|
|
|
+ FileUtils.deleteDirectory(new File(BASE_DIR));
|
|
|
for (int i = 0; i < numDirs; i++) {
|
|
|
File loc = new File(BASE_DIR + "/data" + i);
|
|
|
dirStrings.add(new Path(loc.toString()).toUri().toString());
|
|
@@ -296,6 +302,7 @@ public class TestFsDatasetImpl {
|
|
|
FsVolumeImpl volume = mock(FsVolumeImpl.class);
|
|
|
oldVolumes.add(volume);
|
|
|
when(volume.getBasePath()).thenReturn("data" + i);
|
|
|
+ when(volume.checkClosed()).thenReturn(true);
|
|
|
FsVolumeReference ref = mock(FsVolumeReference.class);
|
|
|
when(ref.getVolume()).thenReturn(volume);
|
|
|
volumeList.addVolume(ref);
|
|
@@ -541,4 +548,52 @@ public class TestFsDatasetImpl {
|
|
|
|
|
|
return dfsUsed;
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testRemoveVolumeBeingWritten() throws Exception {
|
|
|
+ // Will write and remove on dn0.
|
|
|
+ final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
|
|
|
+ final CountDownLatch startFinalizeLatch = new CountDownLatch(1);
|
|
|
+ final CountDownLatch brReceivedLatch = new CountDownLatch(1);
|
|
|
+ class BlockReportThread extends Thread {
|
|
|
+ public void run() {
|
|
|
+ LOG.info("Getting block report");
|
|
|
+ dataset.getBlockReports(eb.getBlockPoolId());
|
|
|
+ LOG.info("Successfully received block report");
|
|
|
+ brReceivedLatch.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ final BlockReportThread brt = new BlockReportThread();
|
|
|
+ class ResponderThread extends Thread {
|
|
|
+ public void run() {
|
|
|
+ try (ReplicaHandler replica = dataset
|
|
|
+ .createRbw(StorageType.DEFAULT, eb, false)) {
|
|
|
+ LOG.info("createRbw finished");
|
|
|
+ startFinalizeLatch.countDown();
|
|
|
+
|
|
|
+ // Slow down while we're holding the reference to the volume
|
|
|
+ Thread.sleep(1000);
|
|
|
+ dataset.finalizeBlock(eb);
|
|
|
+ LOG.info("finalizeBlock finished");
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Exception caught. This should not affect the test", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ResponderThread res = new ResponderThread();
|
|
|
+ res.start();
|
|
|
+ startFinalizeLatch.await();
|
|
|
+
|
|
|
+ Set<File> volumesToRemove = new HashSet<>();
|
|
|
+ volumesToRemove.add(
|
|
|
+ StorageLocation.parse(dataset.getVolume(eb).getBasePath()).getFile());
|
|
|
+ LOG.info("Removing volume " + volumesToRemove);
|
|
|
+ // Verify block report can be received during this
|
|
|
+ brt.start();
|
|
|
+ dataset.removeVolumes(volumesToRemove, true);
|
|
|
+ LOG.info("Volumes removed");
|
|
|
+ brReceivedLatch.await();
|
|
|
+ }
|
|
|
}
|