Procházet zdrojové kódy

HDFS-17417. [FGL] HeartbeatManager and DatanodeAdminMonitor support fine-grained locking (#6656)

ZanderXu před 1 rokem
rodič
revize
40d54ebb66

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1830,7 +1830,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
     final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
@@ -4883,6 +4883,7 @@ public class BlockManager implements BlockStatsMXBean {
         NumberReplicas num = countNodes(block);
         if (shouldProcessExtraRedundancy(num, expectedReplication)) {
           // extra redundancy block
+          // Here involves storage policy ID.
           processExtraRedundancyBlock(block, (short) expectedReplication, null,
               null);
           numExtraRedundancy++;
@@ -4891,14 +4892,15 @@ public class BlockManager implements BlockStatsMXBean {
       // When called by tests like TestDefaultBlockPlacementPolicy.
       // testPlacementWithLocalRackNodesDecommissioned, it is not protected by
       // lock, only when called by DatanodeManager.refreshNodes have writeLock
-      if (namesystem.hasWriteLock()) {
-        namesystem.writeUnlock("processExtraRedundancyBlocksOnInService");
+      if (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)) {
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL,
+            "processExtraRedundancyBlocksOnInService");
         try {
           Thread.sleep(1);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       }
     }
     LOG.info("Invalidated {} extra redundancy blocks on {} after "

+ 20 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.namenode.INode;
@@ -170,7 +171,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
     numBlocksChecked = 0;
     // Check decommission or maintenance progress.
     try {
-      namesystem.writeLock();
+      namesystem.writeLock(FSNamesystemLockMode.BM);
       try {
         /**
          * Other threads can modify the pendingNode list and the cancelled
@@ -208,7 +209,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
 
         processPendingNodes();
       } finally {
-        namesystem.writeUnlock("DatanodeAdminMonitorV2Thread");
+        namesystem.writeUnlock(FSNamesystemLockMode.BM, "DatanodeAdminMonitorV2Thread");
       }
       // After processing the above, various parts of the check() method will
       // take and drop the read / write lock as needed. Aside from the
@@ -326,7 +327,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
    */
   private void processMaintenanceNodes() {
     // Check for any maintenance state nodes which need to be expired
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       for (DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) {
         if (dn.isMaintenance() && dn.maintenanceExpired()) {
@@ -338,12 +339,12 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
           // which added the node to the cancelled list. Therefore expired
           // maintenance nodes do not need to be added to the toRemove list.
           dnAdmin.stopMaintenance(dn);
-          namesystem.writeUnlock("processMaintenanceNodes");
-          namesystem.writeLock();
+          namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMaintenanceNodes");
+          namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
         }
       }
     } finally {
-      namesystem.writeUnlock("processMaintenanceNodes");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMaintenanceNodes");
     }
   }
 
@@ -360,7 +361,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
       // taking the write lock at all.
       return;
     }
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       for (DatanodeDescriptor dn : toRemove) {
         final boolean isHealthy =
@@ -402,7 +403,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
         }
       }
     } finally {
-      namesystem.writeUnlock("processCompletedNodes");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCompletedNodes");
     }
   }
 
@@ -486,7 +487,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
       return;
     }
 
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       long repQueueSize = blockManager.getLowRedundancyBlocksCount();
 
@@ -524,8 +525,8 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
           // replication
           if (blocksProcessed >= blocksPerLock) {
             blocksProcessed = 0;
-            namesystem.writeUnlock("moveBlocksToPending");
-            namesystem.writeLock();
+            namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "moveBlocksToPending");
+            namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
           }
           blocksProcessed++;
           if (nextBlockAddedToPending(blockIt, dn)) {
@@ -546,7 +547,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
         }
       }
     } finally {
-      namesystem.writeUnlock("moveBlocksToPending");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "moveBlocksToPending");
     }
     LOG.debug("{} blocks are now pending replication", pendingCount);
   }
@@ -626,15 +627,16 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
     }
 
     DatanodeStorageInfo[] storage;
