|
@@ -20,6 +20,10 @@ package org.apache.ambari.server.orm;
|
|
|
|
|
|
import java.lang.reflect.Method;
|
|
|
import java.sql.SQLException;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
|
|
|
|
import javax.persistence.EntityManager;
|
|
|
import javax.persistence.EntityTransaction;
|
|
@@ -27,6 +31,9 @@ import javax.persistence.PersistenceException;
|
|
|
|
|
|
import org.aopalliance.intercept.MethodInterceptor;
|
|
|
import org.aopalliance.intercept.MethodInvocation;
|
|
|
+import org.apache.ambari.annotations.TransactionalLock;
|
|
|
+import org.apache.ambari.annotations.TransactionalLock.LockArea;
|
|
|
+import org.apache.ambari.annotations.TransactionalLock.LockType;
|
|
|
import org.eclipse.persistence.exceptions.EclipseLinkException;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -36,10 +43,49 @@ import com.google.inject.persist.Transactional;
|
|
|
import com.google.inject.persist.UnitOfWork;
|
|
|
import com.google.inject.persist.jpa.AmbariJpaPersistService;
|
|
|
|
|
|
+/**
|
|
|
+ * The {@link AmbariJpaLocalTxnInterceptor} is used to intercept method calls
|
|
|
+ * annotated with the {@link Transactional} annotation. If a transaction is not
|
|
|
+ * already in progress, then a new transaction is automatically started.
|
|
|
+ * Otherwise, the currently active transaction will be reused.
|
|
|
+ * <p/>
|
|
|
+ * This interceptor also works with {@link TransactionalLock}s to lock on
|
|
|
+ * {@link LockArea}s. If this interceptor encounters a {@link TransactionalLock}
|
|
|
+ * it will acquire the lock and then add the {@link LockArea} to a collection of
|
|
|
+ * areas which need to be released when the transaction is committed or rolled
|
|
|
+ * back. This ensures that transactional methods invoke from an already running
|
|
|
+ * transaction can have their lock invoked for the lifespan of the outer
|
|
|
+ * "parent" transaction.
|
|
|
+ */
|
|
|
public class AmbariJpaLocalTxnInterceptor implements MethodInterceptor {
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(AmbariJpaLocalTxnInterceptor.class);
|
|
|
|
|
|
+ /**
|
|
|
+ * A list of all of the {@link TransactionalLock}s that this interceptor is
|
|
|
+ * responsible for. As a thread moves through the system encountering
|
|
|
+ * {@link Transactional} and {@link TransactionalLock} methods, this will keep
|
|
|
+ * track of which locks the outer-most interceptor will need to release.
|
|
|
+ */
|
|
|
+ private static final ThreadLocal<LinkedList<TransactionalLock>> s_transactionalLocks = new ThreadLocal<LinkedList<TransactionalLock>>() {
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected LinkedList<TransactionalLock> initialValue() {
|
|
|
+ return new LinkedList<>();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Used to ensure that methods which rely on the completion of
|
|
|
+ * {@link Transactional} can detect when they are able to run.
|
|
|
+ *
|
|
|
+ * @see TransactionalLock
|
|
|
+ */
|
|
|
+ @Inject
|
|
|
+ private final TransactionalLocks transactionLocks = null;
|
|
|
+
|
|
|
@Inject
|
|
|
private final AmbariJpaPersistService emProvider = null;
|
|
|
|
|
@@ -49,6 +95,9 @@ public class AmbariJpaLocalTxnInterceptor implements MethodInterceptor {
|
|
|
// Tracks if the unit of work was begun implicitly by this transaction.
|
|
|
private final ThreadLocal<Boolean> didWeStartWork = new ThreadLocal<Boolean>();
|
|
|
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}
|
|
|
+ */
|
|
|
@Override
|
|
|
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
|
|
|
|
|
@@ -61,56 +110,64 @@ public class AmbariJpaLocalTxnInterceptor implements MethodInterceptor {
|
|
|
Transactional transactional = readTransactionMetadata(methodInvocation);
|
|
|
EntityManager em = emProvider.get();
|
|
|
|
|
|
+ // lock the transaction if needed
|
|
|
+ lockTransaction(methodInvocation);
|
|
|
+
|
|
|
// Allow 'joining' of transactions if there is an enclosing @Transactional method.
|
|
|
if (em.getTransaction().isActive()) {
|
|
|
return methodInvocation.proceed();
|
|
|
}
|
|
|
|
|
|
- Object result;
|
|
|
+ try {
|
|
|
+ // this is the outer-most transactional, begin a transaction
|
|
|
+ final EntityTransaction txn = em.getTransaction();
|
|
|
+ txn.begin();
|
|
|
+
|
|
|
+ Object result;
|
|
|
+ try {
|
|
|
+ result = methodInvocation.proceed();
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ // commit transaction only if rollback didn't occur
|
|
|
+ if (rollbackIfNecessary(transactional, e, txn)) {
|
|
|
+ txn.commit();
|
|
|
+ }
|
|
|
|
|
|
- final EntityTransaction txn = em.getTransaction();
|
|
|
- txn.begin();
|
|
|
+ detailedLogForPersistenceError(e);
|
|
|
|
|
|
- try {
|
|
|
- result = methodInvocation.proceed();
|
|
|
+ // propagate whatever exception is thrown anyway
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ // Close the em if necessary (guarded so this code doesn't run unless
|
|
|
+ // catch fired).
|
|
|
+ if (null != didWeStartWork.get() && !txn.isActive()) {
|
|
|
+ didWeStartWork.remove();
|
|
|
+ unitOfWork.end();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- } catch (Exception e) {
|
|
|
- // commit transaction only if rollback didn't occur
|
|
|
- if (rollbackIfNecessary(transactional, e, txn)) {
|
|
|
+ // everything was normal so commit the txn (do not move into try block
|
|
|
+ // above as it
|
|
|
+ // interferes with the advised method's throwing semantics)
|
|
|
+ try {
|
|
|
txn.commit();
|
|
|
+ } catch (Exception e) {
|
|
|
+ detailedLogForPersistenceError(e);
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ // close the em if necessary
|
|
|
+ if (null != didWeStartWork.get()) {
|
|
|
+ didWeStartWork.remove();
|
|
|
+ unitOfWork.end();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- detailedLogForPersistenceError(e);
|
|
|
-
|
|
|
- // propagate whatever exception is thrown anyway
|
|
|
- throw e;
|
|
|
+ // or return result
|
|
|
+ return result;
|
|
|
} finally {
|
|
|
- // Close the em if necessary (guarded so this code doesn't run unless
|
|
|
- // catch fired).
|
|
|
- if (null != didWeStartWork.get() && !txn.isActive()) {
|
|
|
- didWeStartWork.remove();
|
|
|
- unitOfWork.end();
|
|
|
- }
|
|
|
+ // unlock all lock areas for this transaction
|
|
|
+ unlockTransaction();
|
|
|
}
|
|
|
-
|
|
|
- // everything was normal so commit the txn (do not move into try block
|
|
|
- // above as it
|
|
|
- // interferes with the advised method's throwing semantics)
|
|
|
- try {
|
|
|
- txn.commit();
|
|
|
- } catch (Exception e) {
|
|
|
- detailedLogForPersistenceError(e);
|
|
|
- throw e;
|
|
|
- } finally {
|
|
|
- // close the em if necessary
|
|
|
- if (null != didWeStartWork.get()) {
|
|
|
- didWeStartWork.remove();
|
|
|
- unitOfWork.end();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // or return result
|
|
|
- return result;
|
|
|
}
|
|
|
|
|
|
private void detailedLogForPersistenceError(Exception e) {
|
|
@@ -199,6 +256,68 @@ public class AmbariJpaLocalTxnInterceptor implements MethodInterceptor {
|
|
|
return commit;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Locks the {@link LockArea} specified on the {@link TransactionalLock}
|
|
|
+ * annotation if it exists. If the annotation does not exist, then no work is
|
|
|
+ * done.
|
|
|
+ * <p/>
|
|
|
+ * If a lock is acquired, then {@link #s_transactionalLocks} is updated with
|
|
|
+ * the lock so that the outer-most interceptor can release all locks when the
|
|
|
+ * transaction has completed.
|
|
|
+ *
|
|
|
+ * @param methodInvocation
|
|
|
+ */
|
|
|
+ private void lockTransaction(MethodInvocation methodInvocation) {
|
|
|
+ TransactionalLock annotation = methodInvocation.getMethod().getAnnotation(
|
|
|
+ TransactionalLock.class);
|
|
|
+
|
|
|
+ // no work to do if the annotation is not present
|
|
|
+ if (null == annotation) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // there is a lock area, so acquire the lock
|
|
|
+ LockArea lockArea = annotation.lockArea();
|
|
|
+ LockType lockType = annotation.lockType();
|
|
|
+
|
|
|
+ ReadWriteLock rwLock = transactionLocks.getLock(lockArea);
|
|
|
+ Lock lock = lockType == LockType.READ ? rwLock.readLock() : rwLock.writeLock();
|
|
|
+
|
|
|
+ lock.lock();
|
|
|
+
|
|
|
+ // ensure that we add this lock area, otherwise it will never be released
|
|
|
+ // when the outer most transaction is committed
|
|
|
+ s_transactionalLocks.get().add(annotation);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Unlocks all {@link LockArea}s associated with this transaction or any of
|
|
|
+ * the child transactions which were joined. The order that the locks are
|
|
|
+ * released is inverted from the order in which they were acquired.
|
|
|
+ */
|
|
|
+ private void unlockTransaction(){
|
|
|
+ LinkedList<TransactionalLock> annotations = s_transactionalLocks.get();
|
|
|
+ if (annotations.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // iterate through all locks which were encountered during the course of
|
|
|
+ // this transaction and release them all now that the transaction is
|
|
|
+ // committed; iterate reverse to unlock the most recently locked areas
|
|
|
+ Iterator<TransactionalLock> iterator = annotations.descendingIterator();
|
|
|
+ while (iterator.hasNext()) {
|
|
|
+ TransactionalLock annotation = iterator.next();
|
|
|
+ LockArea lockArea = annotation.lockArea();
|
|
|
+ LockType lockType = annotation.lockType();
|
|
|
+
|
|
|
+ ReadWriteLock rwLock = transactionLocks.getLock(lockArea);
|
|
|
+ Lock lock = lockType == LockType.READ ? rwLock.readLock() : rwLock.writeLock();
|
|
|
+
|
|
|
+ lock.unlock();
|
|
|
+ iterator.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Transactional
|
|
|
private static class Internal {
|
|
|
}
|