Explorar o código

HDFS-17398. [FGL] Implement the FGL lock for FSNLockManager (#6599)

ZanderXu hai 1 ano
pai
achega
16c25b277d

+ 300 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/fgl/FineGrainedFSNamesystemLock.java

@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.fgl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystemLock;
+import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/**
+ * Splitting the global FSN lock into FSLock and BMLock.
+ * FSLock is used to protect directory tree-related operations.
+ * BMLock is used to protect block-related and dn-related operations.
+ * The lock order should be: FSLock,BMLock.
+ */
+public class FineGrainedFSNamesystemLock implements FSNLockManager {
+  private final FSNamesystemLock fsLock;
+  private final FSNamesystemLock bmLock;
+
+  public FineGrainedFSNamesystemLock(Configuration conf, MutableRatesWithAggregation aggregation) {
+    this.fsLock = new FSNamesystemLock(conf, aggregation);
+    this.bmLock = new FSNamesystemLock(conf, aggregation);
+  }
+
+  @Override
+  public void readLock(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.fsLock.readLock();
+      this.bmLock.readLock();
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.readLock();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.readLock();
+    }
+  }
+
+  public void readLockInterruptibly(FSNamesystemLockMode lockMode) throws InterruptedException  {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.fsLock.readLockInterruptibly();
+      try {
+        this.bmLock.readLockInterruptibly();
+      } catch (InterruptedException e) {
+        // The held FSLock should be released if the current thread is interrupted
+        // while acquiring the BMLock.
+        this.fsLock.readUnlock("BMReadLockInterruptiblyFailed");
+        throw e;
+      }
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.readLockInterruptibly();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.readLockInterruptibly();
+    }
+  }
+
+  @Override
+  public void readUnlock(FSNamesystemLockMode lockMode, String opName) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.bmLock.readUnlock(opName);
+      this.fsLock.readUnlock(opName);
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.readUnlock(opName);
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.readUnlock(opName);
+    }
+  }
+
+  public void readUnlock(FSNamesystemLockMode lockMode, String opName,
+      Supplier<String> lockReportInfoSupplier) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.bmLock.readUnlock(opName, lockReportInfoSupplier);
+      this.fsLock.readUnlock(opName, lockReportInfoSupplier);
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.readUnlock(opName, lockReportInfoSupplier);
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.readUnlock(opName, lockReportInfoSupplier);
+    }
+  }
+
+  @Override
+  public void writeLock(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.fsLock.writeLock();
+      this.bmLock.writeLock();
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.writeLock();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.writeLock();
+    }
+  }
+
+  @Override
+  public void writeUnlock(FSNamesystemLockMode lockMode, String opName) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.bmLock.writeUnlock(opName);
+      this.fsLock.writeUnlock(opName);
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.writeUnlock(opName);
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.writeUnlock(opName);
+    }
+  }
+
+  @Override
+  public void writeUnlock(FSNamesystemLockMode lockMode, String opName,
+      boolean suppressWriteLockReport) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.bmLock.writeUnlock(opName, suppressWriteLockReport);
+      this.fsLock.writeUnlock(opName, suppressWriteLockReport);
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.writeUnlock(opName, suppressWriteLockReport);
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.writeUnlock(opName, suppressWriteLockReport);
+    }
+  }
+
+  public void writeUnlock(FSNamesystemLockMode lockMode, String opName,
+      Supplier<String> lockReportInfoSupplier) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.bmLock.writeUnlock(opName, lockReportInfoSupplier);
+      this.fsLock.writeUnlock(opName, lockReportInfoSupplier);
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.writeUnlock(opName, lockReportInfoSupplier);
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.writeUnlock(opName, lockReportInfoSupplier);
+    }
+  }
+
+  @Override
+  public void writeLockInterruptibly(FSNamesystemLockMode lockMode)
+      throws InterruptedException {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      this.fsLock.writeLockInterruptibly();
+      try {
+        this.bmLock.writeLockInterruptibly();
+      } catch (InterruptedException e) {
+        // The held FSLock should be released if the current thread is interrupted
+        // while acquiring the BMLock.
+        this.fsLock.writeUnlock("BMWriteLockInterruptiblyFailed");
+        throw e;
+      }
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      this.fsLock.writeLockInterruptibly();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      this.bmLock.writeLockInterruptibly();
+    }
+  }
+
+  @Override
+  public boolean hasWriteLock(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      if (this.fsLock.isWriteLockedByCurrentThread()) {
+        // The bm writeLock should be held by the current thread.
+        assert this.bmLock.isWriteLockedByCurrentThread();
+        return true;
+      } else {
+        // The bm writeLock should not be held by the current thread.
+        assert !this.bmLock.isWriteLockedByCurrentThread();
+        return false;
+      }
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      return this.fsLock.isWriteLockedByCurrentThread();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      return this.bmLock.isWriteLockedByCurrentThread();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean hasReadLock(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      if (hasWriteLock(FSNamesystemLockMode.GLOBAL)) {
+        return true;
+      } else if (this.fsLock.getReadHoldCount() > 0) {
+        // The bm readLock should be held by the current thread.
+        assert this.bmLock.getReadHoldCount() > 0;
+        return true;
+      } else {
+        // The bm readLock should not be held by the current thread.
+        assert this.bmLock.getReadHoldCount() <= 0;
+        return false;
+      }
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      return this.fsLock.getReadHoldCount() > 0 || this.fsLock.isWriteLockedByCurrentThread();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      return this.bmLock.getReadHoldCount() > 0 || this.bmLock.isWriteLockedByCurrentThread();
+    }
+    return false;
+  }
+
+  @Override
+  /**
+   * This method is only used for ComputeDirectoryContentSummary.
+   * For the GLOBAL mode, just return the FSLock's ReadHoldCount.
+   */
+  public int getReadHoldCount(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      return this.fsLock.getReadHoldCount();
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      return this.fsLock.getReadHoldCount();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      return this.bmLock.getReadHoldCount();
+    }
+    return -1;
+  }
+
+  @Override
+  public int getQueueLength(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      return -1;
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      return this.fsLock.getQueueLength();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      return this.bmLock.getQueueLength();
+    }
+    return -1;
+  }
+
+  @Override
+  public long getNumOfReadLockLongHold(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      return -1;
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      return this.fsLock.getNumOfReadLockLongHold();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      return this.bmLock.getNumOfReadLockLongHold();
+    }
+    return -1;
+  }
+
+  @Override
+  public long getNumOfWriteLockLongHold(FSNamesystemLockMode lockMode) {
+    if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) {
+      return -1;
+    } else if (lockMode.equals(FSNamesystemLockMode.FS)) {
+      return this.fsLock.getNumOfWriteLockLongHold();
+    } else if (lockMode.equals(FSNamesystemLockMode.BM)) {
+      return this.bmLock.getNumOfWriteLockLongHold();
+    }
+    return -1;
+  }
+
+  @Override
+  public boolean isMetricsEnabled() {
+    return this.fsLock.isMetricsEnabled();
+  }
+
+  public void setMetricsEnabled(boolean metricsEnabled) {
+    this.fsLock.setMetricsEnabled(metricsEnabled);
+    this.bmLock.setMetricsEnabled(metricsEnabled);
+  }
+
+  @Override
+  public void setReadLockReportingThresholdMs(long readLockReportingThresholdMs) {
+    this.fsLock.setReadLockReportingThresholdMs(readLockReportingThresholdMs);
+    this.bmLock.setReadLockReportingThresholdMs(readLockReportingThresholdMs);
+  }
+
+  @Override
+  public long getReadLockReportingThresholdMs() {
+    return this.fsLock.getReadLockReportingThresholdMs();
+  }
+
+  @Override
+  public void setWriteLockReportingThresholdMs(long writeLockReportingThresholdMs) {
+    this.fsLock.setWriteLockReportingThresholdMs(writeLockReportingThresholdMs);
+    this.bmLock.setWriteLockReportingThresholdMs(writeLockReportingThresholdMs);
+  }
+
+  @Override
+  public long getWriteLockReportingThresholdMs() {
+    return this.fsLock.getWriteLockReportingThresholdMs();
+  }
+
+  @Override
+  public void setLockForTests(ReentrantReadWriteLock lock) {
+    throw new UnsupportedOperationException("SetLockTests is unsupported");
+  }
+
+  @Override
+  public ReentrantReadWriteLock getLockForTests() {
+    throw new UnsupportedOperationException("SetLockTests is unsupported");
+  }
+}

