Bläddra i källkod

YARN-1838. Enhanced timeline service getEntities API to get entities from a given entity ID or insertion timestamp. Contributed by Billie Rinaldi.
svn merge --ignore-ancestry -c 1580960 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580961 13f79535-47bb-0310-9956-ffa450edef68

Zhijie Shen 11 år sedan
förälder
incheckning
5f0f16c0f6
11 ändrade filer med 538 tillägg och 176 borttagningar
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
  3. 7 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java
  4. 94 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
  5. 40 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
  6. 17 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java
  7. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
  8. 62 45
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
  9. 11 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java
  10. 255 72
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStoreTestUtils.java
  11. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -163,6 +163,9 @@ Release 2.4.0 - UNRELEASED
     use smaps for obtaining used memory information. (Rajesh Balamohan via
     use smaps for obtaining used memory information. (Rajesh Balamohan via
     vinodkv)
     vinodkv)
 
 
+    YARN-1838. Enhanced timeline service getEntities API to get entities from a
+    given entity ID or insertion timestamp. (Billie Rinaldi via zjshen)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -192,7 +192,7 @@ public class TestDistributedShell {
         .getApplicationHistoryServer()
         .getApplicationHistoryServer()
         .getTimelineStore()
         .getTimelineStore()
         .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
         .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
-            null, null, null, null, null, null);
+            null, null, null, null, null, null, null, null);
     Assert.assertNotNull(entitiesAttempts);
     Assert.assertNotNull(entitiesAttempts);
     Assert.assertEquals(1, entitiesAttempts.getEntities().size());
     Assert.assertEquals(1, entitiesAttempts.getEntities().size());
     Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
     Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
