Browse Source

YARN-1717. Enabled periodically discarding old data in LeveldbTimelineStore. Contributed by Billie Rinaldi.
svn merge --ignore-ancestry -c 1577693 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1577700 13f79535-47bb-0310-9956-ffa450edef68

Zhijie Shen 11 years ago
parent
commit
bc9353e608

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -111,6 +111,9 @@ Release 2.4.0 - UNRELEASED
     transferred containers from previous app-attempts to new AMs after YARN-1490.
     (Jian He via vinodkv)
 
+    YARN-1717. Enabled periodically discarding old data in LeveldbTimelineStore.
+    (Billie Rinaldi via zjshen)
+
   IMPROVEMENTS
 
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1100,6 +1100,17 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_STORE =
       TIMELINE_SERVICE_PREFIX + "store-class";
 
+  /** Timeline service enable data age off */
+  public static final String TIMELINE_SERVICE_TTL_ENABLE =
+      TIMELINE_SERVICE_PREFIX + "ttl-enable";
+
+  /** Timeline service length of time to retain data */
+  public static final String TIMELINE_SERVICE_TTL_MS =
+      TIMELINE_SERVICE_PREFIX + "ttl-ms";
+
+  public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
+      1000 * 60 * 60 * 24 * 7;
+
   public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
       TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
 
@@ -1107,16 +1118,36 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_LEVELDB_PATH =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
 
+  /** Timeline service leveldb read cache (uncompressed blocks) */
+  public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
+
+  public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
+      100 * 1024 * 1024;
+
   /** Timeline service leveldb start time read cache (number of entities) */
   public static final String
       TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
 
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE = 10000;
+
   /** Timeline service leveldb start time write cache (number of entities) */
   public static final String
       TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
 
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE = 10000;
+
+  /** Timeline service leveldb interval to wait between deletion rounds */
+  public static final String TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "ttl-interval-ms";
+
+  public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
+      1000 * 60 * 5;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

+ 39 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1107,23 +1107,59 @@
   </property>
 
   <property>
-    <description>The https adddress of the timeline service web application.</description>
+    <description>The https address of the timeline service web application.</description>
     <name>yarn.timeline-service.webapp.https.address</name>
     <value>${yarn.timeline-service.hostname}:8190</value>
   </property>
 
   <property>
-    <description>Store class name for timeline store</description>
+    <description>Store class name for timeline store.</description>
     <name>yarn.timeline-service.store-class</name>
     <value>org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore</value>
   </property>
 
   <property>
-    <description>Store file name for leveldb timeline store</description>
+    <description>Enable age off of timeline store data.</description>
+    <name>yarn.timeline-service.ttl-enable</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <description>Time to live for timeline store data in milliseconds.</description>
+    <name>yarn.timeline-service.ttl-ms</name>
+    <value>604800000</value>
+  </property>
+
+  <property>
+    <description>Store file name for leveldb timeline store.</description>
     <name>yarn.timeline-service.leveldb-timeline-store.path</name>
     <value>${yarn.log.dir}/timeline</value>
   </property>
 
+  <property>
+    <description>Length of time to wait between deletion cycles of leveldb timeline store in milliseconds.</description>
+    <name>yarn.timeline-service.leveldb-timeline-store.ttl-interval-ms</name>
+    <value>300000</value>
+  </property>
+
+  <property>
+    <description>Size of read cache for uncompressed blocks for leveldb timeline store in bytes.</description>
+    <name>yarn.timeline-service.leveldb-timeline-store.read-cache-size</name>
+    <value>104857600</value>
+  </property>
+
+  <property>
+    <description>Size of cache for recently read entity start times for leveldb timeline store in number of entities.</description>
+    <name>yarn.timeline-service.leveldb-timeline-store.start-time-read-cache-size</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <description>Size of cache for recently written entity start times for leveldb timeline store in number of entities.</description>
+    <name>yarn.timeline-service.leveldb-timeline-store.start-time-write-cache-size</name>
+    <value>10000</value>
+  </property>
+
   <property>
     <description>Handler thread count to serve the client RPC requests.</description>
     <name>yarn.timeline-service.handler-thread-count</name>

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java

