Parcourir la source

YARN-1730. Implemented simple write-locking in the LevelDB based timeline-store. Contributed by Billie Rinaldi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1574145 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli il y a 11 ans
Parent
commit
40464fba22

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -249,6 +249,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1765. Added test cases to verify that killApplication API works across
     ResourceManager failover. (Xuan Gong via vinodkv) 
 
+    YARN-1730. Implemented simple write-locking in the LevelDB based timeline-
+    store. (Billie Rinaldi via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 14 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1073,9 +1073,22 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_STORE =
       TIMELINE_SERVICE_PREFIX + "store-class";
 
+  public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
+      TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
+
   /** Timeline service leveldb path */
   public static final String TIMELINE_SERVICE_LEVELDB_PATH =
-      TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.path";
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
+
+  /** Timeline service leveldb start time read cache (number of entities) */
+  public static final String
+      TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
+
+  /** Timeline service leveldb start time write cache (number of entities) */
+  public static final String
+      TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
 
   ////////////////////////////////
   // Other Configs

+ 171 - 50
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java

@@ -33,6 +33,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections.map.LRUMap;
@@ -84,11 +85,17 @@ public class LeveldbTimelineStore extends AbstractService
 
   private static final byte[] EMPTY_BYTES = new byte[0];
 
-  private static final int START_TIME_CACHE_SIZE = 10000;
+  private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000;
+  private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000;
 
-  @SuppressWarnings("unchecked")
-  private final Map<EntityIdentifier, Long> startTimeCache =
-      Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
+  private Map<EntityIdentifier, Long> startTimeWriteCache;
+  private Map<EntityIdentifier, Long> startTimeReadCache;
+
+  /**
+   * Per-entity locks are obtained when writing.
+   */
+  private final LockMap<EntityIdentifier> writeLocks =
+      new LockMap<EntityIdentifier>();
 
   private DB db;
 
@@ -97,6 +104,7 @@ public class LeveldbTimelineStore extends AbstractService
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   protected void serviceInit(Configuration conf) throws Exception {
     Options options = new Options();
     options.createIfMissing(true);
@@ -109,6 +117,12 @@ public class LeveldbTimelineStore extends AbstractService
             "timeline store " + path);
     LOG.info("Using leveldb path " + path);
     db = factory.open(new File(path, FILENAME), options);
+    startTimeWriteCache =
+        Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
+            conf)));
+    startTimeReadCache =
+        Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
+            conf)));
     super.serviceInit(conf);
   }
 
@@ -118,6 +132,45 @@ public class LeveldbTimelineStore extends AbstractService
     super.serviceStop();
   }
 
