Kaynağa Gözat

HDDS-428. OzoneManager lock optimization.
Contributed by Nanda Kumar.

(cherry picked from commit 6e2129cf4edcd34651631a409e065df47aac2b38)

Anu Engineer 6 yıl önce
ebeveyn
işleme
f8165803fa
16 değiştirilmiş dosya ile 833 ekleme ve 126 silme
  1. 5 0
      hadoop-hdds/common/pom.xml
  2. 4 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
  3. 101 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
  4. 101 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
  5. 43 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/PooledLockFactory.java
  6. 21 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/package-info.java
  7. 11 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  8. 64 0
      hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
  9. 21 0
      hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/package-info.java
  10. 13 30
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
  11. 40 50
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  12. 3 11
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
  13. 6 21
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
  14. 181 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
  15. 27 14
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
  16. 192 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerLock.java

+ 5 - 0
hadoop-hdds/common/pom.xml

@@ -100,6 +100,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>disruptor</artifactId>
       <version>${disruptor.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-pool2</artifactId>
+      <version>2.6.0</version>
+    </dependency>
 
   </dependencies>
 

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

@@ -81,4 +81,8 @@ public final class HddsConfigKeys {
       "hdds.scm.chillmode.threshold.pct";
   public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
 
+  public static final String HDDS_LOCK_MAX_CONCURRENCY =
+      "hdds.lock.max.concurrency";
+  public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100;
+
 }

+ 101 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Lock implementation which also maintains counter.
+ */
+public final class ActiveLock {
+
+  private Lock lock;
+  private AtomicInteger count;
+
+  /**
+   * Use ActiveLock#newInstance to create instance.
+   */
+  private ActiveLock() {
+    this.lock = new ReentrantLock();
+    this.count = new AtomicInteger(0);
+  }
+
+  /**
+   * Creates a new instance of ActiveLock.
+   *
+   * @return new ActiveLock
+   */
+  public static ActiveLock newInstance() {
+    return new ActiveLock();
+  }
+
+  /**
+   * Acquires the lock.
+   *
+   * <p>If the lock is not available then the current thread becomes
+   * disabled for thread scheduling purposes and lies dormant until the
+   * lock has been acquired.
+   */
+  public void lock() {
+    lock.lock();
+  }
+
+  /**
+   * Releases the lock.
+   */
+  public void unlock() {
+    lock.unlock();
+  }
+
+  /**
+   * Increment the active count of the lock.
+   */
+  void incrementActiveCount() {
+    count.incrementAndGet();
+  }
+
+  /**
+   * Decrement the active count of the lock.
+   */
+  void decrementActiveCount() {
+    count.decrementAndGet();
+  }
+
+  /**
+   * Returns the active count on the lock.
+   *
+   * @return Number of active leases on the lock.
+   */
+  int getActiveLockCount() {
+    return count.get();
+  }
+
+  /**
+   * Resets the active count on the lock.
+   */
+  void resetCounter() {
+    count.set(0);
+  }
+
+  @Override
+  public String toString() {
+    return lock.toString();
+  }
+}

+ 101 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages the locks on a given resource. A new lock is created for each
+ * and every unique resource. Uniqueness of resource depends on the
+ * {@code equals} implementation of it.
+ */
+public class LockManager<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
+
+  private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>();
+  private final GenericObjectPool<ActiveLock> lockPool =
+      new GenericObjectPool<>(new PooledLockFactory());
+
+  /**
+   * Creates new LockManager instance.
+   *
+   * @param conf Configuration object
+   */
+  public LockManager(Configuration conf) {
+    int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
+        HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
+    lockPool.setMaxTotal(maxPoolSize);
+  }
+
+
+  /**
+   * Acquires the lock on given resource.
+   *
+   * <p>If the lock is not available then the current thread becomes
+   * disabled for thread scheduling purposes and lies dormant until the
+   * lock has been acquired.
+   */
+  public void lock(T resource) {
+    activeLocks.compute(resource, (k, v) -> {
+      ActiveLock lock;
+      try {
+        if (v == null) {
+          lock = lockPool.borrowObject();
+        } else {
+          lock = v;
+        }
+        lock.incrementActiveCount();
+      } catch (Exception ex) {
+        LOG.error("Unable to obtain lock.", ex);
+        throw new RuntimeException(ex);
+      }
+      return lock;
+    }).lock();
+  }
+
+  /**
+   * Releases the lock on given resource.
+   */
+  public void unlock(T resource) {
+    ActiveLock lock = activeLocks.get(resource);
+    if (lock == null) {
+      // Someone is releasing a lock which was never acquired. Log and return.
+      LOG.warn("Trying to release the lock on {}, which was never acquired.",
+          resource);
+      return;
+    }
+    lock.unlock();
+    activeLocks.computeIfPresent(resource, (k, v) -> {
+      v.decrementActiveCount();
+      if (v.getActiveLockCount() != 0) {
+        return v;
+      }
+      lockPool.returnObject(v);
+      return null;
+    });
+  }
+
+}

