|
@@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
|
|
@@ -96,6 +98,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LI
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;
|
|
|
|
|
|
+import java.lang.reflect.Constructor;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.util.Optional;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
@@ -115,6 +118,8 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.OBSERVER;
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNLockManager;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotDeletionGc;
|
|
|
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
|
@@ -623,7 +628,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
private final int numCommittedAllowed;
|
|
|
|
|
|
/** Lock to protect FSNamesystem. */
|
|
|
- private final FSNamesystemLock fsLock;
|
|
|
+ private final FSNLockManager fsLock;
|
|
|
|
|
|
/**
|
|
|
* Checkpoint lock to protect FSNamesystem modification on standby NNs.
|
|
@@ -873,7 +878,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
this.contextFieldSeparator =
|
|
|
conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
|
|
|
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
|
|
|
- fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
|
|
|
+ Class<? extends FSNLockManager> lockKlass = conf.getClass(
|
|
|
+ DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY, DFS_NAMENODE_LOCK_MODEL_PROVIDER_DEFAULT,
|
|
|
+ FSNLockManager.class);
|
|
|
+ fsLock = createLock(lockKlass, conf, detailedLockHoldTimeMetrics);
|
|
|
cpLock = new ReentrantLock();
|
|
|
|
|
|
this.fsImage = fsImage;
|
|
@@ -1082,6 +1090,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private <T> T createLock(Class<T> theClass, Configuration conf,
|
|
|
+ MutableRatesWithAggregation mutableRatesMetrics) {
|
|
|
+ try {
|
|
|
+ Constructor<T> meth = theClass.getDeclaredConstructor(
|
|
|
+ Configuration.class, MutableRatesWithAggregation.class);
|
|
|
+ meth.setAccessible(true);
|
|
|
+ return meth.newInstance(conf, mutableRatesMetrics);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static void checkForAsyncLogEnabledByOldConfigs(Configuration conf) {
|
|
|
// dfs.namenode.audit.log.async is no longer in use. Use log4j properties instead.
|
|
|
if (conf.getBoolean("dfs.namenode.audit.log.async", false)) {
|
|
@@ -1793,70 +1813,74 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void readLock() {
|
|
|
- this.fsLock.readLock();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void readLockInterruptibly() throws InterruptedException {
|
|
|
- this.fsLock.readLockInterruptibly();
|
|
|
+ public void readLock(FSNamesystemLockMode lockMode) {
|
|
|
+ this.fsLock.readLock(lockMode);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void readUnlock() {
|
|
|
- this.fsLock.readUnlock();
|
|
|
+ public void readLockInterruptibly(FSNamesystemLockMode lockMode) throws InterruptedException {
|
|
|
+ this.fsLock.readLockInterruptibly(lockMode);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void readUnlock(String opName) {
|
|
|
- this.fsLock.readUnlock(opName);
|
|
|
+ public void readUnlock(FSNamesystemLockMode lockMode, String opName) {
|
|
|
+ this.fsLock.readUnlock(lockMode, opName);
|
|
|
}
|
|
|
|
|
|
public void readUnlock(String opName,
|
|
|
Supplier<String> lockReportInfoSupplier) {
|
|
|
- this.fsLock.readUnlock(opName, lockReportInfoSupplier);
|
|
|
+ readUnlock(FSNamesystemLockMode.GLOBAL, opName, lockReportInfoSupplier);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void writeLock() {
|
|
|
- this.fsLock.writeLock();
|
|
|
+ public void readUnlock(FSNamesystemLockMode lockMode, String opName,
|
|
|
+ Supplier<String> lockReportInfoSupplier) {
|
|
|
+ this.fsLock.readUnlock(lockMode, opName, lockReportInfoSupplier);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeLockInterruptibly() throws InterruptedException {
|
|
|
- this.fsLock.writeLockInterruptibly();
|
|
|
+ public void writeLock(FSNamesystemLockMode lockMode) {
|
|
|
+ this.fsLock.writeLock(lockMode);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeUnlock() {
|
|
|
- this.fsLock.writeUnlock();
|
|
|
+ public void writeLockInterruptibly(FSNamesystemLockMode lockMode) throws InterruptedException {
|
|
|
+ this.fsLock.writeLockInterruptibly(lockMode);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeUnlock(String opName) {
|
|
|
- this.fsLock.writeUnlock(opName);
|
|
|
+ public void writeUnlock(FSNamesystemLockMode lockMode, String opName) {
|
|
|
+ this.fsLock.writeUnlock(lockMode, opName);
|
|
|
}
|
|
|
|
|
|
public void writeUnlock(String opName, boolean suppressWriteLockReport) {
|
|
|
- this.fsLock.writeUnlock(opName, suppressWriteLockReport);
|
|
|
+ writeUnlock(FSNamesystemLockMode.GLOBAL, opName, suppressWriteLockReport);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void writeUnlock(FSNamesystemLockMode lockMode, String opName,
|
|
|
+ boolean suppressWriteLockReport) {
|
|
|
+ this.fsLock.writeUnlock(lockMode, opName, suppressWriteLockReport);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void writeUnlock(String opName, Supplier<String> lockReportInfoSupplier) {
|
|
|
+ writeUnlock(FSNamesystemLockMode.GLOBAL, opName, lockReportInfoSupplier);
|
|
|
}
|
|
|
|
|
|
- public void writeUnlock(String opName,
|
|
|
+ public void writeUnlock(FSNamesystemLockMode lockMode, String opName,
|
|
|
Supplier<String> lockReportInfoSupplier) {
|
|
|
- this.fsLock.writeUnlock(opName, lockReportInfoSupplier);
|
|
|
+ this.fsLock.writeUnlock(lockMode, opName, lockReportInfoSupplier);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean hasWriteLock() {
|
|
|
- return this.fsLock.isWriteLockedByCurrentThread();
|
|
|
+ public boolean hasWriteLock(FSNamesystemLockMode lockMode) {
|
|
|
+ return this.fsLock.hasWriteLock(lockMode);
|
|
|
}
|
|
|
@Override
|
|
|
- public boolean hasReadLock() {
|
|
|
- return this.fsLock.getReadHoldCount() > 0 || hasWriteLock();
|
|
|
+ public boolean hasReadLock(FSNamesystemLockMode lockMode) {
|
|
|
+ return this.fsLock.hasReadLock(lockMode);
|
|
|
}
|
|
|
|
|
|
public int getReadHoldCount() {
|
|
|
- return this.fsLock.getReadHoldCount();
|
|
|
+ return this.fsLock.getReadHoldCount(FSNamesystemLockMode.GLOBAL);
|
|
|
}
|
|
|
|
|
|
/** Lock the checkpoint lock */
|
|
@@ -4952,21 +4976,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
@Metric({"LockQueueLength", "Number of threads waiting to " +
|
|
|
"acquire FSNameSystemLock"})
|
|
|
public int getFsLockQueueLength() {
|
|
|
- return fsLock.getQueueLength();
|
|
|
+ return fsLock.getQueueLength(FSNamesystemLockMode.FS);
|
|
|
}
|
|
|
|
|
|
@Metric(value = {"ReadLockLongHoldCount", "The number of time " +
|
|
|
"the read lock has been held for longer than the threshold"},
|
|
|
type = Metric.Type.COUNTER)
|
|
|
public long getNumOfReadLockLongHold() {
|
|
|
- return fsLock.getNumOfReadLockLongHold();
|
|
|
+ return fsLock.getNumOfReadLockLongHold(FSNamesystemLockMode.FS);
|
|
|
}
|
|
|
|
|
|
@Metric(value = {"WriteLockLongHoldCount", "The number of time " +
|
|
|
"the write lock has been held for longer than the threshold"},
|
|
|
type = Metric.Type.COUNTER)
|
|
|
public long getNumOfWriteLockLongHold() {
|
|
|
- return fsLock.getNumOfWriteLockLongHold();
|
|
|
+ return fsLock.getNumOfWriteLockLongHold(FSNamesystemLockMode.FS);
|
|
|
}
|
|
|
|
|
|
int getNumberOfDatanodes(DatanodeReportType type) {
|
|
@@ -7129,12 +7153,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
|
|
|
@VisibleForTesting
|
|
|
void setFsLockForTests(ReentrantReadWriteLock lock) {
|
|
|
- this.fsLock.coarseLock = lock;
|
|
|
+ this.fsLock.setLockForTests(lock);
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
public ReentrantReadWriteLock getFsLockForTests() {
|
|
|
- return fsLock.coarseLock;
|
|
|
+ return fsLock.getLockForTests();
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|