Browse Source

YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.

(cherry picked from commit daf3e4ef8bf73cbe4a799d51b4765809cd81089f)
Zhijie Shen 10 years ago
parent
commit
bb035ff087
12 changed files with 2907 additions and 66 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
  3. 46 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
  5. 420 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
  6. 1807 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
  7. 18 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
  8. 56 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
  9. 13 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
  10. 100 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java
  11. 427 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java
  12. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -60,6 +60,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2619. Added NodeManager support for disk io isolation through cgroups.
     (Varun Vasudev and Wei Yan via vinodkv)
 
+    YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
+    (Jonathan Eagles via zjshen)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java

@@ -129,6 +129,12 @@ public class TimelinePutResponse {
      */
     public static final int FORBIDDEN_RELATION = 6;
 
+    /**
+     * Error code returned if the entity start time is before the eviction
+     * period of old data.
+     */
+    public static final int EXPIRED_ENTITY = 7;
+
     private String entityId;
     private String entityType;
     private int errorCode;

+ 46 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1431,6 +1431,18 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
       1000 * 60 * 60 * 24 * 7;
 
+  /** Timeline service rolling period. Valid values are daily, half_daily,
+   * quarter_daily, and hourly. */
+  public static final String TIMELINE_SERVICE_ROLLING_PERIOD =
+      TIMELINE_SERVICE_PREFIX + "rolling-period";
+
+  /** Roll a new database each hour. */
+  public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD =
+      "hourly";
+
+  /** Implementation specific configuration prefix for Timeline Service
+   * leveldb.
+   */
   public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
       TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
 
@@ -1438,13 +1450,36 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_LEVELDB_PATH =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
 
-  /** Timeline service leveldb read cache (uncompressed blocks) */
+  /** Timeline service leveldb read cache (uncompressed blocks). This is
+   * per rolling instance so should be tuned if using rolling leveldb
+   * timeline store */
   public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
 
+  /** Default leveldb read cache size if no configuration is specified. */
   public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
       100 * 1024 * 1024;
 
+  /** Timeline service leveldb write buffer size. */
+  public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size";
+
+  /** Default leveldb write buffer size if no configuration is specified. This
+   * is per rolling instance so should be tuned if using rolling leveldb
+   * timeline store. */
+  public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+      16 * 1024 * 1024;
+
+  /** Timeline service leveldb write batch size. This value can be tuned down
+   * to reduce lock time for ttl eviction. */
+  public static final String
+      TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size";
+
+  /** Default leveldb write batch size is no configuration is specified */
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000;
+
   /** Timeline service leveldb start time read cache (number of entities) */
   public static final String
       TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
@@ -1468,6 +1503,16 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
       1000 * 60 * 5;
 
+  /** Timeline service leveldb number of concurrent open files. Tuned this
+   * configuration to stay within system limits. This is per rolling instance
+   * so should be tuned if using rolling leveldb timeline store. */
+  public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files";
+
+  /** Default leveldb max open files if no configuration is specified. */
+  public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+      1000;
+
   /** The Kerberos principal for the timeline server.*/
   public static final String TIMELINE_SERVICE_PRINCIPAL =
       TIMELINE_SERVICE_PREFIX + "principal";

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml

@@ -180,6 +180,11 @@
       <artifactId>bcprov-jdk16</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>de.ruedigermoeller</groupId>
+      <artifactId>fst</artifactId>
+      <version>2.24</version>
+    </dependency>
   </dependencies>
 
   <build>

+ 420 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java

@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Contains the logic to lookup a leveldb by timestamp so that multiple smaller
+ * databases can roll according to the configured period and evicted efficiently
+ * via operating system directory removal.
+ */
+class RollingLevelDB {
+
+  /** Logger for this class. */
+  private static final Log LOG = LogFactory.getLog(RollingLevelDB.class);
+  /** Factory to open and create new leveldb instances. */
+  private static JniDBFactory factory = new JniDBFactory();
+  /** Thread safe date formatter. */
+  private FastDateFormat fdf;
+  /** Date parser. */
+  private SimpleDateFormat sdf;
+  /** Calendar to calculate the current and next rolling period. */
+  private GregorianCalendar cal = new GregorianCalendar(
+      TimeZone.getTimeZone("GMT"));
+  /** Collection of all active rolling leveldb instances. */
+  private final TreeMap<Long, DB> rollingdbs;
+  /** Collection of all rolling leveldb instances to evict. */
+  private final TreeMap<Long, DB> rollingdbsToEvict;
+  /** Name of this rolling level db. */
+  private final String name;
+  /** Calculated timestamp of when to roll a new leveldb instance. */
+  private volatile long nextRollingCheckMillis = 0;
+  /** File system instance to find and create new leveldb instances. */
+  private FileSystem lfs = null;
+  /** Directory to store rolling leveldb instances. */
+  private Path rollingDBPath;
+  /** Configuration for this object. */
+  private Configuration conf;
+  /** Rolling period. */
+  private RollingPeriod rollingPeriod;
+  /**
+   * Rolling leveldb instances are evicted when their endtime is earlier than
+   * the current time minus the time to live value.
+   */
+  private long ttl;
+  /** Whether time to live is enabled. */
+  private boolean ttlEnabled;
+
+  /** Encapsulates the rolling period to date format lookup. */
+  enum RollingPeriod {
+    DAILY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd";
+      }
+    },
+    HALF_DAILY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH";
+      }
+    },
+    QUARTER_DAILY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH";
+      }
+    },
+    HOURLY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH";
+      }
+    },
+    MINUTELY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH-mm";
+      }
+    };
+    public abstract String dateFormat();
+  }
+
+  /**
+   * Convenience class for associating a write batch with its rolling leveldb
+   * instance.
+   */
+  public static class RollingWriteBatch {
+    /** Leveldb object. */
+    private final DB db;
+    /** Write batch for the db object. */
+    private final WriteBatch writeBatch;
+
+    public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
+      this.db = db;
+      this.writeBatch = writeBatch;
+    }
+
+    public DB getDB() {
+      return db;
+    }
+
+    public WriteBatch getWriteBatch() {
+      return writeBatch;
+    }
+
+    public void write() {
+      db.write(writeBatch);
+    }
+
+    public void close() {
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+  }
+
+  RollingLevelDB(String name) {
+    this.name = name;
+    this.rollingdbs = new TreeMap<Long, DB>();
+    this.rollingdbsToEvict = new TreeMap<Long, DB>();
+  }
+
+  protected String getName() {
+    return name;
+  }
+
+  protected long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+
+  public long getNextRollingTimeMillis() {
+    return nextRollingCheckMillis;
+  }
+
+  public long getTimeToLive() {
+    return ttl;
+  }
+
+  public boolean getTimeToLiveEnabled() {
+    return ttlEnabled;
+  }
+
+  protected void setNextRollingTimeMillis(final long timestamp) {
+    this.nextRollingCheckMillis = timestamp;
+    LOG.info("Next rolling time for " + getName() + " is "
+        + fdf.format(nextRollingCheckMillis));
+  }
+
+  public void init(final Configuration config) throws Exception {
+    LOG.info("Initializing RollingLevelDB for " + getName());
+    this.conf = config;
+    this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+    this.ttlEnabled = conf.getBoolean(
+        YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
+    this.rollingDBPath = new Path(
+        conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
+        RollingLevelDBTimelineStore.FILENAME);
+    initFileSystem();
+    initRollingPeriod();
+    initHistoricalDBs();
+  }
+
+  protected void initFileSystem() throws IOException {
+    lfs = FileSystem.getLocal(conf);
+    boolean success = lfs.mkdirs(rollingDBPath,
+        RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK);
+    if (!success) {
+      throw new IOException("Failed to create leveldb root directory "
+          + rollingDBPath);
+    }
+  }
+
+  protected synchronized void initRollingPeriod() {
+    final String lcRollingPeriod = conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD);
+    this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
+        .toUpperCase(Locale.ENGLISH));
+    fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
+        TimeZone.getTimeZone("GMT"));
+    sdf = new SimpleDateFormat(rollingPeriod.dateFormat());
+    sdf.setTimeZone(fdf.getTimeZone());
+  }
+
+  protected synchronized void initHistoricalDBs() throws IOException {
+    Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
+    FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
+    for (FileStatus status : statuses) {
+      String dbName = FilenameUtils.getExtension(status.getPath().toString());
+      try {
+        Long dbStartTime = sdf.parse(dbName).getTime();
+        initRollingLevelDB(dbStartTime, status.getPath());
+      } catch (ParseException pe) {
+        LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
+            + getName());
+      }
+    }
+  }
+
+  private void initRollingLevelDB(Long dbStartTime,
+      Path rollingInstanceDBPath) {
+    if (rollingdbs.containsKey(dbStartTime)) {
+      return;
+    }
+    Options options = new Options();
+    options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    options.maxOpenFiles(conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+    options.writeBufferSize(conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+    LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
+        + " for start time: " + dbStartTime);
+    DB db = null;
+    try {
+      db = factory.open(
+          new File(rollingInstanceDBPath.toUri().getPath()), options);
+      rollingdbs.put(dbStartTime, db);
+      String dbName = fdf.format(dbStartTime);
+      LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
+    } catch (IOException ioe) {
+      LOG.warn("Failed to open rolling leveldb instance :"
+          + new File(rollingInstanceDBPath.toUri().getPath()), ioe);
+    }
+  }
+
+  synchronized DB getPreviousDB(DB db) {
+    Iterator<DB> iterator = rollingdbs.values().iterator();
+    DB prev = null;
+    while (iterator.hasNext()) {
+      DB cur = iterator.next();
+      if (cur == db) {
+        break;
+      }
+      prev = cur;
+    }
+    return prev;
+  }
+
+  synchronized long getStartTimeFor(DB db) {
+    long startTime = -1;
+    for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) {
+      if (entry.getValue() == db) {
+        startTime = entry.getKey();
+      }
+    }
+    return startTime;
+  }
+
+  public synchronized DB getDBForStartTime(long startTime) {
+    // make sure we sanitize this input
+    startTime = Math.min(startTime, currentTimeMillis());
+
+    if (startTime >= getNextRollingTimeMillis()) {
+      roll(startTime);
+    }
+    Entry<Long, DB> entry = rollingdbs.floorEntry(startTime);
+    if (entry == null) {
+      return null;
+    }
+    return entry.getValue();
+  }
+
+  private void roll(long startTime) {
+    LOG.info("Rolling new DB instance for " + getName());
+    long currentStartTime = computeCurrentCheckMillis(startTime);
+    setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime));
+    String currentRollingDBInstance = fdf.format(currentStartTime);
+    String currentRollingDBName = getName() + "." + currentRollingDBInstance;
+    Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
+    if (getTimeToLiveEnabled()) {
+      scheduleOldDBsForEviction();
+    }
+    initRollingLevelDB(currentStartTime, currentRollingDBPath);
+  }
+
+  private synchronized void scheduleOldDBsForEviction() {
+    // keep at least time to live amount of data
+    long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
+        - getTimeToLive());
+
+    LOG.info("Scheduling " + getName() + " DBs older than "
+        + fdf.format(evictionThreshold) + " for eviction");
+    Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Long, DB> entry = iterator.next();
+      // parse this in gmt time
+      if (entry.getKey() < evictionThreshold) {
+        LOG.info("Scheduling " + getName() + " eviction for "
+            + fdf.format(entry.getKey()));
+        iterator.remove();
+        rollingdbsToEvict.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  public synchronized void evictOldDBs() {
+    LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
+    Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet()
+        .iterator();
+    while (iterator.hasNext()) {
+      Entry<Long, DB> entry = iterator.next();
+      IOUtils.cleanup(LOG, entry.getValue());
+      String dbName = fdf.format(entry.getKey());
+      Path path = new Path(rollingDBPath, getName() + "." + dbName);
+      try {
+        LOG.info("Removing old db directory contents in " + path);
+        lfs.delete(path, true);
+      } catch (IOException ioe) {
+        LOG.warn("Failed to evict old db " + path, ioe);
+      }
+      iterator.remove();
+    }
+  }
+
+  public void stop() throws Exception {
+    for (DB db : rollingdbs.values()) {
+      IOUtils.cleanup(LOG, db);
+    }
+    IOUtils.cleanup(LOG, lfs);
+  }
+
+  private long computeNextCheckMillis(long now) {
+    return computeCheckMillis(now, true);
+  }
+
+  public long computeCurrentCheckMillis(long now) {
+    return computeCheckMillis(now, false);
+  }
+
+  private synchronized long computeCheckMillis(long now, boolean next) {
+    // needs to be called synchronously due to shared Calendar
+    cal.setTimeInMillis(now);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+
+    if (rollingPeriod == RollingPeriod.DAILY) {
+      cal.set(Calendar.HOUR_OF_DAY, 0);
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.DATE, 1);
+      }
+    } else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
+      // round down to 12 hour interval
+      int hour = (cal.get(Calendar.HOUR) / 12) * 12;
+      cal.set(Calendar.HOUR, hour);
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.HOUR_OF_DAY, 12);
+      }
+    } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
+      // round down to 6 hour interval
+      int hour = (cal.get(Calendar.HOUR) / 6) * 6;
+      cal.set(Calendar.HOUR, hour);
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.HOUR_OF_DAY, 6);
+      }
+    } else if (rollingPeriod == RollingPeriod.HOURLY) {
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.HOUR_OF_DAY, 1);
+      }
+    } else if (rollingPeriod == RollingPeriod.MINUTELY) {
+      // round down to 5 minute interval
+      int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
+      cal.set(Calendar.MINUTE, minute);
+      if (next) {
+        cal.add(Calendar.MINUTE, 5);
+      }
+    }
+    return cal.getTimeInMillis();
+  }
+}