@@ -56,7 +56,7 @@ public class GenericObjectMapper {
    *
    * @param o An Object
    * @return A byte array representation of the Object
-   * @throws IOException
+   * @throws IOException if there is a write error
    */
   public static byte[] write(Object o) throws IOException {
     if (o == null) {
@@ -71,7 +71,7 @@ public class GenericObjectMapper {
    *
    * @param b A byte array
    * @return An Object
-   * @throws IOException
+   * @throws IOException if there is a read error
    */
   public static Object read(byte[] b) throws IOException {
     return read(b, 0);
@@ -84,7 +84,7 @@ public class GenericObjectMapper {
    * @param b A byte array
    * @param offset Offset into the array
    * @return An Object
-   * @throws IOException
+   * @throws IOException if there is a read error
    */
   public static Object read(byte[] b, int offset) throws IOException {
     if (b == null || b.length == 0) {

+ 552 - 124
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java

@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections.map.LRUMap;
@@ -57,13 +58,60 @@ import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
+import org.iq80.leveldb.ReadOptions;
 import org.iq80.leveldb.WriteBatch;
+import org.iq80.leveldb.WriteOptions;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
 
 /**
- * An implementation of a timeline store backed by leveldb.
+ * <p>An implementation of an application timeline store backed by leveldb.</p>
+ *
+ * <p>There are three sections of the db, the start time section,
+ * the entity section, and the indexed entity section.</p>
+ *
+ * <p>The start time section is used to retrieve the unique start time for
+ * a given entity. Its values each contain a start time while its keys are of
+ * the form:</p>
+ * <pre>
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id</pre>
+ *
+ * <p>The entity section is ordered by entity type, then entity start time
+ * descending, then entity ID. There are four sub-sections of the entity
+ * section: events, primary filters, related entities,
+ * and other info. The event entries have event info serialized into their
+ * values. The other info entries have values corresponding to the values of
+ * the other info name/value map for the entry (note the names are contained
+ * in the key). All other entries have empty values. The key structure is as
+ * follows:</p>
+ * <pre>
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
+ *     relatedentity id</pre>
+ *
+ * <p>The indexed entity section contains a primary filter name and primary
+ * filter value as the prefix. Within a given name/value, entire entity
+ * entries are stored in the same format as described in the entity section
+ * above (below, "key" represents any one of the possible entity entry keys
+ * described above).</p>
+ * <pre>
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key</pre>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -78,16 +126,15 @@ public class LeveldbTimelineStore extends AbstractService
   private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
   private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
 
-  private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes();
+  private static final byte[] EVENTS_COLUMN = "e".getBytes();
+  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes();
   private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
-  private static final byte[] RELATED_COLUMN = "r".getBytes();
-  private static final byte[] TIME_COLUMN = "t".getBytes();
+  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes();
+  private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
+      "z".getBytes();
 
   private static final byte[] EMPTY_BYTES = new byte[0];
 
-  private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000;
-  private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000;
-
   private Map<EntityIdentifier, Long> startTimeWriteCache;
   private Map<EntityIdentifier, Long> startTimeReadCache;
 
@@ -97,8 +144,13 @@ public class LeveldbTimelineStore extends AbstractService
   private final LockMap<EntityIdentifier> writeLocks =
       new LockMap<EntityIdentifier>();
 
+  private final ReentrantReadWriteLock deleteLock =
+      new ReentrantReadWriteLock();
+
   private DB db;
 
+  private Thread deletionThread;
+
   public LeveldbTimelineStore() {
     super(LeveldbTimelineStore.class.getName());
   }
@@ -108,13 +160,18 @@ public class LeveldbTimelineStore extends AbstractService
   protected void serviceInit(Configuration conf) throws Exception {
     Options options = new Options();
     options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
     JniDBFactory factory = new JniDBFactory();
     String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
     File p = new File(path);
-    if (!p.exists())
-      if (!p.mkdirs())
+    if (!p.exists()) {
+      if (!p.mkdirs()) {
         throw new IOException("Couldn't create directory for leveldb " +
             "timeline store " + path);
+      }
+    }
     LOG.info("Using leveldb path " + path);
     db = factory.open(new File(path, FILENAME), options);
     startTimeWriteCache =
@@ -123,17 +180,65 @@ public class LeveldbTimelineStore extends AbstractService
     startTimeReadCache =
         Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
             conf)));
+
+    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) {
+      deletionThread = new EntityDeletionThread(conf);
+      deletionThread.start();
+    }
+
     super.serviceInit(conf);
   }
 
   @Override
   protected void serviceStop() throws Exception {
+    if (deletionThread != null) {
+      deletionThread.interrupt();
+      LOG.info("Waiting for deletion thread to complete its current action");
+      try {
+        deletionThread.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for deletion thread to complete," +
+            " closing db now", e);
+      }
+    }
     IOUtils.cleanup(LOG, db);
     super.serviceStop();
   }
 
+  private class EntityDeletionThread extends Thread {
+    private final long ttl;
+    private final long ttlInterval;
+
+    public EntityDeletionThread(Configuration conf) {
+      ttl  = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+      ttlInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " +
+          "interval " + ttlInterval);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        long timestamp = System.currentTimeMillis() - ttl;
+        try {
+          discardOldEntities(timestamp);
+          Thread.sleep(ttlInterval);
+        } catch (IOException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.info("Deletion thread received interrupt, exiting");
+          break;
+        }
+      }
+    }
+  }
+
   private static class LockMap<K> {
     private static class CountingReentrantLock<K> extends ReentrantLock {
+      private static final long serialVersionUID = 1L;
       private int count;
       private K key;
 
@@ -201,8 +306,9 @@ public class LeveldbTimelineStore extends AbstractService
       b[index] = t;
       useSeparator[index] = sep;
       length += t.length;
-      if (sep)
+      if (sep) {
         length++;
+      }
       index++;
       return this;
     }
@@ -211,8 +317,9 @@ public class LeveldbTimelineStore extends AbstractService
       ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
       for (int i = 0; i < index; i++) {
         baos.write(b[i]);
-        if (i < index-1 && useSeparator[i])
+        if (i < index-1 && useSeparator[i]) {
           baos.write(0x0);
+        }
       }
       return baos.toByteArray();
     }
@@ -221,8 +328,9 @@ public class LeveldbTimelineStore extends AbstractService
       ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
       for (int i = 0; i < index; i++) {
         baos.write(b[i]);
-        if (useSeparator[i])
+        if (useSeparator[i]) {
           baos.write(0x0);
+        }
       }
       return baos.toByteArray();
     }
@@ -238,20 +346,23 @@ public class LeveldbTimelineStore extends AbstractService
     }
 
     public String getNextString() throws IOException {
-      if (offset >= b.length)
+      if (offset >= b.length) {
         throw new IOException(
             "tried to read nonexistent string from byte array");
+      }
       int i = 0;
-      while (offset+i < b.length && b[offset+i] != 0x0)
+      while (offset+i < b.length && b[offset+i] != 0x0) {
         i++;
+      }
       String s = new String(b, offset, i);
       offset = offset + i + 1;
       return s;
     }
 
     public long getNextLong() throws IOException {
-      if (offset+8 >= b.length)
+      if (offset+8 >= b.length) {
         throw new IOException("byte array ran out when trying to read long");
+      }
       long l = readReverseOrderedLong(b, offset);
       offset += 8;
       return l;
@@ -265,20 +376,21 @@ public class LeveldbTimelineStore extends AbstractService
   @Override
   public TimelineEntity getEntity(String entityId, String entityType,
       EnumSet<Field> fields) throws IOException {
+    Long revStartTime = getStartTimeLong(entityId, entityType);
+    if (revStartTime == null) {
+      return null;
+    }
+    byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(writeReverseOrderedLong(revStartTime))
+        .add(entityId).getBytesForLookup();
+
     DBIterator iterator = null;
     try {
-      byte[] revStartTime = getStartTime(entityId, entityType);
-      if (revStartTime == null)
-        return null;
-      byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
-          .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
-
       iterator = db.iterator();
       iterator.seek(prefix);
 
-      return getEntity(entityId, entityType,
-          readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix,
-          prefix.length);
+      return getEntity(entityId, entityType, revStartTime, fields, iterator,
+          prefix, prefix.length);
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -291,20 +403,18 @@ public class LeveldbTimelineStore extends AbstractService
   private static TimelineEntity getEntity(String entityId, String entityType,
       Long startTime, EnumSet<Field> fields, DBIterator iterator,
       byte[] prefix, int prefixlen) throws IOException {
-    if (fields == null)
+    if (fields == null) {
       fields = EnumSet.allOf(Field.class);
+    }
 
     TimelineEntity entity = new TimelineEntity();
     boolean events = false;
     boolean lastEvent = false;
     if (fields.contains(Field.EVENTS)) {
       events = true;
-      entity.setEvents(new ArrayList<TimelineEvent>());
     } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
       lastEvent = true;
-      entity.setEvents(new ArrayList<TimelineEvent>());
-    }
-    else {
+    } else {
       entity.setEvents(null);
     }
     boolean relatedEntities = false;
@@ -322,7 +432,6 @@ public class LeveldbTimelineStore extends AbstractService
     boolean otherInfo = false;
     if (fields.contains(Field.OTHER_INFO)) {
       otherInfo = true;
-      entity.setOtherInfo(new HashMap<String, Object>());
     } else {
       entity.setOtherInfo(null);
     }
@@ -331,12 +440,16 @@ public class LeveldbTimelineStore extends AbstractService
     // of a requested field
     for (; iterator.hasNext(); iterator.next()) {
       byte[] key = iterator.peekNext().getKey();
-      if (!prefixMatches(prefix, prefixlen, key))
+      if (!prefixMatches(prefix, prefixlen, key)) {
         break;
-      if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
+      }
+      if (key.length == prefixlen) {
+        continue;
+      }
+      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
         if (primaryFilters) {
           addPrimaryFilter(entity, key,
-              prefixlen + PRIMARY_FILTER_COLUMN.length);
+              prefixlen + PRIMARY_FILTERS_COLUMN.length);
         }
       } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
         if (otherInfo) {
@@ -344,22 +457,26 @@ public class LeveldbTimelineStore extends AbstractService
               prefixlen + OTHER_INFO_COLUMN.length),
               GenericObjectMapper.read(iterator.peekNext().getValue()));
         }
-      } else if (key[prefixlen] == RELATED_COLUMN[0]) {
+      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
         if (relatedEntities) {
           addRelatedEntity(entity, key,
-              prefixlen + RELATED_COLUMN.length);
+              prefixlen + RELATED_ENTITIES_COLUMN.length);
         }
-      } else if (key[prefixlen] == TIME_COLUMN[0]) {
-        if (events || (lastEvent && entity.getEvents().size() == 0)) {
+      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
+        if (events || (lastEvent &&
+            entity.getEvents().size() == 0)) {
           TimelineEvent event = getEntityEvent(null, key, prefixlen +
-              TIME_COLUMN.length, iterator.peekNext().getValue());
+              EVENTS_COLUMN.length, iterator.peekNext().getValue());
           if (event != null) {
             entity.addEvent(event);
           }
         }
       } else {
-        LOG.warn(String.format("Found unexpected column for entity %s of " +
-            "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+        if (key[prefixlen] !=
+            INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
+          LOG.warn(String.format("Found unexpected column for entity %s of " +
+              "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+        }
       }
     }
 
@@ -375,8 +492,9 @@ public class LeveldbTimelineStore extends AbstractService
       SortedSet<String> entityIds, Long limit, Long windowStart,
       Long windowEnd, Set<String> eventType) throws IOException {
     TimelineEvents events = new TimelineEvents();
-    if (entityIds == null || entityIds.isEmpty())
+    if (entityIds == null || entityIds.isEmpty()) {
       return events;
+    }
     // create a lexicographically-ordered map from start time to entities
     Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
         List<EntityIdentifier>>(new Comparator<byte[]>() {
@@ -390,15 +508,15 @@ public class LeveldbTimelineStore extends AbstractService
     try {
       // look up start times for the specified entities
       // skip entities with no start time
-      for (String entity : entityIds) {
-        byte[] startTime = getStartTime(entity, entityType);
+      for (String entityId : entityIds) {
+        byte[] startTime = getStartTime(entityId, entityType);
         if (startTime != null) {
           List<EntityIdentifier> entities = startTimeMap.get(startTime);
           if (entities == null) {
             entities = new ArrayList<EntityIdentifier>();
             startTimeMap.put(startTime, entities);
           }
-          entities.add(new EntityIdentifier(entity, entityType));
+          entities.add(new EntityIdentifier(entityId, entityType));
         }
       }
       for (Entry<byte[], List<EntityIdentifier>> entry :
@@ -407,14 +525,14 @@ public class LeveldbTimelineStore extends AbstractService
         // start time, end time, event types) for entities whose start times
         // were found and add the entities to the return list
         byte[] revStartTime = entry.getKey();
-        for (EntityIdentifier entityID : entry.getValue()) {
+        for (EntityIdentifier entityIdentifier : entry.getValue()) {
           EventsOfOneEntity entity = new EventsOfOneEntity();
-          entity.setEntityId(entityID.getId());
+          entity.setEntityId(entityIdentifier.getId());
           entity.setEntityType(entityType);
           events.addEvent(entity);
           KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
-              .add(entityType).add(revStartTime).add(entityID.getId())
-              .add(TIME_COLUMN);
+              .add(entityType).add(revStartTime).add(entityIdentifier.getId())
+              .add(EVENTS_COLUMN);
           byte[] prefix = kb.getBytesForLookup();
           if (windowEnd == null) {
             windowEnd = Long.MAX_VALUE;
@@ -436,12 +554,14 @@ public class LeveldbTimelineStore extends AbstractService
             byte[] key = iterator.peekNext().getKey();
             if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
                 WritableComparator.compareBytes(key, 0, key.length, last, 0,
-                    last.length) > 0))
+                    last.length) > 0)) {
               break;
+            }
             TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
                 iterator.peekNext().getValue());
-            if (event != null)
+            if (event != null) {
               entity.addEvent(event);
+            }
           }
         }
       }
@@ -456,8 +576,9 @@ public class LeveldbTimelineStore extends AbstractService
    */
   private static boolean prefixMatches(byte[] prefix, int prefixlen,
       byte[] b) {
-    if (b.length < prefixlen)
+    if (b.length < prefixlen) {
       return false;
+    }
     return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
         prefixlen) == 0;
   }
@@ -537,9 +658,10 @@ public class LeveldbTimelineStore extends AbstractService
         byte[] key = iterator.peekNext().getKey();
         if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
             WritableComparator.compareBytes(key, 0, key.length, last, 0,
-                last.length) > 0))
+                last.length) > 0)) {
           break;
-        // read the start time and entityId from the current key
+        }
+        // read the start time and entity id from the current key
         KeyParser kp = new KeyParser(key, prefix.length);
         Long startTime = kp.getNextLong();
         String entityId = kp.getNextString();
@@ -547,8 +669,6 @@ public class LeveldbTimelineStore extends AbstractService
         // the entity
         TimelineEntity entity = getEntity(entityId, entityType, startTime,
             fields, iterator, key, kp.getOffset());
-        if (entity == null)
-          continue;
         // determine if the retrieved entity matches the provided secondary
         // filters, and if so add it to the list of entities to return
         boolean filterPassed = true;
@@ -568,8 +688,9 @@ public class LeveldbTimelineStore extends AbstractService
             }
           }
         }
