Ver código fonte

HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
Stephen O'Donnell 5 anos atrás
pai
commit
d7c136b9ed
16 arquivos alterados com 160 adições e 93 exclusões
  1. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
  2. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  4. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
  5. 75 60
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  6. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
  7. 18 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
  8. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  9. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  10. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
  12. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
  13. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
  14. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
  15. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
  16. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java

@@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
   private final Lock readLock;
   private final Lock writeLock;
 
-  InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
+  public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
       long minLoggingGapMs, long lockWarningThresholdMs) {
     ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
     readLock = new InstrumentedReadLock(name, logger, readWriteLock,

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -569,6 +569,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.lock.suppress.warning.interval";
   public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
       10000; //ms
+  public static final String DFS_DATANODE_LOCK_FAIR_KEY =
+      "dfs.datanode.lock.fair";
+  public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
+  public static final String  DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
+      "dfs.datanode.lock-reporting-threshold-ms";
+  public static final long
+      DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
 
   public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
   public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -661,6 +661,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   AutoCloseableLock acquireDatasetLock();
 
+  /***
+   * Acquire the read lock of the data set.
+   * @return The AutoClosable read lock instance.
+   */
+  AutoCloseableLock acquireDatasetReadLock();
+
+
   /**
    * Deep copy the replica info belonging to given block pool.
    * @param bpid Specified block pool id.

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -43,6 +43,7 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveAction;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
 import org.slf4j.Logger;
@@ -67,7 +68,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.DiskChecker;
@@ -875,7 +875,7 @@ class BlockPoolSlice {
 
   private boolean readReplicasFromCache(ReplicaMap volumeMap,
       final RamDiskReplicaTracker lazyWriteReplicaMap) {
-    ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
     File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
     // Check whether the file exists or not.
     if (!replicaFile.exists()) {

+ 75 - 60
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -41,8 +41,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -115,7 +115,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.InstrumentedLock;
+import org.apache.hadoop.util.InstrumentedReadWriteLock;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Timer;
@@ -182,7 +182,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public FsVolumeImpl getVolume(final ExtendedBlock b) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final ReplicaInfo r =
           volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
       return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -192,7 +192,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -205,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     Set<? extends Replica> replicas = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
           EMPTY_SET : volumeMap.replicas(bpid));
     }
@@ -269,8 +269,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private final int maxDataLength;
 
   @VisibleForTesting
-  final AutoCloseableLock datasetLock;
-  private final Condition datasetLockCondition;
+  final AutoCloseableLock datasetWriteLock;
+  @VisibleForTesting
+  final AutoCloseableLock datasetReadLock;
+  @VisibleForTesting
+  final InstrumentedReadWriteLock datasetRWLock;
+  private final Condition datasetWriteLockCondition;
   private static String blockPoolId = "";
   
   /**
@@ -283,15 +287,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-    this.datasetLock = new AutoCloseableLock(
-        new InstrumentedLock(getClass().getName(), LOG,
-          new ReentrantLock(true),
-          conf.getTimeDuration(
-            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS),
-          300));
-    this.datasetLockCondition = datasetLock.newCondition();
+    this.datasetRWLock = new InstrumentedReadWriteLock(
+        conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
+            DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
+        "FsDatasetRWLock", LOG, conf.getTimeDuration(
+        DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+        DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS),
+        conf.getTimeDuration(
+            DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
+            DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
+    this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+    this.datasetWriteLockCondition = datasetWriteLock.newCondition();
 
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
@@ -330,7 +339,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
-    volumeMap = new ReplicaMap(datasetLock);
+    volumeMap = new ReplicaMap(datasetRWLock);
     ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
     @SuppressWarnings("unchecked")
@@ -384,7 +393,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public AutoCloseableLock acquireDatasetLock() {
-    return datasetLock.acquire();
+    return datasetWriteLock.acquire();
+  }
+
+  @Override
+  public AutoCloseableLock acquireDatasetReadLock() {
+    return datasetReadLock.acquire();
   }
 
   /**
@@ -425,7 +439,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ReplicaMap replicaMap,
       Storage.StorageDirectory sd, StorageType storageType,
       FsVolumeReference ref) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
       if (dnStorage != null) {
         final String errorMsg = String.format(
@@ -458,7 +472,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                               .setConf(this.conf)
                               .build();
     FsVolumeReference ref = fsVolume.obtainReference();
-    ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
+    ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -497,7 +511,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     StorageType storageType = location.getStorageType();
     final FsVolumeImpl fsVolume =
         createFsVolume(sd.getStorageUuid(), sd, location);
-    final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
+    final ReplicaMap tempVolumeMap =
+        new ReplicaMap(new ReentrantReadWriteLock());
     ArrayList<IOException> exceptions = Lists.newArrayList();
 
     for (final NamespaceInfo nsInfo : nsInfos) {
@@ -542,7 +557,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new ArrayList<>(storageLocsToRemove);
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
         final StorageLocation sdLocation = sd.getStorageLocation();
@@ -554,7 +569,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Disable the volume from the service.
           asyncDiskService.removeVolume(sd.getStorageUuid());
           volumes.removeVolume(sdLocation, clearFailure);
-          volumes.waitVolumeRemoved(5000, datasetLockCondition);
+          volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
 
           // Removed all replica information for the blocks on the volume.
           // Unlike updating the volumeMap in addVolume(), this operation does
@@ -602,7 +617,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       for(String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
       }
@@ -793,7 +808,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -881,7 +896,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1006,7 +1021,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
           block.getNumBytes());
     }
@@ -1120,7 +1135,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       volumeRef = destination.obtainReference();
     }
 
@@ -1208,7 +1223,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override  // FsDatasetSpi
   public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // If the block was successfully finalized because all packets
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client
@@ -1260,7 +1275,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaInPipeline append(String bpid,
       ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // If the block is cached, start uncaching it.
       if (replicaInfo.getState() != ReplicaState.FINALIZED) {
         throw new IOException("Only a Finalized replica can be appended to; "
@@ -1356,7 +1371,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
           ReplicaInPipeline replica;
@@ -1388,7 +1403,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     LOG.info("Recover failed close " + b);
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1410,7 +1425,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaHandler createRbw(
       StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
       if (replicaInfo != null) {
@@ -1481,7 +1496,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           ReplicaInfo replicaInfo =
               getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
           // check the replica's state
@@ -1506,7 +1521,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // check generation stamp
       long replicaGenerationStamp = rbw.getGenerationStamp();
       if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1567,7 +1582,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
       final long visible = b.getNumBytes();
@@ -1641,7 +1656,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaInfo lastFoundReplicaInfo = null;
     boolean isInPipeline = false;
     do {
-      try (AutoCloseableLock lock = datasetLock.acquire()) {
+      try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1694,7 +1709,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
           false);
     }
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
           .getNumBytes());
       FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1745,7 +1760,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     ReplicaInfo replicaInfo = null;
     ReplicaInfo finalizedReplicaInfo = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       if (Thread.interrupted()) {
         // Don't allow data modifications from interrupted threads
         throw new IOException("Cannot finalize block from Interrupted Thread");
@@ -1776,7 +1791,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // Compare generation stamp of old and new replica before finalizing
       if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
           > replicaInfo.getGenerationStamp()) {
@@ -1821,7 +1836,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
       if (replicaInfo != null &&
@@ -1874,7 +1889,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       curVolumes = volumes.getVolumes();
       for (FsVolumeSpi v : curVolumes) {
         builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1925,7 +1940,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Gets a list of references to the finalized blocks for the given block pool.
    * <p>
    * Callers of this function should call
-   * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
+   * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
    * changed during list iteration.
    * </p>
    * @return a list of references to the finalized blocks for the given block
@@ -1933,7 +1948,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
           volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2026,7 +2041,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   ReplicaInfo validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final ReplicaInfo r;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       r = volumeMap.get(bpid, blockId);
     }
     if (r != null) {
@@ -2077,7 +2092,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (int i = 0; i < invalidBlks.length; i++) {
       final ReplicaInfo removing;
       final FsVolumeImpl v;
-      try (AutoCloseableLock lock = datasetLock.acquire()) {
+      try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
         if (info == null) {
           ReplicaInfo infoByBlockId =
@@ -2203,7 +2218,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long length, genstamp;
     Executor volumeExecutor;
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo info = volumeMap.get(bpid, blockId);
       boolean success = false;
       try {
@@ -2271,7 +2286,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public boolean contains(final ExtendedBlock block) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final long blockId = block.getLocalBlock().getBlockId();
       final String bpid = block.getBlockPoolId();
       final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2391,7 +2406,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null &&
           memBlockInfo.getState() != ReplicaState.FINALIZED) {
@@ -2592,7 +2607,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override 
   public String getReplicaString(String bpid, long blockId) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
       return r == null ? "null" : r.toString();
     }
@@ -2699,7 +2714,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newlength) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       //get replica
       final String bpid = oldBlock.getBlockPoolId();
       final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@@ -2812,7 +2827,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2829,7 +2844,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     LOG.info("Adding block pool " + bpid);
     AddBlockPoolException volumeExceptions = new AddBlockPoolException();
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       try {
         volumes.addBlockPool(bpid, conf);
       } catch (AddBlockPoolException e) {
@@ -2859,7 +2874,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public void shutdownBlockPool(String bpid) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       LOG.info("Removing block pool " + bpid);
       Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
           = getBlockReports(bpid);
@@ -2933,7 +2948,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override //FsDatasetSpi
   public void deleteBlockPool(String bpid, boolean force)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       List<FsVolumeImpl> curVolumes = volumes.getVolumes();
       if (!force) {
         for (FsVolumeImpl volume : curVolumes) {
@@ -2962,7 +2977,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
@@ -3014,7 +3029,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,
       long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
 
       targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -3148,7 +3163,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          try (AutoCloseableLock lock = datasetLock.acquire()) {
+          try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
             replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
 
             // If replicaInfo is null, the block was either deleted before
@@ -3215,7 +3230,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         ReplicaInfo replicaInfo, newReplicaInfo;
         final String bpid = replicaState.getBlockPoolId();
 
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
                                        replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3388,7 +3403,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   void stopAllDataxceiverThreads(FsVolumeImpl volume) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       for (String bpid : volumeMap.getBlockPoolList()) {
         Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
         for (ReplicaInfo replicaInfo : replicas) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java

@@ -30,6 +30,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +56,6 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
@@ -134,7 +134,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
     ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
         Configuration conf) {
       this.providedVolume = volume;
-      bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
+      bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       Class<? extends BlockAliasMap> fmt =
           conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
               TextFileRegionAliasMap.class, BlockAliasMap.class);

+ 18 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java

@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -32,8 +33,10 @@ import org.apache.hadoop.util.AutoCloseableLock;
  * Maintains the replica map. 
  */
 class ReplicaMap {
+  private final ReadWriteLock rwLock;
   // Lock object to synchronize this instance.
-  private final AutoCloseableLock lock;
+  private final AutoCloseableLock readLock;
+  private final AutoCloseableLock writeLock;
   
   // Map of block pool Id to a set of ReplicaInfo.
   private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
@@ -50,16 +53,18 @@ class ReplicaMap {
         }
       };
 
-  ReplicaMap(AutoCloseableLock lock) {
+  ReplicaMap(ReadWriteLock lock) {
     if (lock == null) {
       throw new HadoopIllegalArgumentException(
           "Lock to synchronize on cannot be null");
     }
-    this.lock = lock;
+    this.rwLock = lock;
+    this.readLock = new AutoCloseableLock(rwLock.readLock());
+    this.writeLock = new AutoCloseableLock(rwLock.writeLock());
   }
   
   String[] getBlockPoolList() {
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       return map.keySet().toArray(new String[map.keySet().size()]);   
     }
   }
@@ -104,7 +109,7 @@ class ReplicaMap {
    */
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       if (set == null) {
         return null;
@@ -124,7 +129,7 @@ class ReplicaMap {
   ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       if (set == null) {
         // Add an entry for block pool if it does not exist already
@@ -142,7 +147,7 @@ class ReplicaMap {
   ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       if (set == null) {
         // Add an entry for block pool if it does not exist already
@@ -192,7 +197,7 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, Block block) {
     checkBlockPool(bpid);
     checkBlock(block);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       if (set != null) {
         ReplicaInfo replicaInfo =
@@ -215,7 +220,7 @@ class ReplicaMap {
    */
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       if (set != null) {
         return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
@@ -230,7 +235,7 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       return set != null ? set.size() : 0;
     }
@@ -252,7 +257,7 @@ class ReplicaMap {
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
       if (set == null) {
         // Add an entry for block pool if it does not exist already
@@ -264,7 +269,7 @@ class ReplicaMap {
   
   void cleanUpBlockPool(String bpid) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       map.remove(bpid);
     }
   }
@@ -274,6 +279,6 @@ class ReplicaMap {
    * @return lock object
    */
   AutoCloseableLock getLock() {
-    return lock;
+    return writeLock;
   }
 }

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -3180,6 +3180,27 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.lock.fair</name>
+  <value>true</value>
+  <description>If this is true, the Datanode FsDataset lock will be used in Fair
+    mode, which will help to prevent writer threads from being starved, but can
+    lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
+    for more information on fair/non-fair locks.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.lock-reporting-threshold-ms</name>
+  <value>300</value>
+  <description>When thread waits to obtain a lock, or a thread holds a lock for
+    more than the threshold, a log message will be written. Note that
+    dfs.lock.suppress.warning.interval ensures a single log message is
+    emitted per interval for waiting threads and a single message for holding
+    threads to avoid excessive logging.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.startup.delay.block.deletion.sec</name>
   <value>0</value>

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -1577,6 +1577,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return datasetLock.acquire();
   }
 
+  @Override
+  public AutoCloseableLock acquireDatasetReadLock() {
+    // No RW lock implementation in simulated dataset currently.
+    return datasetLock.acquire();
+  }
+
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -455,6 +455,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
     return null;
   }
 
+  @Override
+  public AutoCloseableLock acquireDatasetReadLock() {
+    return null;
+  }
+
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java

@@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
   @Override
   public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
     // Reload replicas from the disk.
-    ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
+    ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
     try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
       for (FsVolumeSpi vol : refs) {
         FsVolumeImpl volume = (FsVolumeImpl) vol;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,6 +52,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
 import static org.junit.Assert.assertEquals;
@@ -370,7 +370,7 @@ public class TestFsVolumeList {
     fs.close();
     FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
         .getFSDataset();
-    ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
     RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
         .getInstance(conf, fsDataset);
     FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java

@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
-    final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
+    final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
     String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java

@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -79,7 +80,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -400,7 +400,7 @@ public class TestProvidedImpl {
   public void testBlockLoad() throws IOException {
     for (int i = 0; i < providedVolumes.size(); i++) {
       FsVolumeImpl vol = providedVolumes.get(i);
-      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       vol.getVolumeMap(volumeMap, null);
 
       assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@@ -476,7 +476,7 @@ public class TestProvidedImpl {
       vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
           new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
               numBlocks));
-      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
       totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
     }
@@ -586,7 +586,7 @@ public class TestProvidedImpl {
   public void testProvidedReplicaPrefix() throws Exception {
     for (int i = 0; i < providedVolumes.size(); i++) {
       FsVolumeImpl vol = providedVolumes.get(i);
-      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       vol.getVolumeMap(volumeMap, null);
 
       Path expectedPrefix = new Path(

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java

@@ -23,15 +23,16 @@ import static org.junit.Assert.fail;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * Unit test for ReplicasMap class
  */
 public class TestReplicaMap {
-  private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
+  private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
   private final String bpid = "BP-TEST";
   private final  Block block = new Block(1234, 1234, 1234);
   
@@ -111,7 +112,7 @@ public class TestReplicaMap {
 
   @Test
   public void testMergeAll() {
-    ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
     Block tmpBlock = new Block(5678, 5678, 5678);
     temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
 
@@ -122,7 +123,7 @@ public class TestReplicaMap {
 
   @Test
   public void testAddAll() {
-    ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
     Block tmpBlock = new Block(5678, 5678, 5678);
     temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -550,7 +550,7 @@ public class TestWriteToReplica {
           bpList.size() == 2);
       
       createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
-      ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
       oldReplicaMap.addAll(dataSet.volumeMap);
 
       cluster.restartDataNode(0);