瀏覽代碼

HDFS-16774.Improve async delete replica on datanode (#4903)

HDFS-16774. Improve async delete replica on datanode to reduce the probability of ReplicationNotFoundException

Co-authored-by: Haiyang Hu <haiyang.hu@shopee.com>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
huhaiyang 2 年之前
父節點
當前提交
d14b88c698

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

@@ -157,4 +157,9 @@ public class DataNodeFaultInjector {
   public void badDecoding(ByteBuffer[] outputs) {}
 
   public void markSlow(String dnAddr, int[] replies) {}
+
+  /**
+   * Just delay delete replica a while.
+   */
+  public void delayDeleteReplica() {}
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -334,6 +335,13 @@ class FsDatasetAsyncDiskService {
     @Override
     public void run() {
       try {
+        // For testing, simulate the case asynchronously deletion of the
+        // replica task stacked pending.
+        DataNodeFaultInjector.get().delayDeleteReplica();
+        if (!fsdatasetImpl.removeReplicaFromMem(block, volume)) {
+          return;
+        }
+
         final long blockLength = replicaToDelete.getBlockDataLength();
         final long metaLength = replicaToDelete.getMetadataLength();
         boolean result;

+ 94 - 36
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -2305,10 +2305,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
-      final ReplicaInfo removing;
+      final ReplicaInfo info;
       final FsVolumeImpl v;
-      try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
-        final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
+      try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
+        info = volumeMap.get(bpid, invalidBlks[i]);
         if (info == null) {
           ReplicaInfo infoByBlockId =
               volumeMap.get(bpid, invalidBlks[i].getBlockId());
@@ -2342,48 +2342,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           LOG.warn("Parent directory check failed; replica {} is " +
               "not backed by a local file", info);
         }
-        removing = volumeMap.remove(bpid, invalidBlks[i]);
-        addDeletingBlock(bpid, removing.getBlockId());
-        LOG.debug("Block file {} is to be deleted", removing.getBlockURI());
-        datanode.getMetrics().incrBlocksRemoved(1);
-        if (removing instanceof ReplicaInPipeline) {
-          ((ReplicaInPipeline) removing).releaseAllBytesReserved();
-        }
-      }
-
-      if (v.isTransientStorage()) {
-        RamDiskReplica replicaInfo =
-          ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
-        if (replicaInfo != null) {
-          if (!replicaInfo.getIsPersisted()) {
-            datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
-          }
-          ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
-            replicaInfo.getBlockId(), true);
-        }
       }
 
-      // If a DFSClient has the replica in its cache of short-circuit file
-      // descriptors (and the client is using ShortCircuitShm), invalidate it.
-      datanode.getShortCircuitRegistry().processBlockInvalidation(
-                new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
-
-      // If the block is cached, start uncaching it.
-      cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
-
       try {
         if (async) {
           // Delete the block asynchronously to make sure we can do it fast
           // enough.
           // It's ok to unlink the block file before the uncache operation
           // finishes.
-          asyncDiskService.deleteAsync(v.obtainReference(), removing,
+          asyncDiskService.deleteAsync(v.obtainReference(), info,
               new ExtendedBlock(bpid, invalidBlks[i]),
-              dataStorage.getTrashDirectoryForReplica(bpid, removing));
+              dataStorage.getTrashDirectoryForReplica(bpid, info));
         } else {
-          asyncDiskService.deleteSync(v.obtainReference(), removing,
+          asyncDiskService.deleteSync(v.obtainReference(), info,
               new ExtendedBlock(bpid, invalidBlks[i]),
-              dataStorage.getTrashDirectoryForReplica(bpid, removing));
+              dataStorage.getTrashDirectoryForReplica(bpid, info));
         }
       } catch (ClosedChannelException e) {
         LOG.warn("Volume {} is closed, ignore the deletion task for " +
@@ -2422,6 +2395,91 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         block.getStorageUuid());
   }
 
+  /**
+   * Remove Replica from ReplicaMap.
+   *
+   * @param block
+   * @param volume
+   * @return
+   */
+  boolean removeReplicaFromMem(final ExtendedBlock block, final FsVolumeImpl volume) {
+    final String bpid = block.getBlockPoolId();
+    final Block localBlock = block.getLocalBlock();
+    final long blockId = localBlock.getBlockId();
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+      final ReplicaInfo info = volumeMap.get(bpid, localBlock);
+      if (info == null) {
+        ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId);
+        if (infoByBlockId == null) {
+          // It is okay if the block is not found -- it
+          // may be deleted earlier.
+          LOG.info("Failed to delete replica {}: ReplicaInfo not found " +
+              "in removeReplicaFromMem.", localBlock);
+        } else {
+          LOG.error("Failed to delete replica {}: GenerationStamp not matched, " +
+              "existing replica is {} in removeReplicaFromMem.",
+              localBlock, Block.toString(infoByBlockId));
+        }
+        return false;
+      }
+
+      FsVolumeImpl v = (FsVolumeImpl) info.getVolume();
+      if (v == null) {
+        LOG.error("Failed to delete replica {}. No volume for this replica {} " +
+            "in removeReplicaFromMem.", localBlock, info);
+        return false;
+      }
+
+      try {
+        File blockFile = new File(info.getBlockURI());
+        if (blockFile.getParentFile() == null) {
+          LOG.error("Failed to delete replica {}. Parent not found for block file: {} " +
+              "in removeReplicaFromMem.", localBlock, blockFile);
+          return false;
+        }
+      } catch(IllegalArgumentException e) {
+        LOG.warn("Parent directory check failed; replica {} is " +
+            "not backed by a local file in removeReplicaFromMem.", info);
+      }
+
+      if (!volume.getStorageID().equals(v.getStorageID())) {
+        LOG.error("Failed to delete replica {}. Appear different volumes, oldVolume: {} " +
+            "and newVolume: {} for this replica in removeReplicaFromMem.",
+            localBlock, volume, v);
+        return false;
+      }
+
+      ReplicaInfo removing = volumeMap.remove(bpid, localBlock);
+      addDeletingBlock(bpid, removing.getBlockId());
+      LOG.debug("Block file {} is to be deleted", removing.getBlockURI());
+      datanode.getMetrics().incrBlocksRemoved(1);
+      if (removing instanceof ReplicaInPipeline) {
+        ((ReplicaInPipeline) removing).releaseAllBytesReserved();
+      }
+    }
+
+    if (volume.isTransientStorage()) {
+      RamDiskReplicaTracker.RamDiskReplica replicaInfo = ramDiskReplicaTracker.
+          getReplica(bpid, blockId);
+      if (replicaInfo != null) {
+        if (!replicaInfo.getIsPersisted()) {
+          datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
+        }
+        ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
+            replicaInfo.getBlockId(), true);
+      }
+    }
+
+    // If a DFSClient has the replica in its cache of short-circuit file
+    // descriptors (and the client is using ShortCircuitShm), invalidate it.
+    datanode.getShortCircuitRegistry().processBlockInvalidation(
+        ExtendedBlockId.fromExtendedBlock(block));
+
+    // If the block is cached, start uncaching it.
+    cacheManager.uncacheBlock(bpid, blockId);
+    return true;
+  }
+
   /**
    * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
    */
@@ -3628,8 +3686,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
   }