+  private static class LockMap<K> {
+    private static class CountingReentrantLock<K> extends ReentrantLock {
+      private int count;
+      private K key;
+
+      CountingReentrantLock(K key) {
+        super();
+        this.count = 0;
+        this.key = key;
+      }
+    }
+
+    private Map<K, CountingReentrantLock<K>> locks =
+        new HashMap<K, CountingReentrantLock<K>>();
+
+    synchronized CountingReentrantLock<K> getLock(K key) {
+      CountingReentrantLock<K> lock = locks.get(key);
+      if (lock == null) {
+        lock = new CountingReentrantLock<K>(key);
+        locks.put(key, lock);
+      }
+
+      lock.count++;
+      return lock;
+    }
+
+    synchronized void returnLock(CountingReentrantLock<K> lock) {
+      if (lock.count == 0) {
+        throw new IllegalStateException("Returned lock more times than it " +
+            "was retrieved");
+      }
+      lock.count--;
+
+      if (lock.count == 0) {
+        locks.remove(lock.key);
+      }
+    }
+  }
+
   private static class KeyBuilder {
     private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
     private byte[][] b;
@@ -214,7 +267,7 @@ public class LeveldbTimelineStore extends AbstractService
       EnumSet<Field> fields) throws IOException {
     DBIterator iterator = null;
     try {
-      byte[] revStartTime = getStartTime(entityId, entityType, null, null, null);
+      byte[] revStartTime = getStartTime(entityId, entityType);
       if (revStartTime == null)
         return null;
       byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
@@ -338,7 +391,7 @@ public class LeveldbTimelineStore extends AbstractService
       // look up start times for the specified entities
       // skip entities with no start time
       for (String entity : entityIds) {
-        byte[] startTime = getStartTime(entity, entityType, null, null, null);
+        byte[] startTime = getStartTime(entity, entityType);
         if (startTime != null) {
           List<EntityIdentifier> entities = startTimeMap.get(startTime);
           if (entities == null) {
@@ -529,12 +582,16 @@ public class LeveldbTimelineStore extends AbstractService
    * response.
    */
   private void put(TimelineEntity entity, TimelinePutResponse response) {
+    LockMap.CountingReentrantLock<EntityIdentifier> lock =
+        writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
+            entity.getEntityType()));
+    lock.lock();
     WriteBatch writeBatch = null;
     try {
       writeBatch = db.createWriteBatch();
       List<TimelineEvent> events = entity.getEvents();
       // look up the start time for the entity
-      byte[] revStartTime = getStartTime(entity.getEntityId(),
+      byte[] revStartTime = getAndSetStartTime(entity.getEntityId(),
           entity.getEntityType(), entity.getStartTime(), events,
           writeBatch);
       if (revStartTime == null) {
@@ -571,7 +628,7 @@ public class LeveldbTimelineStore extends AbstractService
           String relatedEntityType = relatedEntityList.getKey();
           for (String relatedEntityId : relatedEntityList.getValue()) {
             // look up start time of related entity
-            byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+            byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId,
                 relatedEntityType, null, null, writeBatch);
             if (relatedEntityStartTime == null) {
               // if start time is not found, set start time of the related
@@ -580,7 +637,7 @@ public class LeveldbTimelineStore extends AbstractService
               relatedEntityStartTime = revStartTime;
               writeBatch.put(createStartTimeLookupKey(relatedEntityId,
                   relatedEntityType), relatedEntityStartTime);
-              startTimeCache.put(new EntityIdentifier(relatedEntityId,
+              startTimeWriteCache.put(new EntityIdentifier(relatedEntityId,
                   relatedEntityType), revStartTimeLong);
             }
             // write reverse entry (related entity -> entity)
@@ -629,6 +686,8 @@ public class LeveldbTimelineStore extends AbstractService
       error.setErrorCode(TimelinePutError.IO_EXCEPTION);
       response.addError(error);
     } finally {
+      lock.unlock();
+      writeLocks.returnLock(lock);
       IOUtils.cleanup(LOG, writeBatch);
     }
   }
@@ -666,6 +725,39 @@ public class LeveldbTimelineStore extends AbstractService
    *
    * @param entityId The id of the entity
    * @param entityType The type of the entity
+   * @return A byte array
+   * @throws IOException
+   */
+  private byte[] getStartTime(String entityId, String entityType)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    // start time is not provided, so try to look it up
+    if (startTimeReadCache.containsKey(entity)) {
+      // found the start time in the cache
+      return writeReverseOrderedLong(startTimeReadCache.get(entity));
+    } else {
+      // try to look up the start time in the db
+      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+      byte[] v = db.get(b);
+      if (v == null) {
+        // did not find the start time in the db
+        return null;
+      } else {
+        // found the start time in the db
+        startTimeReadCache.put(entity, readReverseOrderedLong(v, 0));
+        return v;
+      }
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts
+   * the timestamps in reverse order (see {@link
+   * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
+   * doesn't exist, set it based on the information provided.
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
    * @param startTime The start time of the entity, or null
    * @param events A list of events for the entity, or null
    * @param writeBatch A leveldb write batch, if the method is called by a
@@ -673,62 +765,76 @@ public class LeveldbTimelineStore extends AbstractService
    * @return A byte array
    * @throws IOException
    */
-  private byte[] getStartTime(String entityId, String entityType,
+  private byte[] getAndSetStartTime(String entityId, String entityType,
       Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
       throws IOException {
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
     if (startTime == null) {
       // start time is not provided, so try to look it up
-      if (startTimeCache.containsKey(entity)) {
+      if (startTimeWriteCache.containsKey(entity)) {
         // found the start time in the cache
-        startTime = startTimeCache.get(entity);
+        startTime = startTimeWriteCache.get(entity);
+        return writeReverseOrderedLong(startTime);
       } else {
-        // try to look up the start time in the db
-        byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
-        byte[] v = db.get(b);
-        if (v == null) {
-          // did not find the start time in the db
-          // if this is a put, try to set it from the provided events
-          if (events == null || writeBatch == null) {
-            // no events, or not a put, so return null
-            return null;
-          }
+        if (events != null) {
+          // prepare a start time from events in case it is needed
           Long min = Long.MAX_VALUE;
-          for (TimelineEvent e : events)
-            if (min > e.getTimestamp())
+          for (TimelineEvent e : events) {
+            if (min > e.getTimestamp()) {
               min = e.getTimestamp();
-          startTime = min;
-          // selected start time as minimum timestamp of provided events
-          // write start time to db and cache
-          writeBatch.put(b, writeReverseOrderedLong(startTime));
-          startTimeCache.put(entity, startTime);
-        } else {
-          // found the start time in the db
-          startTime = readReverseOrderedLong(v, 0);
-          if (writeBatch != null) {
-            // if this is a put, re-add the start time to the cache
-            startTimeCache.put(entity, startTime);
+            }
           }
+          startTime = min;
         }
+        return checkStartTimeInDb(entity, startTime, writeBatch);
       }
     } else {
       // start time is provided
-      // TODO: verify start time in db as well as cache?
-      if (startTimeCache.containsKey(entity)) {
-        // if the start time is already in the cache,
-        // and it is different from the provided start time,
-        // use the one from the cache
-        if (!startTime.equals(startTimeCache.get(entity)))
-          startTime = startTimeCache.get(entity);
-      } else if (writeBatch != null) {
-        // if this is a put, write the provided start time to the db and the
-        // cache
-        byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
-        writeBatch.put(b, writeReverseOrderedLong(startTime));
-        startTimeCache.put(entity, startTime);
+      if (startTimeWriteCache.containsKey(entity)) {
+        // check the provided start time matches the cache
+        if (!startTime.equals(startTimeWriteCache.get(entity))) {
+          // the start time is already in the cache,
+          // and it is different from the provided start time,
+          // so use the one from the cache
+          startTime = startTimeWriteCache.get(entity);
+        }
+        return writeReverseOrderedLong(startTime);
+      } else {
+        // check the provided start time matches the db
+        return checkStartTimeInDb(entity, startTime, writeBatch);
       }
     }
-    return writeReverseOrderedLong(startTime);
+  }
+
+  /**
+   * Checks db for start time and returns it if it exists.  If it doesn't
+   * exist, writes the suggested start time (if it is not null).  This is
+   * only called when the start time is not found in the cache,
+   * so it adds it back into the cache if it is found.
+   */
+  private byte[] checkStartTimeInDb(EntityIdentifier entity,
+      Long suggestedStartTime, WriteBatch writeBatch) throws IOException {
+    // create lookup key for start time
+    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+    // retrieve value for key
+    byte[] v = db.get(b);
+    byte[] revStartTime;
+    if (v == null) {
+      // start time doesn't exist in db
+      if (suggestedStartTime == null) {
+        return null;
+      }
+      // write suggested start time
+      revStartTime = writeReverseOrderedLong(suggestedStartTime);
+      writeBatch.put(b, revStartTime);
+    } else {
+      // found start time in db, so ignore suggested start time
+      suggestedStartTime = readReverseOrderedLong(v, 0);
+      revStartTime = v;
+    }
+    startTimeWriteCache.put(entity, suggestedStartTime);
+    startTimeReadCache.put(entity, suggestedStartTime);
+    return revStartTime;
   }
 
   /**
@@ -868,6 +974,21 @@ public class LeveldbTimelineStore extends AbstractService
    */
   @VisibleForTesting
   void clearStartTimeCache() {
-    startTimeCache.clear();
+    startTimeWriteCache.clear();
+    startTimeReadCache.clear();
+  }
+
+  @VisibleForTesting
+  static int getStartTimeReadCacheSize(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        DEFAULT_START_TIME_READ_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  static int getStartTimeWriteCacheSize(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        DEFAULT_START_TIME_WRITE_CACHE_SIZE);
   }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java

@@ -30,6 +30,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TestLeveldbTimelineStore
@@ -64,6 +66,7 @@ public class TestLeveldbTimelineStore
     super.testGetSingleEntity();
     ((LeveldbTimelineStore)store).clearStartTimeCache();
     super.testGetSingleEntity();
+    loadTestData();
   }
 
   @Test
@@ -86,4 +89,20 @@ public class TestLeveldbTimelineStore
     super.testGetEvents();
   }
 
+  @Test
+  public void testCacheSizes() {
+    Configuration conf = new Configuration();
+    assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
+    assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
+    conf.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        10001);
+    assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
+    conf = new Configuration();
+    conf.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        10002);
+    assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
+  }
+
 }