|
@@ -22,7 +22,6 @@ import org.apache.ambari.view.ViewContext;
|
|
|
import org.apache.ambari.view.hive2.client.AsyncJobRunner;
|
|
|
import org.apache.ambari.view.hive2.client.AsyncJobRunnerImpl;
|
|
|
import org.apache.ambari.view.hive2.client.ConnectionConfig;
|
|
|
-import org.apache.ambari.view.hive2.client.HiveClientRuntimeException;
|
|
|
import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
|
|
|
import org.apache.ambari.view.hive2.resources.jobs.ModifyNotificationDelegate;
|
|
|
import org.apache.ambari.view.hive2.resources.jobs.ModifyNotificationInvocationHandler;
|
|
@@ -36,9 +35,6 @@ import org.apache.ambari.view.hive2.utils.ServiceFormattedException;
|
|
|
import org.apache.ambari.view.hive2.ConnectionFactory;
|
|
|
import org.apache.ambari.view.hive2.ConnectionSystem;
|
|
|
import org.apache.ambari.view.hive2.actor.message.AsyncJob;
|
|
|
-import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
|
|
|
-import org.apache.ambari.view.hive2.internal.AsyncExecutionSuccess;
|
|
|
-import org.apache.ambari.view.hive2.internal.Either;
|
|
|
import org.apache.ambari.view.utils.hdfs.HdfsApi;
|
|
|
import org.apache.ambari.view.utils.hdfs.HdfsApiException;
|
|
|
import org.apache.ambari.view.utils.hdfs.HdfsUtil;
|
|
@@ -49,9 +45,7 @@ import org.slf4j.LoggerFactory;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Proxy;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.Date;
|
|
|
-import java.util.List;
|
|
|
|
|
|
public class JobControllerImpl implements JobController, ModifyNotificationDelegate {
|
|
|
private final static Logger LOG =
|
|
@@ -63,7 +57,6 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
|
|
|
private Job job;
|
|
|
private boolean modified;
|
|
|
|
|
|
- //private OperationHandleControllerFactory opHandleControllerFactory;
|
|
|
private SavedQueryResourceManager savedQueryResourceManager;
|
|
|
private IATSParser atsParser;
|
|
|
|
|
@@ -72,19 +65,15 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
|
|
|
* Warning: Create JobControllers ONLY using JobControllerFactory!
|
|
|
*/
|
|
|
public JobControllerImpl(ViewContext context, Job job,
|
|
|
- //OperationHandleControllerFactory opHandleControllerFactory,
|
|
|
SavedQueryResourceManager savedQueryResourceManager,
|
|
|
IATSParser atsParser,
|
|
|
HdfsApi hdfsApi) {
|
|
|
this.context = context;
|
|
|
setJobPOJO(job);
|
|
|
- //this.opHandleControllerFactory = opHandleControllerFactory;
|
|
|
this.savedQueryResourceManager = savedQueryResourceManager;
|
|
|
this.atsParser = atsParser;
|
|
|
this.hdfsApi = hdfsApi;
|
|
|
|
|
|
- //UserLocalConnection connectionLocal = new UserLocalConnection();
|
|
|
- //this.hiveConnection = new ConnectionController(opHandleControllerFactory, connectionLocal.get(context));
|
|
|
}
|
|
|
|
|
|
public String getQueryForJob() {
|
|
@@ -110,11 +99,6 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /*@Override
|
|
|
- public OperationHandleController.OperationStatus getStatus() throws ItemNotFound, HiveClientException, NoOperationStatusSetException {
|
|
|
- OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
|
|
|
- return handle.getOperationStatus();
|
|
|
- }*/
|
|
|
|
|
|
@Override
|
|
|
public void submit() throws Throwable {
|
|
@@ -122,8 +106,6 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
|
|
|
String query = getQueryForJob();
|
|
|
ConnectionSystem system = ConnectionSystem.getInstance();
|
|
|
AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
|
|
|
- // create async Job
|
|
|
- //
|
|
|
AsyncJob asyncJob = new AsyncJob(job.getId(), context.getUsername(), getStatements(jobDatabase, query), job.getLogFile(), context);
|
|
|
asyncJobRunner.submitJob(getHiveConnectionConfig(), asyncJob, job);
|
|
|
|
|
@@ -140,72 +122,14 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
|
|
|
|
|
|
@Override
|
|
|
public void cancel() throws ItemNotFound {
|
|
|
- //OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
|
|
|
- //handle.cancel();
|
|
|
+ //TODO: Cancel job
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void update() {
|
|
|
- updateOperationStatus();
|
|
|
- updateOperationLogs();
|
|
|
-
|
|
|
updateJobDuration();
|
|
|
}
|
|
|
|
|
|
- public void updateOperationStatus() {
|
|
|
- try {
|
|
|
-
|
|
|
- //OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
|
|
|
- //OperationHandleController.OperationStatus status = handle.getOperationStatus();
|
|
|
- /*job.setStatus(status.status);
|
|
|
- job.setStatusMessage(status.message);
|
|
|
- job.setSqlState(status.sqlState);*/
|
|
|
- LOG.debug("Status of job#" + job.getId() + " is " + job.getStatus());
|
|
|
-
|
|
|
- } catch (Exception /*NoOperationStatusSetException*/ e) {
|
|
|
- LOG.info("Operation state is not set for job#" + job.getId());
|
|
|
-
|
|
|
- } /*catch (HiveErrorStatusException e) {
|
|
|
- LOG.debug("Error updating status for job#" + job.getId() + ": " + e.getMessage());
|
|
|
- job.setStatus(ExecuteJob.JOB_STATE_UNKNOWN);
|
|
|
-
|
|
|
- } catch (HiveClientException e) {
|
|
|
- throw new HiveClientFormattedException(e);
|
|
|
-
|
|
|
- } catch (ItemNotFound itemNotFound) {
|
|
|
- LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't update status");
|
|
|
- }*/
|
|
|
- }
|
|
|
-
|
|
|
- public void updateOperationLogs() {
|
|
|
- try {
|
|
|
- //OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
|
|
|
- //String logs = handle.getLogs();
|
|
|
-
|
|
|
- //LogParser info = LogParser.parseLog(logs);
|
|
|
- //LogParser.AppId app = info.getLastAppInList();
|
|
|
- /*if (app != null) {
|
|
|
- job.setApplicationId(app.getIdentifier());
|
|
|
- }*/
|
|
|
-
|
|
|
- String logFilePath = job.getLogFile();
|
|
|
- //HdfsUtil.putStringToFile(hdfsApi, logFilePath, logs);
|
|
|
-
|
|
|
- } catch (HiveClientRuntimeException ex) {
|
|
|
- LOG.error("Error while fetching logs: " + ex.getMessage());
|
|
|
- } /*catch (ItemNotFound itemNotFound) {
|
|
|
- LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't read logs");
|
|
|
- } catch (HdfsApiException e) {
|
|
|
- throw new ServiceFormattedException(e);
|
|
|
- }*/
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isJobEnded() {
|
|
|
- String status = job.getStatus();
|
|
|
- return status.equals(Job.JOB_STATE_FINISHED) || status.equals(Job.JOB_STATE_CANCELED) ||
|
|
|
- status.equals(Job.JOB_STATE_CLOSED) || status.equals(Job.JOB_STATE_ERROR) ||
|
|
|
- status.equals(Job.JOB_STATE_UNKNOWN); // Unknown is not finished, but polling makes no sense
|
|
|
- }
|
|
|
|
|
|
@Override
|
|
|
public Job getJob() {
|
|
@@ -230,18 +154,6 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg
|
|
|
this.jobUnproxied = jobPOJO;
|
|
|
}
|
|
|
|
|
|
- /*@Override
|
|
|
- public Cursor getResults() throws ItemNotFound {
|
|
|
- OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
|
|
|
- return handle.getResults();
|
|
|
- }*/
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean hasResults() throws ItemNotFound {
|
|
|
- //OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
|
|
|
- //return handle.hasResults();
|
|
|
- return false;
|
|
|
- }
|
|
|
|
|
|
@Override
|
|
|
public void afterCreation() {
|