Browse Source

AMBARI-18042. Part 3. Query execution takes a long time in hive view version1.5.0. (dipayanb)

Dipayan Bhowmick 9 years ago
parent
commit
ecac23e6da

+ 21 - 12
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java

@@ -23,13 +23,16 @@ import akka.actor.ActorSystem;
 import akka.actor.Inbox;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import com.google.common.collect.Multimap;
 import org.apache.ambari.view.ViewContext;
 import org.apache.ambari.view.hive2.actor.DeathWatch;
 import org.apache.ambari.view.hive2.actor.OperationController;
 import org.apache.ambari.view.hive2.internal.ConnectionSupplier;
 import org.apache.ambari.view.hive2.internal.DataStorageSupplier;
 import org.apache.ambari.view.hive2.internal.HdfsApiSupplier;
+import org.apache.ambari.view.hive2.internal.SafeViewContext;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -39,7 +42,7 @@ public class ConnectionSystem {
   private ActorSystem actorSystem = null;
   private static volatile ConnectionSystem instance = null;
   private static final Object lock = new Object();
-  private static Map<String, ActorRef> operationControllerMap = new ConcurrentHashMap<>();
+  private static Map<String, Map<String, ActorRef>> operationControllerMap = new ConcurrentHashMap<>();
 
   private ConnectionSystem() {
     this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);
@@ -71,27 +74,33 @@ public class ConnectionSystem {
   /**
    * Returns one operationController per View Instance
    *
-   * @param context
+   * @param viewContext
    * @return operationController Instance
    */
-  public ActorRef getOperationController(ViewContext context) {
+  public synchronized ActorRef getOperationController(ViewContext viewContext) {
+    SafeViewContext context = new SafeViewContext(viewContext);
     String instanceName = context.getInstanceName();
-    ActorRef ref = operationControllerMap.get(instanceName);
+    ActorRef ref = null;
+    Map<String, ActorRef> stringActorRefMap = operationControllerMap.get(instanceName);
+    if(stringActorRefMap != null) {
+      ref = stringActorRefMap.get(context.getUsername());
+    }
     if (ref == null) {
-      synchronized (lock) {
-        ref = operationControllerMap.get(instanceName);
-        if (ref == null) {
-          ref = createOperationController(context);
-          operationControllerMap.put(instanceName, ref);
-        }
+      ref = createOperationController(context);
+      if(stringActorRefMap == null) {
+        stringActorRefMap = new HashMap<>();
+        stringActorRefMap.put(context.getUsername(), ref);
+        operationControllerMap.put(instanceName, stringActorRefMap);
+      } else {
+        stringActorRefMap.put(context.getUsername(), ref);
       }
     }
     return ref;
   }
 
   public void removeOperationControllerFromCache(String viewInstanceName) {
-    ActorRef ref = operationControllerMap.remove(viewInstanceName);
-    if (ref != null) {
+    Map<String, ActorRef> refs = operationControllerMap.remove(viewInstanceName);
+    for (ActorRef ref : refs.values()) {
       Inbox inbox = Inbox.create(getActorSystem());
       inbox.send(ref, PoisonPill.getInstance());
     }

+ 7 - 5
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java

@@ -343,13 +343,13 @@ public class JdbcConnector extends HiveActor {
 
   public boolean checkConnection() {
     if (connectable == null) {
-      notifyConnectFailure();
+      notifyConnectFailure(new SQLException("Hive connection is not created"));
       return false;
     }
 
     Optional<HiveConnection> connectionOptional = connectable.getConnection();
     if (!connectionOptional.isPresent()) {
-      notifyConnectFailure();
+      notifyConnectFailure(new SQLException("Hive connection is not created"));
       return false;
     }
     return true;
@@ -375,10 +375,10 @@ public class JdbcConnector extends HiveActor {
     return executionType == HiveJob.Type.ASYNC;
   }
 
-  private void notifyConnectFailure() {
+  private void notifyConnectFailure(Exception ex) {
     executing = false;
     isFailure = true;
-    this.failure = new Failure("Cannot connect to hive", new SQLException("Cannot connect to hive"));
+    this.failure = new Failure("Cannot connect to hive", ex);
     if (isAsync()) {
       updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
     } else {
@@ -416,9 +416,10 @@ public class JdbcConnector extends HiveActor {
         connectable.connect();
       }
     } catch (ConnectionException e) {
+      LOG.error("Failed to create a hive connection. {}", e);
       // set up job failure
       // notify parent about job failure
-      notifyConnectFailure();
+      notifyConnectFailure(e);
       return;
     }
     startTerminateInactivityScheduler();
@@ -568,6 +569,7 @@ public class JdbcConnector extends HiveActor {
     failure = null;
     resultSetIterator = null;
     isCancelCalled = false;
+    statementQueue = new ArrayDeque<>();
   }
 
   @Override

+ 179 - 0
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/internal/SafeViewContext.java

@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.internal;
+
+import org.apache.ambari.view.AmbariStreamProvider;
+import org.apache.ambari.view.DataStore;
+import org.apache.ambari.view.HttpImpersonator;
+import org.apache.ambari.view.ImpersonatorSetting;
+import org.apache.ambari.view.ResourceProvider;
+import org.apache.ambari.view.SecurityException;
+import org.apache.ambari.view.URLConnectionProvider;
+import org.apache.ambari.view.URLStreamProvider;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewController;
+import org.apache.ambari.view.ViewDefinition;
+import org.apache.ambari.view.ViewInstanceDefinition;
+import org.apache.ambari.view.cluster.Cluster;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Wrapper to ViewContext. This delegates all the method calls to wrapped ViewContext object excepting for
+ * #getUsername() and #getLoggedinUser(). At the creation time, the username and loggedinuser are store
+ * in instance variable. This was done to bypass the ThreadLocal variables implicitly used in actual viewContext.
+ * So, object of this class should be used in the ActorSystem.
+ */
+public class SafeViewContext implements ViewContext {
+  private final ViewContext viewContext;
+  private final String username;
+  private final String loggedinUser;
+
+  public SafeViewContext(ViewContext viewContext) {
+    this.viewContext = viewContext;
+    username = viewContext.getUsername();
+    loggedinUser = viewContext.getLoggedinUser();
+  }
+
+  @Override
+  public String getUsername() {
+    return username;
+  }
+
+  @Override
+  public String getLoggedinUser() {
+    return loggedinUser;
+  }
+
+  @Override
+  public void hasPermission(String userName, String permissionName) throws SecurityException {
+    viewContext.hasPermission(userName, permissionName);
+  }
+
+  @Override
+  public String getViewName() {
+    return viewContext.getViewName();
+  }
+
+  @Override
+  public ViewDefinition getViewDefinition() {
+    return viewContext.getViewDefinition();
+  }
+
+  @Override
+  public String getInstanceName() {
+    return viewContext.getInstanceName();
+  }
+
+  @Override
+  public ViewInstanceDefinition getViewInstanceDefinition() {
+    return viewContext.getViewInstanceDefinition();
+  }
+
+  @Override
+  public Map<String, String> getProperties() {
+    return viewContext.getProperties();
+  }
+
+  @Override
+  public void putInstanceData(String key, String value) {
+    viewContext.putInstanceData(key, value);
+  }
+
+  @Override
+  public String getInstanceData(String key) {
+    return viewContext.getInstanceData(key);
+  }
+
+  @Override
+  public Map<String, String> getInstanceData() {
+    return viewContext.getInstanceData();
+  }
+
+  @Override
+  public void removeInstanceData(String key) {
+    viewContext.removeInstanceData(key);
+  }
+
+  @Override
+  public String getAmbariProperty(String key) {
+    return viewContext.getAmbariProperty(key);
+  }
+
+  @Override
+  public ResourceProvider<?> getResourceProvider(String type) {
+    return viewContext.getResourceProvider(type);
+  }
+
+  @Override
+  public URLStreamProvider getURLStreamProvider() {
+    return viewContext.getURLStreamProvider();
+  }
+
+  @Override
+  public URLConnectionProvider getURLConnectionProvider() {
+    return viewContext.getURLConnectionProvider();
+  }
+
+  @Override
+  public AmbariStreamProvider getAmbariStreamProvider() {
+    return viewContext.getAmbariStreamProvider();
+  }
+
+  @Override
+  public AmbariStreamProvider getAmbariClusterStreamProvider() {
+    return viewContext.getAmbariClusterStreamProvider();
+  }
+
+  @Override
+  public DataStore getDataStore() {
+    return viewContext.getDataStore();
+  }
+
+  @Override
+  public Collection<ViewDefinition> getViewDefinitions() {
+    return viewContext.getViewDefinitions();
+  }
+
+  @Override
+  public Collection<ViewInstanceDefinition> getViewInstanceDefinitions() {
+    return viewContext.getViewInstanceDefinitions();
+  }
+
+  @Override
+  public ViewController getController() {
+    return viewContext.getController();
+  }
+
+  @Override
+  public HttpImpersonator getHttpImpersonator() {
+    return viewContext.getHttpImpersonator();
+  }
+
+  @Override
+  public ImpersonatorSetting getImpersonatorSetting() {
+    return viewContext.getImpersonatorSetting();
+  }
+
+  @Override
+  public Cluster getCluster() {
+    return viewContext.getCluster();
+  }
+}

+ 1 - 1
contrib/views/hive-next/src/main/resources/application.conf

@@ -28,7 +28,7 @@ akka {
   # Log level for the very basic logger activated during ActorSystem startup.
   # This logger prints the log messages to stdout (System.out).
   # Options: OFF, ERROR, WARNING, INFO, DEBUG
-  stdout-loglevel = "WARN"
+  stdout-loglevel = "WARNING"
 
   actor {