+ 43 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/PooledLockFactory.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+
+/**
+ * Pool factory to create {@code ActiveLock} instances.
+ */
+public class PooledLockFactory extends BasePooledObjectFactory<ActiveLock> {
+
+  @Override
+  public ActiveLock create() throws Exception {
+    return ActiveLock.newInstance();
+  }
+
+  @Override
+  public PooledObject<ActiveLock> wrap(ActiveLock activeLock) {
+    return new DefaultPooledObject<>(activeLock);
+  }
+
+  @Override
+  public void activateObject(PooledObject<ActiveLock> pooledObject) {
+    pooledObject.getObject().resetCounter();
+  }
+}

+ 21 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+/*
+ This package contains the lock related classes.
+ */

+ 11 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1168,4 +1168,15 @@
       (in compressed format), but doesn't require fast io access such as SSD.
     </description>
   </property>
+
+  <property>
+    <name>hdds.lock.max.concurrency</name>
+    <value>100</value>
+    <tag>HDDS</tag>
+    <description>Locks in HDDS/Ozone uses object pool to maintain active locks
+      in the system, this property defines the max limit for the locks that
+      will be maintained in the pool.
+    </description>
+  </property>
+
 </configuration>

+ 64 - 0
hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test-cases to test LockManager.
+ */
+public class TestLockManager {
+
+  @Test(timeout = 1000)
+  public void testWithDifferentResource() {
+    LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
+    manager.lock("/resourceOne");
+    // This should work, as they are different resource.
+    manager.lock("/resourceTwo");
+    manager.unlock("/resourceOne");
+    manager.unlock("/resourceTwo");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testWithSameResource() throws Exception {
+    LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
+    manager.lock("/resourceOne");
+    AtomicBoolean gotLock = new AtomicBoolean(false);
+    new Thread(() -> {
+      manager.lock("/resourceOne");
+      gotLock.set(true);
+      manager.unlock("/resourceOne");
+    }).start();
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    // Since the new thread is trying to get lock on same object, it will wait.
+    Assert.assertFalse(gotLock.get());
+    manager.unlock("/resourceOne");
+    // Since we have released the lock, the new thread should have the lock
+    // now
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+}

+ 21 - 0
hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+/*
+ This package contains the lock related test classes.
+ */

+ 13 - 30
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java

@@ -79,9 +79,10 @@ public class BucketManagerImpl implements BucketManager {
   @Override
   public void createBucket(OmBucketInfo bucketInfo) throws IOException {
     Preconditions.checkNotNull(bucketInfo);
-    metadataManager.writeLock().lock();
     String volumeName = bucketInfo.getVolumeName();
     String bucketName = bucketInfo.getBucketName();
+    metadataManager.getLock().acquireVolumeLock(volumeName);
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
       byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -118,7 +119,8 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseVolumeLock(volumeName);
     }
   }
 
@@ -133,7 +135,7 @@ public class BucketManagerImpl implements BucketManager {
       throws IOException {
     Preconditions.checkNotNull(volumeName);
     Preconditions.checkNotNull(bucketName);
-    metadataManager.readLock().lock();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
       byte[] value = metadataManager.getBucketTable().get(bucketKey);
@@ -151,7 +153,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.readLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
@@ -164,18 +166,11 @@ public class BucketManagerImpl implements BucketManager {
   @Override
   public void setBucketProperty(OmBucketArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      //Check if volume exists
-      if (metadataManager.getVolumeTable()
-          .get(metadataManager.getVolumeKey(volumeName)) == null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new OMException("Volume doesn't exist",
-            OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
       byte[] value = metadataManager.getBucketTable().get(bucketKey);
       //Check if bucket exist
       if (value == null) {
@@ -230,7 +225,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
@@ -266,16 +261,8 @@ public class BucketManagerImpl implements BucketManager {
       throws IOException {
     Preconditions.checkNotNull(volumeName);
     Preconditions.checkNotNull(bucketName);
-    metadataManager.writeLock().lock();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
-      //Check if volume exists
-      if (metadataManager.getVolumeTable()
-          .get(metadataManager.getVolumeKey(volumeName)) == null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new OMException("Volume doesn't exist",
-            OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-
       //Check if bucket exists
       byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
       if (metadataManager.getBucketTable().get(bucketKey) == null) {
@@ -297,7 +284,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
@@ -309,12 +296,8 @@ public class BucketManagerImpl implements BucketManager {
       String startBucket, String bucketPrefix, int maxNumOfBuckets)
       throws IOException {
     Preconditions.checkNotNull(volumeName);
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.listBuckets(
-          volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
-    } finally {
-      metadataManager.readLock().unlock();
-    }
+    return metadataManager.listBuckets(
+        volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
+
   }
 }

+ 40 - 50
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -139,44 +139,38 @@ public class KeyManagerImpl implements KeyManager {
   public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
       throws IOException {
     Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
-
-    try {
-      validateBucket(volumeName, bucketName);
-      byte[] openKey = metadataManager.getOpenKeyBytes(
-          volumeName, bucketName, keyName, clientID);
-
-      byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
-      if (keyData == null) {
-        LOG.error("Allocate block for a key not in open status in meta store" +
-            " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
-        throw new OMException("Open Key not found",
-            OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
-      }
-      OmKeyInfo keyInfo =
-          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
-      AllocatedBlock allocatedBlock =
-          scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
-              keyInfo.getFactor(), omId);
-      OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
-          .setBlockID(allocatedBlock.getBlockID())
-          .setShouldCreateContainer(allocatedBlock.getCreateContainer())
-          .setLength(scmBlockSize)
-          .setOffset(0)
-          .build();
-      // current version not committed, so new blocks coming now are added to
-      // the same version
-      keyInfo.appendNewBlocks(Collections.singletonList(info));
-      keyInfo.updateModifcationTime();
-      metadataManager.getOpenKeyTable().put(openKey,
-          keyInfo.getProtobuf().toByteArray());
-      return info;
-    } finally {
-      metadataManager.writeLock().unlock();
+    validateBucket(volumeName, bucketName);
+    byte[] openKey = metadataManager.getOpenKeyBytes(
+        volumeName, bucketName, keyName, clientID);
+
+    byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
+    if (keyData == null) {
+      LOG.error("Allocate block for a key not in open status in meta store" +
+          " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
+      throw new OMException("Open Key not found",
+          OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
     }
+    OmKeyInfo keyInfo =
+        OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
+    AllocatedBlock allocatedBlock =
+        scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
+            keyInfo.getFactor(), omId);
+    OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
+        .setBlockID(allocatedBlock.getBlockID())
+        .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+        .setLength(scmBlockSize)
+        .setOffset(0)
+        .build();
+    // current version not committed, so new blocks coming now are added to
+    // the same version
+    keyInfo.appendNewBlocks(Collections.singletonList(info));
+    keyInfo.updateModifcationTime();
+    metadataManager.getOpenKeyTable().put(openKey,
+        keyInfo.getProtobuf().toByteArray());
+    return info;
   }
 
   @Override
@@ -186,7 +180,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     validateBucket(volumeName, bucketName);
 
-    metadataManager.writeLock().lock();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     String keyName = args.getKeyName();
     ReplicationFactor factor = args.getFactor();
     ReplicationType type = args.getType();
@@ -286,17 +280,17 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           OMException.ResultCodes.FAILED_KEY_ALLOCATION);
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
   @Override
   public void commitKey(OmKeyArgs args, long clientID) throws IOException {
     Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       validateBucket(volumeName, bucketName);
       byte[] openKey = metadataManager.getOpenKeyBytes(volumeName, bucketName,
@@ -329,17 +323,17 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           OMException.ResultCodes.FAILED_KEY_ALLOCATION);
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       byte[] keyBytes = metadataManager.getOzoneKeyBytes(
           volumeName, bucketName, keyName);
@@ -357,7 +351,7 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
@@ -375,7 +369,7 @@ public class KeyManagerImpl implements KeyManager {
           ResultCodes.FAILED_INVALID_KEY_NAME);
     }
 
-    metadataManager.writeLock().lock();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       // fromKeyName should exist
       byte[] fromKey = metadataManager.getOzoneKeyBytes(
@@ -431,17 +425,17 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           ResultCodes.FAILED_KEY_RENAME);
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
   @Override
   public void deleteKey(OmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       byte[] objectKey = metadataManager.getOzoneKeyBytes(
           volumeName, bucketName, keyName);
@@ -470,7 +464,7 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(), ex,
           ResultCodes.FAILED_KEY_DELETION);
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
@@ -506,12 +500,8 @@ public class KeyManagerImpl implements KeyManager {
 
   @Override
   public List<BlockGroup> getExpiredOpenKeys() throws IOException {
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.getExpiredOpenKeys();
-    } finally {
-      metadataManager.readLock().unlock();
-    }
+    return metadataManager.getExpiredOpenKeys();
+
   }
 
   @Override

+ 3 - 11
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java

@@ -26,7 +26,6 @@ import org.apache.hadoop.utils.db.Table;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.locks.Lock;
 
 /**
  * OM metadata manager interface.
@@ -51,18 +50,11 @@ public interface OMMetadataManager {
   DBStore getStore();
 
   /**
-   * Returns the read lock used on Metadata DB.
+   * Returns the OzoneManagerLock used on Metadata DB.
    *
-   * @return readLock
+   * @return OzoneManagerLock
    */
-  Lock readLock();
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   *
-   * @return writeLock
-   */
-  Lock writeLock();
+  OzoneManagerLock getLock();
 
   /**
    * Given a volume return the corresponding DB key.

+ 6 - 21
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java

@@ -52,9 +52,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
@@ -102,9 +99,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   private final DBStore store;
 
-  // TODO: Make this lock move into Table instead of *ONE* lock for the whole
-  // DB.
-  private final ReadWriteLock lock;
+  private final OzoneManagerLock lock;
   private final long openKeyExpireThresholdMS;
 
   private final Table userTable;
@@ -116,7 +111,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
     File metaDir = getOzoneMetaDirPath(conf);
-    this.lock = new ReentrantReadWriteLock();
+    this.lock = new OzoneManagerLock(conf);
     this.openKeyExpireThresholdMS = 1000 * conf.getInt(
         OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
         OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
@@ -280,23 +275,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   }
 
   /**
-   * Returns the read lock used on Metadata DB.
+   * Returns the OzoneManagerLock used on Metadata DB.
    *
-   * @return readLock
+   * @return OzoneManagerLock
    */
   @Override
-  public Lock readLock() {
-    return lock.readLock();
-  }
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   *
-   * @return writeLock
-   */
-  @Override
-  public Lock writeLock() {
-    return lock.writeLock();
+  public OzoneManagerLock getLock() {
+    return lock;
   }
 
   /**

+ 181 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.lock.LockManager;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
+
+/**
+ * Provides different locks to handle concurrency in OzoneMaster.
+ * We also maintain lock hierarchy, based on the weight.
+ *
+ * <table>
+ *   <tr>
+ *     <td><b> WEIGHT </b></td> <td><b> LOCK </b></td>
+ *   </tr>
+ *   <tr>
+ *     <td> 0 </td> <td> User Lock </td>
+ *   </tr>
+ *   <tr>
+ *     <td> 1 </td> <td> Volume Lock </td>
+ *   </tr>
+ *   <tr>
+ *     <td> 2 </td> <td> Bucket Lock </td>
+ *   </tr>
+ * </table>
+ *
+ * One cannot obtain a lower weight lock while holding a lock with higher
+ * weight. The other way around is possible. <br>
+ * <br>
+ * <p>
+ * For example:
+ * <br>
+ * -> acquireVolumeLock (will work)<br>
+ *   +-> acquireBucketLock (will work)<br>
+ *     +--> acquireUserLock (will throw Exception)<br>
+ * </p>
+ * <br>
+ *
+ * To acquire a user lock you should not hold any Volume/Bucket lock. Similarly
+ * to acquire a Volume lock you should not hold any Bucket lock.
+ *
+ */
+public final class OzoneManagerLock {
+
+  private static final String VOLUME_LOCK = "volumeLock";
+  private static final String BUCKET_LOCK = "bucketLock";
+
+
+  private final LockManager<String> manager;
+
+  // To maintain locks held by current thread.
+  private final ThreadLocal<Map<String, AtomicInteger>> myLocks =
+      ThreadLocal.withInitial(() -> ImmutableMap.of(
+          VOLUME_LOCK, new AtomicInteger(0),
+          BUCKET_LOCK, new AtomicInteger(0)));
+
+  /**
+   * Creates new OzoneManagerLock instance.
+   * @param conf Configuration object
+   */
+  public OzoneManagerLock(Configuration conf) {
+    manager = new LockManager<>(conf);
+  }
+
+  /**
+   * Acquires user lock on the given resource.
+   *
+   * <p>If the lock is not available then the current thread becomes
+   * disabled for thread scheduling purposes and lies dormant until the
+   * lock has been acquired.
+   *
+   * @param user User on which the lock has to be acquired
+   */
+  public void acquireUserLock(String user) {
+    // Calling thread should not hold any volume or bucket lock.
+    if (hasAnyVolumeLock() || hasAnyBucketLock()) {
+      throw new RuntimeException(
+          "Thread '" + Thread.currentThread().getName() +
+              "' cannot acquire user lock" +
+              " while holding volume/bucket lock(s).");
+    }
+    manager.lock(OM_USER_PREFIX + user);
+  }
+
+  /**
+   * Releases the user lock on given resource.
+   */
+  public void releaseUserLock(String user) {
+    manager.unlock(OM_USER_PREFIX + user);
+  }
+
+  /**
+   * Acquires volume lock on the given resource.
+   *
+   * <p>If the lock is not available then the current thread becomes
+   * disabled for thread scheduling purposes and lies dormant until the
+   * lock has been acquired.
+   *
+   * @param volume Volume on which the lock has to be acquired
+   */
+  public void acquireVolumeLock(String volume) {
+    // Calling thread should not hold any bucket lock.
+    if (hasAnyBucketLock()) {
+      throw new RuntimeException(
+          "Thread '" + Thread.currentThread().getName() +
+              "' cannot acquire volume lock while holding bucket lock(s).");
+    }
+    manager.lock(OM_KEY_PREFIX + volume);
+    myLocks.get().get(VOLUME_LOCK).incrementAndGet();
+  }
+
+  /**
+   * Releases the volume lock on given resource.
+   */
+  public void releaseVolumeLock(String volume) {
+    manager.unlock(OM_KEY_PREFIX + volume);
+    myLocks.get().get(VOLUME_LOCK).decrementAndGet();
+  }
+
+  /**
+   * Acquires bucket lock on the given resource.
+   *
+   * <p>If the lock is not available then the current thread becomes
+   * disabled for thread scheduling purposes and lies dormant until the
+   * lock has been acquired.
+   *
+   * @param bucket Bucket on which the lock has to be acquired
+   */
+  public void acquireBucketLock(String volume, String bucket) {
+    manager.lock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket);
+    myLocks.get().get(BUCKET_LOCK).incrementAndGet();
+  }
+
+
+  /**
+   * Releases the bucket lock on given resource.
+   */
+  public void releaseBucketLock(String volume, String bucket) {
+    manager.unlock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket);
+    myLocks.get().get(BUCKET_LOCK).decrementAndGet();
+  }
+
+  /**
+   * Returns true if the current thread holds any volume lock.
+   * @return true if current thread holds volume lock, else false
+   */
+  private boolean hasAnyVolumeLock() {
+    return myLocks.get().get(VOLUME_LOCK).get() != 0;
+  }
+
+  /**
+   * Returns true if the current thread holds any bucket lock.
+   * @return true if current thread holds bucket lock, else false
+   */
+  private boolean hasAnyBucketLock() {
+    return myLocks.get().get(BUCKET_LOCK).get() != 0;
+  }
+
+}

+ 27 - 14
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java

@@ -126,7 +126,8 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public void createVolume(OmVolumeArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
+    metadataManager.getLock().acquireUserLock(args.getOwnerName());
+    metadataManager.getLock().acquireVolumeLock(args.getVolume());
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
       byte[] volumeInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -177,7 +178,8 @@ public class VolumeManagerImpl implements VolumeManager {
         throw (IOException) ex;
       }
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseVolumeLock(args.getVolume());
+      metadataManager.getLock().releaseUserLock(args.getOwnerName());
     }
   }
 
@@ -192,7 +194,8 @@ public class VolumeManagerImpl implements VolumeManager {
   public void setOwner(String volume, String owner) throws IOException {
     Preconditions.checkNotNull(volume);
     Preconditions.checkNotNull(owner);
-    metadataManager.writeLock().lock();
+    metadataManager.getLock().acquireUserLock(owner);
+    metadataManager.getLock().acquireVolumeLock(volume);
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
       byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -235,7 +238,8 @@ public class VolumeManagerImpl implements VolumeManager {
         throw (IOException) ex;
       }
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseUserLock(owner);
     }
   }
 
@@ -248,7 +252,7 @@ public class VolumeManagerImpl implements VolumeManager {
    */
   public void setQuota(String volume, long quota) throws IOException {
     Preconditions.checkNotNull(volume);
-    metadataManager.writeLock().lock();
+    metadataManager.getLock().acquireVolumeLock(volume);
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
       byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -279,7 +283,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseVolumeLock(volume);
     }
   }
 
@@ -291,7 +295,7 @@ public class VolumeManagerImpl implements VolumeManager {
    */
   public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
     Preconditions.checkNotNull(volume);
-    metadataManager.readLock().lock();
+    metadataManager.getLock().acquireVolumeLock(volume);
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
       byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -310,7 +314,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.readLock().unlock();
+      metadataManager.getLock().releaseVolumeLock(volume);
     }
   }
 
@@ -323,7 +327,15 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public void deleteVolume(String volume) throws IOException {
     Preconditions.checkNotNull(volume);
-    metadataManager.writeLock().lock();
+    String owner;
+    metadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      owner = getVolumeInfo(volume).getOwnerName();
+    } finally {
+      metadataManager.getLock().releaseVolumeLock(volume);
+    }
+    metadataManager.getLock().acquireUserLock(owner);
+    metadataManager.getLock().acquireVolumeLock(volume);
     try {
 
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
@@ -359,7 +371,8 @@ public class VolumeManagerImpl implements VolumeManager {
         throw (IOException) ex;
       }
     } finally {
-      metadataManager.writeLock().unlock();
+      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseUserLock(owner);
     }
   }
 
@@ -375,7 +388,7 @@ public class VolumeManagerImpl implements VolumeManager {
       throws IOException {
     Preconditions.checkNotNull(volume);
     Preconditions.checkNotNull(userAcl);
-    metadataManager.readLock().lock();
+    metadataManager.getLock().acquireVolumeLock(volume);
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
       byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -395,7 +408,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.readLock().unlock();
+      metadataManager.getLock().releaseVolumeLock(volume);
     }
   }
 
@@ -405,12 +418,12 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public List<OmVolumeArgs> listVolumes(String userName,
       String prefix, String startKey, int maxKeys) throws IOException {
-    metadataManager.readLock().lock();
+    metadataManager.getLock().acquireUserLock(userName);
     try {
       return metadataManager.listVolumes(
           userName, prefix, startKey, maxKeys);
     } finally {
-      metadataManager.readLock().unlock();
+      metadataManager.getLock().releaseUserLock(userName);
     }
   }
 }

+ 192 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerLock.java

@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Contains test-cases to verify OzoneManagerLock.
+ */
+public class TestOzoneManagerLock {
+
+  @Test(timeout = 1000)
+  public void testDifferentUserLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireUserLock("userOne");
+    lock.acquireUserLock("userTwo");
+    lock.releaseUserLock("userOne");
+    lock.releaseUserLock("userTwo");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testSameUserLock() throws Exception {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireUserLock("userOne");
+    AtomicBoolean gotLock = new AtomicBoolean(false);
+    new Thread(() -> {
+      lock.acquireUserLock("userOne");
+      gotLock.set(true);
+      lock.releaseUserLock("userOne");
+    }).start();
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    // Since the new thread is trying to get lock on same user, it will wait.
+    Assert.assertFalse(gotLock.get());
+    lock.releaseUserLock("userOne");
+    // Since we have released the lock, the new thread should have the lock
+    // now
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+  @Test(timeout = 1000)
+  public void testDifferentVolumeLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireVolumeLock("volOne");
+    lock.acquireVolumeLock("volTwo");
+    lock.releaseVolumeLock("volOne");
+    lock.releaseVolumeLock("volTwo");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testSameVolumeLock() throws Exception {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireVolumeLock("volOne");
+    AtomicBoolean gotLock = new AtomicBoolean(false);
+    new Thread(() -> {
+      lock.acquireVolumeLock("volOne");
+      gotLock.set(true);
+      lock.releaseVolumeLock("volOne");
+    }).start();
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    // Since the new thread is trying to get lock on same user, it will wait.
+    Assert.assertFalse(gotLock.get());
+    lock.releaseVolumeLock("volOne");
+    // Since we have released the lock, the new thread should have the lock
+    // now
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+  @Test(timeout = 1000)
+  public void testDifferentBucketLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireBucketLock("volOne", "bucketOne");
+    lock.acquireBucketLock("volOne", "bucketTwo");
+    lock.releaseBucketLock("volOne", "bucketTwo");
+    lock.releaseBucketLock("volOne", "bucketOne");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testSameBucketLock() throws Exception {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireBucketLock("volOne", "bucketOne");
+    AtomicBoolean gotLock = new AtomicBoolean(false);
+    new Thread(() -> {
+      lock.acquireBucketLock("volOne", "bucketOne");
+      gotLock.set(true);
+      lock.releaseBucketLock("volOne", "bucketOne");
+    }).start();
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    // Since the new thread is trying to get lock on same user, it will wait.
+    Assert.assertFalse(gotLock.get());
+    lock.releaseBucketLock("volOne", "bucketOne");
+    // Since we have released the lock, the new thread should have the lock
+    // now
+    // Let's give some time for the new thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+  @Test(timeout = 1000)
+  public void testVolumeLockAfterUserLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireUserLock("userOne");
+    lock.acquireVolumeLock("volOne");
+    lock.releaseVolumeLock("volOne");
+    lock.releaseUserLock("userOne");
+    Assert.assertTrue(true);
+  }
+
+  @Test(timeout = 1000)
+  public void testBucketLockAfterVolumeLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireVolumeLock("volOne");
+    lock.acquireBucketLock("volOne", "bucketOne");
+    lock.releaseBucketLock("volOne", "bucketOne");
+    lock.releaseVolumeLock("volOne");
+    Assert.assertTrue(true);
+  }
+
+  @Test(timeout = 1000)
+  public void testBucketLockAfterVolumeLockAfterUserLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireUserLock("userOne");
+    lock.acquireVolumeLock("volOne");
+    lock.acquireBucketLock("volOne", "bucketOne");
+    lock.releaseBucketLock("volOne", "bucketOne");
+    lock.releaseVolumeLock("volOne");
+    lock.releaseUserLock("userOne");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testUserLockAfterVolumeLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireVolumeLock("volOne");
+    try {
+      lock.acquireUserLock("userOne");
+      Assert.fail();
+    } catch (RuntimeException ex) {
+      String msg =
+          "cannot acquire user lock while holding volume/bucket lock(s).";
+      Assert.assertTrue(ex.getMessage().contains(msg));
+    }
+    lock.releaseVolumeLock("volOne");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testVolumeLockAfterBucketLock() {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+    lock.acquireBucketLock("volOne", "bucketOne");
+    try {
+      lock.acquireVolumeLock("volOne");
+      Assert.fail();
+    } catch (RuntimeException ex) {
+      String msg =
+          "cannot acquire volume lock while holding bucket lock(s).";
+      Assert.assertTrue(ex.getMessage().contains(msg));
+    }
+    lock.releaseBucketLock("volOne", "bucketOne");
+    Assert.assertTrue(true);
+  }
+
+
+}