-        if (filterPassed)
+        if (filterPassed) {
           entities.addEntity(entity);
+        }
       }
       return entities;
     } finally {
@@ -578,8 +699,8 @@ public class LeveldbTimelineStore extends AbstractService
   }
 
   /**
-   * Put a single entity.  If there is an error, add a TimelinePutError to the given
-   * response.
+   * Put a single entity.  If there is an error, add a TimelinePutError to the
+   * given response.
    */
   private void put(TimelineEntity entity, TimelinePutResponse response) {
     LockMap.CountingReentrantLock<EntityIdentifier> lock =
@@ -587,13 +708,15 @@ public class LeveldbTimelineStore extends AbstractService
             entity.getEntityType()));
     lock.lock();
     WriteBatch writeBatch = null;
+    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
+        new ArrayList<EntityIdentifier>();
+    byte[] revStartTime = null;
     try {
       writeBatch = db.createWriteBatch();
       List<TimelineEvent> events = entity.getEvents();
       // look up the start time for the entity
-      byte[] revStartTime = getAndSetStartTime(entity.getEntityId(),
-          entity.getEntityType(), entity.getStartTime(), events,
-          writeBatch);
+      revStartTime = getAndSetStartTime(entity.getEntityId(),
+          entity.getEntityType(), entity.getStartTime(), events);
       if (revStartTime == null) {
         // if no start time is found, add an error and return
         TimelinePutError error = new TimelinePutError();
@@ -603,9 +726,12 @@ public class LeveldbTimelineStore extends AbstractService
         response.addError(error);
         return;
       }
-      Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
       Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
 
+      // write entity marker
+      writeBatch.put(createEntityMarkerKey(entity.getEntityId(),
+          entity.getEntityType(), revStartTime), EMPTY_BYTES);
+
       // write event entries
       if (events != null && !events.isEmpty()) {
         for (TimelineEvent event : events) {
@@ -627,25 +753,25 @@ public class LeveldbTimelineStore extends AbstractService
             relatedEntities.entrySet()) {
           String relatedEntityType = relatedEntityList.getKey();
           for (String relatedEntityId : relatedEntityList.getValue()) {
+            // invisible "reverse" entries (entity -> related entity)
+            byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
+                entity.getEntityType(), revStartTime, relatedEntityId,
+                relatedEntityType);
+            writeBatch.put(key, EMPTY_BYTES);
             // look up start time of related entity
-            byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId,
-                relatedEntityType, null, null, writeBatch);
+            byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+                relatedEntityType);
+            // delay writing the related entity if no start time is found
             if (relatedEntityStartTime == null) {
-              // if start time is not found, set start time of the related
-              // entity to the start time of this entity, and write it to the
-              // db and the cache
-              relatedEntityStartTime = revStartTime;
-              writeBatch.put(createStartTimeLookupKey(relatedEntityId,
-                  relatedEntityType), relatedEntityStartTime);
-              startTimeWriteCache.put(new EntityIdentifier(relatedEntityId,
-                  relatedEntityType), revStartTimeLong);
+              relatedEntitiesWithoutStartTimes.add(
+                  new EntityIdentifier(relatedEntityId, relatedEntityType));
+              continue;
             }
-            // write reverse entry (related entity -> entity)
-            byte[] key = createReleatedEntityKey(relatedEntityId,
+            // write "forward" entry (related entity -> entity)
+            key = createRelatedEntityKey(relatedEntityId,
                 relatedEntityType, relatedEntityStartTime,
                 entity.getEntityId(), entity.getEntityType());
             writeBatch.put(key, EMPTY_BYTES);
-            // TODO: write forward entry (entity -> related entity)?
           }
         }
       }
