|
@@ -385,51 +385,70 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
/**
|
|
|
* Removes a collection of volumes from FsDataset.
|
|
|
* @param volumes the root directories of the volumes.
|
|
|
- *
|
|
|
- * DataNode should call this function before calling
|
|
|
- * {@link DataStorage#removeVolumes(java.util.Collection)}.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
|
|
|
+ public void removeVolumes(Collection<StorageLocation> volumes) {
|
|
|
Set<String> volumeSet = new HashSet<String>();
|
|
|
for (StorageLocation sl : volumes) {
|
|
|
volumeSet.add(sl.getFile().getAbsolutePath());
|
|
|
}
|
|
|
- for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
|
|
- Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
|
|
- String volume = sd.getRoot().getAbsolutePath();
|
|
|
- if (volumeSet.contains(volume)) {
|
|
|
- LOG.info("Removing " + volume + " from FsDataset.");
|
|
|
-
|
|
|
- // Disable the volume from the service.
|
|
|
- asyncDiskService.removeVolume(sd.getCurrentDir());
|
|
|
- this.volumes.removeVolume(sd.getRoot());
|
|
|
-
|
|
|
- // Removed all replica information for the blocks on the volume. Unlike
|
|
|
- // updating the volumeMap in addVolume(), this operation does not scan
|
|
|
- // disks.
|
|
|
- for (String bpid : volumeMap.getBlockPoolList()) {
|
|
|
- List<Block> blocks = new ArrayList<Block>();
|
|
|
- for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
|
|
- it.hasNext(); ) {
|
|
|
- ReplicaInfo block = it.next();
|
|
|
- String absBasePath =
|
|
|
+
|
|
|
+ Map<String, List<ReplicaInfo>> blkToInvalidate =
|
|
|
+ new HashMap<String, List<ReplicaInfo>>();
|
|
|
+ List<String> storageToRemove = new ArrayList<String>();
|
|
|
+ synchronized (this) {
|
|
|
+ for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
|
|
+ Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
|
|
+ String volume = sd.getRoot().getAbsolutePath();
|
|
|
+ if (volumeSet.contains(volume)) {
|
|
|
+ LOG.info("Removing " + volume + " from FsDataset.");
|
|
|
+
|
|
|
+ // Disable the volume from the service.
|
|
|
+ asyncDiskService.removeVolume(sd.getCurrentDir());
|
|
|
+ this.volumes.removeVolume(sd.getRoot());
|
|
|
+
|
|
|
+ // Removed all replica information for the blocks on the volume.
|
|
|
+ // Unlike updating the volumeMap in addVolume(), this operation does
|
|
|
+ // not scan disks.
|
|
|
+ for (String bpid : volumeMap.getBlockPoolList()) {
|
|
|
+ List<ReplicaInfo> blocks = new ArrayList<ReplicaInfo>();
|
|
|
+ for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
|
|
+ it.hasNext(); ) {
|
|
|
+ ReplicaInfo block = it.next();
|
|
|
+ String absBasePath =
|
|
|
new File(block.getVolume().getBasePath()).getAbsolutePath();
|
|
|
- if (absBasePath.equals(volume)) {
|
|
|
- invalidate(bpid, block);
|
|
|
- blocks.add(block);
|
|
|
- it.remove();
|
|
|
+ if (absBasePath.equals(volume)) {
|
|
|
+ blocks.add(block);
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
}
|
|
|
+ blkToInvalidate.put(bpid, blocks);
|
|
|
+ // Delete blocks from the block scanner in batch.
|
|
|
+ datanode.getBlockScanner().deleteBlocks(bpid,
|
|
|
+ blocks.toArray(new Block[blocks.size()]));
|
|
|
}
|
|
|
- // Delete blocks from the block scanner in batch.
|
|
|
- datanode.getBlockScanner().deleteBlocks(bpid,
|
|
|
- blocks.toArray(new Block[blocks.size()]));
|
|
|
+
|
|
|
+ storageToRemove.add(sd.getStorageUuid());
|
|
|
}
|
|
|
+ }
|
|
|
+ setupAsyncLazyPersistThreads();
|
|
|
+ }
|
|
|
|
|
|
- storageMap.remove(sd.getStorageUuid());
|
|
|
+ // Call this outside the lock.
|
|
|
+ for (Map.Entry<String, List<ReplicaInfo>> entry :
|
|
|
+ blkToInvalidate.entrySet()) {
|
|
|
+ String bpid = entry.getKey();
|
|
|
+ List<ReplicaInfo> blocks = entry.getValue();
|
|
|
+ for (ReplicaInfo block : blocks) {
|
|
|
+ invalidate(bpid, block);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ for(String storageUuid : storageToRemove) {
|
|
|
+ storageMap.remove(storageUuid);
|
|
|
}
|
|
|
}
|
|
|
- setupAsyncLazyPersistThreads();
|
|
|
}
|
|
|
|
|
|
private StorageType getStorageTypeFromLocations(
|
|
@@ -1639,15 +1658,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
public void invalidate(String bpid, ReplicaInfo block) {
|
|
|
// If a DFSClient has the replica in its cache of short-circuit file
|
|
|
// descriptors (and the client is using ShortCircuitShm), invalidate it.
|
|
|
- // The short-circuit registry is null in the unit tests, because the
|
|
|
- // datanode is mock object.
|
|
|
- if (datanode.getShortCircuitRegistry() != null) {
|
|
|
- datanode.getShortCircuitRegistry().processBlockInvalidation(
|
|
|
- new ExtendedBlockId(block.getBlockId(), bpid));
|
|
|
+ datanode.getShortCircuitRegistry().processBlockInvalidation(
|
|
|
+ new ExtendedBlockId(block.getBlockId(), bpid));
|
|
|
|
|
|
- // If the block is cached, start uncaching it.
|
|
|
- cacheManager.uncacheBlock(bpid, block.getBlockId());
|
|
|
- }
|
|
|
+ // If the block is cached, start uncaching it.
|
|
|
+ cacheManager.uncacheBlock(bpid, block.getBlockId());
|
|
|
|
|
|
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
|
|
|
block.getStorageUuid());
|