Ver Fonte

AMBARI-16146. Hive View Synchronized Around Entire Connection Creation Causing Deadlock (Nitiraj Rathore via pallavkul)

Pallav Kulshreshtha há 9 anos atrás
pai
commit
3a7ea944bb

+ 8 - 5
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/UserLocalConnection.java

@@ -35,11 +35,14 @@ public class UserLocalConnection extends UserLocal<Connection> {
       new UserLocalHiveAuthCredentials();
 
   @Override
-  protected synchronized Connection initialValue(ViewContext context) {
-    ConnectionFactory hiveConnectionFactory = new ConnectionFactory(context, authCredentialsLocal.get(context));
-    authCredentialsLocal.remove(context);  // we should not store credentials in memory,
-                                          // password is erased after connection established
-    return hiveConnectionFactory.create();
+  protected Connection initialValue(ViewContext context) {
+      LOG.debug("creating connection for context : {}" , context);
+      ConnectionFactory hiveConnectionFactory = new ConnectionFactory(context, authCredentialsLocal.get(context));
+      authCredentialsLocal.remove(context);  // we should not store credentials in memory,
+      // password is erased after connection established
+      Connection connection = hiveConnectionFactory.create();
+      LOG.debug("returning connection : {} for context : {} ", connection,context);
+      return connection;
   }
 
 }

+ 1 - 1
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/UserLocalHiveAuthCredentials.java

@@ -27,7 +27,7 @@ public class UserLocalHiveAuthCredentials extends UserLocal<HiveAuthCredentials>
   }
 
   @Override
-  protected synchronized HiveAuthCredentials initialValue(ViewContext context) {
+  protected HiveAuthCredentials initialValue(ViewContext context) {
     return new HiveAuthCredentials();
   }
 }

+ 80 - 15
contrib/views/utils/src/main/java/org/apache/ambari/view/utils/UserLocal.java

@@ -19,10 +19,14 @@
 package org.apache.ambari.view.utils;
 
 import org.apache.ambari.view.ViewContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Manages end user specific objects.