@@ -690,6 +816,36 @@ public class LeveldbTimelineStore extends AbstractService
       writeLocks.returnLock(lock);
       IOUtils.cleanup(LOG, writeBatch);
     }
+
+    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
+      lock = writeLocks.getLock(relatedEntity);
+      lock.lock();
+      try {
+        byte[] relatedEntityStartTime = getAndSetStartTime(
+            relatedEntity.getId(), relatedEntity.getType(),
+            readReverseOrderedLong(revStartTime, 0), null);
+        if (relatedEntityStartTime == null) {
+          throw new IOException("Error setting start time for related entity");
+        }
+        db.put(createRelatedEntityKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime,
+            entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
+        db.put(createEntityMarkerKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
+      } catch (IOException e) {
+        LOG.error("Error putting related entity " + relatedEntity.getId() +
+            " of type " + relatedEntity.getType() + " for entity " +
+            entity.getEntityId() + " of type " + entity.getEntityType(), e);
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+        response.addError(error);
+      } finally {
+        lock.unlock();
+        writeLocks.returnLock(lock);
+      }
+    }
   }
 
   /**
@@ -711,11 +867,16 @@ public class LeveldbTimelineStore extends AbstractService
 
   @Override
   public TimelinePutResponse put(TimelineEntities entities) {
-    TimelinePutResponse response = new TimelinePutResponse();
-    for (TimelineEntity entity : entities.getEntities()) {
-      put(entity, response);
+    try {
+      deleteLock.readLock().lock();
+      TimelinePutResponse response = new TimelinePutResponse();
+      for (TimelineEntity entity : entities.getEntities()) {
+        put(entity, response);
+      }
+      return response;
+    } finally {
+      deleteLock.readLock().unlock();
     }
-    return response;
   }
 
   /**
@@ -725,16 +886,30 @@ public class LeveldbTimelineStore extends AbstractService
    *
    * @param entityId The id of the entity
    * @param entityType The type of the entity
-   * @return A byte array
+   * @return A byte array, null if not found
    * @throws IOException
    */
   private byte[] getStartTime(String entityId, String entityType)
       throws IOException {
+    Long l = getStartTimeLong(entityId, entityType);
+    return l == null ? null : writeReverseOrderedLong(l);
+  }
+
+  /**
+   * Get the unique start time for a given entity as a Long.
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @return A Long, null if not found
+   * @throws IOException
+   */
+  private Long getStartTimeLong(String entityId, String entityType)
+      throws IOException {
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
     // start time is not provided, so try to look it up
     if (startTimeReadCache.containsKey(entity)) {
       // found the start time in the cache
-      return writeReverseOrderedLong(startTimeReadCache.get(entity));
+      return startTimeReadCache.get(entity);
     } else {
       // try to look up the start time in the db
       byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
@@ -744,8 +919,9 @@ public class LeveldbTimelineStore extends AbstractService
         return null;
       } else {
         // found the start time in the db
-        startTimeReadCache.put(entity, readReverseOrderedLong(v, 0));
-        return v;
+        Long l = readReverseOrderedLong(v, 0);
+        startTimeReadCache.put(entity, l);
+        return l;
       }
     }
   }
