|
@@ -0,0 +1,210 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.ambari.view.hive.resources.jobs;
|
|
|
+
|
|
|
+import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy;
|
|
|
+import org.apache.ambari.view.hive.persistence.utils.Indexed;
|
|
|
+import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
|
|
|
+import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy;
|
|
|
+import org.apache.ambari.view.hive.resources.IResourceManager;
|
|
|
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.HiveQueryId;
|
|
|
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser;
|
|
|
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.TezDagId;
|
|
|
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job;
|
|
|
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl;
|
|
|
+import org.apache.commons.beanutils.PropertyUtils;
|
|
|
+import org.apache.commons.codec.binary.Base64;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+/**
|
|
|
+ * View Jobs and ATS Jobs aggregator
|
|
|
+ * Not all ViewJobs create ATS job
|
|
|
+ */
|
|
|
+public class Aggregator {
|
|
|
+ protected final static Logger LOG =
|
|
|
+ LoggerFactory.getLogger(Aggregator.class);
|
|
|
+
|
|
|
+ private final IATSParser ats;
|
|
|
+ private final IOperationHandleResourceManager operationHandleResourceManager;
|
|
|
+ private IResourceManager<Job> viewJobResourceManager;
|
|
|
+
|
|
|
+ public Aggregator(IResourceManager<Job> jobResourceManager,
|
|
|
+ IOperationHandleResourceManager operationHandleResourceManager,
|
|
|
+ IATSParser ats) {
|
|
|
+ this.viewJobResourceManager = jobResourceManager;
|
|
|
+ this.operationHandleResourceManager = operationHandleResourceManager;
|
|
|
+ this.ats = ats;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<Job> readAll(String username) {
|
|
|
+ Set<String> addedOperationIds = new HashSet<String>();
|
|
|
+
|
|
|
+ List<Job> allJobs = new LinkedList<Job>();
|
|
|
+ for (HiveQueryId atsHiveQuery : ats.getHiveQuieryIdsList(username)) {
|
|
|
+
|
|
|
+ TezDagId atsTezDag;
|
|
|
+ if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
|
|
|
+ String dagName = atsHiveQuery.dagNames.get(0);
|
|
|
+
|
|
|
+ atsTezDag = ats.getTezDAGByName(dagName);
|
|
|
+ } else {
|
|
|
+ atsTezDag = new TezDagId();
|
|
|
+ }
|
|
|
+
|
|
|
+ JobImpl atsJob;
|
|
|
+ if (hasOperationId(atsHiveQuery)) {
|
|
|
+ try {
|
|
|
+ Job viewJob = getJobByOperationId(urlSafeBase64ToHexString(atsHiveQuery.operationId));
|
|
|
+ saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
|
|
|
+
|
|
|
+ atsJob = mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
|
|
|
+ } catch (ItemNotFound itemNotFound) {
|
|
|
+ // Executed from HS2, but outside of Hive View
|
|
|
+ atsJob = atsOnlyJob(atsHiveQuery, atsTezDag);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ atsJob = atsOnlyJob(atsHiveQuery, atsTezDag);
|
|
|
+ }
|
|
|
+ allJobs.add(atsJob);
|
|
|
+
|
|
|
+ addedOperationIds.add(atsHiveQuery.operationId);
|
|
|
+ }
|
|
|
+
|
|
|
+ //cover case when operationId is present, but not exists in ATS
|
|
|
+ //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE"
|
|
|
+ for (Job job : viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username))) {
|
|
|
+ List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readJobRelatedHandles(job);
|
|
|
+ assert operationHandles.size() <= 1;
|
|
|
+
|
|
|
+ if (operationHandles.size() > 0) {
|
|
|
+ StoredOperationHandle operationHandle = operationHandles.get(0);
|
|
|
+
|
|
|
+ if (!addedOperationIds.contains(hexStringToUrlSafeBase64(operationHandle.getGuid()))) {
|
|
|
+ //e.g. query without hadoop job: select * from table
|
|
|
+ allJobs.add(job);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return allJobs;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected boolean hasOperationId(HiveQueryId atsHiveQuery) {
|
|
|
+ return atsHiveQuery.operationId != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected JobImpl mergeAtsJobWithViewJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) {
|
|
|
+ JobImpl atsJob;
|
|
|
+ try {
|
|
|
+ atsJob = new JobImpl(PropertyUtils.describe(viewJob));
|
|
|
+ }catch(IllegalAccessException e){
|
|
|
+ LOG.error("Can't instantiate JobImpl", e);
|
|
|
+ return null;
|
|
|
+ }catch(InvocationTargetException e){
|
|
|
+ LOG.error("Can't instantiate JobImpl", e);
|
|
|
+ return null;
|
|
|
+ }catch(NoSuchMethodException e){
|
|
|
+ LOG.error("Can't instantiate JobImpl", e);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag);
|
|
|
+ return atsJob;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob) throws ItemNotFound {
|
|
|
+ if (viewJob.getDagName() == null) {
|
|
|
+ viewJob.setDagName(tezDagId.dagName);
|
|
|
+ viewJobResourceManager.update(viewJob, viewJob.getId());
|
|
|
+ }
|
|
|
+ if (viewJob.getStatus().equals(tezDagId.status)) {
|
|
|
+ viewJob.setStatus(tezDagId.status);
|
|
|
+ viewJobResourceManager.update(viewJob, viewJob.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected JobImpl atsOnlyJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag) {
|
|
|
+ JobImpl atsJob = new JobImpl();
|
|
|
+ atsJob.setId(atsHiveQuery.entity);
|
|
|
+ fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag);
|
|
|
+
|
|
|
+ String query = atsHiveQuery.query;
|
|
|
+ atsJob.setTitle(query.substring(0, (query.length() > 42)?42:query.length()));
|
|
|
+
|
|
|
+ atsJob.setQueryFile("fakefile://" + Base64.encodeBase64URLSafeString(query.getBytes())); // fake queryFile
|
|
|
+ return atsJob;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected JobImpl fillAtsJobFields(JobImpl atsJob, HiveQueryId atsHiveQuery, TezDagId atsTezDag) {
|
|
|
+ atsJob.setApplicationId(atsTezDag.applicationId);
|
|
|
+
|
|
|
+ atsJob.setDagName(atsTezDag.dagName);
|
|
|
+ if (!atsTezDag.status.equals(TezDagId.STATUS_UNKNOWN))
|
|
|
+ atsJob.setStatus(atsTezDag.status);
|
|
|
+ if (atsHiveQuery.starttime != 0)
|
|
|
+ atsJob.setDateSubmitted(atsHiveQuery.starttime);
|
|
|
+ atsJob.setDuration(atsHiveQuery.duration);
|
|
|
+ return atsJob;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected Job getJobByOperationId(final String opId) throws ItemNotFound {
|
|
|
+ List<StoredOperationHandle> operationHandles = operationHandleResourceManager.readAll(new FilteringStrategy() {
|
|
|
+ @Override
|
|
|
+ public boolean isConform(Indexed item) {
|
|
|
+ StoredOperationHandle opHandle = (StoredOperationHandle) item;
|
|
|
+ return opHandle.getGuid().equals(opId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String whereStatement() {
|
|
|
+ return "guid='" + opId + "'";
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ if (operationHandles.size() != 1)
|
|
|
+ throw new ItemNotFound();
|
|
|
+
|
|
|
+ return viewJobResourceManager.read(operationHandles.get(0).getJobId());
|
|
|
+ }
|
|
|
+
|
|
|
+ protected static String urlSafeBase64ToHexString(String urlsafeBase64){
|
|
|
+ byte[] decoded = Base64.decodeBase64(urlsafeBase64);
|
|
|
+
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for(byte b : decoded){
|
|
|
+ sb.append(String.format("%02x", b));
|
|
|
+ }
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected static String hexStringToUrlSafeBase64(String hexString){
|
|
|
+ byte[] decoded = new byte[hexString.length() / 2];
|
|
|
+
|
|
|
+ for(int i=0; i<hexString.length(); i+=2) {
|
|
|
+ decoded[i / 2] = (byte) Integer.parseInt(String.format("%c%c", hexString.charAt(i), hexString.charAt(i+1)), 16);
|
|
|
+ }
|
|
|
+ return Base64.encodeBase64URLSafeString(decoded);
|
|
|
+ }
|
|
|
+}
|