@@ -203,7 +203,7 @@ public class TestDistributedShell {
         .getApplicationHistoryServer()
         .getApplicationHistoryServer()
         .getTimelineStore()
         .getTimelineStore()
         .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
         .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
-            null, null, null, null, null);
+            null, null, null, null, null, null, null);
     Assert.assertNotNull(entities);
     Assert.assertNotNull(entities);
     Assert.assertEquals(2, entities.getEntities().size());
     Assert.assertEquals(2, entities.getEntities().size());
     Assert.assertEquals(entities.getEntities().get(0).getEntityType()
     Assert.assertEquals(entities.getEntities().get(0).getEntityType()

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java

@@ -102,11 +102,15 @@ public class GenericObjectMapper {
    */
    */
   public static byte[] writeReverseOrderedLong(long l) {
   public static byte[] writeReverseOrderedLong(long l) {
     byte[] b = new byte[8];
     byte[] b = new byte[8];
-    b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff));
-    for (int i = 1; i < 7; i++) {
+    return writeReverseOrderedLong(l, b, 0);
+  }
+
+  public static byte[] writeReverseOrderedLong(long l, byte[] b, int offset) {
+    b[offset] = (byte)(0x7f ^ ((l >> 56) & 0xff));
+    for (int i = offset+1; i < offset+7; i++) {
       b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
       b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
     }
     }
-    b[7] = (byte)(0xff ^ (l & 0xff));
+    b[offset+7] = (byte)(0xff ^ (l & 0xff));
     return b;
     return b;
   }
   }
 
 

+ 94 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java

@@ -135,7 +135,7 @@ public class LeveldbTimelineStore extends AbstractService
 
 
   private static final byte[] EMPTY_BYTES = new byte[0];
   private static final byte[] EMPTY_BYTES = new byte[0];
 
 
-  private Map<EntityIdentifier, Long> startTimeWriteCache;
+  private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
   private Map<EntityIdentifier, Long> startTimeReadCache;
   private Map<EntityIdentifier, Long> startTimeReadCache;
 
 
   /**
   /**
@@ -205,6 +205,16 @@ public class LeveldbTimelineStore extends AbstractService
     super.serviceStop();
     super.serviceStop();
   }
   }
 
 
+  private static class StartAndInsertTime {
+    final long startTime;
+    final long insertTime;
+
+    public StartAndInsertTime(long startTime, long insertTime) {
+      this.startTime = startTime;
+      this.insertTime = insertTime;
+    }
+  }
+
   private class EntityDeletionThread extends Thread {
   private class EntityDeletionThread extends Thread {
     private final long ttl;
     private final long ttl;
     private final long ttlInterval;
     private final long ttlInterval;
@@ -585,14 +595,14 @@ public class LeveldbTimelineStore extends AbstractService
 
 
   @Override
   @Override
   public TimelineEntities getEntities(String entityType,
   public TimelineEntities getEntities(String entityType,
-      Long limit, Long windowStart, Long windowEnd,
+      Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       EnumSet<Field> fields) throws IOException {
       EnumSet<Field> fields) throws IOException {
     if (primaryFilter == null) {
     if (primaryFilter == null) {
       // if no primary filter is specified, prefix the lookup with
       // if no primary filter is specified, prefix the lookup with
       // ENTITY_ENTRY_PREFIX
       // ENTITY_ENTRY_PREFIX
       return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
       return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
-          windowStart, windowEnd, secondaryFilters, fields);
+          windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
     } else {
     } else {
       // if a primary filter is specified, prefix the lookup with
       // if a primary filter is specified, prefix the lookup with
       // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
       // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
@@ -602,7 +612,7 @@ public class LeveldbTimelineStore extends AbstractService
           .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
           .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
           .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
           .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
       return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
       return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
-          secondaryFilters, fields);
+          fromId, fromTs, secondaryFilters, fields);
     }
     }
   }
   }
 
 
@@ -614,6 +624,8 @@ public class LeveldbTimelineStore extends AbstractService
    * @param limit A limit on the number of entities to return
    * @param limit A limit on the number of entities to return
    * @param starttime The earliest entity start time to retrieve (exclusive)
    * @param starttime The earliest entity start time to retrieve (exclusive)
    * @param endtime The latest entity start time to retrieve (inclusive)
    * @param endtime The latest entity start time to retrieve (inclusive)
+   * @param fromId Retrieve entities starting with this entity
+   * @param fromTs Ignore entities with insert timestamp later than this ts
    * @param secondaryFilters Filter pairs that the entities should match
    * @param secondaryFilters Filter pairs that the entities should match
    * @param fields The set of fields to retrieve
    * @param fields The set of fields to retrieve
    * @return A list of entities
    * @return A list of entities
@@ -621,8 +633,8 @@ public class LeveldbTimelineStore extends AbstractService
    */
    */
   private TimelineEntities getEntityByTime(byte[] base,
   private TimelineEntities getEntityByTime(byte[] base,
       String entityType, Long limit, Long starttime, Long endtime,
       String entityType, Long limit, Long starttime, Long endtime,
-      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
-      throws IOException {
+      String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) throws IOException {
     DBIterator iterator = null;
     DBIterator iterator = null;
     try {
     try {
       KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
       KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
@@ -632,10 +644,25 @@ public class LeveldbTimelineStore extends AbstractService
         // if end time is null, place no restriction on end time
         // if end time is null, place no restriction on end time
         endtime = Long.MAX_VALUE;
         endtime = Long.MAX_VALUE;
       }
       }
-      // using end time, construct a first key that will be seeked to
-      byte[] revts = writeReverseOrderedLong(endtime);
-      kb.add(revts);
-      byte[] first = kb.getBytesForLookup();
+      // construct a first key that will be seeked to using end time or fromId
+      byte[] first = null;
+      if (fromId != null) {
+        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+        if (fromIdStartTime == null) {
+          // no start time for provided id, so return empty entities
+          return new TimelineEntities();
+        }
+        if (fromIdStartTime <= endtime) {
+          // if provided id's start time falls before the end of the window,
+          // use it to construct the seek key
+          first = kb.add(writeReverseOrderedLong(fromIdStartTime))
+              .add(fromId).getBytesForLookup();
+        }
+      }
+      // if seek key wasn't constructed using fromId, construct it using end ts
+      if (first == null) {
+        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+      }
       byte[] last = null;
       byte[] last = null;
       if (starttime != null) {
       if (starttime != null) {
         // if start time is not null, set a last key that will not be
         // if start time is not null, set a last key that will not be
@@ -665,6 +692,21 @@ public class LeveldbTimelineStore extends AbstractService
         KeyParser kp = new KeyParser(key, prefix.length);
         KeyParser kp = new KeyParser(key, prefix.length);
         Long startTime = kp.getNextLong();
         Long startTime = kp.getNextLong();
         String entityId = kp.getNextString();
         String entityId = kp.getNextString();
+
+        if (fromTs != null) {
+          long insertTime = readReverseOrderedLong(iterator.peekNext()
+              .getValue(), 0);
+          if (insertTime > fromTs) {
+            byte[] firstKey = key;
+            while (iterator.hasNext() && prefixMatches(firstKey,
+                kp.getOffset(), key)) {
+              iterator.next();
+              key = iterator.peekNext().getKey();
+            }
+            continue;
+          }
+        }
+
         // parse the entity that owns this key, iterating over all keys for
         // parse the entity that owns this key, iterating over all keys for
         // the entity
         // the entity
         TimelineEntity entity = getEntity(entityId, entityType, startTime,
         TimelineEntity entity = getEntity(entityId, entityType, startTime,
@@ -715,9 +757,10 @@ public class LeveldbTimelineStore extends AbstractService
       writeBatch = db.createWriteBatch();
       writeBatch = db.createWriteBatch();
       List<TimelineEvent> events = entity.getEvents();
       List<TimelineEvent> events = entity.getEvents();
       // look up the start time for the entity
       // look up the start time for the entity
-      revStartTime = getAndSetStartTime(entity.getEntityId(),
-          entity.getEntityType(), entity.getStartTime(), events);
-      if (revStartTime == null) {
+      StartAndInsertTime startAndInsertTime = getAndSetStartTime(
+          entity.getEntityId(), entity.getEntityType(),
+          entity.getStartTime(), events);
+      if (startAndInsertTime == null) {
         // if no start time is found, add an error and return
         // if no start time is found, add an error and return
         TimelinePutError error = new TimelinePutError();
         TimelinePutError error = new TimelinePutError();
         error.setEntityId(entity.getEntityId());
         error.setEntityId(entity.getEntityId());
@@ -726,11 +769,19 @@ public class LeveldbTimelineStore extends AbstractService
         response.addError(error);
         response.addError(error);
         return;
         return;
       }
       }
+      revStartTime = writeReverseOrderedLong(startAndInsertTime
+          .startTime);
+
       Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
       Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
 
 
       // write entity marker
       // write entity marker
-      writeBatch.put(createEntityMarkerKey(entity.getEntityId(),
-          entity.getEntityType(), revStartTime), EMPTY_BYTES);
+      byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
+          entity.getEntityType(), revStartTime);
+      byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
+          .insertTime);
+      writeBatch.put(markerKey, markerValue);
+      writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
+          markerValue);
 
 
       // write event entries
       // write event entries
       if (events != null && !events.isEmpty()) {
       if (events != null && !events.isEmpty()) {
@@ -821,17 +872,21 @@ public class LeveldbTimelineStore extends AbstractService
       lock = writeLocks.getLock(relatedEntity);
       lock = writeLocks.getLock(relatedEntity);
       lock.lock();
       lock.lock();
       try {
       try {
-        byte[] relatedEntityStartTime = getAndSetStartTime(
-            relatedEntity.getId(), relatedEntity.getType(),
+        StartAndInsertTime relatedEntityStartAndInsertTime =
+            getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
             readReverseOrderedLong(revStartTime, 0), null);
             readReverseOrderedLong(revStartTime, 0), null);
-        if (relatedEntityStartTime == null) {
+        if (relatedEntityStartAndInsertTime == null) {
           throw new IOException("Error setting start time for related entity");
           throw new IOException("Error setting start time for related entity");
         }
         }
+        byte[] relatedEntityStartTime = writeReverseOrderedLong(
+            relatedEntityStartAndInsertTime.startTime);
         db.put(createRelatedEntityKey(relatedEntity.getId(),
         db.put(createRelatedEntityKey(relatedEntity.getId(),
             relatedEntity.getType(), relatedEntityStartTime,
             relatedEntity.getType(), relatedEntityStartTime,
             entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
             entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
         db.put(createEntityMarkerKey(relatedEntity.getId(),
         db.put(createEntityMarkerKey(relatedEntity.getId(),
-            relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
+            relatedEntity.getType(), relatedEntityStartTime),
+            writeReverseOrderedLong(relatedEntityStartAndInsertTime
+                .insertTime));
       } catch (IOException e) {
       } catch (IOException e) {
         LOG.error("Error putting related entity " + relatedEntity.getId() +
         LOG.error("Error putting related entity " + relatedEntity.getId() +
             " of type " + relatedEntity.getType() + " for entity " +
             " of type " + relatedEntity.getType() + " for entity " +
@@ -937,19 +992,18 @@ public class LeveldbTimelineStore extends AbstractService
    * @param entityType The type of the entity
    * @param entityType The type of the entity
    * @param startTime The start time of the entity, or null
    * @param startTime The start time of the entity, or null
    * @param events A list of events for the entity, or null
    * @param events A list of events for the entity, or null
-   * @return A byte array
+   * @return A StartAndInsertTime
    * @throws IOException
    * @throws IOException
    */
    */
-  private byte[] getAndSetStartTime(String entityId, String entityType,
-      Long startTime, List<TimelineEvent> events)
+  private StartAndInsertTime getAndSetStartTime(String entityId,
+      String entityType, Long startTime, List<TimelineEvent> events)
       throws IOException {
       throws IOException {
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
     if (startTime == null) {
     if (startTime == null) {
       // start time is not provided, so try to look it up
       // start time is not provided, so try to look it up
       if (startTimeWriteCache.containsKey(entity)) {
       if (startTimeWriteCache.containsKey(entity)) {
         // found the start time in the cache
         // found the start time in the cache
-        startTime = startTimeWriteCache.get(entity);
-        return writeReverseOrderedLong(startTime);
+        return startTimeWriteCache.get(entity);
       } else {
       } else {
         if (events != null) {
         if (events != null) {
           // prepare a start time from events in case it is needed
           // prepare a start time from events in case it is needed
@@ -966,14 +1020,8 @@ public class LeveldbTimelineStore extends AbstractService
     } else {
     } else {
       // start time is provided
       // start time is provided
       if (startTimeWriteCache.containsKey(entity)) {
       if (startTimeWriteCache.containsKey(entity)) {
-        // check the provided start time matches the cache
-        if (!startTime.equals(startTimeWriteCache.get(entity))) {
-          // the start time is already in the cache,
-          // and it is different from the provided start time,
-          // so use the one from the cache
-          startTime = startTimeWriteCache.get(entity);
-        }
-        return writeReverseOrderedLong(startTime);
+        // always use start time from cache if it exists
+        return startTimeWriteCache.get(entity);
       } else {
       } else {
         // check the provided start time matches the db
         // check the provided start time matches the db
         return checkStartTimeInDb(entity, startTime);
         return checkStartTimeInDb(entity, startTime);
@@ -988,31 +1036,36 @@ public class LeveldbTimelineStore extends AbstractService
    * so it adds it back into the cache if it is found. Should only be called
    * so it adds it back into the cache if it is found. Should only be called
    * when a lock has been obtained on the entity.
    * when a lock has been obtained on the entity.
    */
    */
-  private byte[] checkStartTimeInDb(EntityIdentifier entity,
+  private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
       Long suggestedStartTime) throws IOException {
       Long suggestedStartTime) throws IOException {
+    StartAndInsertTime startAndInsertTime = null;
     // create lookup key for start time
     // create lookup key for start time
     byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
     byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
     // retrieve value for key
     // retrieve value for key
     byte[] v = db.get(b);
     byte[] v = db.get(b);
-    byte[] revStartTime;
     if (v == null) {
     if (v == null) {
       // start time doesn't exist in db
       // start time doesn't exist in db
       if (suggestedStartTime == null) {
       if (suggestedStartTime == null) {
         return null;
         return null;
       }
       }
+      startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
+          System.currentTimeMillis());
+
       // write suggested start time
       // write suggested start time
-      revStartTime = writeReverseOrderedLong(suggestedStartTime);
+      v = new byte[16];
+      writeReverseOrderedLong(suggestedStartTime, v, 0);
+      writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
       WriteOptions writeOptions = new WriteOptions();
       WriteOptions writeOptions = new WriteOptions();
       writeOptions.sync(true);
       writeOptions.sync(true);
-      db.put(b, revStartTime, writeOptions);
+      db.put(b, v, writeOptions);
     } else {
     } else {
       // found start time in db, so ignore suggested start time
       // found start time in db, so ignore suggested start time
-      suggestedStartTime = readReverseOrderedLong(v, 0);
-      revStartTime = v;
+      startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
+          readReverseOrderedLong(v, 8));
     }
     }
-    startTimeWriteCache.put(entity, suggestedStartTime);
-    startTimeReadCache.put(entity, suggestedStartTime);
-    return revStartTime;
+    startTimeWriteCache.put(entity, startAndInsertTime);
+    startTimeReadCache.put(entity, startAndInsertTime.startTime);
+    return startAndInsertTime;
   }
   }
 
 
   /**
   /**
@@ -1245,8 +1298,6 @@ public class LeveldbTimelineStore extends AbstractService
     }
     }
   }
   }
 
 
-  // warning is suppressed to prevent eclipse from noting unclosed resource
-  @SuppressWarnings("resource")
   @VisibleForTesting
   @VisibleForTesting
   boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
   boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
       DBIterator iterator, DBIterator pfIterator, boolean seeked)
       DBIterator iterator, DBIterator pfIterator, boolean seeked)

+ 40 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java

@@ -24,12 +24,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.SortedSet;
+import java.util.TreeSet;
 
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -56,6 +58,8 @@ public class MemoryTimelineStore
 
 
   private Map<EntityIdentifier, TimelineEntity> entities =
   private Map<EntityIdentifier, TimelineEntity> entities =
       new HashMap<EntityIdentifier, TimelineEntity>();
       new HashMap<EntityIdentifier, TimelineEntity>();
+  private Map<EntityIdentifier, Long> entityInsertTimes =
+      new HashMap<EntityIdentifier, Long>();
 
 
   public MemoryTimelineStore() {
   public MemoryTimelineStore() {
     super(MemoryTimelineStore.class.getName());
     super(MemoryTimelineStore.class.getName());
@@ -63,8 +67,9 @@ public class MemoryTimelineStore
 
 
   @Override
   @Override
   public TimelineEntities getEntities(String entityType, Long limit,
   public TimelineEntities getEntities(String entityType, Long limit,
-      Long windowStart, Long windowEnd, NameValuePair primaryFilter,
-      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) {
+      Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) {
     if (limit == null) {
     if (limit == null) {
       limit = DEFAULT_LIMIT;
       limit = DEFAULT_LIMIT;
     }
     }
@@ -77,8 +82,26 @@ public class MemoryTimelineStore
     if (fields == null) {
     if (fields == null) {
       fields = EnumSet.allOf(Field.class);
       fields = EnumSet.allOf(Field.class);
     }
     }
+
+    Iterator<TimelineEntity> entityIterator = null;
+    if (fromId != null) {
+      TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
+          entityType));
+      if (firstEntity == null) {
+        return new TimelineEntities();
+      } else {
+        entityIterator = new TreeSet<TimelineEntity>(entities.values())
+            .tailSet(firstEntity, true).iterator();
+      }
+    }
+    if (entityIterator == null) {
+      entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
+          .iterator();
+    }
+
     List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
     List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
-    for (TimelineEntity entity : new PriorityQueue<TimelineEntity>(entities.values())) {
+    while (entityIterator.hasNext()) {
+      TimelineEntity entity = entityIterator.next();
       if (entitiesSelected.size() >= limit) {
       if (entitiesSelected.size() >= limit) {
         break;
         break;
       }
       }
@@ -91,6 +114,10 @@ public class MemoryTimelineStore
       if (entity.getStartTime() > windowEnd) {
       if (entity.getStartTime() > windowEnd) {
         continue;
         continue;
       }
       }
+      if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
+          entity.getEntityId(), entity.getEntityType())) > fromTs) {
+        continue;
+      }
       if (primaryFilter != null &&
       if (primaryFilter != null &&
           !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
           !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
         continue;
         continue;
@@ -196,6 +223,7 @@ public class MemoryTimelineStore
         existingEntity.setEntityType(entity.getEntityType());
         existingEntity.setEntityType(entity.getEntityType());
         existingEntity.setStartTime(entity.getStartTime());
         existingEntity.setStartTime(entity.getStartTime());
         entities.put(entityId, existingEntity);
         entities.put(entityId, existingEntity);
+        entityInsertTimes.put(entityId, System.currentTimeMillis());
       }
       }
       if (entity.getEvents() != null) {
       if (entity.getEvents() != null) {
         if (existingEntity.getEvents() == null) {
         if (existingEntity.getEvents() == null) {
@@ -215,9 +243,16 @@ public class MemoryTimelineStore
           error.setErrorCode(TimelinePutError.NO_START_TIME);
           error.setErrorCode(TimelinePutError.NO_START_TIME);
           response.addError(error);
           response.addError(error);
           entities.remove(entityId);
           entities.remove(entityId);
+          entityInsertTimes.remove(entityId);
           continue;
           continue;
         } else {
         } else {
-          existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp());
+          Long min = Long.MAX_VALUE;
+          for (TimelineEvent e : entity.getEvents()) {
+            if (min > e.getTimestamp()) {
+              min = e.getTimestamp();
+            }
+          }
+          existingEntity.setStartTime(min);
         }
         }
       }
       }
       if (entity.getPrimaryFilters() != null) {
       if (entity.getPrimaryFilters() != null) {
@@ -264,6 +299,7 @@ public class MemoryTimelineStore
             relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
             relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
                 existingEntity.getEntityId());
                 existingEntity.getEntityId());
             entities.put(relatedEntityId, relatedEntity);
             entities.put(relatedEntityId, relatedEntity);
+            entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
           }
           }
         }
         }
       }
       }

+ 17 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java

@@ -55,8 +55,11 @@ public interface TimelineReader {
   final long DEFAULT_LIMIT = 100;
   final long DEFAULT_LIMIT = 100;
 
 
   /**
   /**
-   * This method retrieves a list of entity information, {@link TimelineEntity}, sorted
-   * by the starting timestamp for the entity, descending.
+   * This method retrieves a list of entity information, {@link TimelineEntity},
+   * sorted by the starting timestamp for the entity, descending. The starting
+   * timestamp of an entity is a timestamp specified by the client. If it is not
+   * explicitly specified, it will be chosen by the store to be the earliest
+   * timestamp of the events received in the first put for the entity.
    * 
    * 
    * @param entityType
    * @param entityType
    *          The type of entities to return (required).
    *          The type of entities to return (required).
@@ -69,6 +72,17 @@ public interface TimelineReader {
    * @param windowEnd
    * @param windowEnd
    *          The latest start timestamp to retrieve (inclusive). If null,
    *          The latest start timestamp to retrieve (inclusive). If null,
    *          defaults to {@link Long#MAX_VALUE}
    *          defaults to {@link Long#MAX_VALUE}
+   * @param fromId
+   *          If fromId is not null, retrieve entities earlier than and
+   *          including the specified ID. If no start time is found for the
+   *          specified ID, an empty list of entities will be returned. The
+   *          windowEnd parameter will take precedence if the start time of this
+   *          entity falls later than windowEnd.
+   * @param fromTs
+   *          If fromTs is not null, ignore entities that were inserted into the
+   *          store after the given timestamp. The entity's insert timestamp
+   *          used for this comparison is the store's system time when the first
+   *          put for the entity was received (not the entity's start time).
    * @param primaryFilter
    * @param primaryFilter
    *          Retrieves only entities that have the specified primary filter. If
    *          Retrieves only entities that have the specified primary filter. If
    *          null, retrieves all entities. This is an indexed retrieval, and no
    *          null, retrieves all entities. This is an indexed retrieval, and no
@@ -88,7 +102,7 @@ public interface TimelineReader {
    * @throws IOException
    * @throws IOException
    */
    */
   TimelineEntities getEntities(String entityType,
   TimelineEntities getEntities(String entityType,
-      Long limit, Long windowStart, Long windowEnd,
+      Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       EnumSet<Field> fieldsToRetrieve) throws IOException;
       EnumSet<Field> fieldsToRetrieve) throws IOException;
 
 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java

@@ -134,6 +134,8 @@ public class TimelineWebServices {
       @QueryParam("secondaryFilter") String secondaryFilter,
       @QueryParam("secondaryFilter") String secondaryFilter,
       @QueryParam("windowStart") String windowStart,
       @QueryParam("windowStart") String windowStart,
       @QueryParam("windowEnd") String windowEnd,
       @QueryParam("windowEnd") String windowEnd,
+      @QueryParam("fromId") String fromId,
+      @QueryParam("fromTs") String fromTs,
       @QueryParam("limit") String limit,
       @QueryParam("limit") String limit,
       @QueryParam("fields") String fields) {
       @QueryParam("fields") String fields) {
     init(res);
     init(res);
@@ -144,6 +146,8 @@ public class TimelineWebServices {
           parseLongStr(limit),
           parseLongStr(limit),
           parseLongStr(windowStart),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
           parseLongStr(windowEnd),
+          parseStr(fromId),
+          parseLongStr(fromTs),
           parsePairStr(primaryFilter, ":"),
           parsePairStr(primaryFilter, ":"),
           parsePairsStr(secondaryFilter, ",", ":"),
           parsePairsStr(secondaryFilter, ",", ":"),
           parseFieldsStr(fields, ","));
           parseFieldsStr(fields, ","));

+ 62 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
 import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.DBIterator;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
@@ -46,8 +44,7 @@ import static org.junit.Assert.assertEquals;
 
 
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 @InterfaceStability.Unstable
-public class TestLeveldbTimelineStore
-    extends TimelineStoreTestUtils {
+public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
   private FileContext fsContext;
   private FileContext fsContext;
   private File fsPath;
   private File fsPath;
 
 
@@ -87,6 +84,16 @@ public class TestLeveldbTimelineStore
     super.testGetEntities();
     super.testGetEntities();
   }
   }
 
 
+  @Test
+  public void testGetEntitiesWithFromId() throws IOException {
+    super.testGetEntitiesWithFromId();
+  }
+
+  @Test
+  public void testGetEntitiesWithFromTs() throws IOException {
+    super.testGetEntitiesWithFromTs();
+  }
+
   @Test
   @Test
   public void testGetEntitiesWithPrimaryFilters() throws IOException {
   public void testGetEntitiesWithPrimaryFilters() throws IOException {
     super.testGetEntitiesWithPrimaryFilters();
     super.testGetEntitiesWithPrimaryFilters();
@@ -135,55 +142,45 @@ public class TestLeveldbTimelineStore
   @Test
   @Test
   public void testGetEntityTypes() throws IOException {
   public void testGetEntityTypes() throws IOException {
     List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
     List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
-    assertEquals(2, entityTypes.size());
+    assertEquals(4, entityTypes.size());
     assertEquals(entityType1, entityTypes.get(0));
     assertEquals(entityType1, entityTypes.get(0));
     assertEquals(entityType2, entityTypes.get(1));
     assertEquals(entityType2, entityTypes.get(1));
+    assertEquals(entityType4, entityTypes.get(2));
+    assertEquals(entityType5, entityTypes.get(3));
   }
   }
 
 
   @Test
   @Test
   public void testDeleteEntities() throws IOException, InterruptedException {
   public void testDeleteEntities() throws IOException, InterruptedException {
-    assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
-        null).getEntities().size());
-    assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
-        null).getEntities().size());
+    assertEquals(2, getEntities("type_1").size());
+    assertEquals(1, getEntities("type_2").size());
 
 
     assertEquals(false, deleteNextEntity(entityType1,
     assertEquals(false, deleteNextEntity(entityType1,
         writeReverseOrderedLong(122l)));
         writeReverseOrderedLong(122l)));
-    assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
-        null).getEntities().size());
-    assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
-        null).getEntities().size());
+    assertEquals(2, getEntities("type_1").size());
+    assertEquals(1, getEntities("type_2").size());
 
 
     assertEquals(true, deleteNextEntity(entityType1,
     assertEquals(true, deleteNextEntity(entityType1,
         writeReverseOrderedLong(123l)));
         writeReverseOrderedLong(123l)));
-    List<TimelineEntity> entities =
-        store.getEntities("type_2", null, null, null, null, null,
-            EnumSet.allOf(Field.class)).getEntities();
+    List<TimelineEntity> entities = getEntities("type_2");
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
     verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
         entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
         entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
         EMPTY_MAP, entities.get(0));
         EMPTY_MAP, entities.get(0));
-    entities = store.getEntities("type_1", null, null, null, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
 
 
     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
-    assertEquals(1, store.getEntities("type_1", null, null, null, null, null,
-        null).getEntities().size());
-    assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
-        null).getEntities().size());
-    assertEquals(1, ((LeveldbTimelineStore)store).getEntityTypes().size());
+    assertEquals(1, getEntities("type_1").size());
+    assertEquals(0, getEntities("type_2").size());
+    assertEquals(3, ((LeveldbTimelineStore)store).getEntityTypes().size());
 
 
     ((LeveldbTimelineStore)store).discardOldEntities(123l);
     ((LeveldbTimelineStore)store).discardOldEntities(123l);
-    assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
-        null).getEntities().size());
-    assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
-        null).getEntities().size());
+    assertEquals(0, getEntities("type_1").size());
+    assertEquals(0, getEntities("type_2").size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
-    assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
-        null, null).getEntities().size());
+    assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
   }
   }
 
 
   @Test
   @Test
@@ -200,14 +197,13 @@ public class TestLeveldbTimelineStore
     assertEquals(0, response.getErrors().size());
     assertEquals(0, response.getErrors().size());
 
 
     NameValuePair pfPair = new NameValuePair("user", "otheruser");
     NameValuePair pfPair = new NameValuePair("user", "otheruser");
-    List<TimelineEntity> entities = store.getEntities("type_1", null, null,
-        null, pfPair, null, null).getEntities();
+    List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1",
+        pfPair);
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
     verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
         EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
         EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
 
 
-    entities = store.getEntities("type_1", null, null, null,
-        userFilter, null, null).getEntities();
+    entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
@@ -215,22 +211,43 @@ public class TestLeveldbTimelineStore
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
-    assertEquals(1, store.getEntities("type_1", null, null, null, pfPair, null,
-        null).getEntities().size());
-    assertEquals(2, store.getEntities("type_1", null, null, null, userFilter,
-        null, null).getEntities().size());
+    assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
+    assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
 
 
     ((LeveldbTimelineStore)store).discardOldEntities(123l);
     ((LeveldbTimelineStore)store).discardOldEntities(123l);
-    assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
-        null).getEntities().size());
-    assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
-        null).getEntities().size());
+    assertEquals(0, getEntities("type_1").size());
+    assertEquals(0, getEntities("type_2").size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
 
 
-    assertEquals(0, store.getEntities("type_1", null, null, null, pfPair, null,
-        null).getEntities().size());
-    assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
-        null, null).getEntities().size());
+    assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
+    assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
+  }
+
+  @Test
+  public void testFromTsWithDeletion()
+      throws IOException, InterruptedException {
+    long l = System.currentTimeMillis();
+    assertEquals(2, getEntitiesFromTs("type_1", l).size());
+    assertEquals(1, getEntitiesFromTs("type_2", l).size());
+    assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        l).size());
+    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    assertEquals(0, getEntitiesFromTs("type_1", l).size());
+    assertEquals(0, getEntitiesFromTs("type_2", l).size());
+    assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        l).size());
+    assertEquals(0, getEntities("type_1").size());
+    assertEquals(0, getEntities("type_2").size());
+    assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        l).size());
+    loadTestData();
+    assertEquals(0, getEntitiesFromTs("type_1", l).size());
+    assertEquals(0, getEntitiesFromTs("type_2", l).size());
+    assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        l).size());
+    assertEquals(2, getEntities("type_1").size());
+    assertEquals(1, getEntities("type_2").size());
+    assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
   }
   }
 
 
 }
 }