@@ -754,19 +930,18 @@ public class LeveldbTimelineStore extends AbstractService
    * Get the unique start time for a given entity as a byte array that sorts
    * the timestamps in reverse order (see {@link
    * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
-   * doesn't exist, set it based on the information provided.
+   * doesn't exist, set it based on the information provided. Should only be
+   * called when a lock has been obtained on the entity.
    *
    * @param entityId The id of the entity
    * @param entityType The type of the entity
    * @param startTime The start time of the entity, or null
    * @param events A list of events for the entity, or null
-   * @param writeBatch A leveldb write batch, if the method is called by a
-   *                   put as opposed to a get
    * @return A byte array
    * @throws IOException
    */
   private byte[] getAndSetStartTime(String entityId, String entityType,
-      Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
+      Long startTime, List<TimelineEvent> events)
       throws IOException {
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
     if (startTime == null) {
@@ -786,7 +961,7 @@ public class LeveldbTimelineStore extends AbstractService
           }
           startTime = min;
         }
-        return checkStartTimeInDb(entity, startTime, writeBatch);
+        return checkStartTimeInDb(entity, startTime);
       }
     } else {
       // start time is provided
@@ -801,7 +976,7 @@ public class LeveldbTimelineStore extends AbstractService
         return writeReverseOrderedLong(startTime);
       } else {
         // check the provided start time matches the db
-        return checkStartTimeInDb(entity, startTime, writeBatch);
+        return checkStartTimeInDb(entity, startTime);
       }
     }
   }