@@ -31,8 +35,12 @@ import java.util.concurrent.ConcurrentHashMap;
  * @param <T> user-local class
  */
 public class UserLocal<T> {
-  private static Map<Class, Map<String, Object>> viewSingletonObjects = new ConcurrentHashMap<Class, Map<String, Object>>();
+  private final static Logger LOG =
+    LoggerFactory.getLogger(UserLocal.class);
+
+  private final static Map<Class, Map<String, Object>> viewSingletonObjects = new ConcurrentHashMap<>();
   private final Class<? extends T> tClass;
+  private final static Map<String,ReentrantLock> locks = new HashMap<>();
 
   public UserLocal(Class<? extends T> tClass) {
     this.tClass =tClass;
@@ -49,6 +57,26 @@ public class UserLocal<T> {
     return null;
   }
 
+  /**
+   * gives the lock object for the specified key. creates if not yet created.
+   * @param key : the key for which the lock is required.
+   * @return : ReentrantLock for that key
+   */
+  private static ReentrantLock getLockFor(String key){
+    LOG.info("Finding lock for : {}", key);
+    if( null == locks.get(key)){
+      LOG.info("Lock not found for {} ",key);
+      synchronized (locks){
+        if(null == locks.get(key)){
+          LOG.info("Creating lock for {} ", key);
+          locks.put(key,new ReentrantLock());
+        }
+      }
+    }
+
+    return locks.get(key);
+  }
+
   /**
    * Returns user-local instance.
    * If instance of class is not present yet for user, calls initialValue to create it.
@@ -57,15 +85,43 @@ public class UserLocal<T> {
    */
   public T get(ViewContext context) {
     if (!viewSingletonObjects.containsKey(tClass)) {
-      viewSingletonObjects.put(tClass, new ConcurrentHashMap<String, Object>());
+      synchronized (viewSingletonObjects) {
+        if (!viewSingletonObjects.containsKey(tClass)) {
+          viewSingletonObjects.put(tClass, new ConcurrentHashMap<String, Object>());
+        }
+      }
     }
 
     Map<String, Object> instances = viewSingletonObjects.get(tClass);
 
-    if (!instances.containsKey(getTagName(context))) {
-      instances.put(getTagName(context), initialValue(context));
+    String key = getTagName(context);
+
+    LOG.debug("looking for key : {}", key);
+    if (!instances.containsKey(key)) {
+      String lockKey = tClass.getName() + "_" + key;
+      LOG.info("key {} not found. getting lock for {}", key,lockKey);
+
+      ReentrantLock lock = getLockFor(lockKey);
+      boolean gotLock = lock.tryLock();
+
+      if( !gotLock ){
+        LOG.error("Lock could not be obtained for {}. Throwing exception.",lockKey);
+        throw new RuntimeException(String.format("Failed to initialize %s for %s. Try Again.", tClass.getName(), key));
+      }
+      else {
+        try {
+          if (!instances.containsKey(key)) {
+            T initValue = initialValue(context);
+            LOG.info("Obtained initial value : {} for key : {}",initValue,key);
+            instances.put(key, initValue);
+          }
+        }finally{
+          lock.unlock();
+        }
+      }
     }
-    return (T) instances.get(getTagName(context));
+
+    return (T) instances.get(key);
   }
 
   /**
@@ -75,11 +131,16 @@ public class UserLocal<T> {
    */
   public void set(T obj, ViewContext context) {
     if (!viewSingletonObjects.containsKey(tClass)) {
-      viewSingletonObjects.put(tClass, new ConcurrentHashMap<String, Object>());
+      synchronized (viewSingletonObjects) {
+        if (!viewSingletonObjects.containsKey(tClass)) {
+          viewSingletonObjects.put(tClass, new ConcurrentHashMap<String, Object>());
+        }
+      }
     }
-
+    String key = getTagName(context);
+    LOG.info("setting key : value {} : {}", key,obj );
     Map<String, Object> instances = viewSingletonObjects.get(tClass);
-    instances.put(getTagName(context), obj);
+    instances.put(key, obj);
   }
 
   /**
@@ -87,11 +148,11 @@ public class UserLocal<T> {
    * @param context ViewContext that provides username and instance name
    */
   public void remove(ViewContext context) {
-    if (viewSingletonObjects.containsKey(tClass)) {
-      Map<String, Object> instances = viewSingletonObjects.get(tClass);
-      if (instances.containsKey(getTagName(context))) {
-        instances.remove(getTagName(context));
-      }
+    String key = getTagName(context);
+    LOG.info("removing key : {}", key);
+    Map<String, Object> instances = viewSingletonObjects.get(tClass);
+    if( null != instances ){
+      instances.remove(key);
     }
   }
 
@@ -113,9 +174,10 @@ public class UserLocal<T> {
    * @param tClass classname instances of which should be dropped
    */
   public static void dropAllConnections(Class tClass) {
+    LOG.info("removing all {} " ,tClass.getName());
     Map<String, Object> instances = viewSingletonObjects.get(tClass);
     if (instances != null) {
-      viewSingletonObjects.get(tClass).clear();
+      instances.clear();
     }
   }
 
@@ -125,6 +187,7 @@ public class UserLocal<T> {
    * Method should not normally be called from production code.
    */
   public static void dropAllConnections() {
+      LOG.info("clearing all viewSingletonObjects.");
       viewSingletonObjects.clear();
   }
 
@@ -132,13 +195,15 @@ public class UserLocal<T> {
    *
    * Drops all objects for give instance name.
    *
-   * @param instanceName
+   * @param instanceName : the name of the view instance for which the keys needs to be dropped.
    */
   public static void dropInstanceCache(String instanceName){
+    LOG.info("removing all the keys for instanceName : {}", instanceName);
     for(Map<String,Object> cache : viewSingletonObjects.values()){
       for(Iterator<Map.Entry<String, Object>> it = cache.entrySet().iterator();it.hasNext();){
         Map.Entry<String, Object> entry = it.next();
         if(entry.getKey().startsWith(instanceName+":")){
+          LOG.debug("removing key : {} ",entry.getKey());
           it.remove();
         }
       }