+ 11 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java

@@ -19,16 +19,13 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
 package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
 
 
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
-public class TestMemoryTimelineStore
-    extends TimelineStoreTestUtils {
+public class TestMemoryTimelineStore extends TimelineStoreTestUtils {
 
 
   @Before
   @Before
   public void setup() throws Exception {
   public void setup() throws Exception {
@@ -58,6 +55,16 @@ public class TestMemoryTimelineStore
     super.testGetEntities();
     super.testGetEntities();
   }
   }
 
 
+  @Test
+  public void testGetEntitiesWithFromId() throws IOException {
+    super.testGetEntitiesWithFromId();
+  }
+
+  @Test
+  public void testGetEntitiesWithFromTs() throws IOException {
+    super.testGetEntitiesWithFromTs();
+  }
+
   @Test
   @Test
   public void testGetEntitiesWithPrimaryFilters() throws IOException {
   public void testGetEntitiesWithPrimaryFilters() throws IOException {
     super.testGetEntitiesWithPrimaryFilters();
     super.testGetEntitiesWithPrimaryFilters();

+ 255 - 72
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStoreTestUtils.java

@@ -41,12 +41,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
 
 
 public class TimelineStoreTestUtils {
 public class TimelineStoreTestUtils {
 
 
+  protected static final List<TimelineEvent> EMPTY_EVENTS =
+      Collections.emptyList();
   protected static final Map<String, Object> EMPTY_MAP =
   protected static final Map<String, Object> EMPTY_MAP =
       Collections.emptyMap();
       Collections.emptyMap();
   protected static final Map<String, Set<Object>> EMPTY_PRIMARY_FILTERS =
   protected static final Map<String, Set<Object>> EMPTY_PRIMARY_FILTERS =
@@ -60,11 +60,16 @@ public class TimelineStoreTestUtils {
   protected String entityId1b;
   protected String entityId1b;
   protected String entityId2;
   protected String entityId2;
   protected String entityType2;
   protected String entityType2;
+  protected String entityId4;
+  protected String entityType4;
+  protected String entityId5;
+  protected String entityType5;
   protected Map<String, Set<Object>> primaryFilters;
   protected Map<String, Set<Object>> primaryFilters;
   protected Map<String, Object> secondaryFilters;
   protected Map<String, Object> secondaryFilters;
   protected Map<String, Object> allFilters;
   protected Map<String, Object> allFilters;
   protected Map<String, Object> otherInfo;
   protected Map<String, Object> otherInfo;
   protected Map<String, Set<String>> relEntityMap;
   protected Map<String, Set<String>> relEntityMap;
+  protected Map<String, Set<String>> relEntityMap2;
   protected NameValuePair userFilter;
   protected NameValuePair userFilter;
   protected NameValuePair numericFilter1;
   protected NameValuePair numericFilter1;
   protected NameValuePair numericFilter2;
   protected NameValuePair numericFilter2;
@@ -78,11 +83,13 @@ public class TimelineStoreTestUtils {
   protected Map<String, Object> eventInfo;
   protected Map<String, Object> eventInfo;
   protected List<TimelineEvent> events1;
   protected List<TimelineEvent> events1;
   protected List<TimelineEvent> events2;
   protected List<TimelineEvent> events2;
+  protected long beforeTs;
 
 
   /**
   /**
    * Load test data into the given store
    * Load test data into the given store
    */
    */
   protected void loadTestData() throws IOException {
   protected void loadTestData() throws IOException {
+    beforeTs = System.currentTimeMillis()-1;
     TimelineEntities entities = new TimelineEntities();
     TimelineEntities entities = new TimelineEntities();
     Map<String, Set<Object>> primaryFilters =
     Map<String, Set<Object>> primaryFilters =
         new HashMap<String, Set<Object>>();
         new HashMap<String, Set<Object>>();
@@ -110,6 +117,10 @@ public class TimelineStoreTestUtils {
     String entityId1b = "id_2";
     String entityId1b = "id_2";
     String entityId2 = "id_2";
     String entityId2 = "id_2";
     String entityType2 = "type_2";
     String entityType2 = "type_2";
+    String entityId4 = "id_4";
+    String entityType4 = "type_4";
+    String entityId5 = "id_5";
+    String entityType5 = "type_5";
 
 
     Map<String, Set<String>> relatedEntities =
     Map<String, Set<String>> relatedEntities =
         new HashMap<String, Set<String>>();
         new HashMap<String, Set<String>>();
@@ -161,6 +172,13 @@ public class TimelineStoreTestUtils {
     assertEquals("badentityid", error.getEntityId());
     assertEquals("badentityid", error.getEntityId());
     assertEquals("badentity", error.getEntityType());
     assertEquals("badentity", error.getEntityType());
     assertEquals(TimelinePutError.NO_START_TIME, error.getErrorCode());
     assertEquals(TimelinePutError.NO_START_TIME, error.getErrorCode());
+
+    relatedEntities.clear();
+    relatedEntities.put(entityType5, Collections.singleton(entityId5));
+    entities.setEntities(Collections.singletonList(createEntity(entityId4,
+        entityType4, 42l, null, relatedEntities, null, null)));
+    response = store.put(entities);
+    assertEquals(0, response.getErrors().size());
   }
   }
 
 
   /**
   /**
@@ -211,6 +229,10 @@ public class TimelineStoreTestUtils {
     entityId1b = "id_2";
     entityId1b = "id_2";
     entityId2 = "id_2";
     entityId2 = "id_2";
     entityType2 = "type_2";
     entityType2 = "type_2";
+    entityId4 = "id_4";
+    entityType4 = "type_4";
+    entityId5 = "id_5";
+    entityType5 = "type_5";
 
 
     ev1 = createEvent(123l, "start_event", null);
     ev1 = createEvent(123l, "start_event", null);
 
 
@@ -228,6 +250,10 @@ public class TimelineStoreTestUtils {
     ids.add(entityId1b);
     ids.add(entityId1b);
     relEntityMap.put(entityType1, ids);
     relEntityMap.put(entityType1, ids);
 
 
+    relEntityMap2 =
+        new HashMap<String, Set<String>>();
+    relEntityMap2.put(entityType4, Collections.singleton(entityId4));
+
     ev3 = createEvent(789l, "launch_event", null);
     ev3 = createEvent(789l, "launch_event", null);
     ev4 = createEvent(-123l, "init_event", null);
     ev4 = createEvent(-123l, "init_event", null);
     events2 = new ArrayList<TimelineEvent>();
     events2 = new ArrayList<TimelineEvent>();
@@ -241,16 +267,24 @@ public class TimelineStoreTestUtils {
         store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
         store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
 
 
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
-        primaryFilters, otherInfo, store.getEntity(entityId1, entityType1,
-        EnumSet.allOf(Field.class)));
+        primaryFilters, otherInfo, 123l, store.getEntity(entityId1,
+        entityType1, EnumSet.allOf(Field.class)));
 
 
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
-        primaryFilters, otherInfo, store.getEntity(entityId1b, entityType1,
-        EnumSet.allOf(Field.class)));
+        primaryFilters, otherInfo, 123l, store.getEntity(entityId1b,
+        entityType1, EnumSet.allOf(Field.class)));
 
 
     verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
     verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
-        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, store.getEntity(entityId2, entityType2,
-        EnumSet.allOf(Field.class)));
+        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2,
+        entityType2, EnumSet.allOf(Field.class)));
+
+    verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
+        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId4,
+        entityType4, EnumSet.allOf(Field.class)));
+
+    verifyEntityInfo(entityId5, entityType5, EMPTY_EVENTS, relEntityMap2,
+        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId5,
+        entityType5, EnumSet.allOf(Field.class)));
 
 
     // test getting single fields
     // test getting single fields
     verifyEntityInfo(entityId1, entityType1, events1, null, null, null,
     verifyEntityInfo(entityId1, entityType1, events1, null, null, null,
@@ -276,70 +310,132 @@ public class TimelineStoreTestUtils {
             EnumSet.of(Field.RELATED_ENTITIES)));
             EnumSet.of(Field.RELATED_ENTITIES)));
   }
   }
 
 
+  protected List<TimelineEntity> getEntities(String entityType)
+      throws IOException {
+    return store.getEntities(entityType, null, null, null, null, null,
+        null, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesWithPrimaryFilter(
+      String entityType, NameValuePair primaryFilter) throws IOException {
+    return store.getEntities(entityType, null, null, null, null, null,
+        primaryFilter, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesFromId(String entityType,
+      String fromId) throws IOException {
+    return store.getEntities(entityType, null, null, null, fromId, null,
+        null, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesFromTs(String entityType,
+      long fromTs) throws IOException {
+    return store.getEntities(entityType, null, null, null, null, fromTs,
+        null, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilter(
+      String entityType, NameValuePair primaryFilter, String fromId)
+      throws IOException {
+    return store.getEntities(entityType, null, null, null, fromId, null,
+        primaryFilter, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesFromTsWithPrimaryFilter(
+      String entityType, NameValuePair primaryFilter, long fromTs)
+      throws IOException {
+    return store.getEntities(entityType, null, null, null, null, fromTs,
+        primaryFilter, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesFromIdWithWindow(String entityType,
+      Long windowEnd, String fromId) throws IOException {
+    return store.getEntities(entityType, null, null, windowEnd, fromId, null,
+        null, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilterAndWindow(
+      String entityType, Long windowEnd, String fromId,
+      NameValuePair primaryFilter) throws IOException {
+    return store.getEntities(entityType, null, null, windowEnd, fromId, null,
+        primaryFilter, null, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntitiesWithFilters(String entityType,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
+      throws IOException {
+    return store.getEntities(entityType, null, null, null, null, null,
+        primaryFilter, secondaryFilters, null).getEntities();
+  }
+
+  protected List<TimelineEntity> getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, NameValuePair primaryFilter,
+      EnumSet<Field> fields) throws IOException {
+    return store.getEntities(entityType, limit, windowStart, windowEnd, null,
+        null, primaryFilter, null, fields).getEntities();
+  }
+
   public void testGetEntities() throws IOException {
   public void testGetEntities() throws IOException {
     // test getting entities
     // test getting entities
     assertEquals("nonzero entities size for nonexistent type", 0,
     assertEquals("nonzero entities size for nonexistent type", 0,
-        store.getEntities("type_0", null, null, null, null, null,
-            null).getEntities().size());
+        getEntities("type_0").size());
+    assertEquals("nonzero entities size for nonexistent type", 0,
+        getEntities("type_3").size());
+    assertEquals("nonzero entities size for nonexistent type", 0,
+        getEntities("type_6").size());
     assertEquals("nonzero entities size for nonexistent type", 0,
     assertEquals("nonzero entities size for nonexistent type", 0,
-        store.getEntities("type_3", null, null, null, null, null,
-            null).getEntities().size());
+        getEntitiesWithPrimaryFilter("type_0", userFilter).size());
     assertEquals("nonzero entities size for nonexistent type", 0,
     assertEquals("nonzero entities size for nonexistent type", 0,
-        store.getEntities("type_0", null, null, null, userFilter,
-            null, null).getEntities().size());
+        getEntitiesWithPrimaryFilter("type_3", userFilter).size());
     assertEquals("nonzero entities size for nonexistent type", 0,
     assertEquals("nonzero entities size for nonexistent type", 0,
-        store.getEntities("type_3", null, null, null, userFilter,
-            null, null).getEntities().size());
+        getEntitiesWithPrimaryFilter("type_6", userFilter).size());
 
 
-    List<TimelineEntity> entities =
-        store.getEntities("type_1", null, null, null, null, null,
-            EnumSet.allOf(Field.class)).getEntities();
+    List<TimelineEntity> entities = getEntities("type_1");
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    entities = store.getEntities("type_2", null, null, null, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_2");
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
     verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
         EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
         EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
 
 
-    entities = store.getEntities("type_1", 1l, null, null, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", 1l, null, null, null,
+        EnumSet.allOf(Field.class));
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
 
 
-    entities = store.getEntities("type_1", 1l, 0l, null, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", 1l, 0l, null, null,
+        EnumSet.allOf(Field.class));
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
 
 
-    entities = store.getEntities("type_1", null, 234l, null, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, 234l, null, null,
+        EnumSet.allOf(Field.class));
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", null, 123l, null, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, 123l, null, null,
+        EnumSet.allOf(Field.class));
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", null, 234l, 345l, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, 234l, 345l, null,
+        EnumSet.allOf(Field.class));
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", null, null, 345l, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, null, 345l, null,
+        EnumSet.allOf(Field.class));
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    entities = store.getEntities("type_1", null, null, 123l, null, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, null, 123l, null,
+        EnumSet.allOf(Field.class));
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
@@ -347,79 +443,152 @@ public class TimelineStoreTestUtils {
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
   }
   }
 
 
+  public void testGetEntitiesWithFromId() throws IOException {
+    List<TimelineEntity> entities = getEntitiesFromId("type_1", entityId1);
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = getEntitiesFromId("type_1", entityId1b);
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    entities = getEntitiesFromIdWithWindow("type_1", 0l, entityId1);
+    assertEquals(0, entities.size());
+
+    entities = getEntitiesFromId("type_2", "a");
+    assertEquals(0, entities.size());
+
+    entities = getEntitiesFromId("type_2", entityId2);
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
+        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
+
+    entities = getEntitiesFromIdWithWindow("type_2", -456l, null);
+    assertEquals(0, entities.size());
+
+    entities = getEntitiesFromIdWithWindow("type_2", -456l, "a");
+    assertEquals(0, entities.size());
+
+    entities = getEntitiesFromIdWithWindow("type_2", 0l, null);
+    assertEquals(1, entities.size());
+
+    entities = getEntitiesFromIdWithWindow("type_2", 0l, entityId2);
+    assertEquals(1, entities.size());
+
+    // same tests with primary filters
+    entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter,
+        entityId1);
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter,
+        entityId1b);
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    entities = getEntitiesFromIdWithPrimaryFilterAndWindow("type_1", 0l,
+        entityId1, userFilter);
+    assertEquals(0, entities.size());
+
+    entities = getEntitiesFromIdWithPrimaryFilter("type_2", userFilter, "a");
+    assertEquals(0, entities.size());
+  }
+
+  public void testGetEntitiesWithFromTs() throws IOException {
+    assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size());
+    assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size());
+    assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        beforeTs).size());
+    long afterTs = System.currentTimeMillis();
+    assertEquals(2, getEntitiesFromTs("type_1", afterTs).size());
+    assertEquals(1, getEntitiesFromTs("type_2", afterTs).size());
+    assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        afterTs).size());
+    assertEquals(2, getEntities("type_1").size());
+    assertEquals(1, getEntities("type_2").size());
+    assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
+    // check insert time is not overwritten
+    long beforeTs = this.beforeTs;
+    loadTestData();
+    assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size());
+    assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size());
+    assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        beforeTs).size());
+    assertEquals(2, getEntitiesFromTs("type_1", afterTs).size());
+    assertEquals(1, getEntitiesFromTs("type_2", afterTs).size());
+    assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+        afterTs).size());
+  }
+
   public void testGetEntitiesWithPrimaryFilters() throws IOException {
   public void testGetEntitiesWithPrimaryFilters() throws IOException {
     // test using primary filter
     // test using primary filter
     assertEquals("nonzero entities size for primary filter", 0,
     assertEquals("nonzero entities size for primary filter", 0,
-        store.getEntities("type_1", null, null, null,
-            new NameValuePair("none", "none"), null,
-            EnumSet.allOf(Field.class)).getEntities().size());
+        getEntitiesWithPrimaryFilter("type_1",
+            new NameValuePair("none", "none")).size());
     assertEquals("nonzero entities size for primary filter", 0,
     assertEquals("nonzero entities size for primary filter", 0,
-        store.getEntities("type_2", null, null, null,
-            new NameValuePair("none", "none"), null,
-            EnumSet.allOf(Field.class)).getEntities().size());
+        getEntitiesWithPrimaryFilter("type_2",
+            new NameValuePair("none", "none")).size());
     assertEquals("nonzero entities size for primary filter", 0,
     assertEquals("nonzero entities size for primary filter", 0,
-        store.getEntities("type_3", null, null, null,
-            new NameValuePair("none", "none"), null,
-            EnumSet.allOf(Field.class)).getEntities().size());
+        getEntitiesWithPrimaryFilter("type_3",
+            new NameValuePair("none", "none")).size());
 
 
-    List<TimelineEntity> entities = store.getEntities("type_1", null, null, null,
-        userFilter, null, EnumSet.allOf(Field.class)).getEntities();
+    List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1",
+        userFilter);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    store.getEntities("type_1", null, null, null,
-        numericFilter1, null, EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithPrimaryFilter("type_1", numericFilter1);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    store.getEntities("type_1", null, null, null,
-        numericFilter2, null, EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithPrimaryFilter("type_1", numericFilter2);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    store.getEntities("type_1", null, null, null,
-        numericFilter3, null, EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithPrimaryFilter("type_1", numericFilter3);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    entities = store.getEntities("type_2", null, null, null, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithPrimaryFilter("type_2", userFilter);
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", 1l, null, null, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", 1l, null, null, userFilter, null);
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
 
 
-    entities = store.getEntities("type_1", 1l, 0l, null, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", 1l, 0l, null, userFilter, null);
     assertEquals(1, entities.size());
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
 
 
-    entities = store.getEntities("type_1", null, 234l, null, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, 234l, null, userFilter, null);
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", null, 234l, 345l, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, 234l, 345l, userFilter, null);
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", null, null, 345l, userFilter, null,
-        EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntities("type_1", null, null, 345l, userFilter, null);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
@@ -429,28 +598,29 @@ public class TimelineStoreTestUtils {
 
 
   public void testGetEntitiesWithSecondaryFilters() throws IOException {
   public void testGetEntitiesWithSecondaryFilters() throws IOException {
     // test using secondary filter
     // test using secondary filter
-    List<TimelineEntity> entities = store.getEntities("type_1", null, null, null,
-        null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    List<TimelineEntity> entities = getEntitiesWithFilters("type_1", null,
+        goodTestingFilters);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    entities = store.getEntities("type_1", null, null, null, userFilter,
-        goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithFilters("type_1", userFilter, goodTestingFilters);
     assertEquals(2, entities.size());
     assertEquals(2, entities.size());
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(0));
         primaryFilters, otherInfo, entities.get(0));
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1));
         primaryFilters, otherInfo, entities.get(1));
 
 
-    entities = store.getEntities("type_1", null, null, null, null,
-        badTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithFilters("type_1", null,
+        Collections.singleton(new NameValuePair("user", "none")));
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
 
 
-    entities = store.getEntities("type_1", null, null, null, userFilter,
-        badTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    entities = getEntitiesWithFilters("type_1", null, badTestingFilters);
+    assertEquals(0, entities.size());
+
+    entities = getEntitiesWithFilters("type_1", userFilter, badTestingFilters);
     assertEquals(0, entities.size());
     assertEquals(0, entities.size());
   }
   }
 
 
@@ -514,6 +684,19 @@ public class TimelineStoreTestUtils {
     verifyEntityTimeline(timelines.get(0), entityId2, entityType2, ev3, ev4);
     verifyEntityTimeline(timelines.get(0), entityId2, entityType2, ev3, ev4);
   }
   }
 
 
+  /**
+   * Verify a single entity and its start time
+   */
+  protected static void verifyEntityInfo(String entityId, String entityType,
+      List<TimelineEvent> events, Map<String, Set<String>> relatedEntities,
+      Map<String, Set<Object>> primaryFilters, Map<String, Object> otherInfo,
+      Long startTime, TimelineEntity retrievedEntityInfo) {
+
+    verifyEntityInfo(entityId, entityType, events, relatedEntities,
+        primaryFilters, otherInfo, retrievedEntityInfo);
+    assertEquals(startTime, retrievedEntityInfo.getStartTime());
+  }
+
   /**
   /**
    * Verify a single entity
    * Verify a single entity
    */
    */

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java

@@ -50,6 +50,7 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
 public class TestTimelineWebServices extends JerseyTest {
 public class TestTimelineWebServices extends JerseyTest {
 
 
   private static TimelineStore store;
   private static TimelineStore store;
+  private long beforeTime;
 
 
   private Injector injector = Guice.createInjector(new ServletModule() {
   private Injector injector = Guice.createInjector(new ServletModule() {
 
 
@@ -79,6 +80,7 @@ public class TestTimelineWebServices extends JerseyTest {
 
 
   private TimelineStore mockTimelineStore()
   private TimelineStore mockTimelineStore()
       throws Exception {
       throws Exception {
+    beforeTime = System.currentTimeMillis() - 1;
     TestMemoryTimelineStore store =
     TestMemoryTimelineStore store =
         new TestMemoryTimelineStore();
         new TestMemoryTimelineStore();
     store.setup();
     store.setup();
@@ -141,6 +143,47 @@ public class TestTimelineWebServices extends JerseyTest {
     verifyEntities(response.getEntity(TimelineEntities.class));
     verifyEntities(response.getEntity(TimelineEntities.class));
   }
   }
 
 
+  @Test
+  public void testFromId() throws Exception {
+    WebResource r = resource();
+    ClientResponse response = r.path("ws").path("v1").path("timeline")
+        .path("type_1").queryParam("fromId", "id_2")
+        .accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+    assertEquals(1, response.getEntity(TimelineEntities.class).getEntities()
+        .size());
+
+    response = r.path("ws").path("v1").path("timeline")
+        .path("type_1").queryParam("fromId", "id_1")
+        .accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+    assertEquals(2, response.getEntity(TimelineEntities.class).getEntities()
+        .size());
+  }
+
+  @Test
+  public void testFromTs() throws Exception {
+    WebResource r = resource();
+    ClientResponse response = r.path("ws").path("v1").path("timeline")
+        .path("type_1").queryParam("fromTs", Long.toString(beforeTime))
+        .accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+    assertEquals(0, response.getEntity(TimelineEntities.class).getEntities()
+        .size());
+
+    response = r.path("ws").path("v1").path("timeline")
+        .path("type_1").queryParam("fromTs", Long.toString(
+            System.currentTimeMillis()))
+        .accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+    assertEquals(2, response.getEntity(TimelineEntities.class).getEntities()
+        .size());
+  }
+
   @Test
   @Test
   public void testPrimaryFilterString() {
   public void testPrimaryFilterString() {
     WebResource r = resource();
     WebResource r = resource();