Browse Source

HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp.

(cherry picked from commit 0d8a35bd6de5d2a5a9b816ca98f31975e94bd7c6)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Kihwal Lee 8 years ago
parent
commit
dd4acebb41

+ 24 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -29,6 +29,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -143,7 +144,6 @@ public class BlockManager implements BlockStatsMXBean {
   private boolean initializedReplQueues;
 
   private final AtomicLong excessBlocksCount = new AtomicLong(0L);
-  private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
   private final long startupDelayBlockDeletionInMs;
   private final BlockReportLeaseManager blockReportLeaseManager;
   private ObjectName mxBeanName;
@@ -178,7 +178,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
   /** Used by metrics */
   public long getPostponedMisreplicatedBlocksCount() {
-    return postponedMisreplicatedBlocksCount.get();
+    return postponedMisreplicatedBlocks.size();
   }
   /** Used by metrics */
   public int getPendingDataNodeMessageCount() {
@@ -218,8 +218,10 @@ public class BlockManager implements BlockStatsMXBean {
    * notified of all block deletions that might have been pending
    * when the failover happened.
    */
-  private final LightWeightHashSet<Block> postponedMisreplicatedBlocks =
-      new LightWeightHashSet<>();
+  private final LinkedHashSet<Block> postponedMisreplicatedBlocks =
+      new LinkedHashSet<Block>();
+  private final int blocksPerPostpondedRescan;
+  private final ArrayList<Block> rescannedMisreplicatedBlocks;
 
   /**
    * Maps a StorageID to the set of blocks that are "extra" for this
@@ -316,6 +318,10 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
 
+    blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
+        datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
+    rescannedMisreplicatedBlocks =
+        new ArrayList<Block>(blocksPerPostpondedRescan);
     startupDelayBlockDeletionInMs = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
         DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
@@ -1424,9 +1430,7 @@ public class BlockManager implements BlockStatsMXBean {
 
 
   private void postponeBlock(Block blk) {
-    if (postponedMisreplicatedBlocks.add(blk)) {
-      postponedMisreplicatedBlocksCount.incrementAndGet();
-    }
+    postponedMisreplicatedBlocks.add(blk);
   }
   
   
@@ -2050,39 +2054,14 @@ public class BlockManager implements BlockStatsMXBean {
     if (getPostponedMisreplicatedBlocksCount() == 0) {
       return;
     }
-    long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
     namesystem.writeLock();
-    long startPostponedMisReplicatedBlocksCount =
-        getPostponedMisreplicatedBlocksCount();
+    long startTime = Time.monotonicNow();
+    long startSize = postponedMisreplicatedBlocks.size();
     try {
-      // blocksPerRescan is the configured number of blocks per rescan.
-      // Randomly select blocksPerRescan consecutive blocks from the HashSet
-      // when the number of blocks remaining is larger than blocksPerRescan.
-      // The reason we don't always pick the first blocksPerRescan blocks is to
-      // handle the case if for some reason some datanodes remain in
-      // content stale state for a long time and only impact the first
-      // blocksPerRescan blocks.
-      int i = 0;
-      long startIndex = 0;
-      long blocksPerRescan =
-          datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
-      long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
-      if (base > 0) {
-        startIndex = ThreadLocalRandom.current().nextLong() % (base+1);
-        if (startIndex < 0) {
-          startIndex += (base+1);
-        }
-      }
       Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
-      for (int tmp = 0; tmp < startIndex; tmp++) {
-        it.next();
-      }
-
-      for (;it.hasNext(); i++) {
+      for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
         Block b = it.next();
-        if (i >= blocksPerRescan) {
-          break;
-        }
+        it.remove();
 
         BlockInfo bi = blocksMap.getStoredBlock(b);
         if (bi == null) {
@@ -2091,8 +2070,6 @@ public class BlockManager implements BlockStatsMXBean {
                 "Postponed mis-replicated block " + b + " no longer found " +
                 "in block map.");
           }
-          it.remove();
-          postponedMisreplicatedBlocksCount.decrementAndGet();
           continue;
         }
         MisReplicationResult res = processMisReplicatedBlock(bi);
@@ -2100,20 +2077,19 @@ public class BlockManager implements BlockStatsMXBean {
           LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
               "Re-scanned block " + b + ", result is " + res);
         }
-        if (res != MisReplicationResult.POSTPONE) {
-          it.remove();
-          postponedMisreplicatedBlocksCount.decrementAndGet();
+        if (res == MisReplicationResult.POSTPONE) {
+          rescannedMisreplicatedBlocks.add(b);
         }
       }
     } finally {
-      long endPostponedMisReplicatedBlocksCount =
-          getPostponedMisreplicatedBlocksCount();
+      postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
+      rescannedMisreplicatedBlocks.clear();
+      long endSize = postponedMisreplicatedBlocks.size();
       namesystem.writeUnlock();
       LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
-          (Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) +
-          " msecs. " + endPostponedMisReplicatedBlocksCount +
-          " blocks are left. " + (startPostponedMisReplicatedBlocksCount -
-          endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
+          (Time.monotonicNow() - startTime) + " msecs. " +
+          endSize + " blocks are left. " +
+          (startSize - endSize) + " blocks were removed.");
     }
   }
   
@@ -3512,9 +3488,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Remove the block from pendingReplications and neededReplications
     pendingReplications.remove(block);
     neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
-    if (postponedMisreplicatedBlocks.remove(block)) {
-      postponedMisreplicatedBlocksCount.decrementAndGet();
-    }
+    postponedMisreplicatedBlocks.remove(block);
   }
 
   public BlockInfo getStoredBlock(Block block) {
@@ -3816,7 +3790,6 @@ public class BlockManager implements BlockStatsMXBean {
     invalidateBlocks.clear();
     datanodeManager.clearPendingQueues();
     postponedMisreplicatedBlocks.clear();
-    postponedMisreplicatedBlocksCount.set(0);
   };
 
   public static LocatedBlock newLocatedBlock(