|
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.lock;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.function.Consumer;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -75,6 +76,9 @@ public class OzoneManagerLock {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(OzoneManagerLock.class);
|
|
|
|
|
|
+ private static final String READ_LOCK = "read";
|
|
|
+ private static final String WRITE_LOCK = "write";
|
|
|
+
|
|
|
private final LockManager<String> manager;
|
|
|
private final ThreadLocal<Short> lockSet = ThreadLocal.withInitial(
|
|
|
() -> Short.valueOf((short)0));
|
|
@@ -105,15 +109,66 @@ public class OzoneManagerLock {
|
|
|
* should be bucket name. For remaining all resource only one param should
|
|
|
* be passed.
|
|
|
*/
|
|
|
+ @Deprecated
|
|
|
public boolean acquireLock(Resource resource, String... resources) {
|
|
|
String resourceName = generateResourceName(resource, resources);
|
|
|
+ return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Acquire read lock on resource.
|
|
|
+ *
|
|
|
+ * For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same
|
|
|
+ * thread acquiring lock again is allowed.
|
|
|
+ *
|
|
|
+ * For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread
|
|
|
+ * acquiring lock again is not allowed.
|
|
|
+ *
|
|
|
+ * Special Note for USER_LOCK: Single thread can acquire single user lock/
|
|
|
+ * multi user lock. But not both at the same time.
|
|
|
+ * @param resource - Type of the resource.
|
|
|
+ * @param resources - Resource names on which user want to acquire lock.
|
|
|
+ * For Resource type BUCKET_LOCK, first param should be volume, second param
|
|
|
+ * should be bucket name. For remaining all resource only one param should
|
|
|
+ * be passed.
|
|
|
+ */
|
|
|
+ public boolean acquireReadLock(Resource resource, String... resources) {
|
|
|
+ String resourceName = generateResourceName(resource, resources);
|
|
|
+ return lock(resource, resourceName, manager::readLock, READ_LOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Acquire write lock on resource.
|
|
|
+ *
|
|
|
+ * For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same
|
|
|
+ * thread acquiring lock again is allowed.
|
|
|
+ *
|
|
|
+ * For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread
|
|
|
+ * acquiring lock again is not allowed.
|
|
|
+ *
|
|
|
+ * Special Note for USER_LOCK: Single thread can acquire single user lock/
|
|
|
+ * multi user lock. But not both at the same time.
|
|
|
+ * @param resource - Type of the resource.
|
|
|
+ * @param resources - Resource names on which user want to acquire lock.
|
|
|
+ * For Resource type BUCKET_LOCK, first param should be volume, second param
|
|
|
+ * should be bucket name. For remaining all resource only one param should
|
|
|
+ * be passed.
|
|
|
+ */
|
|
|
+ public boolean acquireWriteLock(Resource resource, String... resources) {
|
|
|
+ String resourceName = generateResourceName(resource, resources);
|
|
|
+ return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean lock(Resource resource, String resourceName,
|
|
|
+ Consumer<String> lockFn, String lockType) {
|
|
|
if (!resource.canLock(lockSet.get())) {
|
|
|
String errorMessage = getErrorMessage(resource);
|
|
|
LOG.error(errorMessage);
|
|
|
throw new RuntimeException(errorMessage);
|
|
|
} else {
|
|
|
- manager.lock(resourceName);
|
|
|
- LOG.debug("Acquired {} lock on resource {}", resource.name,
|
|
|
+ lockFn.accept(resourceName);
|
|
|
+ LOG.debug("Acquired {} {} lock on resource {}", lockType, resource.name,
|
|
|
resourceName);
|
|
|
lockSet.set(resource.setLock(lockSet.get()));
|
|
|
return true;
|
|
@@ -197,19 +252,19 @@ public class OzoneManagerLock {
|
|
|
|
|
|
if (compare == 0) {
|
|
|
// both users are equal.
|
|
|
- manager.lock(firstUser);
|
|
|
+ manager.writeLock(firstUser);
|
|
|
} else {
|
|
|
- manager.lock(firstUser);
|
|
|
+ manager.writeLock(firstUser);
|
|
|
try {
|
|
|
- manager.lock(secondUser);
|
|
|
+ manager.writeLock(secondUser);
|
|
|
} catch (Exception ex) {
|
|
|
// We got an exception acquiring 2nd user lock. Release already
|
|
|
// acquired user lock, and throw exception to the user.
|
|
|
- manager.unlock(firstUser);
|
|
|
+ manager.writeUnlock(firstUser);
|
|
|
throw ex;
|
|
|
}
|
|
|
}
|
|
|
- LOG.debug("Acquired {} lock on resource {} and {}", resource.name,
|
|
|
+ LOG.debug("Acquired Write {} lock on resource {} and {}", resource.name,
|
|
|
firstUser, secondUser);
|
|
|
lockSet.set(resource.setLock(lockSet.get()));
|
|
|
return true;
|
|
@@ -240,35 +295,66 @@ public class OzoneManagerLock {
|
|
|
|
|
|
if (compare == 0) {
|
|
|
// both users are equal.
|
|
|
- manager.unlock(firstUser);
|
|
|
+ manager.writeUnlock(firstUser);
|
|
|
} else {
|
|
|
- manager.unlock(firstUser);
|
|
|
- manager.unlock(secondUser);
|
|
|
+ manager.writeUnlock(firstUser);
|
|
|
+ manager.writeUnlock(secondUser);
|
|
|
}
|
|
|
- LOG.debug("Release {} lock on resource {} and {}", resource.name,
|
|
|
+ LOG.debug("Release Write {} lock on resource {} and {}", resource.name,
|
|
|
firstUser, secondUser);
|
|
|
lockSet.set(resource.clearLock(lockSet.get()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Release lock on resource.
|
|
|
+ * Release write lock on resource.
|
|
|
+ * @param resource - Type of the resource.
|
|
|
+ * @param resources - Resource names on which user want to acquire lock.
|
|
|
+ * For Resource type BUCKET_LOCK, first param should be volume, second param
|
|
|
+ * should be bucket name. For remaining all resource only one param should
|
|
|
+ * be passed.
|
|
|
+ */
|
|
|
+ public void releaseWriteLock(Resource resource, String... resources) {
|
|
|
+ String resourceName = generateResourceName(resource, resources);
|
|
|
+ unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Release read lock on resource.
|
|
|
+ * @param resource - Type of the resource.
|
|
|
+ * @param resources - Resource names on which user want to acquire lock.
|
|
|
+ * For Resource type BUCKET_LOCK, first param should be volume, second param
|
|
|
+ * should be bucket name. For remaining all resource only one param should
|
|
|
+ * be passed.
|
|
|
+ */
|
|
|
+ public void releaseReadLock(Resource resource, String... resources) {
|
|
|
+ String resourceName = generateResourceName(resource, resources);
|
|
|
+ unlock(resource, resourceName, manager::readUnlock, READ_LOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Release write lock on resource.
|
|
|
* @param resource - Type of the resource.
|
|
|
* @param resources - Resource names on which user want to acquire lock.
|
|
|
* For Resource type BUCKET_LOCK, first param should be volume, second param
|
|
|
* should be bucket name. For remaining all resource only one param should
|
|
|
* be passed.
|
|
|
*/
|
|
|
+ @Deprecated
|
|
|
public void releaseLock(Resource resource, String... resources) {
|
|
|
String resourceName = generateResourceName(resource, resources);
|
|
|
+ unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void unlock(Resource resource, String resourceName,
|
|
|
+ Consumer<String> lockFn, String lockType) {
|
|
|
// TODO: Not checking release of higher order level lock happened while
|
|
|
// releasing lower order level lock, as for that we need counter for
|
|
|
// locks, as some locks support acquiring lock again.
|
|
|
- manager.unlock(resourceName);
|
|
|
+ lockFn.accept(resourceName);
|
|
|
// clear lock
|
|
|
- LOG.debug("Release {}, lock on resource {}", resource.name,
|
|
|
- resource.name, resourceName);
|
|
|
+ LOG.debug("Release {} {}, lock on resource {}", lockType, resource.name,
|
|
|
+ resourceName);
|
|
|
lockSet.set(resource.clearLock(lockSet.get()));
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|