Prechádzať zdrojové kódy

AMBARI-17106 - Deadlock While Updating Stale Configuration Cache During Upgrade (jonathanhurley)

Jonathan Hurley 9 rokov pred
rodič
commit
b024000615

+ 111 - 22
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java

@@ -29,6 +29,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 
@@ -51,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.persist.Transactional;
@@ -68,6 +72,18 @@ public class ConfigHelper {
   public static final String CLUSTER_DEFAULT_TAG = "tag";
   private final boolean STALE_CONFIGS_CACHE_ENABLED;
   private final int STALE_CONFIGS_CACHE_EXPIRATION_TIME;
+
+  /**
+   * A cache of which {@link ServiceComponentHost}s have stale configurations.
+   * This cache may be invalidated within the context of a running transaction
+   * which could potentially cause old data to be re-cached before the
+   * transaction has completed.
+   * <p/>
+   * As a result, all invalidations of this cache should be done on a separate
+   * thread using the {@link LockArea#STALE_CONFIG_CACHE}.
+   *
+   * @see #cacheInvalidationExecutor
+   */
   private final Cache<ServiceComponentHost, Boolean> staleConfigsCache;
 
   private static final Logger LOG =
@@ -96,6 +112,25 @@ public class ConfigHelper {
    */
   public static final String FIRST_VERSION_TAG = "version1";
 
+  /**
+   * A {@link ThreadFactory} for the {@link #cacheInvalidationExecutor}.
+   */
+  private final ThreadFactory cacheInvalidationThreadFactory = new ThreadFactoryBuilder().setNameFormat(
+      "ambari-stale-config-invalidator-%d").setDaemon(true).build();
+
+  /**
+   * Used to invalidate the {@link #staleConfigsCache} on a separate thread.
+   * This helps ensure that the {@link TransactionalLock} /
+   * {@link LockArea#STALE_CONFIG_CACHE} doesn't cause a deadlock with the
+   * "cluster global lock".
+   * <p/>
+   * Cache invalidations must happen after the transaction which invoked them
+   * has completed, otherwise it's possible to cache invalid data before the
+   * transaction is committed.
+   */
+  private final ExecutorService cacheInvalidationExecutor = Executors.newSingleThreadExecutor(
+      cacheInvalidationThreadFactory);
+
   /**
    * Used to ensure that methods which rely on the completion of
    * {@link Transactional} can detect when they are able to run.
@@ -449,34 +484,32 @@ public class ConfigHelper {
     }
 
     if (stale == null) {
-      ReadWriteLock lock = transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE);
-      lock.readLock().lock();
-
-      try {
-        stale = calculateIsStaleConfigs(sch, desiredConfigs);
-        staleConfigsCache.put(sch, stale);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cache configuration staleness for host {} and component {} as {}",
-              sch.getHostName(), sch.getServiceComponentName(), stale);
-        }
-      } finally {
-        lock.readLock().unlock();
+      stale = calculateIsStaleConfigs(sch, desiredConfigs);
+      staleConfigsCache.put(sch, stale);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cache configuration staleness for host {} and component {} as {}",
+            sch.getHostName(), sch.getServiceComponentName(), stale);
       }
     }
     return stale;
   }
 
   /**
-   * Invalidates cached isStale values for hostname
+   * Invalidates the stale configuration cache for all
+   * {@link ServiceComponentHost}s for the given host. This will execute the
+   * invalidation on a separate thread. The thread will attempt to acquire the
+   * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
+   * performed on a separate thread. This way, it won't interfere with any
+   * cluster global locks already acquired.
    *
    * @param hostname
    */
   public void invalidateStaleConfigsCache(String hostname) {
     try {
       for (Cluster cluster : clusters.getClustersForHost(hostname)) {
-        for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostname)) {
-          invalidateStaleConfigsCache(sch);
-        }
+        List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostname);
+        Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(serviceComponentHosts);
+        cacheInvalidationExecutor.execute(invalidationRunnable);
       }
     } catch (AmbariException e) {
       LOG.warn("Unable to find clusters for host " + hostname);
@@ -484,19 +517,28 @@ public class ConfigHelper {
   }
 
   /**
-   * Invalidates isStale cache
+   * Invalidates the stale configuration cache for keys. This will execute the
+   * invalidation on a separate thread. The thread will attempt to acquire the
+   * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
+   * performed on a separate thread. This way, it won't interfere with any
+   * cluster global locks already acquired.
    */
   public void invalidateStaleConfigsCache() {
-    staleConfigsCache.invalidateAll();
+    Runnable invalidationRunnable = new StaleConfigInvalidationRunnable();
+    cacheInvalidationExecutor.execute(invalidationRunnable);
   }
 
   /**
-   * Invalidates cached isStale value for sch
-   *
-   * @param sch
+   * Invalidates the stale configuration cache for a single
+   * {@link ServiceComponentHost}. This will execute the invalidation on a
+   * separate thread. The thread will attempt to acquire the
+   * {@link LockArea#STALE_CONFIG_CACHE} lock which is why this action is
+   * performed on a separate thread. This way, it won't interfere with any
+   * cluster global locks already acquired.
    */
   public void invalidateStaleConfigsCache(ServiceComponentHost sch) {
-    staleConfigsCache.invalidate(sch);
+    Runnable invalidationRunnable = new StaleConfigInvalidationRunnable(Collections.singletonList(sch));
+    cacheInvalidationExecutor.execute(invalidationRunnable);
   }
 
   /**
@@ -1279,5 +1321,52 @@ public class ConfigHelper {
     return filename.substring(0, extIndex);
   }
 
+  /**
+   * Invalidates the {@link ConfigHelper#staleConfigsCache} after acquiring a
+   * lock around {@link LockArea#STALE_CONFIG_CACHE}. It is necessary to acquire
+   * this lock since the event which caused the config to become stale, such as
+   * a new configuration being created, may not have had its transaction
+   * committed yet.
+   */
+  private final class StaleConfigInvalidationRunnable implements Runnable {
+
+    private final List<ServiceComponentHost> m_keysToInvalidate;
+
+    /**
+     * Constructor.
+     *
+     */
+    private StaleConfigInvalidationRunnable() {
+      m_keysToInvalidate = null;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param keysToInvalidate
+     *          the keys to invalidate in the cache.
+     */
+    private StaleConfigInvalidationRunnable(List<ServiceComponentHost> keysToInvalidate) {
+      m_keysToInvalidate = keysToInvalidate;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      ReadWriteLock lock = transactionLocks.getLock(LockArea.STALE_CONFIG_CACHE);
+      lock.writeLock().lock();
+      try {
+        if (null == m_keysToInvalidate || m_keysToInvalidate.isEmpty()) {
+          staleConfigsCache.invalidateAll();
+        } else {
+          staleConfigsCache.invalidateAll(m_keysToInvalidate);
+        }
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+  }
 
 }

+ 14 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java

@@ -27,6 +27,9 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.ambari.annotations.TransactionalLock;
+import org.apache.ambari.annotations.TransactionalLock.LockArea;
+import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.events.ClusterConfigChangedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -347,8 +350,19 @@ public class ConfigImpl implements Config {
     persist(true);
   }
 
+  /**
+   * {@inheritDoc}
+   * <p/>
+   * This method will attempt a lock around the
+   * {@link LockArea#STALE_CONFIG_CACHE} in order to ensure that the stale
+   * configuration cache can only be invalidated once this transaction has been
+   * committed. Without this lock, it's possible that the stale config cache
+   * might be invalidated and then re-populated with old data before this
+   * transaction commits.
+   */
   @Override
   @Transactional
+  @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = LockType.WRITE)
   public void persist(boolean newConfig) {
     cluster.getClusterGlobalLock().writeLock().lock(); //null cluster is not expected, NPE anyway later in code
     try {

+ 0 - 5
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

@@ -41,9 +41,6 @@ import javax.annotation.Nullable;
 import javax.persistence.EntityManager;
 import javax.persistence.RollbackException;
 
-import org.apache.ambari.annotations.TransactionalLock;
-import org.apache.ambari.annotations.TransactionalLock.LockArea;
-import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ConfigGroupNotFoundException;
 import org.apache.ambari.server.DuplicateResourceException;
@@ -2798,7 +2795,6 @@ public class ClusterImpl implements Cluster {
   }
 
   @Transactional
-  @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = LockType.WRITE)
   void selectConfig(String type, String tag, String user) {
     Collection<ClusterConfigMappingEntity> entities =
         clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId());
@@ -2827,7 +2823,6 @@ public class ClusterImpl implements Cluster {
   }
 
   @Transactional
-  @TransactionalLock(lockArea = LockArea.STALE_CONFIG_CACHE, lockType = LockType.WRITE)
   ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, String serviceConfigVersionNote) {
 
     String serviceName = null;

+ 11 - 3
ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java

@@ -23,6 +23,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,8 +40,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.persistence.EntityManager;
 
-import junit.framework.Assert;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -62,6 +61,7 @@ import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
 import org.apache.ambari.server.state.host.HostFactory;
 import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -69,13 +69,15 @@ import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.security.core.context.SecurityContextHolder;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 import com.google.inject.persist.Transactional;
-import org.springframework.security.core.context.SecurityContextHolder;
+
+import junit.framework.Assert;
 
 
 @RunWith(Enclosed.class)
@@ -198,6 +200,12 @@ public class ConfigHelperTest {
       managementController.updateClusters(new HashSet<ClusterRequest>() {{
         add(clusterRequest4);
       }}, null);
+
+      // instrument the asynchronous stale config invalidation to be synchronous
+      // so that tests pass
+      Field field = ConfigHelper.class.getDeclaredField("cacheInvalidationExecutor");
+      field.setAccessible(true);
+      field.set(configHelper, new SynchronousThreadPoolExecutor());
     }
 
     @After