-  
-  private void addDeletingBlock(String bpid, Long blockId) {
+
+  protected void addDeletingBlock(String bpid, Long blockId) {
     synchronized(deletingBlock) {
       Set<Long> s = deletingBlock.get(bpid);
       if (s == null) {

+ 92 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -25,10 +25,13 @@ import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
 import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
@@ -1830,4 +1833,93 @@ public class TestFsDatasetImpl {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  /**
+   * The block should be in the replicaMap if the async deletion task is pending.
+   */
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration config = new HdfsConfiguration();
+    // Bump up replication interval.
+    config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replica process.
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore.
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      assertTrue(uuids.contains(dn.getDatanodeUuid()));
+
+      // Do verification that the first replication shouldn't be deleted from the memory first.
+      // Because the namenode still contains this replica, so client will try to read it.
+      // If this replica is deleted from memory, the client would got an ReplicaNotFoundException.
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+
+      // Make it resume the removeReplicaFromMem method.
+      semaphore.release(1);
+
+      // Waiting for the async deletion task finish.
+      GenericTestUtils.waitFor(() ->
+          ds.asyncDiskService.countPendingDeletions() == 0, 100, 1000);
+
+      // Sleep for two heartbeat times.
+      Thread.sleep(config.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+          DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
+          TimeUnit.SECONDS, TimeUnit.MILLISECONDS) * 2);
+
+      // Test get blocks and datanodes again.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(2, loc.length);
+      uuids = Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      // The namenode does not contain this replica.
+      assertFalse(uuids.contains(dn.getDatanodeUuid()));
+
+      // This replica has deleted from datanode memory.
+      assertNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+    } finally {
+      cluster.shutdown();
+      DataNodeFaultInjector.set(oldInjector);
+    }
+  }
 }