+ 1807 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java

@@ -0,0 +1,1807 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
+
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.ReadOptions;
+import org.iq80.leveldb.WriteBatch;
+import org.nustaq.serialization.FSTConfiguration;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
+import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS;
+
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+/**
+ * <p>
+ * An implementation of an application timeline store backed by leveldb.
+ * </p>
+ *
+ * <p>
+ * There are three sections of the db, the start time section, the entity
+ * section, and the indexed entity section.
+ * </p>
+ *
+ * <p>
+ * The start time section is used to retrieve the unique start time for a given
+ * entity. Its values each contain a start time while its keys are of the form:
+ * </p>
+ *
+ * <pre>
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id
+ * </pre>
+ *
+ * <p>
+ * The entity section is ordered by entity type, then entity start time
+ * descending, then entity ID. There are four sub-sections of the entity
+ * section: events, primary filters, related entities, and other info. The event
+ * entries have event info serialized into their values. The other info entries
+ * have values corresponding to the values of the other info name/value map for
+ * the entry (note the names are contained in the key). All other entries have
+ * empty values. The key structure is as follows:
+ * </p>
+ *
+ * <pre>
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     DOMAIN_ID_COLUMN
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ * </pre>
+ *
+ * <p>
+ * The indexed entity section contains a primary filter name and primary filter
+ * value as the prefix. Within a given name/value, entire entity entries are
+ * stored in the same format as described in the entity section above (below,
+ * "key" represents any one of the possible entity entry keys described above).
+ * </p>
+ *
+ * <pre>
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RollingLevelDBTimelineStore extends AbstractService implements
+    TimelineStore {
+  private static final Log LOG = LogFactory
+      .getLog(RollingLevelDBTimelineStore.class);
+  private static FSTConfiguration fstConf =
+      FSTConfiguration.createDefaultConfiguration();
+
+  static {
+    fstConf.setShareReferences(false);
+  }
+
+  @Private
+  @VisibleForTesting
+  static final String FILENAME = "leveldb-timeline-store";
+  static final String DOMAIN = "domain-ldb";
+  static final String ENTITY = "entity-ldb";
+  static final String INDEX = "indexes-ldb";
+  static final String STARTTIME = "starttime-ldb";
+  static final String OWNER = "owner-ldb";
+
+  private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
+  private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
+  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
+  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8);
+  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8);
+
+  private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8);
+  private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8);
+  private static final byte[] READER_COLUMN = "r".getBytes(UTF_8);
+  private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8);
+  private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8);
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  private static final String TIMELINE_STORE_VERSION_KEY =
+      "timeline-store-version";
+
+  private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);
+
+  private static long writeBatchSize = 10000;
+
+  @Private
+  @VisibleForTesting
+  static final FsPermission LEVELDB_DIR_UMASK = FsPermission
+      .createImmutable((short) 0700);
+
+  private Map<EntityIdentifier, Long> startTimeWriteCache;
+  private Map<EntityIdentifier, Long> startTimeReadCache;
+
+  private DB domaindb;
+  private RollingLevelDB entitydb;
+  private RollingLevelDB indexdb;
+  private DB starttimedb;
+  private DB ownerdb;
+
+  private Thread deletionThread;
+
+  public RollingLevelDBTimelineStore() {
+    super(RollingLevelDBTimelineStore.class.getName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void serviceInit(Configuration conf) throws Exception {
+    Preconditions
+        .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS,
+            DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
+            "%s property value should be greater than zero",
+            TIMELINE_SERVICE_TTL_MS);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
+        "%s property value should be greater than or equal to zero",
+        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
+        " %s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE);
+
+    Options options = new Options();
+    options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    JniDBFactory factory = new JniDBFactory();
+    Path dbPath = new Path(
+        conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
+    Path domainDBPath = new Path(dbPath, DOMAIN);
+    Path starttimeDBPath = new Path(dbPath, STARTTIME);
+    Path ownerDBPath = new Path(dbPath, OWNER);
+    FileSystem localFS = null;
+    try {
+      localFS = FileSystem.getLocal(conf);
+      if (!localFS.exists(dbPath)) {
+        if (!localFS.mkdirs(dbPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + dbPath);
+        }
+        localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
+      }
+      if (!localFS.exists(domainDBPath)) {
+        if (!localFS.mkdirs(domainDBPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + domainDBPath);
+        }
+        localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK);
+      }
+      if (!localFS.exists(starttimeDBPath)) {
+        if (!localFS.mkdirs(starttimeDBPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + starttimeDBPath);
+        }
+        localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK);
+      }
+      if (!localFS.exists(ownerDBPath)) {
+        if (!localFS.mkdirs(ownerDBPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + ownerDBPath);
+        }
+        localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
+      }
+    } finally {
+      IOUtils.cleanup(LOG, localFS);
+    }
+    options.maxOpenFiles(conf.getInt(
+        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+    options.writeBufferSize(conf.getInt(
+        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+    LOG.info("Using leveldb path " + dbPath);
+    domaindb = factory.open(new File(domainDBPath.toString()), options);
+    entitydb = new RollingLevelDB(ENTITY);
+    entitydb.init(conf);
+    indexdb = new RollingLevelDB(INDEX);
+    indexdb.init(conf);
+    starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
+    ownerdb = factory.open(new File(ownerDBPath.toString()), options);
+    checkVersion();
+    startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
+        getStartTimeWriteCacheSize(conf)));
+    startTimeReadCache = Collections.synchronizedMap(new LRUMap(
+        getStartTimeReadCacheSize(conf)));
+
+    writeBatchSize = conf.getInt(
+        TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE);
+
+    super.serviceInit(conf);
+  }
+  
+  @Override
+  protected void serviceStart() throws Exception {
+    if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
+      deletionThread = new EntityDeletionThread(getConfig());
+      deletionThread.start();
+    }
+    super.serviceStart();
+   }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (deletionThread != null) {
+      deletionThread.interrupt();
+      LOG.info("Waiting for deletion thread to complete its current action");
+      try {
+        deletionThread.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for deletion thread to complete,"
+            + " closing db now", e);
+      }
+    }
+    IOUtils.cleanup(LOG, domaindb);
+    IOUtils.cleanup(LOG, starttimedb);
+    IOUtils.cleanup(LOG, ownerdb);
+    entitydb.stop();
+    indexdb.stop();
+    super.serviceStop();
+  }
+
+  private class EntityDeletionThread extends Thread {
+    private final long ttl;
+    private final long ttlInterval;
+
+    public EntityDeletionThread(Configuration conf) {
+      ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
+          DEFAULT_TIMELINE_SERVICE_TTL_MS);
+      ttlInterval = conf.getLong(
+          TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+          DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle "
+          + "interval " + ttlInterval);
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("Leveldb Timeline Store Retention");
+      while (true) {
+        long timestamp = System.currentTimeMillis() - ttl;
+        try {
+          discardOldEntities(timestamp);
+          Thread.sleep(ttlInterval);
+        } catch (IOException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.info("Deletion thread received interrupt, exiting");
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fields) throws IOException {
+    Long revStartTime = getStartTimeLong(entityId, entityType);
+    if (revStartTime == null) {
+      return null;
+    }
+    byte[] prefix = KeyBuilder.newInstance().add(entityType)
+        .add(writeReverseOrderedLong(revStartTime)).add(entityId)
+        .getBytesForLookup();
+
+    DBIterator iterator = null;
+    try {
+      DB db = entitydb.getDBForStartTime(revStartTime);
+      if (db == null) {
+        return null;
+      }
+      iterator = db.iterator();
+      iterator.seek(prefix);
+
+      return getEntity(entityId, entityType, revStartTime, fields, iterator,
+          prefix, prefix.length);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Read entity from a db iterator. If no information is found in the specified
+   * fields for this entity, return null.
+   */
+  private static TimelineEntity getEntity(String entityId, String entityType,
+      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      byte[] prefix, int prefixlen) throws IOException {
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+
+    TimelineEntity entity = new TimelineEntity();
+    boolean events = false;
+    boolean lastEvent = false;
+    if (fields.contains(Field.EVENTS)) {
+      events = true;
+    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+      lastEvent = true;
+    } else {
+      entity.setEvents(null);
+    }
+    boolean relatedEntities = false;
+    if (fields.contains(Field.RELATED_ENTITIES)) {
+      relatedEntities = true;
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    boolean primaryFilters = false;
+    if (fields.contains(Field.PRIMARY_FILTERS)) {
+      primaryFilters = true;
+    } else {
+      entity.setPrimaryFilters(null);
+    }
+    boolean otherInfo = false;
+    if (fields.contains(Field.OTHER_INFO)) {
+      otherInfo = true;
+    } else {
+      entity.setOtherInfo(null);
+    }
+
+    // iterate through the entity's entry, parsing information if it is part
+    // of a requested field
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefixlen, key)) {
+        break;
+      }
+      if (key.length == prefixlen) {
+        continue;
+      }
+      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+        if (primaryFilters) {
+          addPrimaryFilter(entity, key, prefixlen
+              + PRIMARY_FILTERS_COLUMN.length);
+        }
+      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+        if (otherInfo) {
+          entity.addOtherInfo(
+              parseRemainingKey(key, prefixlen + OTHER_INFO_COLUMN.length),
+              fstConf.asObject(iterator.peekNext().getValue()));
+        }
+      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+        if (relatedEntities) {
+          addRelatedEntity(entity, key, prefixlen
+              + RELATED_ENTITIES_COLUMN.length);
+        }
+      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
+        if (events || (lastEvent && entity.getEvents().size() == 0)) {
+          TimelineEvent event = getEntityEvent(null, key, prefixlen
+              + EVENTS_COLUMN.length, iterator.peekNext().getValue());
+          if (event != null) {
+            entity.addEvent(event);
+          }
+        }
+      } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
+        byte[] v = iterator.peekNext().getValue();
+        String domainId = new String(v, UTF_8);
+        entity.setDomainId(domainId);
+      } else {
+        LOG.warn(String.format("Found unexpected column for entity %s of "
+            + "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+      }
+    }
+
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+
+    return entity;
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventType) throws IOException {
+    TimelineEvents events = new TimelineEvents();
+    if (entityIds == null || entityIds.isEmpty()) {
+      return events;
+    }
+    // create a lexicographically-ordered map from start time to entities
+    Map<byte[], List<EntityIdentifier>> startTimeMap =
+        new TreeMap<byte[], List<EntityIdentifier>>(
+        new Comparator<byte[]>() {
+          @Override
+          public int compare(byte[] o1, byte[] o2) {
+            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+                o2.length);
+          }
+        });
+    DBIterator iterator = null;
+    try {
+      // look up start times for the specified entities
+      // skip entities with no start time
+      for (String entityId : entityIds) {
+        byte[] startTime = getStartTime(entityId, entityType);
+        if (startTime != null) {
+          List<EntityIdentifier> entities = startTimeMap.get(startTime);
+          if (entities == null) {
+            entities = new ArrayList<EntityIdentifier>();
+            startTimeMap.put(startTime, entities);
+          }
+          entities.add(new EntityIdentifier(entityId, entityType));
+        }
+      }
+      for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
+          .entrySet()) {
+        // look up the events matching the given parameters (limit,
+        // start time, end time, event types) for entities whose start times
+        // were found and add the entities to the return list
+        byte[] revStartTime = entry.getKey();
+        for (EntityIdentifier entityIdentifier : entry.getValue()) {
+          EventsOfOneEntity entity = new EventsOfOneEntity();
+          entity.setEntityId(entityIdentifier.getId());
+          entity.setEntityType(entityType);
+          events.addEvent(entity);
+          KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
+              .add(revStartTime).add(entityIdentifier.getId())
+              .add(EVENTS_COLUMN);
+          byte[] prefix = kb.getBytesForLookup();
+          if (windowEnd == null) {
+            windowEnd = Long.MAX_VALUE;
+          }
+          byte[] revts = writeReverseOrderedLong(windowEnd);
+          kb.add(revts);
+          byte[] first = kb.getBytesForLookup();
+          byte[] last = null;
+          if (windowStart != null) {
+            last = KeyBuilder.newInstance().add(prefix)
+                .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+          }
+          if (limit == null) {
+            limit = DEFAULT_LIMIT;
+          }
+          DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
+              revStartTime, 0));
+          if (db == null) {
+            continue;
+          }
+          iterator = db.iterator();
+          for (iterator.seek(first); entity.getEvents().size() < limit
+              && iterator.hasNext(); iterator.next()) {
+            byte[] key = iterator.peekNext().getKey();
+            if (!prefixMatches(prefix, prefix.length, key)
+                || (last != null && WritableComparator.compareBytes(key, 0,
+                    key.length, last, 0, last.length) > 0)) {
+              break;
+            }
+            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+                iterator.peekNext().getValue());
+            if (event != null) {
+              entity.addEvent(event);
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return events;
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
+    if (primaryFilter == null) {
+      // if no primary filter is specified, prefix the lookup with
+      // ENTITY_ENTRY_PREFIX
+      return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart,
+          windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false);
+    } else {
+      // if a primary filter is specified, prefix the lookup with
+      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+      // ENTITY_ENTRY_PREFIX
+      byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName())
+          .add(fstConf.asByteArray(primaryFilter.getValue()), true)
+          .getBytesForLookup();
+      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+          fromId, fromTs, secondaryFilters, fields, checkAcl, true);
+    }
+  }
+
+  /**
+   * Retrieves a list of entities satisfying given parameters.
+   *
+   * @param base
+   *          A byte array prefix for the lookup
+   * @param entityType
+   *          The type of the entity
+   * @param limit
+   *          A limit on the number of entities to return
+   * @param starttime
+   *          The earliest entity start time to retrieve (exclusive)
+   * @param endtime
+   *          The latest entity start time to retrieve (inclusive)
+   * @param fromId
+   *          Retrieve entities starting with this entity
+   * @param fromTs
+   *          Ignore entities with insert timestamp later than this ts
+   * @param secondaryFilters
+   *          Filter pairs that the entities should match
+   * @param fields
+   *          The set of fields to retrieve
+   * @param usingPrimaryFilter
+   *          true if this query is using a primary filter
+   * @return A list of entities
+   * @throws IOException
+   */
+  private TimelineEntities getEntityByTime(byte[] base, String entityType,
+      Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
+      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
+      CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
+    DBIterator iterator = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+      // only db keys matching the prefix (base + entity type) will be parsed
+      byte[] prefix = kb.getBytesForLookup();
+      if (endtime == null) {
+        // if end time is null, place no restriction on end time
+        endtime = Long.MAX_VALUE;
+      }
+
+      // Sanitize the fields parameter
+      if (fields == null) {
+        fields = EnumSet.allOf(Field.class);
+      }
+
+      // construct a first key that will be seeked to using end time or fromId
+      long firstStartTime = Long.MAX_VALUE;
+      byte[] first = null;
+      if (fromId != null) {
+        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+        if (fromIdStartTime == null) {
+          // no start time for provided id, so return empty entities
+          return new TimelineEntities();
+        }
+        if (fromIdStartTime <= endtime) {
+          // if provided id's start time falls before the end of the window,
+          // use it to construct the seek key
+          firstStartTime = fromIdStartTime;
+          first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
+              .getBytesForLookup();
+        }
+      }
+      // if seek key wasn't constructed using fromId, construct it using end ts
+      if (first == null) {
+        firstStartTime = endtime;
+        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+      }
+      byte[] last = null;
+      if (starttime != null) {
+        // if start time is not null, set a last key that will not be
+        // iterated past
+        last = KeyBuilder.newInstance().add(base).add(entityType)
+            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+      }
+      if (limit == null) {
+        // if limit is not specified, use the default
+        limit = DEFAULT_LIMIT;
+      }
+
+      TimelineEntities entities = new TimelineEntities();
+      RollingLevelDB rollingdb = null;
+      if (usingPrimaryFilter) {
+        rollingdb = indexdb;
+      } else {
+        rollingdb = entitydb;
+      }
+
+      DB db = rollingdb.getDBForStartTime(firstStartTime);
+      while (entities.getEntities().size() < limit && db != null) {
+        iterator = db.iterator();
+        iterator.seek(first);
+
+        // iterate until one of the following conditions is met: limit is
+        // reached, there are no more keys, the key prefix no longer matches,
+        // or a start time has been specified and reached/exceeded
+        while (entities.getEntities().size() < limit && iterator.hasNext()) {
+          byte[] key = iterator.peekNext().getKey();
+          if (!prefixMatches(prefix, prefix.length, key)
+              || (last != null && WritableComparator.compareBytes(key, 0,
+                  key.length, last, 0, last.length) > 0)) {
+            break;
+          }
+          // read the start time and entity id from the current key
+          KeyParser kp = new KeyParser(key, prefix.length);
+          Long startTime = kp.getNextLong();
+          String entityId = kp.getNextString();
+
+          if (fromTs != null) {
+            long insertTime = readReverseOrderedLong(iterator.peekNext()
+                .getValue(), 0);
+            if (insertTime > fromTs) {
+              byte[] firstKey = key;
+              while (iterator.hasNext()) {
+                key = iterator.peekNext().getKey();
+                iterator.next();
+                if (!prefixMatches(firstKey, kp.getOffset(), key)) {
+                  break;
+                }
+              }
+              continue;
+            }
+          }
+          // Even if other info and primary filter fields are not included, we
+          // still need to load them to match secondary filters when they are
+          // non-empty
+          EnumSet<Field> queryFields = EnumSet.copyOf(fields);
+          boolean addPrimaryFilters = false;
+          boolean addOtherInfo = false;
+          if (secondaryFilters != null && secondaryFilters.size() > 0) {
+            if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
+              queryFields.add(Field.PRIMARY_FILTERS);
+              addPrimaryFilters = true;
+            }
+            if (!queryFields.contains(Field.OTHER_INFO)) {
+              queryFields.add(Field.OTHER_INFO);
+              addOtherInfo = true;
+            }
+          }
+
+          // parse the entity that owns this key, iterating over all keys for
+          // the entity
+          TimelineEntity entity = null;
+          if (usingPrimaryFilter) {
+            entity = getEntity(entityId, entityType, queryFields);
+            iterator.next();
+          } else {
+            entity = getEntity(entityId, entityType, startTime, queryFields,
+                iterator, key, kp.getOffset());
+          }
+          // determine if the retrieved entity matches the provided secondary
+          // filters, and if so add it to the list of entities to return
+          boolean filterPassed = true;
+          if (secondaryFilters != null) {
+            for (NameValuePair filter : secondaryFilters) {
+              Object v = entity.getOtherInfo().get(filter.getName());
+              if (v == null) {
+                Set<Object> vs = entity.getPrimaryFilters()
+                    .get(filter.getName());
+                if (vs == null || !vs.contains(filter.getValue())) {
+                  filterPassed = false;
+                  break;
+                }
+              } else if (!v.equals(filter.getValue())) {
+                filterPassed = false;
+                break;
+              }
+            }
+          }
+          if (filterPassed) {
+            if (entity.getDomainId() == null) {
+              entity.setDomainId(DEFAULT_DOMAIN_ID);
+            }
+            if (checkAcl == null || checkAcl.check(entity)) {
+              // Remove primary filter and other info if they are added for
+              // matching secondary filters
+              if (addPrimaryFilters) {
+                entity.setPrimaryFilters(null);
+              }
+              if (addOtherInfo) {
+                entity.setOtherInfo(null);
+              }
+              entities.addEntity(entity);
+            }
+          }
+        }
+        db = rollingdb.getPreviousDB(db);
+      }
+      return entities;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Put a single entity. If there is an error, add a TimelinePutError to the
+   * given response.
+   *
+   * @param entityUpdates
+   *          a map containing all the scheduled writes for this put to the
+   *          entity db
+   * @param indexUpdates
+   *          a map containing all the scheduled writes for this put to the
+   *          index db
+   */
+  private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates,
+      TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity,
+      TimelinePutResponse response) {
+
+    long putCount = 0;
+    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
+        new ArrayList<EntityIdentifier>();
+    byte[] revStartTime = null;
+    Map<String, Set<Object>> primaryFilters = null;
+    try {
+      List<TimelineEvent> events = entity.getEvents();
+      // look up the start time for the entity
+      Long startTime = getAndSetStartTime(entity.getEntityId(),
+          entity.getEntityType(), entity.getStartTime(), events);
+      if (startTime == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_START_TIME);
+        response.addError(error);
+        return putCount;
+      }
+
+      // Must have a domain
+      if (StringUtils.isEmpty(entity.getDomainId())) {
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_DOMAIN);
+        response.addError(error);
+        return putCount;
+      }
+
+      revStartTime = writeReverseOrderedLong(startTime);
+      long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime);
+      RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime);
+      if (rollingWriteBatch == null) {
+        DB db = entitydb.getDBForStartTime(startTime);
+        if (db != null) {
+          WriteBatch writeBatch = db.createWriteBatch();
+          rollingWriteBatch = new RollingWriteBatch(db, writeBatch);
+          entityUpdates.put(roundedStartTime, rollingWriteBatch);
+        }
+      }
+      if (rollingWriteBatch == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+        response.addError(error);
+        return putCount;
+      }
+      WriteBatch writeBatch = rollingWriteBatch.getWriteBatch();
+
+      // Save off the getBytes conversion to avoid unnecessary cost
+      byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8);
+      byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8);
+      byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8);
+
+      // write entity marker
+      byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true)
+          .add(revStartTime).add(entityIdBytes, true).getBytesForLookup();
+      writeBatch.put(markerKey, EMPTY_BYTES);
+      ++putCount;
+
+      // write domain id entry
+      byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true)
+          .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN)
+          .getBytes();
+      writeBatch.put(domainkey, domainIdBytes);
+      ++putCount;
+
+      // write event entries
+      if (events != null) {
+        for (TimelineEvent event : events) {
+          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
+          byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true)
+              .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN)
+              .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes();
+          byte[] value = fstConf.asByteArray(event.getEventInfo());
+          writeBatch.put(key, value);
+          ++putCount;
+        }
+      }
+
+      // write primary filter entries
+      primaryFilters = entity.getPrimaryFilters();
+      if (primaryFilters != null) {
+        for (Entry<String, Set<Object>> primaryFilter : primaryFilters
+            .entrySet()) {
+          for (Object primaryFilterValue : primaryFilter.getValue()) {
+            byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true)
+                .add(revStartTime).add(entityIdBytes, true)
+                .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey())
+                .add(fstConf.asByteArray(primaryFilterValue)).getBytes();
+            writeBatch.put(key, EMPTY_BYTES);
+            ++putCount;
+          }
+        }
+      }
+
+      // write other info entries
+      Map<String, Object> otherInfo = entity.getOtherInfo();
+      if (otherInfo != null) {
+        for (Entry<String, Object> info : otherInfo.entrySet()) {
+          byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true)
+              .add(revStartTime).add(entityIdBytes, true)
+              .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes();
+          byte[] value = fstConf.asByteArray(info.getValue());
+          writeBatch.put(key, value);
+          ++putCount;
+        }
+      }
+
+      // write related entity entries
+      Map<String, Set<String>> relatedEntities = entity.getRelatedEntities();
+      if (relatedEntities != null) {
+        for (Entry<String, Set<String>> relatedEntityList : relatedEntities
+            .entrySet()) {
+          String relatedEntityType = relatedEntityList.getKey();
+          for (String relatedEntityId : relatedEntityList.getValue()) {
+            // look up start time of related entity
+            Long relatedStartTimeLong = getStartTimeLong(relatedEntityId,
+                relatedEntityType);
+            // delay writing the related entity if no start time is found
+            if (relatedStartTimeLong == null) {
+              relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(
+                  relatedEntityId, relatedEntityType));
+              continue;
+            }
+
+            byte[] relatedEntityStartTime =
+                writeReverseOrderedLong(relatedStartTimeLong);
+            long relatedRoundedStartTime = entitydb
+                .computeCurrentCheckMillis(relatedStartTimeLong);
+            RollingWriteBatch relatedRollingWriteBatch = entityUpdates
+                .get(relatedRoundedStartTime);
+            if (relatedRollingWriteBatch == null) {
+              DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
+              if (db != null) {
+                WriteBatch relatedWriteBatch = db.createWriteBatch();
+                relatedRollingWriteBatch = new RollingWriteBatch(db,
+                    relatedWriteBatch);
+                entityUpdates.put(relatedRoundedStartTime,
+                    relatedRollingWriteBatch);
+              }
+            }
+            if (relatedRollingWriteBatch == null) {
+              // if no start time is found, add an error and return
+              TimelinePutError error = new TimelinePutError();
+              error.setEntityId(entity.getEntityId());
+              error.setEntityType(entity.getEntityType());
+              error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+              response.addError(error);
+              continue;
+            }
+            // This is the existing entity
+            byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get(
+                createDomainIdKey(relatedEntityId, relatedEntityType,
+                    relatedEntityStartTime));
+            // The timeline data created by the server before 2.6 won't have
+            // the domain field. We assume this timeline data is in the
+            // default timeline domain.
+            String domainId = null;
+            if (relatedDomainIdBytes == null) {
+              domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
+            } else {
+              domainId = new String(relatedDomainIdBytes, UTF_8);
+            }
+            if (!domainId.equals(entity.getDomainId())) {
+              // in this case the entity will be put, but the relation will be
+              // ignored
+              TimelinePutError error = new TimelinePutError();
+              error.setEntityId(entity.getEntityId());
+              error.setEntityType(entity.getEntityType());
+              error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
+              response.addError(error);
+              continue;
+            }
+            // write "forward" entry (related entity -> entity)
+            byte[] key = createRelatedEntityKey(relatedEntityId,
+                relatedEntityType, relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType());
+            WriteBatch relatedWriteBatch = relatedRollingWriteBatch
+                .getWriteBatch();
+            relatedWriteBatch.put(key, EMPTY_BYTES);
+            ++putCount;
+          }
+        }
+      }
+
+      // write index entities
+      RollingWriteBatch indexRollingWriteBatch = indexUpdates
+          .get(roundedStartTime);
+      if (indexRollingWriteBatch == null) {
+        DB db = indexdb.getDBForStartTime(startTime);
+        if (db != null) {
+          WriteBatch indexWriteBatch = db.createWriteBatch();
+          indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch);
+          indexUpdates.put(roundedStartTime, indexRollingWriteBatch);
+        }
+      }
+      if (indexRollingWriteBatch == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+        response.addError(error);
+        return putCount;
+      }
+      WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch();
+      putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters,
+          markerKey, EMPTY_BYTES);
+    } catch (IOException e) {
+      LOG.error("Error putting entity " + entity.getEntityId() + " of type "
+          + entity.getEntityType(), e);
+      TimelinePutError error = new TimelinePutError();
+      error.setEntityId(entity.getEntityId());
+      error.setEntityType(entity.getEntityType());
+      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+      response.addError(error);
+    }
+
+    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
+      try {
+        Long relatedEntityStartAndInsertTime = getAndSetStartTime(
+            relatedEntity.getId(), relatedEntity.getType(),
+            readReverseOrderedLong(revStartTime, 0), null);
+        if (relatedEntityStartAndInsertTime == null) {
+          throw new IOException("Error setting start time for related entity");
+        }
+        long relatedStartTimeLong = relatedEntityStartAndInsertTime;
+        long relatedRoundedStartTime = entitydb
+            .computeCurrentCheckMillis(relatedStartTimeLong);
+        RollingWriteBatch relatedRollingWriteBatch = entityUpdates
+            .get(relatedRoundedStartTime);
+        if (relatedRollingWriteBatch == null) {
+          DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
+          if (db != null) {
+            WriteBatch relatedWriteBatch = db.createWriteBatch();
+            relatedRollingWriteBatch = new RollingWriteBatch(db,
+                relatedWriteBatch);
+            entityUpdates
+                .put(relatedRoundedStartTime, relatedRollingWriteBatch);
+          }
+        }
+        if (relatedRollingWriteBatch == null) {
+          // if no start time is found, add an error and return
+          TimelinePutError error = new TimelinePutError();
+          error.setEntityId(entity.getEntityId());
+          error.setEntityType(entity.getEntityType());
+          error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+          response.addError(error);
+          continue;
+        }
+        WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch();
+        byte[] relatedEntityStartTime =
+            writeReverseOrderedLong(relatedEntityStartAndInsertTime);
+        // This is the new entity, the domain should be the same
+        byte[] key = createDomainIdKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime);
+        relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8));
+        ++putCount;
+        relatedWriteBatch.put(
+            createRelatedEntityKey(relatedEntity.getId(),
+                relatedEntity.getType(), relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
+        ++putCount;
+        relatedWriteBatch.put(
+            createEntityMarkerKey(relatedEntity.getId(),
+                relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
+        ++putCount;
+      } catch (IOException e) {
+        LOG.error(
+            "Error putting related entity " + relatedEntity.getId()
+                + " of type " + relatedEntity.getType() + " for entity "
+                + entity.getEntityId() + " of type " + entity.getEntityType(),
+            e);
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+        response.addError(error);
+      }
+    }
+
+    return putCount;
+  }
+
+  /**
+   * For a given key / value pair that has been written to the db, write
+   * additional entries to the db for each primary filter.
+   */
+  private static long writePrimaryFilterEntries(WriteBatch writeBatch,
+      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
+      throws IOException {
+    long putCount = 0;
+    if (primaryFilters != null) {
+      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
+        for (Object pfval : pf.getValue()) {
+          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value);
+          ++putCount;
+        }
+      }
+    }
+    return putCount;
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities entities) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Starting put");
+    }
+    TimelinePutResponse response = new TimelinePutResponse();
+    TreeMap<Long, RollingWriteBatch> entityUpdates =
+        new TreeMap<Long, RollingWriteBatch>();
+    TreeMap<Long, RollingWriteBatch> indexUpdates =
+        new TreeMap<Long, RollingWriteBatch>();
+
+    long entityCount = 0;
+    long indexCount = 0;
+
+    try {
+
+      for (TimelineEntity entity : entities.getEntities()) {
+        entityCount += putEntities(entityUpdates, indexUpdates, entity,
+            response);
+      }
+
+      for (RollingWriteBatch entityUpdate : entityUpdates.values()) {
+        entityUpdate.write();
+      }
+
+      for (RollingWriteBatch indexUpdate : indexUpdates.values()) {
+        indexUpdate.write();
+      }
+
+    } finally {
+
+      for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) {
+        entityRollingWriteBatch.close();
+      }
+      for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) {
+        indexRollingWriteBatch.close();
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Put " + entityCount + " new leveldb entity entries and "
+          + indexCount + " new leveldb index entries from "
+          + entities.getEntities().size() + " timeline entities");
+    }
+    return response;
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts the
+   * timestamps in reverse order (see
+   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}).
+   *
+   * @param entityId
+   *          The id of the entity
+   * @param entityType
+   *          The type of the entity
+   * @return A byte array, null if not found
+   * @throws IOException
+   */
+  private byte[] getStartTime(String entityId, String entityType)
+      throws IOException {
+    Long l = getStartTimeLong(entityId, entityType);
+    return l == null ? null : writeReverseOrderedLong(l);
+  }
+
+  /**
+   * Get the unique start time for a given entity as a Long.
+   *
+   * @param entityId
+   *          The id of the entity
+   * @param entityType
+   *          The type of the entity
+   * @return A Long, null if not found
+   * @throws IOException
+   */
+  private Long getStartTimeLong(String entityId, String entityType)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    // start time is not provided, so try to look it up
+    if (startTimeReadCache.containsKey(entity)) {
+      // found the start time in the cache
+      return startTimeReadCache.get(entity);
+    } else {
+      // try to look up the start time in the db
+      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+      byte[] v = starttimedb.get(b);
+      if (v == null) {
+        // did not find the start time in the db
+        return null;
+      } else {
+        // found the start time in the db
+        Long l = readReverseOrderedLong(v, 0);
+        startTimeReadCache.put(entity, l);
+        return l;
+      }
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts the
+   * timestamps in reverse order (see
+   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
+   * time doesn't exist, set it based on the information provided. Should only
+   * be called when a lock has been obtained on the entity.
+   *
+   * @param entityId
+   *          The id of the entity
+   * @param entityType
+   *          The type of the entity
+   * @param startTime
+   *          The start time of the entity, or null
+   * @param events
+   *          A list of events for the entity, or null
+   * @return A StartAndInsertTime
+   * @throws IOException
+   */
+  private Long getAndSetStartTime(String entityId, String entityType,
+      Long startTime, List<TimelineEvent> events) throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    Long time = startTimeWriteCache.get(entity);
+    if (time != null) {
+      // return the value in the cache
+      return time;
+    }
+    if (startTime == null && events != null) {
+      // calculate best guess start time based on lowest event time
+      startTime = Long.MAX_VALUE;
+      for (TimelineEvent e : events) {
+        if (e.getTimestamp() < startTime) {
+          startTime = e.getTimestamp();
+        }
+      }
+    }
+    // check the provided start time matches the db
+    return checkStartTimeInDb(entity, startTime);
+  }
+
+  /**
+   * Checks db for start time and returns it if it exists. If it doesn't exist,
+   * writes the suggested start time (if it is not null). This is only called
+   * when the start time is not found in the cache, so it adds it back into the
+   * cache if it is found. Should only be called when a lock has been obtained
+   * on the entity.
+   */
+  private Long checkStartTimeInDb(EntityIdentifier entity,
+      Long suggestedStartTime) throws IOException {
+    Long startAndInsertTime = null;
+    // create lookup key for start time
+    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+    // retrieve value for key
+    byte[] v = starttimedb.get(b);
+    if (v == null) {
+      // start time doesn't exist in db
+      if (suggestedStartTime == null) {
+        return null;
+      }
+      startAndInsertTime = suggestedStartTime;
+
+      // write suggested start time
+      starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
+    } else {
+      // found start time in db, so ignore suggested start time
+      startAndInsertTime = readReverseOrderedLong(v, 0);
+    }
+    startTimeWriteCache.put(entity, startAndInsertTime);
+    startTimeReadCache.put(entity, startAndInsertTime);
+    return startAndInsertTime;
+  }
+
+  /**
+   * Creates a key for looking up the start time of a given entity, of the form
+   * START_TIME_LOOKUP_PREFIX + entity type + entity id.
+   */
+  private static byte[] createStartTimeLookupKey(String entityId,
+      String entityType) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes();
+  }
+
+  /**
+   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id.
+   */
+  private static byte[] createEntityMarkerKey(String entityId,
+      String entityType, byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
+        .add(entityId).getBytesForLookup();
+  }
+
+  /**
+   * Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX +
+   * primaryfiltername + primaryfiltervalue + key.
+   */
+  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
+      Object primaryFilterValue, byte[] key) throws IOException {
+    return KeyBuilder.newInstance().add(primaryFilterName)
+        .add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes();
+  }
+
+  /**
+   * Creates an event object from the given key, offset, and value. If the event
+   * type is not contained in the specified set of event types, returns null.
+   */
+  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
+      byte[] key, int offset, byte[] value) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    long ts = kp.getNextLong();
+    String tstype = kp.getNextString();
+    if (eventTypes == null || eventTypes.contains(tstype)) {
+      TimelineEvent event = new TimelineEvent();
+      event.setTimestamp(ts);
+      event.setEventType(tstype);
+      Object o = fstConf.asObject(value);
+      if (o == null) {
+        event.setEventInfo(null);
+      } else if (o instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> m = (Map<String, Object>) o;
+        event.setEventInfo(m);
+      } else {
+        throw new IOException("Couldn't deserialize event info map");
+      }
+      return event;
+    }
+    return null;
+  }
+
+  /**
+   * Parses the primary filter from the given key at the given offset and adds
+   * it to the given entity.
+   */
+  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String name = kp.getNextString();
+    byte[] bytes = kp.getRemainingBytes();
+    Object value = fstConf.asObject(bytes);
+    entity.addPrimaryFilter(name, value);
+  }
+
+  /**
+   * Creates a string representation of the byte array from the given offset to
+   * the end of the array (for parsing other info keys).
+   */
+  private static String parseRemainingKey(byte[] b, int offset) {
+    return new String(b, offset, b.length - offset, UTF_8);
+  }
+
+  /**
+   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type
+   * + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type +
+   * relatedentity id.
+   */
+  private static byte[] createRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
+        .add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType)
+        .add(relatedEntityId).getBytes();
+  }
+
+  /**
+   * Parses the related entity from the given key at the given offset and adds
+   * it to the given entity.
+   */
+  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String type = kp.getNextString();
+    String id = kp.getNextString();
+    entity.addRelatedEntity(type, id);
+  }
+
+  /**
+   * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + DOMAIN_ID_COLUMN.
+   */
+  private static byte[] createDomainIdKey(String entityId, String entityType,
+      byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
+        .add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
+  }
+
+  /**
+   * Clears the cache to test reloading start times from leveldb (only for
+   * testing).
+   */
+  @VisibleForTesting
+  void clearStartTimeCache() {
+    startTimeWriteCache.clear();
+    startTimeReadCache.clear();
+  }
+
+  @VisibleForTesting
+  static int getStartTimeReadCacheSize(Configuration conf) {
+    return conf
+        .getInt(
+            TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  static int getStartTimeWriteCacheSize(Configuration conf) {
+    return conf
+        .getInt(
+            TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  long evictOldStartTimes(long minStartTime) throws IOException {
+    LOG.info("Searching for start times to evict earlier than " + minStartTime);
+
+    long batchSize = 0;
+    long totalCount = 0;
+    long startTimesCount = 0;
+
+    WriteBatch writeBatch = null;
+    DBIterator iterator = null;
+
+    try {
+      writeBatch = starttimedb.createWriteBatch();
+      ReadOptions readOptions = new ReadOptions();
+      readOptions.fillCache(false);
+      iterator = starttimedb.iterator(readOptions);
+      // seek to the first start time entry
+      iterator.seekToFirst();
+
+      // evaluate each start time entry to see if it needs to be evicted or not
+      while (iterator.hasNext()) {
+        Map.Entry<byte[], byte[]> current = iterator.next();
+        byte[] entityKey = current.getKey();
+        byte[] entityValue = current.getValue();
+        long startTime = readReverseOrderedLong(entityValue, 0);
+        if (startTime < minStartTime) {
+          ++batchSize;
+          ++startTimesCount;
+          writeBatch.delete(entityKey);
+
+          // a large delete will hold the lock for too long
+          if (batchSize >= writeBatchSize) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Preparing to delete a batch of " + batchSize
+                  + " old start times");
+            }
+            starttimedb.write(writeBatch);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Deleted batch of " + batchSize
+                  + ". Total start times deleted so far this cycle: "
+                  + startTimesCount);
+            }
+            IOUtils.cleanup(LOG, writeBatch);
+            writeBatch = starttimedb.createWriteBatch();
+            batchSize = 0;
+          }
+        }
+        ++totalCount;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Preparing to delete a batch of " + batchSize
+            + " old start times");
+      }
+      starttimedb.write(writeBatch);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleted batch of " + batchSize
+            + ". Total start times deleted so far this cycle: "
+            + startTimesCount);
+      }
+      LOG.info("Deleted " + startTimesCount + "/" + totalCount
+          + " start time entities earlier than " + minStartTime);
+    } finally {
+      IOUtils.cleanup(LOG, writeBatch);
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return startTimesCount;
+  }
+
+  /**
+   * Discards entities with start timestamp less than or equal to the given
+   * timestamp.
+   */
+  @VisibleForTesting
+  void discardOldEntities(long timestamp) throws IOException,
+      InterruptedException {
+    long totalCount = 0;
+    long t1 = System.currentTimeMillis();
+    try {
+      totalCount += evictOldStartTimes(timestamp);
+      indexdb.evictOldDBs();
+      entitydb.evictOldDBs();
+    } finally {
+      long t2 = System.currentTimeMillis();
+      LOG.info("Discarded " + totalCount + " entities for timestamp "
+          + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
+    }
+  }
+
+  Version loadVersion() throws IOException {
+    byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return Version.newInstance(1, 0);
+    }
+    Version version = new VersionPBImpl(VersionProto.parseFrom(data));
+    return version;
+  }
+
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(Version state) throws IOException {
+    dbStoreVersion(state);
+  }
+
+  private void dbStoreVersion(Version state) throws IOException {
+    String key = TIMELINE_STORE_VERSION_KEY;
+    byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
+    try {
+      starttimedb.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  /**
+   * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25,
+   * 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any
+   * compatible change of TS-store is a minor upgrade. 3) Within a minor
+   * upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate
+   * user to use a separate upgrade tool to upgrade timeline store or remove
+   * incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded timeline store version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing timeline store version info " + getCurrentVersion());
+      dbStoreVersion(CURRENT_VERSION_INFO);
+    } else {
+      String incompatibleMessage = "Incompatible version for timeline store: "
+          + "expecting version " + getCurrentVersion()
+          + ", but loading version " + loadedVersion;
+      LOG.fatal(incompatibleMessage);
+      throw new IOException(incompatibleMessage);
+    }
+  }
+
+  // TODO: make data retention work with the domain data as well
+  @Override
+  public void put(TimelineDomain domain) throws IOException {
+    WriteBatch domainWriteBatch = null;
+    WriteBatch ownerWriteBatch = null;
+    try {
+      domainWriteBatch = domaindb.createWriteBatch();
+      ownerWriteBatch = ownerdb.createWriteBatch();
+      if (domain.getId() == null || domain.getId().length() == 0) {
+        throw new IllegalArgumentException("Domain doesn't have an ID");
+      }
+      if (domain.getOwner() == null || domain.getOwner().length() == 0) {
+        throw new IllegalArgumentException("Domain doesn't have an owner.");
+      }
+
+      // Write description
+      byte[] domainEntryKey = createDomainEntryKey(domain.getId(),
+          DESCRIPTION_COLUMN);
+      byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), DESCRIPTION_COLUMN);
+      if (domain.getDescription() != null) {
+        domainWriteBatch.put(domainEntryKey,
+            domain.getDescription().getBytes(UTF_8));
+        ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription()
+            .getBytes(UTF_8));
+      } else {
+        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
+        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
+      }
+
+      // Write owner
+      domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), OWNER_COLUMN);
+      // Null check for owner is done before
+      domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8));
+      ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner()
+          .getBytes(UTF_8));
+
+      // Write readers
+      domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), READER_COLUMN);
+      if (domain.getReaders() != null && domain.getReaders().length() > 0) {
+        domainWriteBatch.put(domainEntryKey, domain.getReaders()
+            .getBytes(UTF_8));
+        ownerWriteBatch.put(ownerLookupEntryKey,
+            domain.getReaders().getBytes(UTF_8));
+      } else {
+        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
+        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
+      }
+
+      // Write writers
+      domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), WRITER_COLUMN);
+      if (domain.getWriters() != null && domain.getWriters().length() > 0) {
+        domainWriteBatch.put(domainEntryKey, domain.getWriters()
+            .getBytes(UTF_8));
+        ownerWriteBatch.put(ownerLookupEntryKey,
+            domain.getWriters().getBytes(UTF_8));
+      } else {
+        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
+        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
+      }
+
+      // Write creation time and modification time
+      // We put both timestamps together because they are always retrieved
+      // together, and store them in the same way as we did for the entity's
+      // start time and insert time.
+      domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), TIMESTAMP_COLUMN);
+      long currentTimestamp = System.currentTimeMillis();
+      byte[] timestamps = domaindb.get(domainEntryKey);
+      if (timestamps == null) {
+        timestamps = new byte[16];
+        writeReverseOrderedLong(currentTimestamp, timestamps, 0);
+        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
+      } else {
+        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
+      }
+      domainWriteBatch.put(domainEntryKey, timestamps);
+      ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
+      domaindb.write(domainWriteBatch);
+      ownerdb.write(ownerWriteBatch);
+    } finally {
+      IOUtils.cleanup(LOG, domainWriteBatch);
+      IOUtils.cleanup(LOG, ownerWriteBatch);
+    }
+  }
+
+  /**
+   * Creates a domain entity key with column name suffix, of the form
+   * DOMAIN_ENTRY_PREFIX + domain id + column name.
+   */
+  private static byte[] createDomainEntryKey(String domainId, byte[] columnName)
+      throws IOException {
+    return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes();
+  }
+
+  /**
+   * Creates an owner lookup key with column name suffix, of the form
+   * OWNER_LOOKUP_PREFIX + owner + domain id + column name.
+   */
+  private static byte[] createOwnerLookupKey(String owner, String domainId,
+      byte[] columnName) throws IOException {
+    return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName)
+        .getBytes();
+  }
+
+  @Override
+  public TimelineDomain getDomain(String domainId) throws IOException {
+    DBIterator iterator = null;
+    try {
+      byte[] prefix = KeyBuilder.newInstance().add(domainId)
+          .getBytesForLookup();
+      iterator = domaindb.iterator();
+      iterator.seek(prefix);
+      return getTimelineDomain(iterator, domainId, prefix);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  @Override
+  public TimelineDomains getDomains(String owner) throws IOException {
+    DBIterator iterator = null;
+    try {
+      byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
+      List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
+      for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator
+          .hasNext();) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(prefix, prefix.length, key)) {
+          break;
+        }
+        // Iterator to parse the rows of an individual domain
+        KeyParser kp = new KeyParser(key, prefix.length);
+        String domainId = kp.getNextString();
+        byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId)
+            .getBytesForLookup();
+        TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId,
+            prefixExt);
+        if (domainToReturn != null) {
+          domains.add(domainToReturn);
+        }
+      }
+      // Sort the domains to return
+      Collections.sort(domains, new Comparator<TimelineDomain>() {
+        @Override
+        public int compare(TimelineDomain domain1, TimelineDomain domain2) {
+          int result = domain2.getCreatedTime().compareTo(
+              domain1.getCreatedTime());
+          if (result == 0) {
+            return domain2.getModifiedTime().compareTo(
+                domain1.getModifiedTime());
+          } else {
+            return result;
+          }
+        }
+      });
+      TimelineDomains domainsToReturn = new TimelineDomains();
+      domainsToReturn.addDomains(domains);
+      return domainsToReturn;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  private static TimelineDomain getTimelineDomain(DBIterator iterator,
+      String domainId, byte[] prefix) throws IOException {
+    // Iterate over all the rows whose key starts with prefix to retrieve the
+    // domain information.
+    TimelineDomain domain = new TimelineDomain();
+    domain.setId(domainId);
+    boolean noRows = true;
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefix.length, key)) {
+        break;
+      }
+      if (noRows) {
+        noRows = false;
+      }
+      byte[] value = iterator.peekNext().getValue();
+      if (value != null && value.length > 0) {
+        if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
+          domain.setDescription(new String(value, UTF_8));
+        } else if (key[prefix.length] == OWNER_COLUMN[0]) {
+          domain.setOwner(new String(value, UTF_8));
+        } else if (key[prefix.length] == READER_COLUMN[0]) {
+          domain.setReaders(new String(value, UTF_8));
+        } else if (key[prefix.length] == WRITER_COLUMN[0]) {
+          domain.setWriters(new String(value, UTF_8));
+        } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
+          domain.setCreatedTime(readReverseOrderedLong(value, 0));
+          domain.setModifiedTime(readReverseOrderedLong(value, 8));
+        } else {
+          LOG.error("Unrecognized domain column: " + key[prefix.length]);
+        }
+      }
+    }
+    if (noRows) {
+      return null;
+    } else {
+      return domain;
+    }
+  }
+}

