Browse Source

HDFS-16043. Add markedDeleteBlockScrubberThread to delete blocks asynchronously (#3882). Contributed by Xiangyi Zhu.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Xiangyi Zhu 3 years ago
parent
commit
6da346a358
18 changed files with 242 additions and 70 deletions
  1. 104 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  2. 12 33
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  3. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
  4. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
  5. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java
  6. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
  7. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
  8. 17 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
  9. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
  11. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
  12. 27 20
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java
  13. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
  14. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
  15. 28 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
  16. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
  17. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
  18. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

+ 104 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import java.util.concurrent.atomic.AtomicLong;
 import javax.management.ObjectName;
@@ -190,6 +191,9 @@ public class BlockManager implements BlockStatsMXBean {
   private volatile long lowRedundancyBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
 
+  private final long deleteBlockLockTimeMs = 500;
+  private final long deleteBlockUnlockIntervalTimeMs = 100;
+
   /** flag indicating whether replication queues have been initialized */
   private boolean initializedReplQueues;
 
@@ -323,6 +327,12 @@ public class BlockManager implements BlockStatsMXBean {
    * {@link #redundancyThread} has run at least one full iteration.
    */
   private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
+  /**
+   * markedDeleteBlockScrubber thread for handling async delete blocks.
+   */
+  private final Daemon markedDeleteBlockScrubberThread =
+      new Daemon(new MarkedDeleteBlockScrubber());
+
   /** Block report thread for handling async reports. */
   private final BlockReportProcessingThread blockReportThread;
 
@@ -421,6 +431,12 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private int numBlocksPerIteration;
 
+  /**
+   * The blocks of deleted files are put into the queue,
+   * and the cleanup thread processes these blocks periodically.
+   */
+  private final ConcurrentLinkedQueue<List<BlockInfo>> markedDeleteQueue;
+
   /**
    * Progress of the Reconstruction queues initialisation.
    */
@@ -474,7 +490,7 @@ public class BlockManager implements BlockStatsMXBean {
         datanodeManager.getBlockInvalidateLimit(),
         startupDelayBlockDeletionInMs,
         blockIdManager);
-
+    markedDeleteQueue = new ConcurrentLinkedQueue<>();
     // Compute the map capacity by allocating 2% of total memory
     blocksMap = new BlocksMap(
         LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
@@ -724,6 +740,9 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.activate(conf);
     this.redundancyThread.setName("RedundancyMonitor");
     this.redundancyThread.start();
+    this.markedDeleteBlockScrubberThread.
+        setName("MarkedDeleteBlockScrubberThread");
+    this.markedDeleteBlockScrubberThread.start();
     this.blockReportThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
@@ -737,8 +756,10 @@ public class BlockManager implements BlockStatsMXBean {
     try {
       redundancyThread.interrupt();
       blockReportThread.interrupt();
+      markedDeleteBlockScrubberThread.interrupt();
       redundancyThread.join(3000);
       blockReportThread.join(3000);
+      markedDeleteBlockScrubberThread.join(3000);
     } catch (InterruptedException ie) {
     }
     datanodeManager.close();
@@ -4877,6 +4898,77 @@ public class BlockManager implements BlockStatsMXBean {
     return lastRedundancyCycleTS.get();
   }
 
+  /**
+   * Periodically deletes the marked block.
+   */
+  private class MarkedDeleteBlockScrubber implements Runnable {
+    private Iterator<BlockInfo> toDeleteIterator = null;
+    private boolean isSleep;
+    private NameNodeMetrics metrics;
+
+    private void remove(long time) {
+      if (checkToDeleteIterator()) {
+        namesystem.writeLock();
+        try {
+          while (toDeleteIterator.hasNext()) {
+            removeBlock(toDeleteIterator.next());
+            metrics.decrPendingDeleteBlocksCount();
+            if (Time.monotonicNow() - time > deleteBlockLockTimeMs) {
+              isSleep = true;
+              break;
+            }
+          }
+        } finally {
+          namesystem.writeUnlock();
+        }
+      }
+    }
+
+    private boolean checkToDeleteIterator() {
+      return toDeleteIterator != null && toDeleteIterator.hasNext();
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Start MarkedDeleteBlockScrubber thread");
+      while (namesystem.isRunning() &&
+          !Thread.currentThread().isInterrupted()) {
+        if (!markedDeleteQueue.isEmpty() || checkToDeleteIterator()) {
+          try {
+            metrics = NameNode.getNameNodeMetrics();
+            metrics.setDeleteBlocksQueued(markedDeleteQueue.size());
+            isSleep = false;
+            long startTime = Time.monotonicNow();
+            remove(startTime);
+            while (!isSleep && !markedDeleteQueue.isEmpty() &&
+                !Thread.currentThread().isInterrupted()) {
+              List<BlockInfo> markedDeleteList = markedDeleteQueue.poll();
+              if (markedDeleteList != null) {
+                toDeleteIterator = markedDeleteList.listIterator();
+              }
+              remove(startTime);
+            }
+          } catch (Exception e){
+            LOG.warn("MarkedDeleteBlockScrubber encountered an exception" +
+                " during the block deletion process, " +
+                " the deletion of the block will retry in {} millisecond.",
+                deleteBlockUnlockIntervalTimeMs, e);
+          }
+        }
+        if (isSleep) {
+          LOG.debug("Clear markedDeleteQueue over {}" +
+              " millisecond to release the write lock", deleteBlockLockTimeMs);
+        }
+        try {
+          Thread.sleep(deleteBlockUnlockIntervalTimeMs);
+        } catch (InterruptedException e) {
+          LOG.info("Stopping MarkedDeleteBlockScrubber.");
+          break;
+        }
+      }
+    }
+  }
+
   /**
    * Periodically calls computeBlockRecoveryWork().
    */
@@ -5223,6 +5315,17 @@ public class BlockManager implements BlockStatsMXBean {
     return blockIdManager;
   }
 
+  @VisibleForTesting
+  public ConcurrentLinkedQueue<List<BlockInfo>> getMarkedDeleteQueue() {
+    return markedDeleteQueue;
+  }
+
+  public void addBLocksToMarkedDeleteQueue(List<BlockInfo> blockInfos) {
+    markedDeleteQueue.add(blockInfos);
+    NameNode.getNameNodeMetrics().
+        incrPendingDeleteBlocksCount(blockInfos.size());
+  }
+
   public long nextGenerationStamp(boolean legacyBlock) throws IOException {
     return blockIdManager.nextGenerationStamp(legacyBlock);
   }

+ 12 - 33
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2277,8 +2277,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       getEditLog().logSync();
       if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
-        removeBlocks(toRemoveBlocks);
-        toRemoveBlocks.clear();
+        blockManager.addBLocksToMarkedDeleteQueue(
+            toRemoveBlocks.getToDeleteList());
       }
       logAuditEvent(true, operationName, src, null, r.getFileStatus());
     } catch (AccessControlException e) {
@@ -2717,8 +2717,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (!skipSync) {
         getEditLog().logSync();
         if (toRemoveBlocks != null) {
-          removeBlocks(toRemoveBlocks);
-          toRemoveBlocks.clear();
+          blockManager.addBLocksToMarkedDeleteQueue(
+              toRemoveBlocks.getToDeleteList());
         }
       }
     }
@@ -3236,8 +3236,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks;
     if (!collectedBlocks.getToDeleteList().isEmpty()) {
-      removeBlocks(collectedBlocks);
-      collectedBlocks.clear();
+      blockManager.addBLocksToMarkedDeleteQueue(
+          collectedBlocks.getToDeleteList());
     }
 
     logAuditEvent(true, operationName + " (options=" +
@@ -3276,7 +3276,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     getEditLog().logSync();
     logAuditEvent(ret, operationName, src);
     if (toRemovedBlocks != null) {
-      removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
+      blockManager.addBLocksToMarkedDeleteQueue(
+          toRemovedBlocks.getToDeleteList());
     }
     return ret;
   }
@@ -3286,30 +3287,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return dir.getPermissionChecker();
   }
 
-  /**
-   * From the given list, incrementally remove the blocks from blockManager
-   * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
-   * ensure that other waiters on the lock can get in. See HDFS-2938
-   * 
-   * @param blocks
-   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
-   *          of blocks that need to be removed from blocksMap
-   */
-  void removeBlocks(BlocksMapUpdateInfo blocks) {
-    List<BlockInfo> toDeleteList = blocks.getToDeleteList();
-    Iterator<BlockInfo> iter = toDeleteList.iterator();
-    while (iter.hasNext()) {
-      writeLock();
-      try {
-        for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
-          blockManager.removeBlock(iter.next());
-        }
-      } finally {
-        writeUnlock("removeBlocks");
-      }
-    }
-  }
-  
   /**
    * Remove leases and inodes related to a given path
    * @param removedUCFiles INodes whose leases need to be released
@@ -4508,7 +4485,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                   INodesInPath.fromINode((INodeFile) bc), false);
           changed |= toRemoveBlocks != null;
           if (toRemoveBlocks != null) {
-            removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
+            blockManager.addBLocksToMarkedDeleteQueue(
+                toRemoveBlocks.getToDeleteList());
           }
         }
       } finally {
@@ -7170,7 +7148,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // Breaking the pattern as removing blocks have to happen outside of the
     // global lock
     if (blocksToBeDeleted != null) {
-      removeBlocks(blocksToBeDeleted);
+      blockManager.addBLocksToMarkedDeleteQueue(
+          blocksToBeDeleted.getToDeleteList());
     }
     logAuditEvent(true, operationName, rootPath, null, null);
   }

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

@@ -87,6 +87,10 @@ public class NameNodeMetrics {
   MutableCounterLong blockOpsBatched;
   @Metric("Number of pending edits")
   MutableGaugeInt pendingEditsCount;
+  @Metric("Number of delete blocks Queued")
+  MutableGaugeInt deleteBlocksQueued;
+  @Metric("Number of pending deletion blocks")
+  MutableGaugeInt pendingDeleteBlocksCount;
 
   @Metric("Number of file system operations")
   public long totalFileOps(){
@@ -334,6 +338,18 @@ public class NameNodeMetrics {
     blockOpsQueued.set(size);
   }
 
+  public void setDeleteBlocksQueued(int size) {
+    deleteBlocksQueued.set(size);
+  }
+
+  public void incrPendingDeleteBlocksCount(int size) {
+    pendingDeleteBlocksCount.incr(size);
+  }
+
+  public void decrPendingDeleteBlocksCount() {
+    pendingDeleteBlocksCount.decr();
+  }
+
   public void addBlockOpsBatched(int count) {
     blockOpsBatched.incr(count);
   }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java

@@ -190,6 +190,8 @@ public class TestBlocksScheduledCounter {
 
       // 4. delete the file
       dfs.delete(filePath, true);
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem(0).getBlockManager());
       int blocksScheduled = 0;
       for (DatanodeDescriptor descriptor : dnList) {
         if (descriptor.getBlocksScheduled() != 0) {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRename.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -161,6 +162,8 @@ public class TestDFSRename {
       assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
           getLocalBlock()) != null);
       dfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem(0).getBlockManager());
       assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
           getLocalBlock()) == null);
       

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -1350,6 +1351,8 @@ public class TestFileCreation {
       assertBlocks(bm, oldBlocks, true);
       
       out = dfs.create(filePath, true);
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem(0).getBlockManager());
       byte[] newData = AppendTestUtil.randomBytes(seed, fileSize);
       try {
         out.write(newData);
@@ -1357,6 +1360,8 @@ public class TestFileCreation {
         out.close();
       }
       dfs.deleteOnExit(filePath);
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem(0).getBlockManager());
       
       LocatedBlocks newBlocks = NameNodeAdapter.getBlockLocations(
           nn, file, 0, fileSize);

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -127,7 +128,7 @@ public class TestReadStripedFileWithDecoding {
   }
 
   @Test
-  public void testInvalidateBlock() throws IOException {
+  public void testInvalidateBlock() throws IOException, InterruptedException {
     final Path file = new Path("/invalidate");
     final int length = 10;
     final byte[] bytes = StripedFileTestUtil.generateBytes(length);
@@ -151,6 +152,8 @@ public class TestReadStripedFileWithDecoding {
     try {
       // delete the file
       dfs.delete(file, true);
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem().getBlockManager());
       // check the block is added to invalidateBlocks
       final FSNamesystem fsn = cluster.getNamesystem();
       final BlockManager bm = fsn.getBlockManager();

+ 17 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java

@@ -39,6 +39,9 @@ import org.junit.Assert;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 
 public class BlockManagerTestUtil {
+
+  static final long SLEEP_TIME = 1000;
+
   public static void setNodeReplicationLimit(final BlockManager blockManager,
       final int limit) {
     blockManager.maxReplicationStreams = limit;
@@ -178,7 +181,20 @@ public class BlockManagerTestUtil {
    */
   public static  CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){
     return blockManager.corruptReplicas;
-    
+
+  }
+
+  /**
+   * Wait for the processing of the marked deleted block to complete.
+   */
+  public static void waitForMarkedDeleteQueueIsEmpty(
+      BlockManager blockManager) throws InterruptedException {
+    while (true) {
+      if (blockManager.getMarkedDeleteQueue().isEmpty()) {
+        return;
+      }
+      Thread.sleep(SLEEP_TIME);
+    }
   }
 
   /**

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java

@@ -253,6 +253,8 @@ public class TestComputeInvalidateWork {
     }
     dfs.delete(path, false);
     dfs.delete(ecFile, false);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     namesystem.writeLock();
     InvalidateBlocks invalidateBlocks;
     int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits()

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -175,6 +176,8 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
 
     // Delete the file and ensure locked RAM goes to zero.
     fs.delete(path, false);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
     waitForLockedBytesUsed(fsd, 0);
   }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java

@@ -445,6 +445,8 @@ public class TestDecommissioningStatus {
     // Delete the under-replicated file, which should let the 
     // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
     AdminStatesBaseTest.cleanupFile(fileSys, f);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     BlockManagerTestUtil.recheckDecommissionState(dm);
     // Block until the admin's monitor updates the number of tracked nodes.
     waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 0);

+ 27 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java

@@ -33,6 +33,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -321,7 +322,8 @@ public class TestFileTruncate {
   }
 
   @Test
-  public void testSnapshotWithAppendTruncate() throws IOException {
+  public void testSnapshotWithAppendTruncate()
+      throws IOException, InterruptedException {
     testSnapshotWithAppendTruncate(0, 1, 2);
     testSnapshotWithAppendTruncate(0, 2, 1);
     testSnapshotWithAppendTruncate(1, 0, 2);
@@ -335,7 +337,8 @@ public class TestFileTruncate {
    * Delete snapshots in the specified order and verify that
    * remaining snapshots are still readable.
    */
-  void testSnapshotWithAppendTruncate(int ... deleteOrder) throws IOException {
+  void testSnapshotWithAppendTruncate(int... deleteOrder)
+      throws IOException, InterruptedException {
     FSDirectory fsDir = cluster.getNamesystem().getFSDirectory();
     fs.mkdirs(parent);
     fs.setQuota(parent, 100, 1000);
@@ -383,16 +386,16 @@ public class TestFileTruncate {
     // Truncate to block boundary
     int newLength = length[0] + BLOCK_SIZE / 2;
     boolean isReady = fs.truncate(src, newLength);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertTrue("Recovery is not expected.", isReady);
     assertFileLength(snapshotFiles[2], length[2]);
     assertFileLength(snapshotFiles[1], length[1]);
     assertFileLength(snapshotFiles[0], length[0]);
     assertBlockNotPresent(appendedBlk);
-
     // Diskspace consumed should be 16 bytes * 3. [blk 1,2,3 SS:4]
     contentSummary = fs.getContentSummary(parent);
     assertThat(contentSummary.getSpaceConsumed(), is(48L));
-
     // Truncate full block again
     newLength = length[0] - BLOCK_SIZE / 2;
     isReady = fs.truncate(src, newLength);
@@ -400,11 +403,9 @@ public class TestFileTruncate {
     assertFileLength(snapshotFiles[2], length[2]);
     assertFileLength(snapshotFiles[1], length[1]);
     assertFileLength(snapshotFiles[0], length[0]);
-
     // Diskspace consumed should be 16 bytes * 3. [blk 1,2 SS:3,4]
     contentSummary = fs.getContentSummary(parent);
     assertThat(contentSummary.getSpaceConsumed(), is(48L));
-
     // Truncate half of the last block
     newLength -= BLOCK_SIZE / 2;
     isReady = fs.truncate(src, newLength);
@@ -415,15 +416,12 @@ public class TestFileTruncate {
     assertFileLength(snapshotFiles[0], length[0]);
     Block replacedBlk = getLocatedBlocks(src).getLastLocatedBlock()
         .getBlock().getLocalBlock();
-
     // Diskspace consumed should be 16 bytes * 3. [blk 1,6 SS:2,3,4]
     contentSummary = fs.getContentSummary(parent);
     assertThat(contentSummary.getSpaceConsumed(), is(54L));
-
     snapshotDir = fs.createSnapshot(parent, ss[3]);
     snapshotFiles[3] = new Path(snapshotDir, truncateFile);
     length[3] = newLength;
-
     // Delete file. Should still be able to read snapshots
     int numINodes = fsDir.getInodeMapSize();
     isReady = fs.delete(src, false);
@@ -434,17 +432,15 @@ public class TestFileTruncate {
     assertFileLength(snapshotFiles[0], length[0]);
     assertEquals("Number of INodes should not change",
         numINodes, fsDir.getInodeMapSize());
-
     fs.deleteSnapshot(parent, ss[3]);
-
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertBlockExists(firstBlk);
     assertBlockExists(lastBlk);
     assertBlockNotPresent(replacedBlk);
-
     // Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4]
     contentSummary = fs.getContentSummary(parent);
     assertThat(contentSummary.getSpaceConsumed(), is(48L));
-
     // delete snapshots in the specified order
     fs.deleteSnapshot(parent, ss[deleteOrder[0]]);
     assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]);
@@ -453,12 +449,12 @@ public class TestFileTruncate {
     assertBlockExists(lastBlk);
     assertEquals("Number of INodes should not change",
         numINodes, fsDir.getInodeMapSize());
-
     // Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4]
     contentSummary = fs.getContentSummary(parent);
     assertThat(contentSummary.getSpaceConsumed(), is(48L));
-
     fs.deleteSnapshot(parent, ss[deleteOrder[1]]);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
     assertBlockExists(firstBlk);
     contentSummary = fs.getContentSummary(parent);
@@ -472,11 +468,11 @@ public class TestFileTruncate {
     }
     assertEquals("Number of INodes should not change",
         numINodes, fsDir .getInodeMapSize());
-
     fs.deleteSnapshot(parent, ss[deleteOrder[2]]);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertBlockNotPresent(firstBlk);
     assertBlockNotPresent(lastBlk);
-
     // Diskspace consumed should be 0 bytes * 3. []
     contentSummary = fs.getContentSummary(parent);
     assertThat(contentSummary.getSpaceConsumed(), is(0L));
@@ -490,7 +486,8 @@ public class TestFileTruncate {
    * remaining snapshots are still readable.
    */
   @Test
-  public void testSnapshotWithTruncates() throws IOException {
+  public void testSnapshotWithTruncates()
+      throws IOException, InterruptedException {
     testSnapshotWithTruncates(0, 1, 2);
     testSnapshotWithTruncates(0, 2, 1);
     testSnapshotWithTruncates(1, 0, 2);
@@ -499,7 +496,8 @@ public class TestFileTruncate {
     testSnapshotWithTruncates(2, 1, 0);
   }
 
-  void testSnapshotWithTruncates(int ... deleteOrder) throws IOException {
+  void testSnapshotWithTruncates(int... deleteOrder)
+      throws IOException, InterruptedException {
     fs.mkdirs(parent);
     fs.setQuota(parent, 100, 1000);
     fs.allowSnapshot(parent);
@@ -546,6 +544,8 @@ public class TestFileTruncate {
     assertThat(contentSummary.getSpaceConsumed(), is(42L));
 
     fs.deleteSnapshot(parent, ss[deleteOrder[0]]);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]);
     assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
     assertFileLength(src, length[2]);
@@ -563,6 +563,8 @@ public class TestFileTruncate {
     }
 
     fs.deleteSnapshot(parent, ss[deleteOrder[1]]);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
     assertFileLength(src, length[2]);
     assertBlockExists(firstBlk);
@@ -583,6 +585,8 @@ public class TestFileTruncate {
     }
 
     fs.deleteSnapshot(parent, ss[deleteOrder[2]]);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     assertFileLength(src, length[2]);
     assertBlockExists(firstBlk);
 
@@ -592,6 +596,8 @@ public class TestFileTruncate {
     assertThat(contentSummary.getLength(), is(6L));
 
     fs.delete(src, false);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem().getBlockManager());
     assertBlockNotPresent(firstBlk);
 
     // Diskspace consumed should be 0 bytes * 3. []
@@ -1258,7 +1264,8 @@ public class TestFileTruncate {
         cluster.getNamesystem().getFSDirectory().getBlockManager()
             .getTotalBlocks());
     fs.delete(p, true);
-
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem().getBlockManager());
     assertEquals("block num should 0", 0,
         cluster.getNamesystem().getFSDirectory().getBlockManager()
             .getTotalBlocks());

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -140,6 +141,8 @@ public class TestLargeDirectoryDelete {
     
     final long start = Time.now();
     mc.getFileSystem().delete(new Path("/root"), true); // recursive delete
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        mc.getNamesystem(0).getBlockManager());
     final long end = Time.now();
     threads[0].endThread();
     threads[1].endThread();

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java

@@ -139,6 +139,8 @@ public class TestMetaSave {
     nnRpc.delete("/filestatus0", true);
     nnRpc.delete("/filestatus1", true);
 
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem().getBlockManager());
     nnRpc.metaSave("metasaveAfterDelete.out.txt");
 
     // Verification

+ 28 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java

@@ -1031,7 +1031,8 @@ public class TestNameNodeMXBean {
   @Test
   public void testTotalBlocksMetrics() throws Exception {
     MiniDFSCluster cluster = null;
-    FSNamesystem namesystem = null;
+    FSNamesystem activeNn = null;
+    FSNamesystem standbyNn = null;
     DistributedFileSystem fs = null;
     try {
       Configuration conf = new HdfsConfiguration();
@@ -1046,12 +1047,16 @@ public class TestNameNodeMXBean {
       conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
 
       cluster = new MiniDFSCluster.Builder(conf)
-          .numDataNodes(totalSize).build();
-      namesystem = cluster.getNamesystem();
-      fs = cluster.getFileSystem();
+          .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(1)).
+              numDataNodes(totalSize).build();
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      activeNn = cluster.getNamesystem(0);
+      standbyNn = cluster.getNamesystem(1);
+      fs = cluster.getFileSystem(0);
       fs.enableErasureCodingPolicy(
           StripedFileTestUtil.getDefaultECPolicy().getName());
-      verifyTotalBlocksMetrics(0L, 0L, namesystem.getTotalBlocks());
+      verifyTotalBlocksMetrics(0L, 0L, activeNn.getTotalBlocks());
 
       // create small file
       Path replDirPath = new Path("/replicated");
@@ -1068,7 +1073,7 @@ public class TestNameNodeMXBean {
       final int smallLength = cellSize * dataBlocks;
       final byte[] smallBytes = StripedFileTestUtil.generateBytes(smallLength);
       DFSTestUtil.writeFile(fs, ecFileSmall, smallBytes);
-      verifyTotalBlocksMetrics(1L, 1L, namesystem.getTotalBlocks());
+      verifyTotalBlocksMetrics(1L, 1L, activeNn.getTotalBlocks());
 
       // create learge file
       Path replFileLarge = new Path(replDirPath, "replfile_large");
@@ -1079,15 +1084,20 @@ public class TestNameNodeMXBean {
       final int largeLength = blockSize * totalSize + smallLength;
       final byte[] largeBytes = StripedFileTestUtil.generateBytes(largeLength);
       DFSTestUtil.writeFile(fs, ecFileLarge, largeBytes);
-      verifyTotalBlocksMetrics(3L, 3L, namesystem.getTotalBlocks());
+      verifyTotalBlocksMetrics(3L, 3L, activeNn.getTotalBlocks());
 
       // delete replicated files
       fs.delete(replDirPath, true);
-      verifyTotalBlocksMetrics(0L, 3L, namesystem.getTotalBlocks());
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem(0).getBlockManager());
+      verifyTotalBlocksMetrics(0L, 3L, activeNn.getTotalBlocks());
 
       // delete ec files
       fs.delete(ecDirPath, true);
-      verifyTotalBlocksMetrics(0L, 0L, namesystem.getTotalBlocks());
+      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+          cluster.getNamesystem(0).getBlockManager());
+      verifyTotalBlocksMetrics(0L, 0L, activeNn.getTotalBlocks());
+      verifyTotalBlocksMetrics(0L, 0L, standbyNn.getTotalBlocks());
     } finally {
       if (fs != null) {
         try {
@@ -1096,9 +1106,16 @@ public class TestNameNodeMXBean {
           throw e;
         }
       }
-      if (namesystem != null) {
+      if (activeNn != null) {
+        try {
+          activeNn.close();
+        } catch (Exception e) {
+          throw e;
+        }
+      }
+      if (standbyNn != null) {
         try {
-          namesystem.close();
+          standbyNn.close();
         } catch (Exception e) {
           throw e;
         }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java

@@ -345,6 +345,8 @@ public class TestHASafeMode {
     // once it starts up
     banner("Removing the blocks without rolling the edit log");
     fs.delete(new Path("/test"), true);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     BlockManagerTestUtil.computeAllPendingWork(
         nn0.getNamesystem().getBlockManager());
     cluster.triggerHeartbeats();
@@ -384,6 +386,8 @@ public class TestHASafeMode {
     // ACKed when due to block removals.
     banner("Removing the blocks without rolling the edit log");
     fs.delete(new Path("/test"), true);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem(0).getBlockManager());
     BlockManagerTestUtil.computeAllPendingWork(
         nn0.getNamesystem().getBlockManager());
     

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

@@ -661,6 +661,8 @@ public class TestNameNodeMetrics {
     // verify ExcessBlocks metric is decremented and
     // excessReplicateMap is cleared after deleting a file
     fs.delete(file, true);
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem().getBlockManager());
     rb = getMetrics(NS_METRICS);
     assertGauge("ExcessBlocks", 0L, rb);
     assertEquals(0L, bm.getExcessBlocksCount());

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
@@ -1128,7 +1129,8 @@ public class TestSnapshotDeletion {
   }
 
   @Test
-  public void testCorrectNumberOfBlocksAfterRestart() throws IOException {
+  public void testCorrectNumberOfBlocksAfterRestart()
+      throws IOException, InterruptedException {
     final Path foo = new Path("/foo");
     final Path bar = new Path(foo, "bar");
     final Path file = new Path(foo, "file");
@@ -1149,9 +1151,10 @@ public class TestSnapshotDeletion {
     hdfs.delete(bar, true);
     hdfs.delete(foo, true);
 
-    long numberOfBlocks = cluster.getNamesystem().getBlocksTotal();
     cluster.restartNameNode(0);
-    assertEquals(numberOfBlocks, cluster.getNamesystem().getBlocksTotal());
+    BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
+        cluster.getNamesystem().getBlockManager());
+    assertEquals(0, cluster.getNamesystem().getBlocksTotal());
   }
 
   /*