Quellcode durchsuchen

HDFS-17413. [FGL] CacheReplicationMonitor supports fine-grained lock (#6641)

ZanderXu vor 1 Jahr
Ursprung
Commit
d39f034f98

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.GSet;
@@ -223,7 +224,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    * after are not atomic.
    */
   public void waitForRescanIfNeeded() {
-    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+    Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS),
         "Must not hold the FSN write lock when waiting for a rescan.");
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when waiting for a rescan.");
@@ -268,7 +269,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    */
   @Override
   public void close() throws IOException {
-    Preconditions.checkArgument(namesystem.hasWriteLock());
+    Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
     lock.lock();
     try {
       if (shutdown) return;
@@ -291,7 +292,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
     scannedBlocks = 0;
     lastScanTimeMs = Time.monotonicNow();
     try {
-      namesystem.writeLock();
+      namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       try {
         lock.lock();
         if (shutdown) {
@@ -308,7 +309,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       rescanCachedBlockMap();
       blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
     } finally {
-      namesystem.writeUnlock("cacheReplicationMonitorRescan");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
     }
   }
 
@@ -325,11 +326,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
     long now = Time.monotonicNow();
     if (now - last > cacheManager.getMaxLockTimeMs()) {
       try {
-        namesystem.writeUnlock();
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
         Thread.sleep(cacheManager.getSleepTimeMs());
       } catch (InterruptedException e) {
       } finally {
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       }
     }
   }

+ 17 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -80,6 +80,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBl
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -317,7 +318,7 @@ public class CacheManager {
   }
 
   public void clearDirectiveStats() {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     for (CacheDirective directive : directivesById.values()) {
       directive.resetStatistics();
     }
@@ -327,7 +328,7 @@ public class CacheManager {
    * @return Unmodifiable view of the collection of CachePools.
    */
   public Collection<CachePool> getCachePools() {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     return Collections.unmodifiableCollection(cachePools.values());
   }
 
@@ -335,18 +336,18 @@ public class CacheManager {
    * @return Unmodifiable view of the collection of CacheDirectives.
    */
   public Collection<CacheDirective> getCacheDirectives() {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     return Collections.unmodifiableCollection(directivesById.values());
   }
   
   @VisibleForTesting
   public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
     return cachedBlocks;
   }
 
   private long getNextDirectiveId() throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     if (nextDirectiveId >= Long.MAX_VALUE - 1) {
       throw new IOException("No more available IDs.");
     }
@@ -574,7 +575,7 @@ public class CacheManager {
   public CacheDirectiveInfo addDirective(
       CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     CacheDirective directive;
     try {
       CachePool pool = getCachePool(validatePoolName(info));
@@ -652,7 +653,7 @@ public class CacheManager {
 
   public void modifyDirective(CacheDirectiveInfo info,
       FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     String idString =
         (info.getId() == null) ?
             "(null)" : info.getId().toString();
@@ -703,7 +704,7 @@ public class CacheManager {
 
   private void removeInternal(CacheDirective directive)
       throws InvalidRequestException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     // Remove the corresponding entry in directivesByPath.
     String path = directive.getPath();
     if (!directivesByPath.remove(path, directive)) {
@@ -724,7 +725,7 @@ public class CacheManager {
 
   public void removeDirective(long id, FSPermissionChecker pc)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     try {
       CacheDirective directive = getById(id);
       checkWritePermission(pc, directive.getPool());
@@ -740,7 +741,7 @@ public class CacheManager {
         listCacheDirectives(long prevId,
             CacheDirectiveInfo filter,
             FSPermissionChecker pc) throws IOException {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     String filterPath = null;
     if (filter.getPath() != null) {
@@ -815,7 +816,7 @@ public class CacheManager {
    */
   public CachePoolInfo addCachePool(CachePoolInfo info)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     CachePool pool;
     try {
       CachePoolInfo.validate(info);
@@ -845,7 +846,7 @@ public class CacheManager {
    */
   public void modifyCachePool(CachePoolInfo info)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     StringBuilder bld = new StringBuilder();
     try {
       CachePoolInfo.validate(info);
@@ -915,7 +916,7 @@ public class CacheManager {
    */
   public void removeCachePool(String poolName)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     try {
       CachePoolInfo.validateName(poolName);
       CachePool pool = cachePools.remove(poolName);
@@ -941,7 +942,7 @@ public class CacheManager {
 
   public BatchedListEntries<CachePoolEntry>
       listCachePools(FSPermissionChecker pc, String prevKey) {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     ArrayList<CachePoolEntry> results = 
         new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -1008,7 +1009,7 @@ public class CacheManager {
           datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
       return;
     }
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     final long startTime = Time.monotonicNow();
     final long endTime;
     try {
@@ -1022,7 +1023,7 @@ public class CacheManager {
       processCacheReportImpl(datanode, blockIds);
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock("processCacheReport");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport");
     }
 
     // Log the block report processing stats from Namenode perspective

+ 16 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -7802,7 +7802,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     FSPermissionChecker.setOperationType(operationName);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache directive");
@@ -7811,7 +7811,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       } finally {
         effectiveDirectiveStr = effectiveDirective != null ?
             effectiveDirective.toString() : null;
-        writeUnlock(operationName,
+        writeUnlock(FSNamesystemLockMode.FS, operationName,
             getLockReportInfoSupplier(effectiveDirectiveStr));
       }
     } catch (AccessControlException ace) {
@@ -7835,14 +7835,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.WRITE);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache directive");
         FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
             logRetryCache);
       } finally {
-        writeUnlock(operationName,
+        writeUnlock(FSNamesystemLockMode.FS, operationName,
             getLockReportInfoSupplier(idStr, directive.toString()));
       }
     } catch (AccessControlException ace) {
@@ -7861,14 +7861,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     FSPermissionChecker.setOperationType(operationName);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot remove cache directives");
         FSNDNCacheOp.removeCacheDirective(this, cacheManager, id,
             logRetryCache);
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(idStr));
+        writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, idStr, null, null);
@@ -7886,13 +7886,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     BatchedListEntries<CacheDirectiveEntry> results;
     cacheManager.waitForRescanIfNeeded();
     try {
-      readLock();
+      readLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.READ);
         results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
             filter);
       } finally {
-        readUnlock(operationName,
+        readUnlock(FSNamesystemLockMode.FS, operationName,
             getLockReportInfoSupplier(filter.toString()));
       }
     } catch (AccessControlException ace) {
@@ -7911,7 +7911,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String poolName = req == null ? null : req.getPoolName();
     checkSuperuserPrivilege(operationName, poolName);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache pool"
@@ -7920,7 +7920,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             logRetryCache);
         poolInfoStr = info.toString();
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr));
+        writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolInfoStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolInfoStr);
@@ -7938,14 +7938,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         (req == null ? null : req.getPoolName()) + "}";
     checkSuperuserPrivilege(operationName, poolNameStr);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot modify cache pool"
             + (req == null ? null : req.getPoolName()));
         FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr,
+        writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr,
             req == null ? null : req.toString()));
       }
     } catch (AccessControlException ace) {
@@ -7965,14 +7965,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String poolNameStr = "{poolName: " + cachePoolName + "}";
     checkSuperuserPrivilege(operationName, poolNameStr);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
         FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
             logRetryCache);
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr));
+        writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolNameStr);
@@ -7990,12 +7990,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     FSPermissionChecker.setOperationType(operationName);
     cacheManager.waitForRescanIfNeeded();
     try {
-      readLock();
+      readLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.READ);
         results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
       } finally {
-        readUnlock(operationName, getLockReportInfoSupplier(null));
+        readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(null));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, null);