Sfoglia il codice sorgente

HDFS-17496. DataNode supports more fine-grained dataset lock based on blockid. (#7280). Contributed by hfutatzhanghb.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 3 mesi fa
parent
commit
b05c0ce972

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java

@@ -24,12 +24,17 @@ package org.apache.hadoop.hdfs.server.common;
 public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {
 
   /**
-   * Acquire block pool level first if you want to Acquire volume lock.
+   * Acquire block pool level and volume level lock first if you want to acquire dir lock.
    * Or only acquire block pool level lock.
+   * There are several locking sequential patterns as below:
+   * 1. block pool
+   * 2. block poll -> volume
+   * 3. block pool level -> volume -> dir
    */
   enum LockLevel {
     BLOCK_POOl,
-    VOLUME
+    VOLUME,
+    DIR
   }
 
   /**

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutSubLockStrategy.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.List;
+
+public class DataNodeLayoutSubLockStrategy implements DataSetSubLockStrategy {
+  @Override
+  public String blockIdToSubLock(long blockid) {
+    return DatanodeUtil.idToBlockDirSuffix(blockid);
+  }
+
+  @Override
+  public List<String> getAllSubLockNames() {
+    return DatanodeUtil.getAllSubDirNameForDataSetLock();
+  }
+}

+ 36 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java

@@ -96,6 +96,13 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
             + resources[0] + "volume lock :" + resources[1]);
       }
       return resources[0] + resources[1];
+    } else if (resources.length == 3 && level == LockLevel.DIR) {
+      if (resources[0] == null || resources[1] == null || resources[2] == null) {
+        throw new IllegalArgumentException("acquire a null dataset lock : "
+            + resources[0] + ",volume lock :" + resources[1]
+        + ",subdir lock :" + resources[2]);
+      }
+      return resources[0] + resources[1] + resources[2];
     } else {
       throw new IllegalArgumentException("lock level do not match resource");
     }
@@ -156,7 +163,7 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
   public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
     if (level == LockLevel.BLOCK_POOl) {
       return getReadLock(level, resources[0]);
-    } else {
+    } else if (level == LockLevel.VOLUME){
       AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
       AutoCloseDataSetLock volLock = getReadLock(level, resources);
       volLock.setParentLock(bpLock);
@@ -165,6 +172,17 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
             resources[0]);
       }
       return volLock;
+    } else {
+      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
+      AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
+      volLock.setParentLock(bpLock);
+      AutoCloseDataSetLock dirLock = getReadLock(level, resources);
+      dirLock.setParentLock(volLock);
+      if (openLockTrace) {
+        LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
+            resources[0] + resources[1]);
+      }
+      return dirLock;
     }
   }
 
@@ -172,7 +190,7 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
   public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
     if (level == LockLevel.BLOCK_POOl) {
       return getWriteLock(level, resources[0]);
-    } else {
+    } else if (level == LockLevel.VOLUME) {
       AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
       AutoCloseDataSetLock volLock = getWriteLock(level, resources);
       volLock.setParentLock(bpLock);
@@ -181,6 +199,17 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
             resources[0]);
       }
       return volLock;
+    } else {
+      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
+      AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
+      volLock.setParentLock(bpLock);
+      AutoCloseDataSetLock dirLock = getWriteLock(level, resources);
+      dirLock.setParentLock(volLock);
+      if (openLockTrace) {
+        LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
+            resources[0] + resources[1]);
+      }
+      return dirLock;
     }
   }
 
@@ -235,8 +264,13 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
     String lockName = generateLockName(level, resources);
     if (level == LockLevel.BLOCK_POOl) {
       lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
+    } else if (level == LockLevel.VOLUME) {
+      lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
+      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
     } else {
       lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
+      lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0], resources[1]),
+          new ReentrantReadWriteLock(isFair));
       lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
     }
   }

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.List;
+
+/**
+ * This interface is used to generate sub lock name for a blockid.
+ */
+public interface DataSetSubLockStrategy {
+
+  /**
+   * Generate sub lock name for the given blockid.
+   * @param blockid the block id.
+   * @return sub lock name for the input blockid.
+   */
+  String blockIdToSubLock(long blockid);
+
+  List<String> getAllSubLockNames();
+}

+ 30 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java

@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -37,6 +39,7 @@ public class DatanodeUtil {
   public static final String DISK_ERROR = "Possible disk error: ";
 
   private static final String SEP = System.getProperty("file.separator");
+  private static final long MASK = 0x1F;
 
   /** Get the cause of an I/O exception if caused by a possible disk error
    * @param ioe an I/O exception
@@ -112,6 +115,21 @@ public class DatanodeUtil {
     return true;
   }
 
+  /**
+   * Take an example.
+   * We hava a block with blockid mapping to:
+   * "/data1/hadoop/hdfs/datanode/current/BP-xxxx/current/finalized/subdir0/subdir1"
+   * We return "subdir0/subdir0".
+   * @param blockId the block id.
+   * @return two-level subdir string where block will be stored.
+   */
+  public static String idToBlockDirSuffix(long blockId) {
+    int d1 = (int) ((blockId >> 16) & MASK);
+    int d2 = (int) ((blockId >> 8) & MASK);
+    return DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+        DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+  }
+
   /**
    * Get the directory where a finalized block with this ID should be stored.
    * Do not attempt to create the directory.
@@ -120,13 +138,21 @@ public class DatanodeUtil {
    * @return
    */
   public static File idToBlockDir(File root, long blockId) {
-    int d1 = (int) ((blockId >> 16) & 0x1F);
-    int d2 = (int) ((blockId >> 8) & 0x1F);
-    String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
-        DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+    String path = idToBlockDirSuffix(blockId);
     return new File(root, path);
   }
 
+  public static List<String> getAllSubDirNameForDataSetLock() {
+    List<String> res = new ArrayList<>();
+    for (int d1 = 0; d1 <= MASK; d1++) {
+      for (int d2 = 0; d2 <= MASK; d2++) {
+        res.add(DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+            DataStorage.BLOCK_SUBDIR_PREFIX + d2);
+      }
+    }
+    return res;
+  }
+
   /**
    * @return the FileInputStream for the meta data of the given block.
    * @throws FileNotFoundException

+ 80 - 38
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -64,7 +64,9 @@ import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
 import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
 import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutSubLockStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataSetSubLockStrategy;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
@@ -198,8 +200,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        bpid)) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        bpid, getReplicaInfo(bpid, blkid).getStorageUuid(),
+        datasetSubLockStrategy.blockIdToSubLock(blkid))) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -288,6 +291,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private long lastDirScannerNotifyTime;
   private volatile long lastDirScannerFinishTime;
 
+  private final DataSetSubLockStrategy datasetSubLockStrategy;
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -392,6 +397,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY,
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT);
     lastDirScannerNotifyTime = System.currentTimeMillis();
+    this.datasetSubLockStrategy = new DataNodeLayoutSubLockStrategy();
   }
 
   /**
@@ -430,6 +436,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       FsVolumeReference ref) throws IOException {
     for (String bp : volumeMap.getBlockPoolList()) {
       lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID());
+      List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames();
+      for (String dir : allSubDirNameForDataSetLock) {
+        lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), dir);
+        LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+            bp, ref.getVolume().getStorageID(), dir);
+      }
     }
     DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
     if (dnStorage != null) {
@@ -629,6 +641,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       for (String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
         for (String bp : volumeMap.getBlockPoolList()) {
+          List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames();
+          for (String dir : allSubDirNameForDataSetLock) {
+            lockManager.removeLock(LockLevel.DIR, bp, storageUuid, dir);
+            LOG.info("Removed DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+                bp, storageUuid, dir);
+          }
           lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
         }
       }
@@ -819,8 +837,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        b.getBlockPoolId())) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -914,8 +933,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1380,8 +1400,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override  // FsDatasetSpi
   public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       // If the block was successfully finalized because all packets
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client
@@ -1433,8 +1454,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaInPipeline append(String bpid,
       ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        bpid, replicaInfo.getStorageUuid())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        bpid, replicaInfo.getStorageUuid(),
+        datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
       // If the block is cached, start uncaching it.
       if (replicaInfo.getState() != ReplicaState.FINALIZED) {
         throw new IOException("Only a Finalized replica can be appended to; "
@@ -1530,8 +1552,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
-            b.getBlockPoolId())) {
+        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+            b.getBlockPoolId(), getStorageUuidForLock(b),
+            datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
           ReplicaInPipeline replica;
@@ -1564,8 +1587,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         b, newGS, expectedBlockLen);
     while (true) {
       try {
-        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-            b.getBlockPoolId(), getStorageUuidForLock(b))) {
+        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+            b.getBlockPoolId(), getStorageUuidForLock(b),
+            datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1650,8 +1674,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       ReplicaInPipeline newReplicaInfo;
-      try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
-          b.getBlockPoolId(), v.getStorageID())) {
+      try (AutoCloseableLock l = lockManager.writeLock(LockLevel.DIR,
+          b.getBlockPoolId(), v.getStorageID(),
+          datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
         newReplicaInfo = v.createRbw(b);
         if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
           throw new IOException("CreateRBW returned a replica of state "
@@ -1681,8 +1706,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     try {
       while (true) {
         try {
-          try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-              b.getBlockPoolId(), getStorageUuidForLock(b))) {
+          try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+              b.getBlockPoolId(), getStorageUuidForLock(b),
+              datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
             ReplicaInfo replicaInfo =
                 getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
             // check the replica's state
@@ -1713,8 +1739,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       // check generation stamp
       long replicaGenerationStamp = rbw.getGenerationStamp();
       if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1775,8 +1802,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
     long startTimeMs = Time.monotonicNow();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
       final long visible = b.getNumBytes();
@@ -1915,8 +1943,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         .getNumBytes());
     FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
     ReplicaInPipeline newReplicaInfo;
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), v.getStorageID())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), v.getStorageID(),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       try {
         newReplicaInfo = v.createTemporary(b);
         LOG.debug("creating temporary for block: {} on volume: {}",
@@ -1973,8 +2002,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaInfo replicaInfo = null;
     ReplicaInfo finalizedReplicaInfo = null;
     long startTimeMs = Time.monotonicNow();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       if (Thread.interrupted()) {
         // Don't allow data modifications from interrupted threads
         throw new IOException("Cannot finalize block: " + b + " from Interrupted Thread");
@@ -2010,8 +2040,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        bpid, replicaInfo.getStorageUuid())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        bpid, replicaInfo.getStorageUuid(),
+        datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
       // Compare generation stamp of old and new replica before finalizing
       if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
           > replicaInfo.getGenerationStamp()) {
@@ -2060,8 +2091,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public void unfinalizeBlock(ExtendedBlock b) throws IOException {
     long startTimeMs = Time.monotonicNow();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
       if (replicaInfo != null &&
@@ -2459,7 +2491,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final String bpid = block.getBlockPoolId();
     final Block localBlock = block.getLocalBlock();
     final long blockId = localBlock.getBlockId();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, volume.getStorageID(),
+        datasetSubLockStrategy.blockIdToSubLock(blockId))) {
       final ReplicaInfo info = volumeMap.get(bpid, localBlock);
       if (info == null) {
         ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId);
@@ -2548,8 +2581,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           bpid + ": ReplicaInfo not found.");
       return;
     }
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
-        info.getStorageUuid())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
+        info.getStorageUuid(), datasetSubLockStrategy.blockIdToSubLock(blockId))) {
       boolean success = false;
       try {
         info = volumeMap.get(bpid, blockId);
@@ -2746,7 +2779,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       lastDirScannerNotifyTime = startTimeMs;
     }
     String storageUuid = vol.getStorageID();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
+        vol.getStorageID(), datasetSubLockStrategy.blockIdToSubLock(blockId))) {
       if (!storageMap.containsKey(storageUuid)) {
         // Storage was already removed
         return;
@@ -3231,8 +3265,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        block.getBlockPoolId())) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        block.getBlockPoolId(), getStorageUuidForLock(block),
+        datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -3259,6 +3294,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       Set<String> vols = storageMap.keySet();
       for (String v : vols) {
         lockManager.addLock(LockLevel.VOLUME, bpid, v);
+        List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames();
+        for (String dir : allSubDirNameForDataSetLock) {
+          lockManager.addLock(LockLevel.DIR, bpid, v, dir);
+          LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+              bpid, v, dir);
+        }
       }
     }
     try {
@@ -3386,8 +3427,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        block.getBlockPoolId())) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        block.getBlockPoolId(), getStorageUuidForLock(block),
+        datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java

@@ -37,6 +37,7 @@ public class TestDataSetLockManager {
   public void testBaseFunc() {
     manager.addLock(LockLevel.BLOCK_POOl, "BPtest");
     manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest");
+    manager.addLock(LockLevel.DIR, "BPtest", "Volumetest", "SubDirtest");
 
     AutoCloseDataSetLock lock = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest");
     AutoCloseDataSetLock lock1 = manager.readLock(LockLevel.BLOCK_POOl, "BPtest");
@@ -62,6 +63,16 @@ public class TestDataSetLockManager {
     manager.lockLeakCheck();
     assertNull(manager.getLastException());
 
+    AutoCloseDataSetLock lock6 = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest");
+    AutoCloseDataSetLock lock7 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest");
+    AutoCloseDataSetLock lock8 = manager.readLock(LockLevel.DIR,
+        "BPtest", "Volumetest", "SubDirtest");
+    lock8.close();
+    lock7.close();
+    lock6.close();
+    manager.lockLeakCheck();
+    assertNull(manager.getLastException());
+
     manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest");
     manager.lockLeakCheck();
 

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -1946,7 +1946,12 @@ public class TestFsDatasetImpl {
       assertFalse(uuids.contains(dn.getDatanodeUuid()));
 
       // This replica has deleted from datanode memory.
-      assertNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      try {
+        Block storedBlock = ds.getStoredBlock(bpid, extendedBlock.getBlockId());
+        assertNull(storedBlock);
+      } catch (Exception e) {
+        GenericTestUtils.assertExceptionContains("ReplicaNotFoundException", e);
+      }
     } finally {
       cluster.shutdown();
       DataNodeFaultInjector.set(oldInjector);

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.util.RwLockMode;
@@ -597,9 +598,13 @@ public class TestDNFencing {
       throws IOException {
     int count = 0;
     for (DataNode dn : cluster.getDataNodes()) {
-      if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
-          block.getBlockPoolId(), block.getBlockId()) != null) {
-        count++;
+      try {
+        if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
+            block.getBlockPoolId(), block.getBlockId()) != null) {
+          count++;
+        }
+      } catch (ReplicaNotFoundException e) {
+        continue;
       }
     }
     return count;