+ 275 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFineGrainedFSNamesystemLock.java

@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.fgl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestFineGrainedFSNamesystemLock {
+
+  private final Logger log = LoggerFactory.getLogger(TestFineGrainedFSNamesystemLock.class);
+
+  private int getLoopNumber() {
+    return ThreadLocalRandom.current().nextInt(2000, 3000);
+  }
+
+  /**
+   * Test read/write lock of Global, FS and BM model through multi-threading.
+   */
+  @Test(timeout=120000)
+  public void testMultipleThreadsUsingLocks()
+      throws InterruptedException, ExecutionException {
+    FineGrainedFSNamesystemLock fsn = new FineGrainedFSNamesystemLock(new Configuration(), null);
+    ExecutorService service = HadoopExecutors.newFixedThreadPool(1000);
+
+    AtomicLong globalCount = new AtomicLong(0);
+    AtomicLong fsCount = new AtomicLong(0);
+    AtomicLong bmCount = new AtomicLong(0);
+    AtomicLong globalNumber = new AtomicLong(0);
+    AtomicLong fsNumber = new AtomicLong(0);
+    AtomicLong bmNumber = new AtomicLong(0);
+
+    List<Callable<Boolean>> callableList = new ArrayList<>(1000);
+    for (int i = 0; i < 1000; i++) {
+      int index = i % 12;
+      String opName = Integer.toString(i);
+      if (index == 0) { // Test the global write lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            writeLock(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount);
+            globalNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 1) { // Test the fs write lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            writeLock(fsn, FSNamesystemLockMode.FS, opName, fsCount);
+            fsNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 2) { // Test the bm write lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            writeLock(fsn, FSNamesystemLockMode.BM, opName, bmCount);
+            bmNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 3) { // Test the bm read lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            readLock(fsn, FSNamesystemLockMode.BM, opName, bmCount);
+            bmNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 4) { // Test the fs read lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            readLock(fsn, FSNamesystemLockMode.FS, opName, fsCount);
+            fsNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 5) { // Test the global read lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            readLock(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount);
+            globalNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 6) { // Test the global interruptable write lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            writeLockInterruptibly(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount);
+            globalNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 7) { // Test the fs interruptable write lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            writeLockInterruptibly(fsn, FSNamesystemLockMode.FS, opName, fsCount);
+            fsNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 8) { // Test the bm interruptable write lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            writeLockInterruptibly(fsn, FSNamesystemLockMode.BM, opName, bmCount);
+            bmNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 9) { // Test the bm interruptable read lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            readLockInterruptibly(fsn, FSNamesystemLockMode.BM, opName, bmCount);
+            bmNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else if (index == 10) { // Test the fs interruptable read lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            readLockInterruptibly(fsn, FSNamesystemLockMode.FS, opName, fsCount);
+            fsNumber.incrementAndGet();
+          }
+          return true;
+        });
+      } else { // Test the global interruptable read lock via multiple threads.
+        callableList.add(() -> {
+          for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) {
+            readLockInterruptibly(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount);
+            globalNumber.incrementAndGet();
+          }
+          return true;
+        });
+      }
+    }
+
+    List<Future<Boolean>> futures = service.invokeAll(callableList);
+    for (Future<Boolean> f : futures) {
+      f.get();
+    }
+    log.info("Global executed {} times, FS executed {} times, BM executed {} times.",
+        globalNumber.get(), fsNumber.get(), bmNumber.get());
+    assert globalCount.get() == 0;
+    assert fsCount.get() == 0;
+    assert bmCount.get() == 0;
+  }
+
+  /**
+   * Test write lock for the input lock mode.
+   * @param fsn FSNLockManager
+   * @param mode LockMode
+   * @param opName operation name
+   * @param counter counter to trace this lock mode
+   */
+  private void writeLock(FSNLockManager fsn, FSNamesystemLockMode mode,
+      String opName, AtomicLong counter)  {
+    fsn.writeLock(mode);
+    try {
+      counter.incrementAndGet();
+    } finally {
+      fsn.writeUnlock(mode, opName);
+    }
+    fsn.writeLock(mode);
+    try {
+      counter.decrementAndGet();
+    } finally {
+      fsn.writeUnlock(mode, opName);
+    }
+  }
+
+  /**
+   * Test read lock for the input lock mode.
+   * @param fsn FSNLockManager
+   * @param mode LockMode
+   * @param opName operation name
+   * @param counter counter to trace this lock mode
+   */
+  private void readLock(FSNLockManager fsn, FSNamesystemLockMode mode,
+      String opName, AtomicLong counter)  {
+    fsn.readLock(mode);
+    try {
+      counter.get();
+    } finally {
+      fsn.readUnlock(mode, opName);
+    }
+  }
+
+  /**
+   * Test write lock for the input lock mode.
+   * @param fsn FSNLockManager
+   * @param mode LockMode
+   * @param opName operation name
+   * @param counter counter to trace this lock mode
+   */
+  private void writeLockInterruptibly(FSNLockManager fsn, FSNamesystemLockMode mode,
+      String opName, AtomicLong counter)  {
+    boolean success = false;
+    try {
+      fsn.writeLockInterruptibly(mode);
+      try {
+        counter.incrementAndGet();
+        success = true;
+      } finally {
+        fsn.writeUnlock(mode, opName);
+      }
+    } catch (InterruptedException e) {
+      log.info("InterruptedException happens in thread {}" +
+          " during increasing the Count.", opName);
+      // ignore;
+    }
+    while (success) {
+      try {
+        fsn.writeLockInterruptibly(mode);
+        try {
+          counter.decrementAndGet();
+          success = false;
+        } finally {
+          fsn.writeUnlock(mode, opName);
+        }
+      } catch (InterruptedException e) {
+        log.info("InterruptedException happens in thread {}" +
+            " during decreasing the Count.", opName);
+        // ignore.
+      }
+    }
+  }
+
+  /**
+   * Test read lock for the input lock mode.
+   * @param fsn FSNLockManager
+   * @param mode LockMode
+   * @param opName operation name
+   * @param counter counter to trace this lock mode
+   */
+  private void readLockInterruptibly(FSNLockManager fsn, FSNamesystemLockMode mode,
+      String opName, AtomicLong counter)  {
+    try {
+      fsn.readLockInterruptibly(mode);
+      try {
+        counter.get();
+      } finally {
+        fsn.readUnlock(mode, opName);
+      }
+    } catch (InterruptedException e) {
+      log.info("InterruptedException happens in thread {}" +
+          " during getting the Count.", opName);
+      // ignore
+    }
+  }
+}