+ 18 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java

@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.timeline;
 
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -51,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
  * The class wrap over the timeline store and the ACLs manager. It does some non
  * trivial manipulation of the timeline data before putting or after getting it
  * from the timeline store, and checks the user's access to it.
- * 
+ *
  */
 public class TimelineDataManager extends AbstractService {
 
@@ -119,7 +116,7 @@ public class TimelineDataManager extends AbstractService {
    * Get the timeline entities that the given user have access to. The meaning
    * of each argument has been documented with
    * {@link TimelineReader#getEntities}.
-   * 
+   *
    * @see TimelineReader#getEntities
    */
   public TimelineEntities getEntities(
@@ -156,7 +153,7 @@ public class TimelineDataManager extends AbstractService {
    * Get the single timeline entity that the given user has access to. The
    * meaning of each argument has been documented with
    * {@link TimelineReader#getEntity}.
-   * 
+   *
    * @see TimelineReader#getEntity
    */
   public TimelineEntity getEntity(
@@ -182,7 +179,7 @@ public class TimelineDataManager extends AbstractService {
    * Get the events whose entities the given user has access to. The meaning of
    * each argument has been documented with
    * {@link TimelineReader#getEntityTimelines}.
-   * 
+   *
    * @see TimelineReader#getEntityTimelines
    */
   public TimelineEvents getEvents(
@@ -218,7 +215,7 @@ public class TimelineDataManager extends AbstractService {
             eventsItr.remove();
           }
         } catch (Exception e) {
-          LOG.error("Error when verifying access for user " + callerUGI
+          LOG.warn("Error when verifying access for user " + callerUGI
               + " on the events of the timeline entity "
               + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
                   eventsOfOneEntity.getEntityType()), e);
@@ -242,13 +239,10 @@ public class TimelineDataManager extends AbstractService {
     if (entities == null) {
       return new TimelinePutResponse();
     }
-    List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
     TimelineEntities entitiesToPut = new TimelineEntities();
     List<TimelinePutResponse.TimelinePutError> errors =
         new ArrayList<TimelinePutResponse.TimelinePutError>();
     for (TimelineEntity entity : entities.getEntities()) {
-      EntityIdentifier entityID =
-          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
 
       // if the domain id is not specified, the entity will be put into
       // the default domain
@@ -261,44 +255,42 @@ public class TimelineDataManager extends AbstractService {
       TimelineEntity existingEntity = null;
       try {
         existingEntity =
-            store.getEntity(entityID.getId(), entityID.getType(),
+            store.getEntity(entity.getEntityId(), entity.getEntityType(),
                 EnumSet.of(Field.PRIMARY_FILTERS));
         if (existingEntity != null) {
           addDefaultDomainIdIfAbsent(existingEntity);
           if (!existingEntity.getDomainId().equals(entity.getDomainId())) {
             throw new YarnException("The domain of the timeline entity "
-              + entityID + " is not allowed to be changed.");
+              + "{ id: " + entity.getEntityId() + ", type: "
+              + entity.getEntityType() + " } is not allowed to be changed from "
+              + existingEntity.getDomainId() + " to " + entity.getDomainId());
           }
         }
         if (!timelineACLsManager.checkAccess(
             callerUGI, ApplicationAccessType.MODIFY_APP, entity)) {
           throw new YarnException(callerUGI
-              + " is not allowed to put the timeline entity " + entityID
-              + " into the domain " + entity.getDomainId() + ".");
+              + " is not allowed to put the timeline entity "
+              + "{ id: " + entity.getEntityId() + ", type: "
+              + entity.getEntityType() + " } into the domain "
+              + entity.getDomainId() + ".");
         }
       } catch (Exception e) {
         // Skip the entity which already exists and was put by others
-        LOG.error("Skip the timeline entity: " + entityID, e);
+        LOG.warn("Skip the timeline entity: { id: " + entity.getEntityId()
+            + ", type: "+ entity.getEntityType() + " }", e);
         TimelinePutResponse.TimelinePutError error =
             new TimelinePutResponse.TimelinePutError();
-        error.setEntityId(entityID.getId());
-        error.setEntityType(entityID.getType());
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
         error.setErrorCode(
             TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
         errors.add(error);
         continue;
       }
 
-      entityIDs.add(entityID);
       entitiesToPut.addEntity(entity);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
-            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
     }
+
     TimelinePutResponse response = store.put(entitiesToPut);
     // add the errors of timeline system filter key conflict
     response.addErrors(errors);

+ 56 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java

@@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.server.timeline.util;
 
 import org.apache.hadoop.io.WritableComparator;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
 
 public class LeveldbUtils {
 
+  /** A string builder utility for building timeline server leveldb keys. */
   public static class KeyBuilder {
+    /** Maximum subkeys that can be added to construct a key. */
     private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
     private byte[][] b;
     private boolean[] useSeparator;
@@ -47,8 +48,15 @@ public class LeveldbUtils {
       return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
     }
 
+    /** Instantiate a new key build with the given maximum subkes.
+     * @param size maximum subkeys that can be added to this key builder
+     * @return a newly constructed key builder */
+    public static KeyBuilder newInstance(final int size) {
+      return new KeyBuilder(size);
+    }
+
     public KeyBuilder add(String s) {
-      return add(s.getBytes(Charset.forName("UTF-8")), true);
+      return add(s.getBytes(UTF_8), true);
     }
 
     public KeyBuilder add(byte[] t) {
@@ -66,26 +74,37 @@ public class LeveldbUtils {
       return this;
     }
 
-    public byte[] getBytes() throws IOException {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+    /** Builds a byte array without the final string delimiter. */
+    public byte[] getBytes() {
+      // check the last valid entry to see the final length
+      int bytesLength = length;
+      if (useSeparator[index - 1]) {
+        bytesLength = length - 1;
+      }
+      byte[] bytes = new byte[bytesLength];
+      int curPos = 0;
       for (int i = 0; i < index; i++) {
-        baos.write(b[i]);
+        System.arraycopy(b[i], 0, bytes, curPos, b[i].length);
+        curPos += b[i].length;
         if (i < index - 1 && useSeparator[i]) {
-          baos.write(0x0);
+          bytes[curPos++] = 0x0;
         }
       }
-      return baos.toByteArray();
+      return bytes;
     }
 
-    public byte[] getBytesForLookup() throws IOException {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+    /** Builds a byte array including the final string delimiter. */
+    public byte[] getBytesForLookup() {
+      byte[] bytes = new byte[length];
+      int curPos = 0;
       for (int i = 0; i < index; i++) {
-        baos.write(b[i]);
+        System.arraycopy(b[i], 0, bytes, curPos, b[i].length);
+        curPos += b[i].length;
         if (useSeparator[i]) {
-          baos.write(0x0);
+          bytes[curPos++] = 0x0;
         }
       }
-      return baos.toByteArray();
+      return bytes;
     }
   }
 
@@ -93,11 +112,12 @@ public class LeveldbUtils {
     private final byte[] b;
     private int offset;
 
-    public KeyParser(byte[] b, int offset) {
+    public KeyParser(final byte[] b, final int offset) {
       this.b = b;
       this.offset = offset;
     }
 
+    /** Returns a string from the offset until the next string delimiter. */
     public String getNextString() throws IOException {
       if (offset >= b.length) {
         throw new IOException(
@@ -107,23 +127,42 @@ public class LeveldbUtils {
       while (offset + i < b.length && b[offset + i] != 0x0) {
         i++;
       }
-      String s = new String(b, offset, i, Charset.forName("UTF-8"));
+      String s = new String(b, offset, i, UTF_8);
       offset = offset + i + 1;
       return s;
     }
 
+    /** Moves current position until after the next end of string marker. */
+    public void skipNextString() throws IOException {
+      if (offset >= b.length) {
+        throw new IOException("tried to read nonexistent string from byte array");
+      }
+      while (offset < b.length && b[offset] != 0x0) {
+        ++offset;
+      }
+      ++offset;
+    }
+
+    /** Read the next 8 bytes in the byte buffer as a long. */
     public long getNextLong() throws IOException {
       if (offset + 8 >= b.length) {
         throw new IOException("byte array ran out when trying to read long");
       }
-      long l = readReverseOrderedLong(b, offset);
+      long value = readReverseOrderedLong(b, offset);
       offset += 8;
-      return l;
+      return value;
     }
 
     public int getOffset() {
       return offset;
     }
+
+    /** Returns a copy of the remaining bytes. */
+    public byte[] getRemainingBytes() {
+      byte[] bytes = new byte[b.length - offset];
+      System.arraycopy(b, offset, bytes, 0, b.length - offset);
+      return bytes;
+    }
   }
 
   /**

+ 13 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java

@@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
-import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.iq80.leveldb.DBException;
 import org.junit.After;
@@ -155,7 +153,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
       return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
           iterator, pfIterator, false);
     } catch(DBException e) {
-      throw new IOException(e);   	
+      throw new IOException(e);
     } finally {
       IOUtils.cleanup(null, iterator, pfIterator);
     }
@@ -179,12 +177,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     assertEquals(1, getEntities("type_2").size());
 
     assertEquals(false, deleteNextEntity(entityType1,
-        writeReverseOrderedLong(60l)));
+        writeReverseOrderedLong(60L)));
     assertEquals(3, getEntities("type_1").size());
     assertEquals(1, getEntities("type_2").size());
 
     assertEquals(true, deleteNextEntity(entityType1,
-        writeReverseOrderedLong(123l)));
+        writeReverseOrderedLong(123L)));
     List<TimelineEntity> entities = getEntities("type_2");
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
@@ -198,12 +196,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1), domainId2);
 
-    ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(0L);
     assertEquals(2, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
     assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size());
 
-    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(123L);
     assertEquals(0, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
@@ -240,11 +238,11 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(2), domainId2);
 
-    ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(-123L);
     assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
     assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
 
-    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(123L);
     assertEquals(0, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
@@ -261,7 +259,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     assertEquals(1, getEntitiesFromTs("type_2", l).size());
     assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
         l).size());
-    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(123L);
     assertEquals(0, getEntitiesFromTs("type_1", l).size());
     assertEquals(0, getEntitiesFromTs("type_2", l).size());
     assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
@@ -279,7 +277,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     assertEquals(1, getEntities("type_2").size());
     assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
   }
-  
+
   @Test
   public void testCheckVersion() throws IOException {
     LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
@@ -299,16 +297,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     Assert.assertEquals(defaultVersion, dbStore.loadVersion());
 
     // incompatible version
-    Version incompatibleVersion =
-      Version.newInstance(defaultVersion.getMajorVersion() + 1,
-          defaultVersion.getMinorVersion());
+    Version incompatibleVersion = Version.newInstance(
+        defaultVersion.getMajorVersion() + 1, defaultVersion.getMinorVersion());
     dbStore.storeVersion(incompatibleVersion);
     try {
       restartTimelineStore();
       Assert.fail("Incompatible version, should expect fail here.");
     } catch (ServiceStateException e) {
-      Assert.assertTrue("Exception message mismatch", 
-        e.getMessage().contains("Incompatible version for timeline store"));
+      Assert.assertTrue("Exception message mismatch",
+          e.getMessage().contains("Incompatible version for timeline store"));
     }
   }
 

+ 100 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.iq80.leveldb.DB;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test class for verification of RollingLevelDB. */
+public class TestRollingLevelDB {
+  private Configuration conf = new YarnConfiguration();
+  private FileSystem lfs;
+  private MyRollingLevelDB rollingLevelDB;
+
+  /** RollingLevelDB for testing that has a setting current time. */
+  public static class MyRollingLevelDB extends RollingLevelDB {
+    private long currentTimeMillis;
+
+    MyRollingLevelDB() {
+      super("Test");
+      this.currentTimeMillis = System.currentTimeMillis();
+    }
+
+    @Override
+    protected long currentTimeMillis() {
+      return currentTimeMillis;
+    }
+
+    public void setCurrentTimeMillis(long time) {
+      this.currentTimeMillis = time;
+    }
+  };
+
+  @Before
+  public void setup() throws Exception {
+    lfs = FileSystem.getLocal(conf);
+    File fsPath = new File("target", this.getClass().getSimpleName() +
+        "-tmpDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+        fsPath.getAbsolutePath());
+    lfs.delete(new Path(fsPath.getAbsolutePath()), true);
+    rollingLevelDB = new MyRollingLevelDB();
+  }
+
+  @Test
+  public void testInsertAfterRollPeriodRollsDB() throws Exception {
+
+    rollingLevelDB.init(conf);
+    long now = rollingLevelDB.currentTimeMillis();
+    DB db = rollingLevelDB.getDBForStartTime(now);
+    long startTime = rollingLevelDB.getStartTimeFor(db);
+    Assert.assertEquals("Received level db for incorrect start time",
+        rollingLevelDB.computeCurrentCheckMillis(now),
+        startTime);
+    now = rollingLevelDB.getNextRollingTimeMillis();
+    rollingLevelDB.setCurrentTimeMillis(now);
+    db = rollingLevelDB.getDBForStartTime(now);
+    startTime = rollingLevelDB.getStartTimeFor(db);
+    Assert.assertEquals("Received level db for incorrect start time",
+        rollingLevelDB.computeCurrentCheckMillis(now),
+        startTime);
+  }
+
+  @Test
+  public void testInsertForPreviousPeriodAfterRollPeriodRollsDB()
+      throws Exception {
+
+    rollingLevelDB.init(conf);
+    long now = rollingLevelDB.currentTimeMillis();
+    now = rollingLevelDB.computeCurrentCheckMillis(now);
+    rollingLevelDB.setCurrentTimeMillis(now);
+    DB db = rollingLevelDB.getDBForStartTime(now - 1);
+    long startTime = rollingLevelDB.getStartTimeFor(db);
+    Assert.assertEquals("Received level db for incorrect start time",
+        rollingLevelDB.computeCurrentCheckMillis(now - 1),
+        startTime);
+  }
+}

+ 427 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java

@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/** Test class to verify RollingLevelDBTimelineStore. */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils {
+  private FileContext fsContext;
+  private File fsPath;
+  private Configuration config = new YarnConfiguration();
+
+  @Before
+  public void setup() throws Exception {
+    fsContext = FileContext.getLocalFSFileContext();
+    fsPath = new File("target", this.getClass().getSimpleName() +
+        "-tmpDir").getAbsoluteFile();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+    config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+        fsPath.getAbsolutePath());
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    store = new RollingLevelDBTimelineStore();
+    store.init(config);
+    store.start();
+    loadTestEntityData();
+    loadVerificationEntityData();
+    loadTestDomainData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.stop();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+  }
+
+  @Test
+  public void testRootDirPermission() throws IOException {
+    FileSystem fs = FileSystem.getLocal(new YarnConfiguration());
+    FileStatus file = fs.getFileStatus(new Path(fsPath.getAbsolutePath(),
+        RollingLevelDBTimelineStore.FILENAME));
+    assertNotNull(file);
+    assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK,
+        file.getPermission());
+  }
+
+  @Test
+  public void testGetSingleEntity() throws IOException {
+    super.testGetSingleEntity();
+    ((RollingLevelDBTimelineStore)store).clearStartTimeCache();
+    super.testGetSingleEntity();
+    loadTestEntityData();
+  }
+
+  @Test
+  public void testGetEntities() throws IOException {
+    super.testGetEntities();
+  }
+
+  @Test
+  public void testGetEntitiesWithFromId() throws IOException {
+    super.testGetEntitiesWithFromId();
+  }
+
+  @Test
+  public void testGetEntitiesWithFromTs() throws IOException {
+    // feature not supported
+  }
+
+  @Test
+  public void testGetEntitiesWithPrimaryFilters() throws IOException {
+    super.testGetEntitiesWithPrimaryFilters();
+  }
+
+  @Test
+  public void testGetEntitiesWithSecondaryFilters() throws IOException {
+    super.testGetEntitiesWithSecondaryFilters();
+  }
+
+  @Test
+  public void testGetEvents() throws IOException {
+    super.testGetEvents();
+  }
+
+  @Test
+  public void testCacheSizes() {
+    Configuration conf = new Configuration();
+    assertEquals(10000,
+        RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
+    assertEquals(10000,
+        RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
+    conf.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        10001);
+    assertEquals(10001,
+        RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
+    conf = new Configuration();
+    conf.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        10002);
+    assertEquals(10002,
+        RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
+  }
+
+  @Test
+  public void testCheckVersion() throws IOException {
+    RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store;
+    // default version
+    Version defaultVersion = dbStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // compatible version
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    dbStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
+    restartTimelineStore();
+    dbStore = (RollingLevelDBTimelineStore) store;
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // incompatible version
+    Version incompatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    dbStore.storeVersion(incompatibleVersion);
+    try {
+      restartTimelineStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch",
+          e.getMessage().contains("Incompatible version for timeline store"));
+    }
+  }
+
+  @Test
+  public void testValidateConfig() throws IOException {
+    Configuration copyConfig = new YarnConfiguration(config);
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration.TIMELINE_SERVICE_TTL_MS));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+          0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration
+          .TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration
+          .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+          0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration
+          .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE));
+    }
+    config = copyConfig;
+    restartTimelineStore();
+  }
+
+  private void restartTimelineStore() throws IOException {
+    // need to close so leveldb releases database lock
+    if (store != null) {
+      store.close();
+    }
+    store = new RollingLevelDBTimelineStore();
+    store.init(config);
+    store.start();
+  }
+
+  @Test
+  public void testGetDomain() throws IOException {
+    super.testGetDomain();
+  }
+
+  @Test
+  public void testGetDomains() throws IOException {
+    super.testGetDomains();
+  }
+
+  @Test
+  public void testRelatingToNonExistingEntity() throws IOException {
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
+    entityToStore.setEntityId("TEST_ENTITY_ID_1");
+    entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    store.put(entities);
+    TimelineEntity entityToGet =
+        store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    Assert.assertEquals("TEST_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("TEST_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+  }
+
+  @Test
+  public void testRelatingToEntityInSamePut() throws IOException {
+    TimelineEntity entityToRelate = new TimelineEntity();
+    entityToRelate.setEntityType("TEST_ENTITY_TYPE_2");
+    entityToRelate.setEntityId("TEST_ENTITY_ID_2");
+    entityToRelate.setDomainId("TEST_DOMAIN");
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
+    entityToStore.setEntityId("TEST_ENTITY_ID_1");
+    entityToStore.setDomainId("TEST_DOMAIN");
+    entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    entities.addEntity(entityToRelate);
+    store.put(entities);
+    TimelineEntity entityToGet =
+        store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId());
+    Assert.assertEquals("TEST_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("TEST_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+  }
+
+  @Test
+  public void testRelatingToOldEntityWithoutDomainId() throws IOException {
+    // New entity is put in the default domain
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
+    entityToStore.setEntityId("NEW_ENTITY_ID_1");
+    entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    store.put(entities);
+
+    TimelineEntity entityToGet =
+        store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    Assert.assertEquals("NEW_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("NEW_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+
+    // New entity is not put in the default domain
+    entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
+    entityToStore.setEntityId("NEW_ENTITY_ID_2");
+    entityToStore.setDomainId("NON_DEFAULT");
+    entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
+    entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    TimelinePutResponse response = store.put(entities);
+    Assert.assertEquals(1, response.getErrors().size());
+    Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
+        response.getErrors().get(0).getErrorCode());
+    entityToGet =
+        store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    // Still have one related entity
+    Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
+    Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
+        .iterator().next().size());
+  }
+
+  public void testStorePerformance() throws IOException {
+    TimelineEntity entityToStorePrep = new TimelineEntity();
+    entityToStorePrep.setEntityType("TEST_ENTITY_TYPE_PREP");
+    entityToStorePrep.setEntityId("TEST_ENTITY_ID_PREP");
+    entityToStorePrep.setDomainId("TEST_DOMAIN");
+    entityToStorePrep.addRelatedEntity("TEST_ENTITY_TYPE_2",
+        "TEST_ENTITY_ID_2");
+    entityToStorePrep.setStartTime(0L);
+
+    TimelineEntities entitiesPrep = new TimelineEntities();
+    entitiesPrep.addEntity(entityToStorePrep);
+    store.put(entitiesPrep);
+
+    long start = System.currentTimeMillis();
+    int num = 1000000;
+
+    Log.info("Start test for " + num);
+
+    final String tezTaskAttemptId = "TEZ_TA";
+    final String tezEntityId = "attempt_1429158534256_0001_1_00_000000_";
+    final String tezTaskId = "TEZ_T";
+    final String tezDomainId = "Tez_ATS_application_1429158534256_0001";
+
+    TimelineEntity entityToStore = new TimelineEntity();
+    TimelineEvent startEvt = new TimelineEvent();
+    entityToStore.setEntityType(tezTaskAttemptId);
+
+    startEvt.setEventType("TASK_ATTEMPT_STARTED");
+    startEvt.setTimestamp(0);
+    entityToStore.addEvent(startEvt);
+    entityToStore.setDomainId(tezDomainId);
+
+    entityToStore.addPrimaryFilter("status", "SUCCEEDED");
+    entityToStore.addPrimaryFilter("applicationId",
+        "application_1429158534256_0001");
+    entityToStore.addPrimaryFilter("TEZ_VERTEX_ID",
+        "vertex_1429158534256_0001_1_00");
+    entityToStore.addPrimaryFilter("TEZ_DAG_ID", "dag_1429158534256_0001_1");
+    entityToStore.addPrimaryFilter("TEZ_TASK_ID",
+        "task_1429158534256_0001_1_00_000000");
+
+    entityToStore.setStartTime(0L);
+    entityToStore.addOtherInfo("startTime", 0);
+    entityToStore.addOtherInfo("inProgressLogsURL",
+        "localhost:8042/inProgressLogsURL");
+    entityToStore.addOtherInfo("completedLogsURL", "");
+    entityToStore.addOtherInfo("nodeId", "localhost:54450");
+    entityToStore.addOtherInfo("nodeHttpAddress", "localhost:8042");
+    entityToStore.addOtherInfo("containerId",
+        "container_1429158534256_0001_01_000002");
+    entityToStore.addOtherInfo("status", "RUNNING");
+    entityToStore.addRelatedEntity(tezTaskId, "TEZ_TASK_ID_1");
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+
+    for (int i = 0; i < num; ++i) {
+      entityToStore.setEntityId(tezEntityId + i);
+      store.put(entities);
+    }
+
+    long duration = System.currentTimeMillis() - start;
+    Log.info("Duration for " + num + ": " + duration);
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestRollingLevelDBTimelineStore store =
+        new TestRollingLevelDBTimelineStore();
+    store.setup();
+    store.testStorePerformance();
+    store.tearDown();
+  }
+}

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java

@@ -70,7 +70,7 @@ public class TimelineStoreTestUtils {
   protected String entityId6;
   protected String entityId7;
   protected String entityType7;
-  
+
   protected Map<String, Set<Object>> primaryFilters;
   protected Map<String, Object> secondaryFilters;
   protected Map<String, Object> allFilters;
@@ -105,7 +105,7 @@ public class TimelineStoreTestUtils {
     Set<Object> l1 = new HashSet<Object>();
     l1.add("username");
     Set<Object> l2 = new HashSet<Object>();
-    l2.add((long)Integer.MAX_VALUE);
+    l2.add(Integer.MAX_VALUE);
     Set<Object> l3 = new HashSet<Object>();
     l3.add("123abc");
     Set<Object> l4 = new HashSet<Object>();
@@ -115,7 +115,7 @@ public class TimelineStoreTestUtils {
     primaryFilters.put("other", l3);
     primaryFilters.put("long", l4);
     Map<String, Object> secondaryFilters = new HashMap<String, Object>();
-    secondaryFilters.put("startTime", 123456l);
+    secondaryFilters.put("startTime", 123456);
     secondaryFilters.put("status", "RUNNING");
     Map<String, Object> otherInfo1 = new HashMap<String, Object>();
     otherInfo1.put("info1", "val1");
@@ -139,7 +139,7 @@ public class TimelineStoreTestUtils {
     relatedEntities.put(entityType2, Collections.singleton(entityId2));
 
     TimelineEvent ev3 = createEvent(789l, "launch_event", null);
-    TimelineEvent ev4 = createEvent(-123l, "init_event", null);
+    TimelineEvent ev4 = createEvent(0l, "init_event", null);
     List<TimelineEvent> events = new ArrayList<TimelineEvent>();
     events.add(ev3);
     events.add(ev4);
@@ -302,7 +302,7 @@ public class TimelineStoreTestUtils {
     relEntityMap2.put(entityType4, Collections.singleton(entityId4));
 
     ev3 = createEvent(789l, "launch_event", null);
-    ev4 = createEvent(-123l, "init_event", null);
+    ev4 = createEvent(0l, "init_event", null);
     events2 = new ArrayList<TimelineEvent>();
     events2.add(ev3);
     events2.add(ev4);
@@ -384,7 +384,7 @@ public class TimelineStoreTestUtils {
         entityType1, EnumSet.allOf(Field.class)), domainId1);
 
     verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
-        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2,
+        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 0l, store.getEntity(entityId2,
         entityType2, EnumSet.allOf(Field.class)), domainId1);
 
     verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,