|
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.locks.ReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import org.apache.ambari.annotations.TransactionalLock;
|
|
|
import org.apache.ambari.annotations.TransactionalLock.LockArea;
|
|
@@ -41,7 +41,6 @@ import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.api.services.AmbariMetaInfo;
|
|
|
import org.apache.ambari.server.configuration.Configuration;
|
|
|
import org.apache.ambari.server.controller.AmbariManagementController;
|
|
|
-import org.apache.ambari.server.orm.TransactionalLocks;
|
|
|
import org.apache.ambari.server.orm.dao.ClusterDAO;
|
|
|
import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
|
|
|
import org.apache.ambari.server.state.PropertyInfo.PropertyType;
|
|
@@ -73,14 +72,22 @@ public class ConfigHelper {
|
|
|
private final boolean STALE_CONFIGS_CACHE_ENABLED;
|
|
|
private final int STALE_CONFIGS_CACHE_EXPIRATION_TIME;
|
|
|
|
|
|
+ /**
|
|
|
+ * This lock used to synchronize {@link #staleConfigCacheDesiredConfigs} variable writing and avoid
|
|
|
+ * {@link #staleConfigsCache} invalidation during stale config reading(it will be invalidated when all reading
|
|
|
+ * operations are finished, thus next updating of {@link #staleConfigsCache} will be performed with fresh
|
|
|
+ * {@link #staleConfigCacheDesiredConfigs}
|
|
|
+ */
|
|
|
+ private final ReentrantReadWriteLock staleConfigCacheLock = new ReentrantReadWriteLock();
|
|
|
/**
|
|
|
* A cache of which {@link ServiceComponentHost}s have stale configurations.
|
|
|
* This cache may be invalidated within the context of a running transaction
|
|
|
* which could potentially cause old data to be re-cached before the
|
|
|
* transaction has completed.
|
|
|
* <p/>
|
|
|
- * As a result, all invalidations of this cache should be done on a separate
|
|
|
- * thread using the {@link LockArea#STALE_CONFIG_CACHE}.
|
|
|
+ * As a result, all invalidation of this cache should be done on a separate
|
|
|
+ * thread using the {@link ConfigHelper#staleConfigCacheLock} with desiredConfigs
|
|
|
+ * from thread that requested invalidation.
|
|
|
*
|
|
|
* @see #cacheInvalidationExecutor
|
|
|
*/
|
|
@@ -121,24 +128,15 @@ public class ConfigHelper {
|
|
|
|
|
|
/**
|
|
|
* Used to invalidate the {@link #staleConfigsCache} on a separate thread.
|
|
|
- * This helps ensure that the {@link TransactionalLock} /
|
|
|
- * {@link LockArea#STALE_CONFIG_CACHE} doesn't cause a deadlock with the
|
|
|
- * "cluster global lock".
|
|
|
- * <p/>
|
|
|
- * Cache invalidations must happen after the transaction which invoked them
|
|
|
- * has completed, otherwise it's possible to cache invalid data before the
|
|
|
- * transaction is committed.
|
|
|
*/
|
|
|
private final ExecutorService cacheInvalidationExecutor = createCacheInvalidationExecutor();
|
|
|
|
|
|
/**
|
|
|
- * Used to ensure that methods which rely on the completion of
|
|
|
- * {@link Transactional} can detect when they are able to run.
|
|
|
- *
|
|
|
- * @see TransactionalLock
|
|
|
+ * Every request that updates configs or configs-groups must invalidate staleConfigCache and update this filed
|
|
|
+ * with its own updated desiredConfigs(they must be passed form updater request) to avoid any issues with EL caches
|
|
|
+ * during parallel requests.
|
|
|
*/
|
|
|
- @Inject
|
|
|
- private final TransactionalLocks transactionLocks = null;
|
|
|
+ private volatile Map<String, DesiredConfig> staleConfigCacheDesiredConfigs = null;
|
|
|
|
|
|
@Inject
|
|
|
public ConfigHelper(Clusters c, AmbariMetaInfo metaInfo, Configuration configuration, ClusterDAO clusterDAO) {
|
|
@@ -192,7 +190,7 @@ public class ConfigHelper {
|
|
|
* the cluster
|
|
|
* @param hostConfigOverrides
|
|
|
* the host overrides applied using config groups
|
|
|
- * @param desiredConfigs
|
|
|
+ * @param clusterDesired
|
|
|
* the desired configurations for the cluster. Obtaining these can be
|
|
|
* expensive, ans since this method could be called 10,000's of times
|
|
|
* when generating cluster/host responses. Therefore, the caller
|
|
@@ -464,7 +462,7 @@ public class ConfigHelper {
|
|
|
*
|
|
|
* @param sch
|
|
|
* the SCH to calcualte config staleness for (not {@code null}).
|
|
|
- * @param desiredConfigs
|
|
|
+ * @param requestDesiredConfigs
|
|
|
* the desired configurations for the cluster. Obtaining these can be
|
|
|
* expensive and since this method operates on SCH's, it could be
|
|
|
* called 10,000's of times when generating cluster/host responses.
|
|
@@ -474,41 +472,52 @@ public class ConfigHelper {
|
|
|
*
|
|
|
* @return <code>true</code> if the actual configs are stale
|
|
|
*/
|
|
|
- public boolean isStaleConfigs(ServiceComponentHost sch, Map<String, DesiredConfig> desiredConfigs)
|
|
|
+ public boolean isStaleConfigs(ServiceComponentHost sch, Map<String, DesiredConfig> requestDesiredConfigs)
|
|
|
throws AmbariException {
|
|
|
Boolean stale = null;
|
|
|
+ staleConfigCacheLock.readLock().lock();
|
|
|
+ try {
|
|
|
+ // try the cache
|
|
|
+ if (STALE_CONFIGS_CACHE_ENABLED) {
|
|
|
+ stale = staleConfigsCache.getIfPresent(sch);
|
|
|
+ }
|
|
|
|
|
|
- // try the cache
|
|
|
- if (STALE_CONFIGS_CACHE_ENABLED) {
|
|
|
- stale = staleConfigsCache.getIfPresent(sch);
|
|
|
- }
|
|
|
-
|
|
|
- if (stale == null) {
|
|
|
- stale = calculateIsStaleConfigs(sch, desiredConfigs);
|
|
|
- staleConfigsCache.put(sch, stale);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Cache configuration staleness for host {} and component {} as {}",
|
|
|
- sch.getHostName(), sch.getServiceComponentName(), stale);
|
|
|
+ if (stale == null) {
|
|
|
+ /*
|
|
|
+ prefer staleConfigCacheDesiredConfigs(they are supposed to be passed from thread that updated configs) to avoid
|
|
|
+ population staleConfigsCache with stale data in case current thread have stale EL caches(this can happen during
|
|
|
+ parallel requests)
|
|
|
+ */
|
|
|
+ Map<String, DesiredConfig> desiredConfigs = staleConfigCacheDesiredConfigs != null ? staleConfigCacheDesiredConfigs : requestDesiredConfigs;
|
|
|
+ stale = calculateIsStaleConfigs(sch, desiredConfigs);
|
|
|
+ staleConfigsCache.put(sch, stale);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Cache configuration staleness for host {} and component {} as {}",
|
|
|
+ sch.getHostName(), sch.getServiceComponentName(), stale);
|
|
|
+ }
|
|
|
}
|
|
|
+ return stale;
|
|
|
+ } finally {
|
|
|
+ staleConfigCacheLock.readLock().unlock();
|
|
|
}
|
|
|
- return stale;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Invalidates the stale configuration cache for all
|
|
|
* {@link ServiceComponentHost}s for the given host. This will execute the
|
|
|
- * invalidation on a separate thread. The thread will attempt to acquire the
|
|
|
- * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
|
|
|
- * performed on a separate thread. This way, it won't interfere with any
|
|
|
- * cluster global locks already acquired.
|
|
|
+ * invalidation on a separate thread that will try to get write lock from {@link ConfigHelper#staleConfigCacheLock} to
|
|
|
+ * wait for finish of all reading operations for stale configs. Every subsequent read operation will be blocked with
|
|
|
+ * write lock and will be executed only after {@link #staleConfigCacheDesiredConfigs} updated and cache invalidated.
|
|
|
*
|
|
|
* @param hostname
|
|
|
+ * @param desiredConfigs desired cluster configs that will be used for configuration invalidation
|
|
|
*/
|
|
|
- public void invalidateStaleConfigsCache(String hostname) {
|
|
|
+ public void invalidateStaleConfigsCache(String hostname, Map<String, DesiredConfig> desiredConfigs) {
|
|
|
try {
|
|
|
for (Cluster cluster : clusters.getClustersForHost(hostname)) {
|
|
|
List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostname);
|
|
|
- Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(serviceComponentHosts);
|
|
|
+ Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(serviceComponentHosts, desiredConfigs);
|
|
|
cacheInvalidationExecutor.execute(invalidationRunnable);
|
|
|
}
|
|
|
} catch (AmbariException e) {
|
|
@@ -518,26 +527,20 @@ public class ConfigHelper {
|
|
|
|
|
|
/**
|
|
|
* Invalidates the stale configuration cache for keys. This will execute the
|
|
|
- * invalidation on a separate thread. The thread will attempt to acquire the
|
|
|
- * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
|
|
|
- * performed on a separate thread. This way, it won't interfere with any
|
|
|
- * cluster global locks already acquired.
|
|
|
+ * invalidation on a separate thread.
|
|
|
*/
|
|
|
- public void invalidateStaleConfigsCache() {
|
|
|
- Runnable invalidationRunnable = new StaleConfigInvalidationRunnable();
|
|
|
+ public void invalidateStaleConfigsCache(Map<String, DesiredConfig> desiredConfigs) {
|
|
|
+ Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(desiredConfigs);
|
|
|
cacheInvalidationExecutor.execute(invalidationRunnable);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Invalidates the stale configuration cache for a single
|
|
|
* {@link ServiceComponentHost}. This will execute the invalidation on a
|
|
|
- * separate thread. The thread will attempt to acquire the
|
|
|
- * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
|
|
|
- * performed on a separate thread. This way, it won't interfere with any
|
|
|
- * cluster global locks already acquired.
|
|
|
+ * separate thread.
|
|
|
*/
|
|
|
- public void invalidateStaleConfigsCache(ServiceComponentHost sch) {
|
|
|
- Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(Collections.singletonList(sch));
|
|
|
+ public void invalidateStaleConfigsCache(ServiceComponentHost sch, Map<String, DesiredConfig> desiredConfigs) {
|
|
|
+ Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(Collections.singletonList(sch), desiredConfigs);
|
|
|
cacheInvalidationExecutor.execute(invalidationRunnable);
|
|
|
}
|
|
|
|
|
@@ -1376,21 +1379,24 @@ public class ConfigHelper {
|
|
|
|
|
|
/**
|
|
|
* Invalidates the {@link ConfigHelper#staleConfigsCache} after acquiring a
|
|
|
- * lock around {@link LockArea#STALE_CONFIG_CACHE}. It is necessary to acquire
|
|
|
- * this lock since the event which caused the config to become stale, such as
|
|
|
- * a new configuration being created, may not have had its transaction
|
|
|
- * committed yet.
|
|
|
+ * lock around {@link ConfigHelper#staleConfigCacheLock}. It is necessary to acquire
|
|
|
+ * this lock to avoid parallel reading threads populate cache using stale desiredConfigs.
|
|
|
+ * This thread can receive desiredConfigs form thread that initialized invalidation and put
|
|
|
+ * them to {@link ConfigHelper}. {@link ConfigHelper#staleConfigCacheLock} guaranteeing that invalidation
|
|
|
+ * will happen only after desiredConfigs passed to {@link ConfigHelper} instance.
|
|
|
*/
|
|
|
private final class StaleConfigInvalidationRunnable implements Runnable {
|
|
|
|
|
|
private final List<ServiceComponentHost> m_keysToInvalidate;
|
|
|
+ private final Map<String, DesiredConfig> m_desiredConfigs;
|
|
|
|
|
|
/**
|
|
|
* Constructor.
|
|
|
*
|
|
|
*/
|
|
|
- private StaleConfigInvalidationRunnable() {
|
|
|
+ private StaleConfigInvalidationRunnable(Map<String, DesiredConfig> desiredConfigs) {
|
|
|
m_keysToInvalidate = null;
|
|
|
+ m_desiredConfigs = desiredConfigs;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1399,8 +1405,9 @@ public class ConfigHelper {
|
|
|
* @param keysToInvalidate
|
|
|
* the keys to invalidate in the cache.
|
|
|
*/
|
|
|
- private StaleConfigInvalidationRunnable(List<ServiceComponentHost> keysToInvalidate) {
|
|
|
+ private StaleConfigInvalidationRunnable(List<ServiceComponentHost> keysToInvalidate, Map<String, DesiredConfig> desiredConfigs) {
|
|
|
m_keysToInvalidate = keysToInvalidate;
|
|
|
+ m_desiredConfigs = desiredConfigs;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1408,16 +1415,16 @@ public class ConfigHelper {
|
|
|
*/
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- ReadWriteLock lock = transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE);
|
|
|
- lock.writeLock().lock();
|
|
|
+ staleConfigCacheLock.writeLock().lock();
|
|
|
try {
|
|
|
+ ConfigHelper.this.staleConfigCacheDesiredConfigs = m_desiredConfigs;
|
|
|
if (null == m_keysToInvalidate || m_keysToInvalidate.isEmpty()) {
|
|
|
staleConfigsCache.invalidateAll();
|
|
|
} else {
|
|
|
staleConfigsCache.invalidateAll(m_keysToInvalidate);
|
|
|
}
|
|
|
} finally {
|
|
|
- lock.writeLock().unlock();
|
|
|
+ staleConfigCacheLock.writeLock().unlock();
|
|
|
}
|
|
|
}
|
|
|
}
|