@@ -810,10 +985,11 @@ public class LeveldbTimelineStore extends AbstractService
    * Checks db for start time and returns it if it exists.  If it doesn't
    * exist, writes the suggested start time (if it is not null).  This is
    * only called when the start time is not found in the cache,
-   * so it adds it back into the cache if it is found.
+   * so it adds it back into the cache if it is found. Should only be called
+   * when a lock has been obtained on the entity.
    */
   private byte[] checkStartTimeInDb(EntityIdentifier entity,
-      Long suggestedStartTime, WriteBatch writeBatch) throws IOException {
+      Long suggestedStartTime) throws IOException {
     // create lookup key for start time
     byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
     // retrieve value for key
@@ -826,7 +1002,9 @@ public class LeveldbTimelineStore extends AbstractService
       }
       // write suggested start time
       revStartTime = writeReverseOrderedLong(suggestedStartTime);
-      writeBatch.put(b, revStartTime);
+      WriteOptions writeOptions = new WriteOptions();
+      writeOptions.sync(true);
+      db.put(b, revStartTime, writeOptions);
     } else {
       // found start time in db, so ignore suggested start time
       suggestedStartTime = readReverseOrderedLong(v, 0);
@@ -839,12 +1017,22 @@ public class LeveldbTimelineStore extends AbstractService
 
   /**
    * Creates a key for looking up the start time of a given entity,
-   * of the form START_TIME_LOOKUP_PREFIX + entitytype + entity.
+   * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id.
    */
-  private static byte[] createStartTimeLookupKey(String entity,
-      String entitytype) throws IOException {
+  private static byte[] createStartTimeLookupKey(String entityId,
+      String entityType) throws IOException {
     return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
-        .add(entitytype).add(entity).getBytes();
+        .add(entityType).add(entityId).getBytes();
+  }
+
+  /**
+   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id.
+   */
+  private static byte[] createEntityMarkerKey(String entityId,
+      String entityType, byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
   }
 
   /**
@@ -860,15 +1048,15 @@ public class LeveldbTimelineStore extends AbstractService
   }
 
   /**
-   * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype +
-   * revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype.
+   * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type.
    */
-  private static byte[] createEntityEventKey(String entity, String entitytype,
-      byte[] revStartTime, byte[] reveventtimestamp, String eventtype)
-      throws IOException {
+  private static byte[] createEntityEventKey(String entityId,
+      String entityType, byte[] revStartTime, byte[] revEventTimestamp,
+      String eventType) throws IOException {
     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
-        .add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN)
-        .add(reveventtimestamp).add(eventtype).getBytes();
+        .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN)
+        .add(revEventTimestamp).add(eventType).getBytes();
   }
 
   /**
@@ -876,8 +1064,8 @@ public class LeveldbTimelineStore extends AbstractService
    * event type is not contained in the specified set of event types,
    * returns null.
    */
-  private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key,
-      int offset, byte[] value) throws IOException {
+  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
+      byte[] key, int offset, byte[] value) throws IOException {
     KeyParser kp = new KeyParser(key, offset);
     long ts = kp.getNextLong();
     String tstype = kp.getNextString();
@@ -902,13 +1090,14 @@ public class LeveldbTimelineStore extends AbstractService
 
   /**
    * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
-   * entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name + value.
+   * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name +
+   * value.
    */
-  private static byte[] createPrimaryFilterKey(String entity,
-      String entitytype, byte[] revStartTime, String name, Object value)
+  private static byte[] createPrimaryFilterKey(String entityId,
+      String entityType, byte[] revStartTime, String name, Object value)
       throws IOException {
-    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
-        .add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name)
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name)
         .add(GenericObjectMapper.write(value)).getBytes();
   }
 
@@ -925,13 +1114,13 @@ public class LeveldbTimelineStore extends AbstractService
   }
 
   /**
-   * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype +
-   * revstarttime + entity + OTHER_INFO_COLUMN + name.
+   * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + OTHER_INFO_COLUMN + name.
    */
-  private static byte[] createOtherInfoKey(String entity, String entitytype,
+  private static byte[] createOtherInfoKey(String entityId, String entityType,
       byte[] revStartTime, String name) throws IOException {
-    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
-        .add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name)
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name)
         .getBytes();
   }
 
@@ -945,15 +1134,15 @@ public class LeveldbTimelineStore extends AbstractService
 
   /**
    * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
-   * entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype +
-   * relatedentity.
+   * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN +
+   * relatedentity type + relatedentity id.
    */
