|
@@ -41,6 +41,8 @@ import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
|
|
|
import org.apache.ambari.view.hive2.actor.message.job.Failure;
|
|
|
import org.apache.ambari.view.hive2.actor.message.job.NoResult;
|
|
|
import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder;
|
|
|
+import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation;
|
|
|
+import org.apache.ambari.view.hive2.actor.message.job.SaveGuidToDB;
|
|
|
import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
|
|
|
import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
|
|
|
import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
|
|
@@ -189,6 +191,10 @@ public class JdbcConnector extends HiveActor {
|
|
|
fetchResult((FetchResult) message);
|
|
|
} else if (message instanceof FetchError) {
|
|
|
fetchError((FetchError) message);
|
|
|
+ } else if (message instanceof SaveGuidToDB) {
|
|
|
+ saveGuid((SaveGuidToDB) message);
|
|
|
+ } else if (message instanceof SaveDagInformation) {
|
|
|
+ saveDagInformation((SaveDagInformation) message);
|
|
|
} else {
|
|
|
unhandled(message);
|
|
|
}
|
|
@@ -418,16 +424,50 @@ public class JdbcConnector extends HiveActor {
|
|
|
startTerminateInactivityScheduler();
|
|
|
}
|
|
|
|
|
|
- private void updateJobStatus(String jobid, String status) {
|
|
|
- try {
|
|
|
- JobImpl job = storage.load(JobImpl.class, jobid);
|
|
|
- job.setStatus(status);
|
|
|
- job.setDuration(getUpdatedDuration(job.getDateSubmitted()));
|
|
|
- storage.store(JobImpl.class, job);
|
|
|
- LOG.info("Stored job status for Job id: {} as '{}'", jobid, status);
|
|
|
- } catch (ItemNotFound itemNotFound) {
|
|
|
- // Cannot do anything
|
|
|
+ private void updateJobStatus(String jobid, final String status) {
|
|
|
+ new JobSaver(jobid) {
|
|
|
+ @Override
|
|
|
+ protected void update(JobImpl job) {
|
|
|
+ job.setStatus(status);
|
|
|
+ job.setDuration(getUpdatedDuration(job.getDateSubmitted()));
|
|
|
+ }
|
|
|
+ }.save();
|
|
|
+ LOG.info("Stored job status for Job id: {} as '{}'", jobid, status);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveGuid(final SaveGuidToDB message) {
|
|
|
+ new JobSaver(message.getJobId()) {
|
|
|
+ @Override
|
|
|
+ protected void update(JobImpl job) {
|
|
|
+ job.setGuid(message.getGuid());
|
|
|
+ }
|
|
|
+ }.save();
|
|
|
+ LOG.info("Stored GUID for Job id: {} as '{}'", message.getJobId(), message.getGuid());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveDagInformation(final SaveDagInformation message) {
|
|
|
+ if(message.getDagId() == null &&
|
|
|
+ message.getDagName() == null &&
|
|
|
+ message.getApplicationId() == null) {
|
|
|
+ LOG.error("Cannot save Dag Information for job Id: {} as all the properties are null.", message.getJobId());
|
|
|
+ return;
|
|
|
}
|
|
|
+ new JobSaver(message.getJobId()) {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void update(JobImpl job) {
|
|
|
+ if (message.getApplicationId() != null) {
|
|
|
+ job.setApplicationId(message.getApplicationId());
|
|
|
+ }
|
|
|
+ if (message.getDagId() != null) {
|
|
|
+ job.setDagId(message.getDagId());
|
|
|
+ }
|
|
|
+ if(message.getDagName() != null) {
|
|
|
+ job.setDagName(message.getDagName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }.save();
|
|
|
+ LOG.info("Store Dag Information for job. Job id: {}, dagName: {}, dagId: {}, applicationId: {}", message.getJobId(), message.getDagName(), message.getDagId(), message.getApplicationId());
|
|
|
}
|
|
|
|
|
|
private Long getUpdatedDuration(Long dateSubmitted) {
|
|
@@ -539,4 +579,34 @@ public class JdbcConnector extends HiveActor {
|
|
|
connectable.disconnect();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Saves the job to database.
|
|
|
+ */
|
|
|
+ private abstract class JobSaver {
|
|
|
+ private final String jobId;
|
|
|
+
|
|
|
+ JobSaver(String jobId) {
|
|
|
+ this.jobId = jobId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void save() {
|
|
|
+ try {
|
|
|
+ JobImpl job = storage.load(JobImpl.class, jobId);
|
|
|
+ update(job);
|
|
|
+ storage.store(JobImpl.class, job);
|
|
|
+ } catch (ItemNotFound itemNotFound) {
|
|
|
+ itemNotFound(jobId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Override to handle Not found exception
|
|
|
+ */
|
|
|
+ private void itemNotFound(String jobId) {
|
|
|
+ // Nothing to do
|
|
|
+ }
|
|
|
+
|
|
|
+ protected abstract void update(JobImpl job);
|
|
|
+ }
|
|
|
}
|