Explorar el Código

AMBARI-14081. HiveView fails to find DAGs corresponding to query. (Dipayan Bhowmick via yusaku)

Yusaku Sako hace 9 años
padre
commit
21b37c20f6

+ 12 - 12
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java

@@ -82,14 +82,7 @@ public class Aggregator {
     List<Job> allJobs = new LinkedList<Job>();
     for (HiveQueryId atsHiveQuery : ats.getHiveQueryIdsList(username)) {
 
-      TezDagId atsTezDag;
-      if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
-        String dagName = atsHiveQuery.dagNames.get(0);
-
-        atsTezDag = ats.getTezDAGByName(dagName);
-      } else {
-        atsTezDag = new TezDagId();
-      }
+      TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
 
       JobImpl atsJob;
       if (hasOperationId(atsHiveQuery)) {
@@ -135,17 +128,24 @@ public class Aggregator {
     String hexGuid = Hex.encodeHexString(operationHandle.getOperationId().getGuid());
     HiveQueryId atsHiveQuery = ats.getHiveQueryIdByOperationId(hexStringToUrlSafeBase64(hexGuid));
 
+    TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+
+    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
+    return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+  }
+
+  private TezDagId getTezDagFromHiveQueryId(HiveQueryId atsHiveQuery) {
     TezDagId atsTezDag;
-    if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
+    if (atsHiveQuery.version >= HiveQueryId.ATS_15_RESPONSE_VERSION) {
+      atsTezDag = ats.getTezDAGByEntity(atsHiveQuery.entity);
+    } else if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
       String dagName = atsHiveQuery.dagNames.get(0);
 
       atsTezDag = ats.getTezDAGByName(dagName);
     } else {
       atsTezDag = new TezDagId();
     }
-
-    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
-    return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+    return atsTezDag;
   }
 
   protected boolean hasOperationId(HiveQueryId atsHiveQuery) {

+ 14 - 0
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java

@@ -94,6 +94,16 @@ public class ATSParser implements IATSParser {
   @Override
   public TezDagId getTezDAGByName(String name) {
     JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities");
+    return parseTezDag(tezDagEntities);
+  }
+
+  @Override
+  public TezDagId getTezDAGByEntity(String entity) {
+    JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities");
+    return parseTezDag(tezDagEntities);
+  }
+
+  private TezDagId parseTezDag(JSONArray tezDagEntities) {
     assert tezDagEntities.size() <= 1;
     if (tezDagEntities.size() == 0) {
       return new TezDagId();
@@ -151,6 +161,10 @@ public class ATSParser implements IATSParser {
       parsedJob.dagNames = dagIds;
       parsedJob.stages = stagesList;
     }
+
+    if (otherinfo.get("VERSION") != null) {
+      parsedJob.version = (Long) otherinfo.get("VERSION");
+    }
     return parsedJob;
   }
 

+ 2 - 0
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java

@@ -38,4 +38,6 @@ public interface ATSRequestsDelegate {
   JSONObject tezDagByName(String name);
 
   JSONObject tezVerticesListForDAG(String dagId);
+
+  JSONObject tezDagByEntity(String entity);
 }

+ 13 - 0
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java

@@ -96,12 +96,25 @@ public class ATSRequestsDelegateImpl implements ATSRequestsDelegate {
     return (JSONObject) JSONValue.parse(response);
   }
 
+  @Override
+  public JSONObject tezDagByEntity(String entity) {
+    String tezDagEntityUrl = tezDagEntityUrl(entity);
+    String response = readFromWithDefault(tezDagEntityUrl, EMPTY_ENTITIES_JSON);
+    return (JSONObject) JSONValue.parse(response);
+  }
+
+  private String tezDagEntityUrl(String entity) {
+    return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=callerId:" + entity;
+  }
+
   @Override
   public JSONObject tezVerticesListForDAG(String dagId) {
     String response = readFromWithDefault(tezVerticesListForDAGUrl(dagId), "{ \"entities\" : [  ] }");
     return (JSONObject) JSONValue.parse(response);
   }
 
+
+
   protected String readFromWithDefault(String atsUrl, String defaultResponse) {
     String response;
     try {

+ 3 - 0
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java

@@ -23,6 +23,8 @@ import org.json.simple.JSONObject;
 import java.util.List;
 
 public class HiveQueryId {
+  public static long ATS_15_RESPONSE_VERSION = 2; // version returned from ATS 1.5 release
+
   public String url;
 
   public String entity;
@@ -36,4 +38,5 @@ public class HiveQueryId {
   public long duration;
   public String operationId;
   public String user;
+  public long version;
 }

+ 2 - 0
contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java

@@ -28,4 +28,6 @@ public interface IATSParser {
   HiveQueryId getHiveQueryIdByOperationId(String guidString);
 
   TezDagId getTezDAGByName(String name);
+
+  TezDagId getTezDAGByEntity(String entity);
 }

La diferencia del archivo ha sido suprimido porque es demasiado grande
+ 38 - 0
contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java


+ 42 - 0
contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java

@@ -139,6 +139,33 @@ public class AggregatorTest {
     Assert.assertEquals("1", job.getId());
   }
 
+  @Test
+  public void testReadJobBothATSAndViewV2() throws Exception {
+    HiveQueryId hiveQueryId = getSampleHiveQueryIdV2("ENTITY-NAME");
+    hiveQueryId.operationId = Aggregator.hexStringToUrlSafeBase64("1b2b");
+    MockATSParser atsParser = getMockATSWithQueries(hiveQueryId);
+
+    MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(getSampleViewJob("1"));
+
+    StoredOperationHandle operationHandle = getSampleOperationHandle("5", "1");
+    operationHandle.setGuid("1b2b");
+    MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(operationHandle);
+
+    Aggregator aggregator = new Aggregator(jobResourceManager,
+      operationHandleResourceManager,
+      atsParser);
+
+    List<Job> aggregated = aggregator.readAll("luke");
+
+    Assert.assertEquals(1, aggregated.size());
+    Job job = aggregated.get(0);
+    Assert.assertEquals("1", job.getId());
+    Assert.assertEquals("app_test_1", job.getApplicationId());
+    Assert.assertEquals("ENTITY-NAME", job.getDagId());
+    Assert.assertEquals("SUCCEEDED", job.getStatus());
+  }
+
+
   @Test
   public void testReadJobComplex() throws Exception {
     //job both on ATS and View
@@ -232,6 +259,12 @@ public class AggregatorTest {
     return hiveQueryId;
   }
 
+  private HiveQueryId getSampleHiveQueryIdV2(String id) {
+    HiveQueryId hiveQueryId = getSampleHiveQueryId(id);
+    hiveQueryId.version = HiveQueryId.ATS_15_RESPONSE_VERSION;
+    return hiveQueryId;
+  }
+
   @Test
   public void testGetJobByOperationId() throws Exception {
 
@@ -412,6 +445,15 @@ public class AggregatorTest {
       return new TezDagId();
     }
 
+    @Override
+    public TezDagId getTezDAGByEntity(String entity) {
+      TezDagId dagId = new TezDagId();
+      dagId.applicationId = "app_test_1";
+      dagId.entity = entity;
+      dagId.status = "SUCCEEDED";
+      return dagId;
+    }
+
     public List<HiveQueryId> getHiveQueryIds() {
       return hiveQueryIds;
     }

Algunos archivos no se mostraron porque demasiados archivos cambiaron en este cambio