-  private static byte[] createReleatedEntityKey(String entity,
-      String entitytype, byte[] revStartTime, String relatedEntity,
+  private static byte[] createRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
       String relatedEntityType) throws IOException {
-    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
-        .add(revStartTime).add(entity).add(RELATED_COLUMN)
-        .add(relatedEntityType).add(relatedEntity).getBytes();
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN)
+        .add(relatedEntityType).add(relatedEntityId).getBytes();
   }
 
   /**
@@ -968,6 +1157,21 @@ public class LeveldbTimelineStore extends AbstractService
     entity.addRelatedEntity(type, id);
   }
 
+  /**
+   * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id +
+   * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN +
+   * relatedentity type + relatedentity id.
+   */
+  private static byte[] createReverseRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId)
+        .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN)
+        .add(relatedEntityType).add(relatedEntityId).getBytes();
+  }
+
   /**
    * Clears the cache to test reloading start times from leveldb (only for
    * testing).
@@ -982,13 +1186,237 @@ public class LeveldbTimelineStore extends AbstractService
   static int getStartTimeReadCacheSize(Configuration conf) {
     return conf.getInt(
         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
-        DEFAULT_START_TIME_READ_CACHE_SIZE);
+        YarnConfiguration.
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
   }
 
   @VisibleForTesting
   static int getStartTimeWriteCacheSize(Configuration conf) {
     return conf.getInt(
         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
-        DEFAULT_START_TIME_WRITE_CACHE_SIZE);
+        YarnConfiguration.
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+  }
+
+  // warning is suppressed to prevent eclipse from noting unclosed resource
+  @SuppressWarnings("resource")
+  @VisibleForTesting
+  List<String> getEntityTypes() throws IOException {
+    DBIterator iterator = null;
+    try {
+      iterator = getDbIterator(false);
+      List<String> entityTypes = new ArrayList<String>();
+      iterator.seek(ENTITY_ENTRY_PREFIX);
+      while (iterator.hasNext()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
+          break;
+        }
+        KeyParser kp = new KeyParser(key,
+            ENTITY_ENTRY_PREFIX.length);
+        String entityType = kp.getNextString();
+        entityTypes.add(entityType);
+        byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+            .add(entityType).getBytesForLookup();
+        if (lookupKey[lookupKey.length - 1] != 0x0) {
+          throw new IOException("Found unexpected end byte in lookup key");
+        }
+        lookupKey[lookupKey.length - 1] = 0x1;
+        iterator.seek(lookupKey);
+      }
+      return entityTypes;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Finds all keys in the db that have a given prefix and deletes them on
+   * the given write batch.
+   */
+  private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
+      DBIterator iterator) {
+    for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefix.length, key)) {
+        break;
+      }
+      writeBatch.delete(key);
+    }
+  }
+
+  // warning is suppressed to prevent eclipse from noting unclosed resource
+  @SuppressWarnings("resource")
+  @VisibleForTesting
+  boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
+      DBIterator iterator, DBIterator pfIterator, boolean seeked)
+      throws IOException {
+    WriteBatch writeBatch = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+          .add(entityType);
+      byte[] typePrefix = kb.getBytesForLookup();
+      kb.add(reverseTimestamp);
+      if (!seeked) {
+        iterator.seek(kb.getBytesForLookup());
+      }
+      if (!iterator.hasNext()) {
+        return false;
+      }
+      byte[] entityKey = iterator.peekNext().getKey();
+      if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) {
+        return false;
+      }
+
+      // read the start time and entity id from the current key
+      KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8);
+      String entityId = kp.getNextString();
+      int prefixlen = kp.getOffset();
+      byte[] deletePrefix = new byte[prefixlen];
+      System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen);
+
+      writeBatch = db.createWriteBatch();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting entity type:" + entityType + " id:" + entityId);
+      }
+      // remove start time from cache and db
+      writeBatch.delete(createStartTimeLookupKey(entityId, entityType));
+      EntityIdentifier entityIdentifier =
+          new EntityIdentifier(entityId, entityType);
+      startTimeReadCache.remove(entityIdentifier);
+      startTimeWriteCache.remove(entityIdentifier);
+
+      // delete current entity
+      for (; iterator.hasNext(); iterator.next()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(entityKey, prefixlen, key)) {
+          break;
+        }
+        writeBatch.delete(key);
+
+        if (key.length == prefixlen) {
+          continue;
+        }
+        if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+          kp = new KeyParser(key,
+              prefixlen + PRIMARY_FILTERS_COLUMN.length);
+          String name = kp.getNextString();
+          Object value = GenericObjectMapper.read(key, kp.getOffset());
+          deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value,
+              deletePrefix), pfIterator);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " primary filter entry " + name + " " +
+                value);
+          }
+        } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+          kp = new KeyParser(key,
+              prefixlen + RELATED_ENTITIES_COLUMN.length);
+          String type = kp.getNextString();
+          String id = kp.getNextString();
+          byte[] relatedEntityStartTime = getStartTime(id, type);
+          if (relatedEntityStartTime == null) {
+            LOG.warn("Found no start time for " +
+                "related entity " + id + " of type " + type + " while " +
+                "deleting " + entityId + " of type " + entityType);
+            continue;
+          }
+          writeBatch.delete(createReverseRelatedEntityKey(id, type,
+              relatedEntityStartTime, entityId, entityType));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " from invisible reverse related entity " +
+                "entry of type:" + type + " id:" + id);
+          }
+        } else if (key[prefixlen] ==
+            INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
+          kp = new KeyParser(key, prefixlen +
+              INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length);
+          String type = kp.getNextString();
+          String id = kp.getNextString();
+          byte[] relatedEntityStartTime = getStartTime(id, type);
+          if (relatedEntityStartTime == null) {
+            LOG.warn("Found no start time for reverse " +
+                "related entity " + id + " of type " + type + " while " +
+                "deleting " + entityId + " of type " + entityType);
+            continue;
+          }
+          writeBatch.delete(createRelatedEntityKey(id, type,
+              relatedEntityStartTime, entityId, entityType));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " from related entity entry of type:" +
+                type + " id:" + id);
+          }
+        }
+      }
+      WriteOptions writeOptions = new WriteOptions();
+      writeOptions.sync(true);
+      db.write(writeBatch, writeOptions);
+      return true;
+    } finally {
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+  }
+
+  /**
+   * Discards entities with start timestamp less than or equal to the given
+   * timestamp.
+   */
+  @VisibleForTesting
+  void discardOldEntities(long timestamp)
+      throws IOException, InterruptedException {
+    byte[] reverseTimestamp = writeReverseOrderedLong(timestamp);
+    long totalCount = 0;
+    long t1 = System.currentTimeMillis();
+    try {
+      List<String> entityTypes = getEntityTypes();
+      for (String entityType : entityTypes) {
+        DBIterator iterator = null;
+        DBIterator pfIterator = null;
+        long typeCount = 0;
+        try {
+          deleteLock.writeLock().lock();
+          iterator = getDbIterator(false);
+          pfIterator = getDbIterator(false);
+
+          if (deletionThread != null && deletionThread.isInterrupted()) {
+            throw new InterruptedException();
+          }
+          boolean seeked = false;
+          while (deleteNextEntity(entityType, reverseTimestamp, iterator,
+              pfIterator, seeked)) {
+            typeCount++;
+            totalCount++;
+            seeked = true;
+            if (deletionThread != null && deletionThread.isInterrupted()) {
+              throw new InterruptedException();
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("Got IOException while deleting entities for type " +
+              entityType + ", continuing to next type", e);
+        } finally {
+          IOUtils.cleanup(LOG, iterator, pfIterator);
+          deleteLock.writeLock().unlock();
+          if (typeCount > 0) {
+            LOG.info("Deleted " + typeCount + " entities of type " +
+                entityType);
+          }
+        }
+      }
+    } finally {
+      long t2 = System.currentTimeMillis();
+      LOG.info("Discarded " + totalCount + " entities for timestamp " +
+          timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
+    }
+  }
+
+  @VisibleForTesting
+  DBIterator getDbIterator(boolean fillCache) {
+    ReadOptions readOptions = new ReadOptions();
+    readOptions.fillCache(fillCache);
+    return db.iterator(readOptions);
   }
 }

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java

@@ -259,7 +259,9 @@ public class TimelineWebServices {
               + TimelineUtils.dumpTimelineRecordtoJSON(entity));
         }
       }
