Преглед на файлове

HDFS-17416. [FGL] Monitor threads in BlockManager.class support fine-grained lock (#6647)

ZanderXu преди 1 година
родител
ревизия
ac0605db41

+ 47 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2107,7 +2107,9 @@ public class BlockManager implements BlockStatsMXBean {
    */
   int computeBlockReconstructionWork(int blocksToProcess) {
     List<List<BlockInfo>> blocksToReconstruct = null;
-    namesystem.writeLock();
+    // TODO: Change it to readLock(FSNamesystemLockMode.BM)
+    //  since chooseLowRedundancyBlocks is thread safe.
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       boolean reset = false;
       if (replQueueResetToHeadThreshold > 0) {
@@ -2122,7 +2124,7 @@ public class BlockManager implements BlockStatsMXBean {
       blocksToReconstruct = neededReconstruction
           .chooseLowRedundancyBlocks(blocksToProcess, reset);
     } finally {
-      namesystem.writeUnlock("computeBlockReconstructionWork");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeBlockReconstructionWork");
     }
     return computeReconstructionWorkForBlocks(blocksToReconstruct);
   }
@@ -2141,7 +2143,9 @@ public class BlockManager implements BlockStatsMXBean {
     List<BlockReconstructionWork> reconWork = new ArrayList<>();
 
     // Step 1: categorize at-risk blocks into replication and EC tasks
-    namesystem.writeLock();
+    // TODO: Change to readLock(FSNamesystemLockMode.GLOBAL)
+    //  since neededReconstruction is thread safe.
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       synchronized (neededReconstruction) {
         for (int priority = 0; priority < blocksToReconstruct
@@ -2156,7 +2160,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     } finally {
-      namesystem.writeUnlock("computeReconstructionWorkForBlocks");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "computeReconstructionWorkForBlocks");
     }
 
     // Step 2: choose target nodes for each reconstruction task
@@ -2181,7 +2185,9 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Step 3: add tasks to the DN
-    namesystem.writeLock();
+    // TODO: Change to readLock(FSNamesystemLockMode.BM)
+    //  since pendingReconstruction and neededReconstruction are thread safe.
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       for (BlockReconstructionWork rw : reconWork) {
         final DatanodeStorageInfo[] targets = rw.getTargets();
@@ -2197,7 +2203,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     } finally {
-      namesystem.writeUnlock("computeReconstructionWorkForBlocks");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeReconstructionWorkForBlocks");
     }
 
     if (blockLog.isDebugEnabled()) {
@@ -2688,7 +2694,9 @@ public class BlockManager implements BlockStatsMXBean {
   void processPendingReconstructions() {
     BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
     if (timedOutItems != null) {
-      namesystem.writeLock();
+      // TODO: Change to readLock(FSNamesystemLockMode.BM)
+      //  since neededReconstruction is thread safe.
+      namesystem.writeLock(FSNamesystemLockMode.BM);
       try {
         for (int i = 0; i < timedOutItems.length; i++) {
           /*
@@ -2707,7 +2715,7 @@ public class BlockManager implements BlockStatsMXBean {
           }
         }
       } finally {
-        namesystem.writeUnlock("processPendingReconstructions");
+        namesystem.writeUnlock(FSNamesystemLockMode.BM, "processPendingReconstructions");
       }
       /* If we know the target datanodes where the replication timedout,
        * we could invoke decBlocksScheduled() on it. Its ok for now.
@@ -2902,7 +2910,7 @@ public class BlockManager implements BlockStatsMXBean {
       final DatanodeStorage storage,
       final BlockListAsLongs newReport,
       BlockReportContext context) throws IOException {
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
     DatanodeDescriptor node;
@@ -2960,7 +2968,7 @@ public class BlockManager implements BlockStatsMXBean {
       storageInfo.receivedBlockReport();
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock("processReport");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processReport");
     }
 
     if (blockLog.isDebugEnabled()) {
@@ -3033,7 +3041,7 @@ public class BlockManager implements BlockStatsMXBean {
     if (getPostponedMisreplicatedBlocksCount() == 0) {
       return;
     }
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     long startTime = Time.monotonicNow();
     long startSize = postponedMisreplicatedBlocks.size();
     try {
@@ -3062,7 +3070,8 @@ public class BlockManager implements BlockStatsMXBean {
       postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
       rescannedMisreplicatedBlocks.clear();
       long endSize = postponedMisreplicatedBlocks.size();
-      namesystem.writeUnlock("rescanPostponedMisreplicatedBlocks");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL,
+          "rescanPostponedMisreplicatedBlocks");
       LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
           " msecs. {} blocks are left. {} blocks were removed.",
           (Time.monotonicNow() - startTime), endSize, (startSize - endSize));
@@ -3104,7 +3113,8 @@ public class BlockManager implements BlockStatsMXBean {
     if (excessRedundancyMap.size() == 0) {
       return;
     }
-    namesystem.writeLock();
+    // TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe.
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     long now = Time.monotonicNow();
     int processed = 0;
     try {
@@ -3158,7 +3168,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     } finally {
-      namesystem.writeUnlock("processTimedOutExcessBlocks");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "processTimedOutExcessBlocks");
       LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
     }
   }
@@ -3264,7 +3274,7 @@ public class BlockManager implements BlockStatsMXBean {
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
-    assert (namesystem.hasWriteLock());
+    assert (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
     assert (storageInfo.getBlockReportCount() == 0);
 
     for (BlockReportReplica iblk : report) {
@@ -3316,6 +3326,7 @@ public class BlockManager implements BlockStatsMXBean {
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
+        // isInSnapshot involves the full path, so it needs FSReadLock.
         if (namesystem.isInSnapshot(storedBlock.getBlockCollectionId())) {
           int numOfReplicas = storedBlock.getUnderConstructionFeature()
               .getNumExpectedLocations();
@@ -3731,7 +3742,7 @@ public class BlockManager implements BlockStatsMXBean {
   private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
       DatanodeStorageInfo storageInfo)
   throws IOException {
-    assert (storedBlock != null && namesystem.hasWriteLock());
+    assert (storedBlock != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
     if (!namesystem.isInStartupSafeMode()
         || isPopulatingReplQueues()) {
       addStoredBlock(storedBlock, reported, storageInfo, null, false);
@@ -3766,7 +3777,7 @@ public class BlockManager implements BlockStatsMXBean {
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
-    assert block != null && namesystem.hasWriteLock();
+    assert block != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (!block.isComplete()) {
@@ -4002,7 +4013,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
       int processed = 0;
-      namesystem.writeLockInterruptibly();
+      namesystem.writeLockInterruptibly(FSNamesystemLockMode.GLOBAL);
       try {
         while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
           BlockInfo block = blocksItr.next();
@@ -4061,7 +4072,7 @@ public class BlockManager implements BlockStatsMXBean {
           break;
         }
       } finally {
-        namesystem.writeUnlock("processMisReplicatesAsync");
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMisReplicatesAsync");
         LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " +
             "processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks);
         // Make sure it is out of the write lock for sufficiently long time.
@@ -4214,7 +4225,7 @@ public class BlockManager implements BlockStatsMXBean {
   private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
@@ -4258,7 +4269,10 @@ public class BlockManager implements BlockStatsMXBean {
       BlockInfo storedBlock, short replication,
       DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
-    assert namesystem.hasWriteLock();
+    // bc.getStoragePolicyID() needs FSReadLock.
+    // TODO: Change to hasReadLock(FSNamesystemLockMode.GLOBAL)
+    //  since chooseExcessRedundancyContiguous is thread safe.
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(storedBlock);
     if (storedBlock.isStriped()) {
@@ -4627,7 +4641,7 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public void processIncrementalBlockReport(final DatanodeID nodeID,
       final StorageReceivedDeletedBlocks srdb) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
     final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
     if (node == null || !node.isRegistered()) {
       blockLog.warn("BLOCK* processIncrementalBlockReport"
@@ -4991,6 +5005,8 @@ public class BlockManager implements BlockStatsMXBean {
   /** updates a block in needed reconstruction queue. */
   private void updateNeededReconstructions(final BlockInfo block,
       final int curReplicasDelta, int expectedReplicasDelta) {
+    // TODO: Change to readLock(FSNamesystemLockMode.BM)
+    //  since pendingReconstruction and neededReconstruction are thread safe.
     namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       if (!isPopulatingReplQueues() || !block.isComplete()) {
@@ -5042,8 +5058,9 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private int invalidateWorkForOneNode(DatanodeInfo dn) {
     final List<Block> toInvalidate;
-    
-    namesystem.writeLock();
+
+    // TODO: Change to readLock(FSNamesystemLockMode.BM) since invalidateBlocks is thread safe.
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       // blocks should not be replicated or removed if safe mode is on
       if (namesystem.isInSafeMode()) {
@@ -5067,7 +5084,7 @@ public class BlockManager implements BlockStatsMXBean {
         return 0;
       }
     } finally {
-      namesystem.writeUnlock("invalidateWorkForOneNode");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "invalidateWorkForOneNode");
     }
     if (blockLog.isDebugEnabled()) {
       blockLog.debug("BLOCK* {}: ask {} to delete {}",
@@ -5295,7 +5312,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     private void remove(long time) {
       if (checkToDeleteIterator()) {
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.BM);
         try {
           while (toDeleteIterator.hasNext()) {
             removeBlock(toDeleteIterator.next());
@@ -5306,7 +5323,7 @@ public class BlockManager implements BlockStatsMXBean {
             }
           }
         } finally {
-          namesystem.writeUnlock("markedDeleteBlockScrubberThread");
+          namesystem.writeUnlock(FSNamesystemLockMode.BM, "markedDeleteBlockScrubberThread");
         }
       }
     }
@@ -5420,12 +5437,13 @@ public class BlockManager implements BlockStatsMXBean {
     int workFound = this.computeBlockReconstructionWork(blocksToProcess);
 
     // Update counters
-    namesystem.writeLock();
+    // TODO: Make corruptReplicas thread safe to remove this lock.
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       this.updateState();
       this.scheduledReplicationBlocksCount = workFound;
     } finally {
-      namesystem.writeUnlock("computeDatanodeWork");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeDatanodeWork");
     }
     workFound += this.computeInvalidateWork(nodesToProcess);
     return workFound;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java

@@ -145,7 +145,7 @@ public class ProvidedStorageMap {
 
   private void processProvidedStorageReport()
       throws IOException {
-    assert lock.hasWriteLock() : "Not holding write lock";
+    assert lock.hasWriteLock(FSNamesystemLockMode.GLOBAL) : "Not holding write lock";
     if (providedStorageInfo.getBlockReportCount() == 0
         || providedDescriptor.activeProvidedDatanodes() == 0) {
       LOG.info("Calling process first blk report from storage: "

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3973,7 +3973,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   @Override
   public boolean isInSnapshot(long blockCollectionID) {
-    assert hasReadLock();
+    assert hasReadLock(FSNamesystemLockMode.FS);
     final INodeFile bc = getBlockCollection(blockCollectionID);
     if (bc == null || !bc.isUnderConstruction()) {
       return false;
@@ -5371,11 +5371,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   public void processIncrementalBlockReport(final DatanodeID nodeID,
       final StorageReceivedDeletedBlocks srdb)
       throws IOException {
-    writeLock();
+    // completeBlock will updateQuota, so it needs BMWriteLock and FSWriteLock.
+    // processExtraRedundancyBlock chooses excess replicas depending on storage policyId,
+    // so it needs FSReadLock.
+    writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       blockManager.processIncrementalBlockReport(nodeID, srdb);
     } finally {
-      writeUnlock("processIncrementalBlockReport");
+      writeUnlock(FSNamesystemLockMode.GLOBAL, "processIncrementalBlockReport");
     }
   }
   

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -2334,4 +2334,4 @@ public class TestBlockManager {
       DataNodeFaultInjector.set(oldInjector);
     }
   }
-}
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestProvidedImpl;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.junit.Before;
@@ -87,7 +88,7 @@ public class TestProvidedStorageMap {
     DatanodeStorage dn1DiskStorage = new DatanodeStorage(
         "sid-1", DatanodeStorage.State.NORMAL, StorageType.DISK);
 
-    when(nameSystemLock.hasWriteLock()).thenReturn(true);
+    when(nameSystemLock.hasWriteLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true);
     DatanodeStorageInfo dns1Provided =
         providedMap.getStorage(dn1, dn1ProvidedStorage);
     DatanodeStorageInfo dns1Disk = providedMap.getStorage(dn1, dn1DiskStorage);

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -1406,6 +1407,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     FSNamesystem mockNS = mock(FSNamesystem.class);
     when(mockNS.hasWriteLock()).thenReturn(true);
     when(mockNS.hasReadLock()).thenReturn(true);
+    when(mockNS.hasWriteLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true);
+    when(mockNS.hasReadLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true);
+    when(mockNS.hasWriteLock(FSNamesystemLockMode.BM)).thenReturn(true);
+    when(mockNS.hasReadLock(FSNamesystemLockMode.BM)).thenReturn(true);
+    when(mockNS.hasWriteLock(FSNamesystemLockMode.FS)).thenReturn(true);
+    when(mockNS.hasReadLock(FSNamesystemLockMode.FS)).thenReturn(true);
     BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
     LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;