-    namesystem.readLock();
+    namesystem.readLock(FSNamesystemLockMode.BM);
     try {
       storage = dn.getStorageInfos();
     } finally {
-      namesystem.readUnlock("scanDatanodeStorage");
+      namesystem.readUnlock(FSNamesystemLockMode.BM, "scanDatanodeStorage");
     }
 
     for (DatanodeStorageInfo s : storage) {
-      namesystem.readLock();
+      // isBlockReplicatedOk involves FS.
+      namesystem.readLock(FSNamesystemLockMode.GLOBAL);
       try {
         // As the lock is dropped and re-taken between each storage, we need
         // to check the storage is still present before processing it, as it
@@ -660,7 +662,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
           numBlocksChecked++;
         }
       } finally {
-        namesystem.readUnlock("scanDatanodeStorage");
+        namesystem.readUnlock(FSNamesystemLockMode.GLOBAL, "scanDatanodeStorage");
       }
     }
   }
@@ -683,7 +685,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
    * namenode write lock while it runs.
    */
   private void processPendingReplication() {
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       for (Iterator<Map.Entry<DatanodeDescriptor, List<BlockInfo>>>
            entIt = pendingRep.entrySet().iterator(); entIt.hasNext();) {
@@ -715,7 +717,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
             suspectBlocks.getOutOfServiceBlockCount());
       }
     } finally {
-      namesystem.writeUnlock("processPendingReplication");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processPendingReplication");
     }
   }
 

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -182,7 +183,9 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
     numBlocksCheckedPerLock = 0;
     numNodesChecked = 0;
     // Check decommission or maintenance progress.
-    namesystem.writeLock();
+    // dnAdmin.stopMaintenance(dn) needs FSReadLock
+    // since processExtraRedundancyBlock involves storage policy and isSufficient involves bc.
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       processCancelledNodes();
       processPendingNodes();
@@ -191,7 +194,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
       LOG.warn("DatanodeAdminMonitor caught exception when processing node.",
           e);
     } finally {
-      namesystem.writeUnlock("DatanodeAdminMonitorThread");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "DatanodeAdminMonitorThread");
     }
     if (numBlocksChecked + numNodesChecked > 0) {
       LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
@@ -426,7 +429,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
         // lock.
         // Yielding is required in case of block number is greater than the
         // configured per-iteration-limit.
-        namesystem.writeUnlock("processBlocksInternal");
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processBlocksInternal");
         try {
           LOG.debug("Yielded lock during decommission/maintenance check");
           Thread.sleep(0, 500);
@@ -435,7 +438,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase
         }
         // reset
         numBlocksCheckedPerLock = 0;
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       }
       numBlocksChecked++;
       numBlocksCheckedPerLock++;

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1343,12 +1343,13 @@ public class DatanodeManager {
    */
   public void refreshNodes(final Configuration conf) throws IOException {
     refreshHostsReader(conf);
-    namesystem.writeLock();
+    // processExtraRedundancyBlocksOnInService involves FS in stopMaintenance and stopDecommission.
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       refreshDatanodes();
       countSoftwareVersions();
     } finally {
-      namesystem.writeUnlock("refreshNodes");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "refreshNodes");
     }
   }
 

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.util.Daemon;
@@ -514,20 +515,20 @@ class HeartbeatManager implements DatanodeStatistics {
 
       for (DatanodeDescriptor dead : deadDatanodes) {
         // acquire the fsnamesystem lock, and then remove the dead node.
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.BM);
         try {
           dm.removeDeadDatanode(dead, !dead.isMaintenance());
         } finally {
-          namesystem.writeUnlock("removeDeadDatanode");
+          namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeDeadDatanode");
         }
       }
       for (DatanodeStorageInfo failedStorage : failedStorages) {
         // acquire the fsnamesystem lock, and remove blocks on the storage.
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.BM);
         try {
           blockManager.removeBlocksAssociatedTo(failedStorage);
         } finally {
-          namesystem.writeUnlock("removeBlocksAssociatedTo");
+          namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeBlocksAssociatedTo");
         }
       }
     }