|
@@ -96,6 +96,8 @@ import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
@@ -253,16 +255,24 @@ public class TestFsDatasetImpl {
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
|
- public void testRemoveVolumes() throws IOException {
|
|
|
+ public void testRemoveOneVolume() throws IOException {
|
|
|
// Feed FsDataset with block metadata.
|
|
|
- final int NUM_BLOCKS = 100;
|
|
|
- for (int i = 0; i < NUM_BLOCKS; i++) {
|
|
|
- String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
|
|
|
+ final int numBlocks = 100;
|
|
|
+ for (int i = 0; i < numBlocks; i++) {
|
|
|
+ String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length];
|
|
|
ExtendedBlock eb = new ExtendedBlock(bpid, i);
|
|
|
- try (ReplicaHandler replica =
|
|
|
- dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
|
|
|
+ ReplicaHandler replica = null;
|
|
|
+ try {
|
|
|
+ replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
|
|
|
+ false);
|
|
|
+ } finally {
|
|
|
+ if (replica != null) {
|
|
|
+ replica.close();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // Remove one volume
|
|
|
final String[] dataDirs =
|
|
|
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
|
|
final String volumePathToRemove = dataDirs[0];
|
|
@@ -285,6 +295,11 @@ public class TestFsDatasetImpl {
|
|
|
assertEquals("The volume has been removed from the storageMap.",
|
|
|
expectedNumVolumes, dataset.storageMap.size());
|
|
|
|
|
|
+ // DataNode.notifyNamenodeDeletedBlock() should be called 50 times
|
|
|
+ // as we deleted one volume that has 50 blocks
|
|
|
+ verify(datanode, times(50))
|
|
|
+ .notifyNamenodeDeletedBlock(any(), any());
|
|
|
+
|
|
|
try {
|
|
|
dataset.asyncDiskService.execute(volumeToRemove,
|
|
|
new Runnable() {
|
|
@@ -302,10 +317,81 @@ public class TestFsDatasetImpl {
|
|
|
totalNumReplicas += dataset.volumeMap.size(bpid);
|
|
|
}
|
|
|
assertEquals("The replica infos on this volume has been removed from the "
|
|
|
- + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
|
|
|
+ + "volumeMap.", numBlocks / NUM_INIT_VOLUMES,
|
|
|
totalNumReplicas);
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testRemoveTwoVolumes() throws IOException {
|
|
|
+ // Feed FsDataset with block metadata.
|
|
|
+ final int numBlocks = 100;
|
|
|
+ for (int i = 0; i < numBlocks; i++) {
|
|
|
+ String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length];
|
|
|
+ ExtendedBlock eb = new ExtendedBlock(bpid, i);
|
|
|
+ ReplicaHandler replica = null;
|
|
|
+ try {
|
|
|
+ replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
|
|
|
+ false);
|
|
|
+ } finally {
|
|
|
+ if (replica != null) {
|
|
|
+ replica.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove two volumes
|
|
|
+ final String[] dataDirs =
|
|
|
+ conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
|
|
+ Set<StorageLocation> volumesToRemove = new HashSet<>();
|
|
|
+ volumesToRemove.add(StorageLocation.parse(dataDirs[0]));
|
|
|
+ volumesToRemove.add(StorageLocation.parse(dataDirs[1]));
|
|
|
+
|
|
|
+ FsVolumeReferences volReferences = dataset.getFsVolumeReferences();
|
|
|
+ Set<FsVolumeImpl> volumes = new HashSet<>();
|
|
|
+ for (FsVolumeSpi vol: volReferences) {
|
|
|
+ for (StorageLocation volume : volumesToRemove) {
|
|
|
+ if (vol.getStorageLocation().equals(volume)) {
|
|
|
+ volumes.add((FsVolumeImpl) vol);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertEquals(2, volumes.size());
|
|
|
+ volReferences.close();
|
|
|
+
|
|
|
+ dataset.removeVolumes(volumesToRemove, true);
|
|
|
+ int expectedNumVolumes = dataDirs.length - 2;
|
|
|
+ assertEquals("The volume has been removed from the volumeList.",
|
|
|
+ expectedNumVolumes, getNumVolumes());
|
|
|
+ assertEquals("The volume has been removed from the storageMap.",
|
|
|
+ expectedNumVolumes, dataset.storageMap.size());
|
|
|
+
|
|
|
+ // DataNode.notifyNamenodeDeletedBlock() should be called 100 times
|
|
|
+ // as we deleted 2 volumes that have 100 blocks totally
|
|
|
+ verify(datanode, times(100))
|
|
|
+ .notifyNamenodeDeletedBlock(any(), any());
|
|
|
+
|
|
|
+ for (FsVolumeImpl volume : volumes) {
|
|
|
+ try {
|
|
|
+ dataset.asyncDiskService.execute(volume,
|
|
|
+ new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {}
|
|
|
+ });
|
|
|
+ fail("Expect RuntimeException: the volume has been removed from the "
|
|
|
+ + "AsyncDiskService.");
|
|
|
+ } catch (RuntimeException e) {
|
|
|
+ GenericTestUtils.assertExceptionContains("Cannot find volume", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int totalNumReplicas = 0;
|
|
|
+ for (String bpid : dataset.volumeMap.getBlockPoolList()) {
|
|
|
+ totalNumReplicas += dataset.volumeMap.size(bpid);
|
|
|
+ }
|
|
|
+ assertEquals("The replica infos on this volume has been removed from the "
|
|
|
+ + "volumeMap.", 0, totalNumReplicas);
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout = 5000)
|
|
|
public void testRemoveNewlyAddedVolume() throws IOException {
|
|
|
final int numExistingVolumes = getNumVolumes();
|