Browse Source

AMBARI-17269. INSERT and SELECT not working. jobid was used in actor names to make it unique but jobid is not a unique identifier across instances. (dipayanb)

Dipayan Bhowmick 9 năm trước cách đây
mục cha
commit
92efc25eb7

+ 11 - 9
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java

@@ -47,13 +47,14 @@ public class ConnectionSystem {
   private static Map<String, ActorRef> operationControllerMap = new HashMap<>();
 
   private ConnectionSystem() {
-    this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);;
+    this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);
+    ;
   }
 
   public static ConnectionSystem getInstance() {
-    if(instance == null) {
+    if (instance == null) {
       synchronized (lock) {
-        if(instance == null) {
+        if (instance == null) {
           instance = new ConnectionSystem();
         }
       }
@@ -61,10 +62,10 @@ public class ConnectionSystem {
     return instance;
   }
 
-  private ActorRef createOperationController() {
+  private ActorRef createOperationController(ViewContext context) {
     ActorRef deathWatch = actorSystem.actorOf(Props.create(DeathWatch.class));
     return actorSystem.actorOf(
-      Props.create(OperationController.class, actorSystem,deathWatch,
+      Props.create(OperationController.class, actorSystem, deathWatch, context,
         new ConnectionSupplier(), new DataStorageSupplier(), new HdfsApiSupplier()));
   }
 
@@ -74,17 +75,18 @@ public class ConnectionSystem {
 
   /**
    * Returns one operationController per View Instance
+   *
    * @param context
    * @return operationController Instance
    */
   public ActorRef getOperationController(ViewContext context) {
     String instanceName = context.getInstanceName();
     ActorRef ref = operationControllerMap.get(instanceName);
-    if(ref == null) {
+    if (ref == null) {
       synchronized (lock) {
         ref = operationControllerMap.get(instanceName);
-        if(ref == null) {
-          ref = createOperationController();
+        if (ref == null) {
+          ref = createOperationController(context);
           operationControllerMap.put(instanceName, ref);
         }
       }
@@ -93,7 +95,7 @@ public class ConnectionSystem {
   }
 
   public void shutdown() {
-    if(!actorSystem.isTerminated()) {
+    if (!actorSystem.isTerminated()) {
       actorSystem.shutdown();
     }
   }

+ 3 - 1
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java

@@ -354,7 +354,9 @@ public class JdbcConnector extends HiveActor {
   }
 
   private ActorRef getStatementExecutor() {
-    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate).withDispatcher("akka.actor.result-dispatcher"), "StatementExecutor");
+    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate)
+      .withDispatcher("akka.actor.result-dispatcher"),
+      "StatementExecutor:" + UUID.randomUUID().toString());
   }
 
   private boolean isAsync() {

+ 28 - 17
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java

@@ -32,11 +32,9 @@ import org.apache.ambari.view.hive2.actor.message.HiveJob;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
 import org.apache.ambari.view.hive2.actor.message.JobRejected;
 import org.apache.ambari.view.hive2.actor.message.RegisterActor;
-import org.apache.ambari.view.hive2.actor.message.ResultNotReady;
-import org.apache.ambari.view.hive2.actor.message.ResultReady;
 import org.apache.ambari.view.hive2.actor.message.SQLStatementJob;
-import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
 import org.apache.ambari.view.hive2.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive2.actor.message.job.FetchFailed;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
 import org.apache.ambari.view.hive2.internal.ContextSupplier;
@@ -94,13 +92,18 @@ public class OperationController extends HiveActor {
    */
   private final Map<String, Set<ActorRef>> syncBusyConnections;
 
+
+  private final ViewContext context;
+
   public OperationController(ActorSystem system,
                              ActorRef deathWatch,
+                             ViewContext context,
                              ContextSupplier<ConnectionDelegate> connectionSupplier,
                              ContextSupplier<Storage> storageSupplier,
                              ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) {
     this.system = system;
     this.deathWatch = deathWatch;
+    this.context = context;
     this.connectionSupplier = connectionSupplier;
     this.storageSupplier = storageSupplier;
     this.hdfsApiSupplier = hdfsApiSupplier;
@@ -151,7 +154,9 @@ public class OperationController extends HiveActor {
     if (actorRef != null) {
       actorRef.tell(message, sender());
     } else {
-      LOG.error("Failed to find a running job. Cannot cancel jobId: {}.", message.getJobId());
+      String msg = String.format("Cannot cancel job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
     }
   }
 
@@ -159,8 +164,12 @@ public class OperationController extends HiveActor {
     String jobId = message.getJobId();
     String username = message.getUsername();
     ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
-    if(actorRef != null) {
+    if (actorRef != null) {
       actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot fetch error for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
     }
   }
 
@@ -170,6 +179,10 @@ public class OperationController extends HiveActor {
     ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
     if (actorRef != null) {
       actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot fetch result for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
     }
   }
 
@@ -179,9 +192,8 @@ public class OperationController extends HiveActor {
     ActorRef subActor = null;
     // Check if there is available actors to process this
     subActor = getActorRefFromAsyncPool(username);
-    ViewContext viewContext = job.getViewContext();
     if (subActor == null) {
-      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
       if (!hdfsApiOptional.isPresent()) {
         sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self());
         return;
@@ -189,10 +201,10 @@ public class OperationController extends HiveActor {
       HdfsApi hdfsApi = hdfsApiOptional.get();
 
       subActor = system.actorOf(
-        Props.create(JdbcConnector.class, viewContext, self(),
-          deathWatch, hdfsApi, connectionSupplier.get(viewContext),
-          storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
-        "jobId:" + jobId + ":asyncjdbcConnector");
+        Props.create(JdbcConnector.class, context, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(context),
+          storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+        UUID.randomUUID().toString() + ":asyncjdbcConnector");
       deathWatch.tell(new RegisterActor(subActor), self());
     }
 
@@ -242,10 +254,9 @@ public class OperationController extends HiveActor {
     ActorRef subActor = null;
     // Check if there is available actors to process this
     subActor = getActorRefFromSyncPool(username);
-    ViewContext viewContext = job.getViewContext();
 
     if (subActor == null) {
-      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
       if (!hdfsApiOptional.isPresent()) {
         sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
         return;
@@ -253,10 +264,10 @@ public class OperationController extends HiveActor {
       HdfsApi hdfsApi = hdfsApiOptional.get();
 
       subActor = system.actorOf(
-        Props.create(JdbcConnector.class, viewContext, self(),
-          deathWatch, hdfsApi,  connectionSupplier.get(viewContext),
-          storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
-        UUID.randomUUID().toString() + ":SyncjdbcConnector");
+        Props.create(JdbcConnector.class, context, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(context),
+          storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+        UUID.randomUUID().toString() + ":syncjdbcConnector");
       deathWatch.tell(new RegisterActor(subActor), self());
     }
 

+ 8 - 10
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java

@@ -18,33 +18,31 @@
 
 package org.apache.ambari.view.hive2.actor.message;
 
-import org.apache.ambari.view.ViewContext;
-
 public class GetColumnMetadataJob extends HiveJob {
   private final String schemaPattern;
   private final String tablePattern;
   private final String columnPattern;
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext,
+  public GetColumnMetadataJob(String username,
                               String schemaPattern, String tablePattern, String columnPattern) {
-    super(Type.SYNC, username, viewContext);
+    super(Type.SYNC, username);
     this.schemaPattern = schemaPattern;
     this.tablePattern = tablePattern;
     this.columnPattern = columnPattern;
   }
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext,
+  public GetColumnMetadataJob(String username,
                               String tablePattern, String columnPattern) {
-    this(username, viewContext, "*", tablePattern, columnPattern);
+    this(username, "*", tablePattern, columnPattern);
   }
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext,
+  public GetColumnMetadataJob(String username,
                               String columnPattern) {
-    this(username, viewContext, "*", "*", columnPattern);
+    this(username, "*", "*", columnPattern);
   }
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext) {
-    this(username, viewContext, "*", "*", "*");
+  public GetColumnMetadataJob(String username) {
+    this(username, "*", "*", "*");
   }
 
   public String getSchemaPattern() {

+ 1 - 16
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java

@@ -18,23 +18,14 @@
 
 package org.apache.ambari.view.hive2.actor.message;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.ambari.view.ViewContext;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
 public abstract class HiveJob {
 
   private final String username;
   private final Type type;
-  private final ViewContext viewContext;
 
-  public HiveJob(Type type, String username,ViewContext viewContext) {
+  public HiveJob(Type type, String username) {
     this.type = type;
     this.username = username;
-    this.viewContext = viewContext;
   }
 
   public String getUsername() {
@@ -49,12 +40,6 @@ public abstract class HiveJob {
   }
 
 
-
-  public ViewContext getViewContext() {
-    return viewContext;
-  }
-
-
   public enum Type {
     SYNC,
     ASYNC

+ 4 - 4
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java

@@ -34,8 +34,8 @@ public class SQLStatementJob extends HiveJob {
   private final String jobId;
   private final String logFile;
 
-  public SQLStatementJob(Type type, String[] statements, String username, String jobId, String logFile, ViewContext viewContext) {
-    super(type, username, viewContext);
+  public SQLStatementJob(Type type, String[] statements, String username, String jobId, String logFile) {
+    super(type, username);
     this.statements = new String[statements.length];
     this.jobId = jobId;
     this.logFile = logFile;
@@ -43,8 +43,8 @@ public class SQLStatementJob extends HiveJob {
       this.statements[i] = clean(statements[i]);
     }
   }
-  public SQLStatementJob(Type type, String[] statements, String username, ViewContext viewContext) {
-    this(type, statements, username, null, null, viewContext);
+  public SQLStatementJob(Type type, String[] statements, String username) {
+    this(type, statements, username, null, null);
   }
 
   private String clean(String statement) {

+ 2 - 2
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java

@@ -144,7 +144,7 @@ public class DDLDelegatorImpl implements DDLDelegator {
 
   private Optional<Result> getRowsFromDB(ConnectionConfig config, String[] statements) {
     Connect connect = config.createConnectMessage();
-    HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, config.getUsername(), context);
+    HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, config.getUsername());
     ExecuteJob execute = new ExecuteJob(connect, job);
 
     LOG.info("Executing query: {}, for user: {}", getJoinedStatements(statements), job.getUsername());
@@ -154,7 +154,7 @@ public class DDLDelegatorImpl implements DDLDelegator {
 
   private Optional<Result> getTableDescription(ConnectionConfig config, String databasePattern, String tablePattern, String columnPattern) {
     Connect connect = config.createConnectMessage();
-    HiveJob job = new GetColumnMetadataJob(config.getUsername(), context, databasePattern, tablePattern, columnPattern);
+    HiveJob job = new GetColumnMetadataJob(config.getUsername(), databasePattern, tablePattern, columnPattern);
     ExecuteJob execute = new ExecuteJob(connect, job);
 
     LOG.info("Executing query to fetch the column description for dbPattern: {}, tablePattern: {}, columnPattern: {}, for user: {}",

+ 1 - 1
contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java

@@ -107,7 +107,7 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
         String query = getQueryForJob();
         ConnectionSystem system = ConnectionSystem.getInstance();
         AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
-        SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, getStatements(jobDatabase, query), context.getUsername(), job.getId(), job.getLogFile(), context);
+        SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, getStatements(jobDatabase, query), context.getUsername(), job.getId(), job.getLogFile());
         asyncJobRunner.submitJob(getHiveConnectionConfig(), asyncJob, job);
 
     }