Browse Source

YARN-3984. Adjusted the event column key schema and avoided missing empty event. Contributed by Vrushali C.

(cherry picked from commit 895ccfa1ab9e701f2908586e323249f670fe5544)
Zhijie Shen 10 năm trước cách đây
mục cha
commit
2d97f86f98

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -121,6 +121,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3993. Changed to use the AM flag in ContainerContext determine AM
     container in TestPerNodeTimelineCollectorsAuxService. (Sunil G via zjshen)
 
+    YARN-3984. Adjusted the event column key schema and avoided missing empty
+    event. (Vrushali C via zjshen)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

+ 18 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -200,20 +201,32 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
                   "! Using the current timestamp");
               eventTimestamp = System.currentTimeMillis();
             }
+            byte[] columnQualifierFirst =
+                Bytes.toBytes(Separator.VALUES.encode(eventId));
+            byte[] columnQualifierWithTsBytes =
+                Separator.VALUES.join(columnQualifierFirst,
+                    Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
             Map<String, Object> eventInfo = event.getInfo();
-            if (eventInfo != null) {
+            if ((eventInfo == null) || (eventInfo.size() == 0)) {
+              // add separator since event key is empty
+              byte[] compoundColumnQualifierBytes =
+                  Separator.VALUES.join(columnQualifierWithTsBytes,
+                      null);
+              String compoundColumnQualifier =
+                  Bytes.toString(compoundColumnQualifierBytes);
+              EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                  compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES);
+            } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
                 // eventId?infoKey
-                byte[] columnQualifierFirst =
-                    Bytes.toBytes(Separator.VALUES.encode(eventId));
                 byte[] compoundColumnQualifierBytes =
-                    Separator.VALUES.join(columnQualifierFirst,
+                    Separator.VALUES.join(columnQualifierWithTsBytes,
                         Bytes.toBytes(info.getKey()));
                 // convert back to string to avoid additional API on store.
                 String compoundColumnQualifier =
                     Bytes.toString(compoundColumnQualifierBytes);
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                    compoundColumnQualifier, eventTimestamp, info.getValue());
+                    compoundColumnQualifier, null, info.getValue());
               } // for info: eventInfo
             }
           }

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java

@@ -124,4 +124,17 @@ public class TimelineWriterUtils {
     return segments;
   }
 
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invert(Long key) {
+    return Long.MAX_VALUE - key;
+  }
+
 }

+ 3 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
 /**
  * Represents a rowkey for the entity table.
@@ -47,7 +48,7 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
     return Separator.QUALIFIERS.join(first, second, third);
   }
@@ -70,24 +71,11 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third =
         Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
             te.getId()));
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
-  /**
-   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
-   * where we want to have the most recent timestamp in the top of the table
-   * (scans start at the most recent timestamp first).
-   *
-   * @param key value to be inverted so that the latest version will be first in
-   *          a scan.
-   * @return inverted long
-   */
-  public static long invert(Long key) {
-    return Long.MAX_VALUE - key;
-  }
-
 }

+ 91 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

@@ -43,8 +43,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -274,7 +274,7 @@ public class TestHBaseTimelineWriterImpl {
     assertEquals(user, Bytes.toString(rowKeyComponents[0]));
     assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
     assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3]));
     assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
     assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
     assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
@@ -317,7 +317,6 @@ public class TestHBaseTimelineWriterImpl {
       byte[] startRow =
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       s.setStartRow(startRow);
-      s.setMaxVersions(Integer.MAX_VALUE);
       Connection conn = ConnectionFactory.createConnection(c1);
       ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
 
@@ -331,24 +330,23 @@ public class TestHBaseTimelineWriterImpl {
           assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
               entity));
 
-          // check the events
-          NavigableMap<String, NavigableMap<Long, Object>> eventsResult =
-              EntityColumnPrefix.EVENT.readResultsWithTimestamps(result);
+          Map<String, Object> eventsResult =
+              EntityColumnPrefix.EVENT.readResults(result);
           // there should be only one event
           assertEquals(1, eventsResult.size());
           // key name for the event
-          String valueKey = eventId + Separator.VALUES.getValue() + expKey;
-          for (Map.Entry<String, NavigableMap<Long, Object>> e :
+          byte[] compoundColumnQualifierBytes =
+              Separator.VALUES.join(Bytes.toBytes(eventId),
+                  Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
+                  Bytes.toBytes(expKey));
+          String valueKey = Bytes.toString(compoundColumnQualifierBytes);
+          for (Map.Entry<String, Object> e :
               eventsResult.entrySet()) {
             // the value key must match
             assertEquals(valueKey, e.getKey());
-            NavigableMap<Long, Object> value = e.getValue();
+            Object value = e.getValue();
             // there should be only one timestamp and value
-            assertEquals(1, value.size());
-            for (Map.Entry<Long, Object> e2: value.entrySet()) {
-              assertEquals(expTs, e2.getKey());
-              assertEquals(expVal, e2.getValue());
-            }
+            assertEquals(expVal, value.toString());
           }
         }
       }
@@ -360,6 +358,85 @@ public class TestHBaseTimelineWriterImpl {
     }
   }
 
+  @Test
+  public void testAdditionalEntityEmptyEventInfo() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    String eventId = "foo_event_id";
+    event.setId(eventId);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+
+    final TimelineEntity entity = new TimelineEntity();
+    entity.setId("attempt_1329348432655_0001_m_000008_18");
+    entity.setType("FOO_ATTEMPT");
+    entity.addEvent(event);
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String cluster = "cluster_emptyeventkey";
+      String user = "user_emptyeventkey";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "some app name";
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      s.setStartRow(startRow);
+      s.addFamily(EntityColumnFamily.INFO.getBytes());
+      Connection conn = ConnectionFactory.createConnection(c1);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
+      int rowCount = 0;
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+
+          // check the row key
+          byte[] row1 = result.getRow();
+          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
+              entity));
+
+          Map<String, Object> eventsResult =
+              EntityColumnPrefix.EVENT.readResults(result);
+          // there should be only one event
+          assertEquals(1, eventsResult.size());
+          // key name for the event
+          byte[] compoundColumnQualifierWithTsBytes =
+              Separator.VALUES.join(Bytes.toBytes(eventId),
+                  Bytes.toBytes(TimelineWriterUtils.invert(expTs)));
+          byte[] compoundColumnQualifierBytes =
+              Separator.VALUES.join(compoundColumnQualifierWithTsBytes,
+                  null);
+          String valueKey = Bytes.toString(compoundColumnQualifierBytes);
+          for (Map.Entry<String, Object> e :
+              eventsResult.entrySet()) {
+            // the column qualifier key must match
+            assertEquals(valueKey, e.getKey());
+            Object value = e.getValue();
+            // value should be empty
+            assertEquals("", value.toString());
+          }
+        }
+      }
+      assertEquals(1, rowCount);
+
+    } finally {
+      hbi.stop();
+      hbi.close();
+    }
+  }
+
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();