-      LOG.info("Storing entities: " + CSV_JOINER.join(entityIDs));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
+      }
       return store.put(entities);
     } catch (IOException e) {
       LOG.error("Error putting entities", e);

+ 128 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java

@@ -19,17 +19,29 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
+import org.iq80.leveldb.DBIterator;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
 import static org.junit.Assert.assertEquals;
 
 @InterfaceAudience.Private
@@ -48,6 +60,7 @@ public class TestLeveldbTimelineStore
     fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
     conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
         fsPath.getAbsolutePath());
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
     store = new LeveldbTimelineStore();
     store.init(conf);
     store.start();
@@ -105,4 +118,119 @@ public class TestLeveldbTimelineStore
     assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
   }
 
+  private boolean deleteNextEntity(String entityType, byte[] ts)
+      throws IOException, InterruptedException {
+    DBIterator iterator = null;
+    DBIterator pfIterator = null;
+    try {
+      iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
+      pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
+      return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
+          iterator, pfIterator, false);
+    } finally {
+      IOUtils.cleanup(null, iterator, pfIterator);
+    }
+  }
+
+  @Test
+  public void testGetEntityTypes() throws IOException {
+    List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
+    assertEquals(2, entityTypes.size());
+    assertEquals(entityType1, entityTypes.get(0));
+    assertEquals(entityType2, entityTypes.get(1));
+  }
+
+  @Test
+  public void testDeleteEntities() throws IOException, InterruptedException {
+    assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
+        null).getEntities().size());
+
+    assertEquals(false, deleteNextEntity(entityType1,
+        writeReverseOrderedLong(122l)));
+    assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
+        null).getEntities().size());
+
+    assertEquals(true, deleteNextEntity(entityType1,
+        writeReverseOrderedLong(123l)));
+    List<TimelineEntity> entities =
+        store.getEntities("type_2", null, null, null, null, null,
+            EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
+        entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
+        EMPTY_MAP, entities.get(0));
+    entities = store.getEntities("type_1", null, null, null, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+    assertEquals(1, store.getEntities("type_1", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(1, ((LeveldbTimelineStore)store).getEntityTypes().size());
+
+    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
+    assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
+        null, null).getEntities().size());
+  }
+
+  @Test
+  public void testDeleteEntitiesPrimaryFilters()
+      throws IOException, InterruptedException {
+    Map<String, Set<Object>> primaryFilter =
+        Collections.singletonMap("user", Collections.singleton(
+            (Object) "otheruser"));
+    TimelineEntities atsEntities = new TimelineEntities();
+    atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b,
+        entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter,
+        null)));
+    TimelinePutResponse response = store.put(atsEntities);
+    assertEquals(0, response.getErrors().size());
+
+    NameValuePair pfPair = new NameValuePair("user", "otheruser");
+    List<TimelineEntity> entities = store.getEntities("type_1", null, null,
+        null, pfPair, null, null).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
+        EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
+
+    entities = store.getEntities("type_1", null, null, null,
+        userFilter, null, null).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+    assertEquals(1, store.getEntities("type_1", null, null, null, pfPair, null,
+        null).getEntities().size());
+    assertEquals(2, store.getEntities("type_1", null, null, null, userFilter,
+        null, null).getEntities().size());
+
+    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
+        null).getEntities().size());
+    assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
+
+    assertEquals(0, store.getEntities("type_1", null, null, null, pfPair, null,
+        null).getEntities().size());
+    assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
+        null, null).getEntities().size());
+  }
+
 }