Ver Fonte

YARN-4987. Read cache concurrency issue between read and evict in EntityGroupFS timeline store. Contributed by Li Lu.
(cherry picked from commit 705286ccaeea36941d97ec1c1700746b74264924)

Junping Du há 9 anos atrás
pai
commit
dea79a7dd6

+ 57 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java

@@ -16,6 +16,8 @@
  */
  */
 package org.apache.hadoop.yarn.server.timeline;
 package org.apache.hadoop.yarn.server.timeline;
 
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
 /**
  * Cache item for timeline server v1.5 reader cache. Each cache item has a
  * Cache item for timeline server v1.5 reader cache. Each cache item has a
@@ -40,12 +43,17 @@ public class EntityCacheItem {
       = LoggerFactory.getLogger(EntityCacheItem.class);
       = LoggerFactory.getLogger(EntityCacheItem.class);
 
 
   private TimelineStore store;
   private TimelineStore store;
+  private TimelineEntityGroupId groupId;
   private EntityGroupFSTimelineStore.AppLogs appLogs;
   private EntityGroupFSTimelineStore.AppLogs appLogs;
   private long lastRefresh;
   private long lastRefresh;
   private Configuration config;
   private Configuration config;
   private FileSystem fs;
   private FileSystem fs;
+  private int refCount = 0;
+  private static AtomicInteger activeStores = new AtomicInteger(0);
 
 
-  public EntityCacheItem(Configuration config, FileSystem fs) {
+  public EntityCacheItem(TimelineEntityGroupId gId, Configuration config,
+      FileSystem fs) {
+    this.groupId = gId;
     this.config = config;
     this.config = config;
     this.fs = fs;
     this.fs = fs;
   }
   }
@@ -70,17 +78,24 @@ public class EntityCacheItem {
 
 
   /**
   /**
    * @return The timeline store, either loaded or unloaded, of this cache item.
    * @return The timeline store, either loaded or unloaded, of this cache item.
+   * This method will not hold the storage from being reclaimed.
    */
    */
   public synchronized TimelineStore getStore() {
   public synchronized TimelineStore getStore() {
     return store;
     return store;
   }
   }
 
 
+  /**
+   * @return The number of currently active stores in all CacheItems.
+   */
+  public static int getActiveStores() {
+    return activeStores.get();
+  }
+
   /**
   /**
    * Refresh this cache item if it needs refresh. This will enforce an appLogs
    * Refresh this cache item if it needs refresh. This will enforce an appLogs
    * rescan and then load new data. The refresh process is synchronized with
    * rescan and then load new data. The refresh process is synchronized with
    * other operations on the same cache item.
    * other operations on the same cache item.
    *
    *
-   * @param groupId Group id of the cache
    * @param aclManager ACL manager for the timeline storage
    * @param aclManager ACL manager for the timeline storage
    * @param jsonFactory JSON factory for the storage
    * @param jsonFactory JSON factory for the storage
    * @param objMapper Object mapper for the storage
    * @param objMapper Object mapper for the storage
@@ -89,10 +104,9 @@ public class EntityCacheItem {
    *         object filled with all entities in the group.
    *         object filled with all entities in the group.
    * @throws IOException
    * @throws IOException
    */
    */
-  public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
-      TimelineACLsManager aclManager, JsonFactory jsonFactory,
-      ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
-      throws IOException {
+  public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
+      JsonFactory jsonFactory, ObjectMapper objMapper,
+      EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
     if (needRefresh()) {
     if (needRefresh()) {
       long startTime = Time.monotonicNow();
       long startTime = Time.monotonicNow();
       // If an application is not finished, we only update summary logs (and put
       // If an application is not finished, we only update summary logs (and put
@@ -105,6 +119,7 @@ public class EntityCacheItem {
       }
       }
       if (!appLogs.getDetailLogs().isEmpty()) {
       if (!appLogs.getDetailLogs().isEmpty()) {
         if (store == null) {
         if (store == null) {
+          activeStores.getAndIncrement();
           store = new LevelDBCacheTimelineStore(groupId.toString(),
           store = new LevelDBCacheTimelineStore(groupId.toString(),
               "LeveldbCache." + groupId);
               "LeveldbCache." + groupId);
           store.init(config);
           store.init(config);
@@ -148,11 +163,35 @@ public class EntityCacheItem {
   }
   }
 
 
   /**
   /**
-   * Release the cache item for the given group id.
+   * Increase the number of references to this cache item by 1.
+   */
+  public synchronized void incrRefs() {
+    refCount++;
+  }
+
+  /**
+   * Unregister a reader. Try to release the cache if the reader to current
+   * cache reaches 0.
    *
    *
-   * @param groupId the group id that the cache should release
+   * @return true if the cache has been released, otherwise false
    */
    */
-  public synchronized void releaseCache(TimelineEntityGroupId groupId) {
+  public synchronized boolean tryRelease() {
+    refCount--;
+    // Only reclaim the storage if there is no reader.
+    if (refCount > 0) {
+      LOG.debug("{} references left for cached group {}, skipping the release",
+          refCount, groupId);
+      return false;
+    }
+    forceRelease();
+    return true;
+  }
+
+  /**
+   * Force releasing the cache item for the given group id, even though there
+   * may be active references.
+   */
+  public synchronized void forceRelease() {
     try {
     try {
       if (store != null) {
       if (store != null) {
         store.close();
         store.close();
@@ -161,12 +200,21 @@ public class EntityCacheItem {
       LOG.warn("Error closing timeline store", e);
       LOG.warn("Error closing timeline store", e);
     }
     }
     store = null;
     store = null;
+    activeStores.getAndDecrement();
+    refCount = 0;
     // reset offsets so next time logs are re-parsed
     // reset offsets so next time logs are re-parsed
     for (LogInfo log : appLogs.getDetailLogs()) {
     for (LogInfo log : appLogs.getDetailLogs()) {
       if (log.getFilename().contains(groupId.toString())) {
       if (log.getFilename().contains(groupId.toString())) {
         log.setOffset(0);
         log.setOffset(0);
       }
       }
     }
     }
+    LOG.debug("Cache for group {} released. ", groupId);
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  synchronized int getRefCount() {
+    return refCount;
   }
   }
 
 
   private boolean needRefresh() {
   private boolean needRefresh() {

+ 60 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java

@@ -108,6 +108,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
           + "%04d" + Path.SEPARATOR // app num / 1,000,000
           + "%04d" + Path.SEPARATOR // app num / 1,000,000
           + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
           + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
           + "%s" + Path.SEPARATOR; // full app id
           + "%s" + Path.SEPARATOR; // full app id
+  // Indicates when to force release a cache item even if there are active
+  // readers. Enlarge this factor may increase memory usage for the reader since
+  // there may be more cache items "hanging" in memory but not in cache.
+  private static final int CACHE_ITEM_OVERFLOW_FACTOR = 2;
 
 
   private YarnClient yarnClient;
   private YarnClient yarnClient;
   private TimelineStore summaryStore;
   private TimelineStore summaryStore;
@@ -172,7 +176,15 @@ public class EntityGroupFSTimelineStore extends CompositeService
               TimelineEntityGroupId groupId = eldest.getKey();
               TimelineEntityGroupId groupId = eldest.getKey();
               LOG.debug("Evicting {} due to space limitations", groupId);
               LOG.debug("Evicting {} due to space limitations", groupId);
               EntityCacheItem cacheItem = eldest.getValue();
               EntityCacheItem cacheItem = eldest.getValue();
-              cacheItem.releaseCache(groupId);
+              int activeStores = EntityCacheItem.getActiveStores();
+              if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) {
+                LOG.debug("Force release cache {} since {} stores are active",
+                    groupId, activeStores);
+                cacheItem.forceRelease();
+              } else {
+                LOG.debug("Try release cache {}", groupId);
+                cacheItem.tryRelease();
+              }
               if (cacheItem.getAppLogs().isDone()) {
               if (cacheItem.getAppLogs().isDone()) {
                 appIdLogMap.remove(groupId.getApplicationId());
                 appIdLogMap.remove(groupId.getApplicationId());
               }
               }
@@ -826,17 +838,19 @@ public class EntityGroupFSTimelineStore extends CompositeService
   @InterfaceAudience.Private
   @InterfaceAudience.Private
   @VisibleForTesting
   @VisibleForTesting
   void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
   void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
+    cacheItem.incrRefs();
     cachedLogs.put(groupId, cacheItem);
     cachedLogs.put(groupId, cacheItem);
   }
   }
 
 
   private List<TimelineStore> getTimelineStoresFromCacheIds(
   private List<TimelineStore> getTimelineStoresFromCacheIds(
-      Set<TimelineEntityGroupId> groupIds, String entityType)
+      Set<TimelineEntityGroupId> groupIds, String entityType,
+      List<EntityCacheItem> cacheItems)
       throws IOException {
       throws IOException {
     List<TimelineStore> stores = new LinkedList<TimelineStore>();
     List<TimelineStore> stores = new LinkedList<TimelineStore>();
     // For now we just handle one store in a context. We return the first
     // For now we just handle one store in a context. We return the first
     // non-null storage for the group ids.
     // non-null storage for the group ids.
     for (TimelineEntityGroupId groupId : groupIds) {
     for (TimelineEntityGroupId groupId : groupIds) {
-      TimelineStore storeForId = getCachedStore(groupId);
+      TimelineStore storeForId = getCachedStore(groupId, cacheItems);
       if (storeForId != null) {
       if (storeForId != null) {
         LOG.debug("Adding {} as a store for the query", storeForId.getName());
         LOG.debug("Adding {} as a store for the query", storeForId.getName());
         stores.add(storeForId);
         stores.add(storeForId);
@@ -851,8 +865,9 @@ public class EntityGroupFSTimelineStore extends CompositeService
     return stores;
     return stores;
   }
   }
 
 
-  private List<TimelineStore> getTimelineStoresForRead(String entityId,
-      String entityType) throws IOException {
+  protected List<TimelineStore> getTimelineStoresForRead(String entityId,
+      String entityType, List<EntityCacheItem> cacheItems)
+      throws IOException {
     Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
     for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
       LOG.debug("Trying plugin {} for id {} and type {}",
       LOG.debug("Trying plugin {} for id {} and type {}",
@@ -871,12 +886,12 @@ public class EntityGroupFSTimelineStore extends CompositeService
             cacheIdPlugin.getClass().getName());
             cacheIdPlugin.getClass().getName());
       }
       }
     }
     }
-    return getTimelineStoresFromCacheIds(groupIds, entityType);
+    return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
   }
   }
 
 
   private List<TimelineStore> getTimelineStoresForRead(String entityType,
   private List<TimelineStore> getTimelineStoresForRead(String entityType,
-      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
-      throws IOException {
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      List<EntityCacheItem> cacheItems) throws IOException {
     Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
     for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
       Set<TimelineEntityGroupId> idsFromPlugin =
       Set<TimelineEntityGroupId> idsFromPlugin =
@@ -888,24 +903,26 @@ public class EntityGroupFSTimelineStore extends CompositeService
         groupIds.addAll(idsFromPlugin);
         groupIds.addAll(idsFromPlugin);
       }
       }
     }
     }
-    return getTimelineStoresFromCacheIds(groupIds, entityType);
+    return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
   }
   }
 
 
   // find a cached timeline store or null if it cannot be located
   // find a cached timeline store or null if it cannot be located
-  private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
-      throws IOException {
+  private TimelineStore getCachedStore(TimelineEntityGroupId groupId,
+      List<EntityCacheItem> cacheItems) throws IOException {
     EntityCacheItem cacheItem;
     EntityCacheItem cacheItem;
     synchronized (this.cachedLogs) {
     synchronized (this.cachedLogs) {
       // Note that the content in the cache log storage may be stale.
       // Note that the content in the cache log storage may be stale.
       cacheItem = this.cachedLogs.get(groupId);
       cacheItem = this.cachedLogs.get(groupId);
       if (cacheItem == null) {
       if (cacheItem == null) {
         LOG.debug("Set up new cache item for id {}", groupId);
         LOG.debug("Set up new cache item for id {}", groupId);
-        cacheItem = new EntityCacheItem(getConfig(), fs);
+        cacheItem = new EntityCacheItem(groupId, getConfig(), fs);
         AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
         AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
         if (appLogs != null) {
         if (appLogs != null) {
           LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
           LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
           cacheItem.setAppLogs(appLogs);
           cacheItem.setAppLogs(appLogs);
           this.cachedLogs.put(groupId, cacheItem);
           this.cachedLogs.put(groupId, cacheItem);
+          // Add the reference by the cache
+          cacheItem.incrRefs();
         } else {
         } else {
           LOG.warn("AppLogs for groupId {} is set to null!", groupId);
           LOG.warn("AppLogs for groupId {} is set to null!", groupId);
         }
         }
@@ -915,30 +932,43 @@ public class EntityGroupFSTimelineStore extends CompositeService
     if (cacheItem.getAppLogs() != null) {
     if (cacheItem.getAppLogs() != null) {
       AppLogs appLogs = cacheItem.getAppLogs();
       AppLogs appLogs = cacheItem.getAppLogs();
       LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
       LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
-      store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
-          objMapper, metrics);
+      // Add the reference by the store
+      cacheItem.incrRefs();
+      cacheItems.add(cacheItem);
+      store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper,
+          metrics);
     } else {
     } else {
       LOG.warn("AppLogs for group id {} is null", groupId);
       LOG.warn("AppLogs for group id {} is null", groupId);
     }
     }
     return store;
     return store;
   }
   }
 
 
+  protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) {
+    for (EntityCacheItem item : relatedCacheItems) {
+      item.tryRelease();
+    }
+  }
+
   @Override
   @Override
   public TimelineEntities getEntities(String entityType, Long limit,
   public TimelineEntities getEntities(String entityType, Long limit,
       Long windowStart, Long windowEnd, String fromId, Long fromTs,
       Long windowStart, Long windowEnd, String fromId, Long fromTs,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
       EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
     LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
     LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
+    List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
     List<TimelineStore> stores = getTimelineStoresForRead(entityType,
     List<TimelineStore> stores = getTimelineStoresForRead(entityType,
-        primaryFilter, secondaryFilters);
+        primaryFilter, secondaryFilters, relatedCacheItems);
     TimelineEntities returnEntities = new TimelineEntities();
     TimelineEntities returnEntities = new TimelineEntities();
     for (TimelineStore store : stores) {
     for (TimelineStore store : stores) {
       LOG.debug("Try timeline store {} for the request", store.getName());
       LOG.debug("Try timeline store {} for the request", store.getName());
-      returnEntities.addEntities(
-          store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
-              fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
-              checkAcl).getEntities());
+      TimelineEntities entities = store.getEntities(entityType, limit,
+          windowStart, windowEnd, fromId, fromTs, primaryFilter,
+          secondaryFilters, fieldsToRetrieve, checkAcl);
+      if (entities != null) {
+        returnEntities.addEntities(entities.getEntities());
+      }
     }
     }
+    tryReleaseCacheItems(relatedCacheItems);
     return returnEntities;
     return returnEntities;
   }
   }
 
 
@@ -946,17 +976,21 @@ public class EntityGroupFSTimelineStore extends CompositeService
   public TimelineEntity getEntity(String entityId, String entityType,
   public TimelineEntity getEntity(String entityId, String entityType,
       EnumSet<Field> fieldsToRetrieve) throws IOException {
       EnumSet<Field> fieldsToRetrieve) throws IOException {
     LOG.debug("getEntity type={} id={}", entityType, entityId);
     LOG.debug("getEntity type={} id={}", entityType, entityId);
-    List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
+    List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
+    List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType,
+        relatedCacheItems);
     for (TimelineStore store : stores) {
     for (TimelineStore store : stores) {
       LOG.debug("Try timeline store {}:{} for the request", store.getName(),
       LOG.debug("Try timeline store {}:{} for the request", store.getName(),
           store.toString());
           store.toString());
       TimelineEntity e =
       TimelineEntity e =
           store.getEntity(entityId, entityType, fieldsToRetrieve);
           store.getEntity(entityId, entityType, fieldsToRetrieve);
       if (e != null) {
       if (e != null) {
+        tryReleaseCacheItems(relatedCacheItems);
         return e;
         return e;
       }
       }
     }
     }
     LOG.debug("getEntity: Found nothing");
     LOG.debug("getEntity: Found nothing");
+    tryReleaseCacheItems(relatedCacheItems);
     return null;
     return null;
   }
   }
 
 
@@ -966,10 +1000,11 @@ public class EntityGroupFSTimelineStore extends CompositeService
       Long windowEnd, Set<String> eventTypes) throws IOException {
       Long windowEnd, Set<String> eventTypes) throws IOException {
     LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
     LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
     TimelineEvents returnEvents = new TimelineEvents();
     TimelineEvents returnEvents = new TimelineEvents();
+    List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
     for (String entityId : entityIds) {
     for (String entityId : entityIds) {
       LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
       LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
       List<TimelineStore> stores
       List<TimelineStore> stores
-          = getTimelineStoresForRead(entityId, entityType);
+          = getTimelineStoresForRead(entityId, entityType, relatedCacheItems);
       for (TimelineStore store : stores) {
       for (TimelineStore store : stores) {
         LOG.debug("Try timeline store {}:{} for the request", store.getName(),
         LOG.debug("Try timeline store {}:{} for the request", store.getName(),
             store.toString());
             store.toString());
@@ -978,9 +1013,12 @@ public class EntityGroupFSTimelineStore extends CompositeService
         TimelineEvents events =
         TimelineEvents events =
             store.getEntityTimelines(entityType, entityIdSet, limit,
             store.getEntityTimelines(entityType, entityIdSet, limit,
                 windowStart, windowEnd, eventTypes);
                 windowStart, windowEnd, eventTypes);
-        returnEvents.addEvents(events.getAllEvents());
+        if (events != null) {
+          returnEvents.addEvents(events.getAllEvents());
+        }
       }
       }
     }
     }
+    tryReleaseCacheItems(relatedCacheItems);
     return returnEvents;
     return returnEvents;
   }
   }
 
 

+ 11 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java

@@ -18,7 +18,9 @@
 package org.apache.hadoop.yarn.server.timeline;
 package org.apache.hadoop.yarn.server.timeline;
 
 
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Set;
 import java.util.Set;
@@ -26,31 +28,32 @@ import java.util.SortedSet;
 
 
 class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
 class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
 
 
-  private static TimelineEntityGroupId timelineEntityGroupId
-      = TimelineEntityGroupId.newInstance(
-      TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
+  static final String APP_ID_FILTER_NAME = "appid";
 
 
   @Override
   @Override
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
       NameValuePair primaryFilter,
       NameValuePair primaryFilter,
       Collection<NameValuePair> secondaryFilters) {
       Collection<NameValuePair> secondaryFilters) {
-    return Sets.newHashSet(timelineEntityGroupId);
+    ApplicationId appId
+        = ConverterUtils.toApplicationId(primaryFilter.getValue().toString());
+    return Sets.newHashSet(getStandardTimelineGroupId(appId));
   }
   }
 
 
   @Override
   @Override
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
       String entityType) {
       String entityType) {
-    return Sets.newHashSet(timelineEntityGroupId);
+    ApplicationId appId = ConverterUtils.toApplicationId(entityId);
+    return Sets.newHashSet(getStandardTimelineGroupId(appId));
   }
   }
 
 
   @Override
   @Override
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
       SortedSet<String> entityIds,
       SortedSet<String> entityIds,
       Set<String> eventTypes) {
       Set<String> eventTypes) {
-    return Sets.newHashSet(timelineEntityGroupId);
+    return Sets.newHashSet();
   }
   }
 
 
-  static TimelineEntityGroupId getStandardTimelineGroupId() {
-    return timelineEntityGroupId;
+  static TimelineEntityGroupId getStandardTimelineGroupId(ApplicationId appId) {
+    return TimelineEntityGroupId.newInstance(appId, "test");
   }
   }
 }
 }

+ 197 - 51
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java

@@ -44,8 +44,17 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.junit.rules.TestName;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 
 
 import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
 import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -53,24 +62,15 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
 
-  private static final String SAMPLE_APP_NAME = "1234_5678";
+  private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000";
+  private static final int CACHE_TEST_CACHE_SIZE = 5;
 
 
-  static final ApplicationId TEST_APPLICATION_ID
-      = ConverterUtils.toApplicationId(
-      ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
-
-  private static final String TEST_APP_DIR_NAME
-      = TEST_APPLICATION_ID.toString();
-  private static final String TEST_ATTEMPT_DIR_NAME
-      = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
   private static final String TEST_SUMMARY_LOG_FILE_NAME
   private static final String TEST_SUMMARY_LOG_FILE_NAME
       = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
       = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
-  private static final String TEST_ENTITY_LOG_FILE_NAME
-      = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
-          + EntityGroupPlugInForTest.getStandardTimelineGroupId();
   private static final String TEST_DOMAIN_LOG_FILE_NAME
   private static final String TEST_DOMAIN_LOG_FILE_NAME
       = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
       = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
 
 
@@ -78,9 +78,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
       = new Path(System.getProperty("test.build.data",
       = new Path(System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")),
           System.getProperty("java.io.tmpdir")),
       TestEntityGroupFSTimelineStore.class.getSimpleName());
       TestEntityGroupFSTimelineStore.class.getSimpleName());
-  private static Path testAppDirPath;
-  private static Path testAttemptDirPath;
-  private static Path testDoneDirPath;
 
 
   private static Configuration config = new YarnConfiguration();
   private static Configuration config = new YarnConfiguration();
   private static MiniDFSCluster hdfsCluster;
   private static MiniDFSCluster hdfsCluster;
@@ -88,7 +85,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   private static FileContext fc;
   private static FileContext fc;
   private static FileContextTestHelper fileContextTestHelper =
   private static FileContextTestHelper fileContextTestHelper =
       new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore");
       new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore");
-  private EntityGroupFSTimelineStore store;
+
+  private static List<ApplicationId> sampleAppIds;
+  private static ApplicationId mainTestAppId;
+  private static Path mainTestAppDirPath;
+  private static Path testDoneDirPath;
+  private static String mainEntityLogFileName;
+
+  private EntityGroupFSTimelineStoreForTest store;
   private TimelineEntity entityNew;
   private TimelineEntity entityNew;
 
 
   @Rule
   @Rule
@@ -101,23 +105,44 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         YarnConfiguration
         YarnConfiguration
             .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
             .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
         "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
         "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
+    config.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
+        CACHE_TEST_CACHE_SIZE);
     config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
     config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
     HdfsConfiguration hdfsConfig = new HdfsConfiguration();
     HdfsConfiguration hdfsConfig = new HdfsConfiguration();
     hdfsCluster
     hdfsCluster
         = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
         = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
     fs = hdfsCluster.getFileSystem();
     fs = hdfsCluster.getFileSystem();
     fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
     fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
-    testAppDirPath = getTestRootPath(TEST_APPLICATION_ID.toString());
-    testAttemptDirPath = new Path(testAppDirPath, TEST_ATTEMPT_DIR_NAME);
-    testDoneDirPath = getTestRootPath("done");
-    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, testDoneDirPath.toString());
 
 
+    sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1);
+    for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) {
+      ApplicationId appId = ConverterUtils.toApplicationId(
+          ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST
+              + i);
+      sampleAppIds.add(appId);
+    }
+    // Among all sample applicationIds, choose the first one for most of the
+    // tests.
+    mainTestAppId = sampleAppIds.get(0);
+    mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
+    mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+          + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
+
+    testDoneDirPath = getTestRootPath("done");
+    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+        testDoneDirPath.toString());
   }
   }
 
 
   @Before
   @Before
   public void setup() throws Exception {
   public void setup() throws Exception {
-    createTestFiles();
-    store = new EntityGroupFSTimelineStore();
+    for (ApplicationId appId : sampleAppIds) {
+      Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
+          getAttemptDirName(appId));
+      createTestFiles(appId, attemotDirPath);
+    }
+
+    store = new EntityGroupFSTimelineStoreForTest();
     if (currTestName.getMethodName().contains("Plugin")) {
     if (currTestName.getMethodName().contains("Plugin")) {
       config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
       config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
           EntityGroupPlugInForTest.class.getName());
           EntityGroupPlugInForTest.class.getName());
@@ -130,7 +155,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   @After
   @After
   public void tearDown() throws Exception {
   public void tearDown() throws Exception {
     store.stop();
     store.stop();
-    fs.delete(testAppDirPath, true);
+    for (ApplicationId appId : sampleAppIds) {
+      fs.delete(getTestRootPath(appId.toString()), true);
+    }
   }
   }
 
 
   @AfterClass
   @AfterClass
@@ -144,7 +171,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   @Test
   @Test
   public void testAppLogsScanLogs() throws Exception {
   public void testAppLogsScanLogs() throws Exception {
     EntityGroupFSTimelineStore.AppLogs appLogs =
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
         AppState.COMPLETED);
     appLogs.scanForLogs();
     appLogs.scanForLogs();
     List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
     List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
@@ -160,14 +187,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
 
     for (LogInfo log : detailLogs) {
     for (LogInfo log : detailLogs) {
       String fileName = log.getFilename();
       String fileName = log.getFilename();
-      assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
+      assertEquals(fileName, mainEntityLogFileName);
     }
     }
   }
   }
 
 
   @Test
   @Test
   public void testMoveToDone() throws Exception {
   public void testMoveToDone() throws Exception {
     EntityGroupFSTimelineStore.AppLogs appLogs =
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
         AppState.COMPLETED);
     Path pathBefore = appLogs.getAppDirPath();
     Path pathBefore = appLogs.getAppDirPath();
     appLogs.moveToDone();
     appLogs.moveToDone();
@@ -182,7 +209,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
     MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
     long beforeScan = scanned.value();
     long beforeScan = scanned.value();
     EntityGroupFSTimelineStore.AppLogs appLogs =
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
         AppState.COMPLETED);
     appLogs.scanForLogs();
     appLogs.scanForLogs();
     appLogs.parseSummaryLogs(tdm);
     appLogs.parseSummaryLogs(tdm);
@@ -194,6 +221,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   public void testCleanLogs() throws Exception {
   public void testCleanLogs() throws Exception {
     // Create test dirs and files
     // Create test dirs and files
     // Irrelevant file, should not be reclaimed
     // Irrelevant file, should not be reclaimed
+    String appDirName = mainTestAppId.toString();
+    String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix
+        + appDirName + "_1";
     Path irrelevantFilePath = new Path(
     Path irrelevantFilePath = new Path(
             testDoneDirPath, "irrelevant.log");
             testDoneDirPath, "irrelevant.log");
     FSDataOutputStream stream = fs.create(irrelevantFilePath);
     FSDataOutputStream stream = fs.create(irrelevantFilePath);
@@ -204,29 +234,29 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
 
     Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001");
     Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001");
     // First application, untouched after creation
     // First application, untouched after creation
-    Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
-    Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
+    Path appDirClean = new Path(doneAppHomeDir, appDirName);
+    Path attemptDirClean = new Path(appDirClean, attemptDirName);
     fs.mkdirs(attemptDirClean);
     fs.mkdirs(attemptDirClean);
     Path filePath = new Path(attemptDirClean, "test.log");
     Path filePath = new Path(attemptDirClean, "test.log");
     stream = fs.create(filePath);
     stream = fs.create(filePath);
     stream.close();
     stream.close();
     // Second application, one file touched after creation
     // Second application, one file touched after creation
-    Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
+    Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1");
     Path attemptDirHoldByFile
     Path attemptDirHoldByFile
-        = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
+        = new Path(appDirHoldByFile, attemptDirName);
     fs.mkdirs(attemptDirHoldByFile);
     fs.mkdirs(attemptDirHoldByFile);
     Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
     Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
     stream = fs.create(filePathHold);
     stream = fs.create(filePathHold);
     stream.close();
     stream.close();
     // Third application, one dir touched after creation
     // Third application, one dir touched after creation
-    Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
-    Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
+    Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2");
+    Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName);
     fs.mkdirs(attemptDirHoldByDir);
     fs.mkdirs(attemptDirHoldByDir);
     Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
     Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
     fs.mkdirs(dirPathHold);
     fs.mkdirs(dirPathHold);
     // Fourth application, empty dirs
     // Fourth application, empty dirs
-    Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
-    Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
+    Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3");
+    Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName);
     fs.mkdirs(attemptDirEmpty);
     fs.mkdirs(attemptDirEmpty);
     Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
     Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
     fs.mkdirs(dirPathEmpty);
     fs.mkdirs(dirPathEmpty);
@@ -274,12 +304,15 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
             YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
             YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
     // Load data and cache item, prepare timeline store by making a cache item
     // Load data and cache item, prepare timeline store by making a cache item
     EntityGroupFSTimelineStore.AppLogs appLogs =
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
         AppState.COMPLETED);
-    EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
+    EntityCacheItem cacheItem = new EntityCacheItem(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        config, fs);
     cacheItem.setAppLogs(appLogs);
     cacheItem.setAppLogs(appLogs);
     store.setCachedLogs(
     store.setCachedLogs(
-        EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        cacheItem);
     MutableCounterLong detailLogEntityRead =
     MutableCounterLong detailLogEntityRead =
         store.metrics.getGetEntityToDetailOps();
         store.metrics.getGetEntityToDetailOps();
     MutableStat cacheRefresh = store.metrics.getCacheRefresh();
     MutableStat cacheRefresh = store.metrics.getCacheRefresh();
@@ -291,16 +324,20 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         = PluginStoreTestUtils.getTdmWithStore(config, store);
         = PluginStoreTestUtils.getTdmWithStore(config, store);
 
 
     // Verify single entity read
     // Verify single entity read
-    TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
+    TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(),
         EnumSet.allOf(TimelineReader.Field.class),
         EnumSet.allOf(TimelineReader.Field.class),
         UserGroupInformation.getLoginUser());
         UserGroupInformation.getLoginUser());
     assertNotNull(entity3);
     assertNotNull(entity3);
     assertEquals(entityNew.getStartTime(), entity3.getStartTime());
     assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+    assertEquals(1, cacheItem.getRefCount());
+    assertEquals(1, EntityCacheItem.getActiveStores());
     // Verify multiple entities read
     // Verify multiple entities read
-    TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
-        null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+    NameValuePair primaryFilter = new NameValuePair(
+        EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
+    TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null,
+        null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
         UserGroupInformation.getLoginUser());
         UserGroupInformation.getLoginUser());
-    assertEquals(entities.getEntities().size(), 1);
+    assertEquals(1, entities.getEntities().size());
     for (TimelineEntity entity : entities.getEntities()) {
     for (TimelineEntity entity : entities.getEntities()) {
       assertEquals(entityNew.getStartTime(), entity.getStartTime());
       assertEquals(entityNew.getStartTime(), entity.getStartTime());
     }
     }
@@ -309,11 +346,79 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
     assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
   }
   }
 
 
+  @Test(timeout = 90000L)
+  public void testMultiplePluginRead() throws Exception {
+    Thread mainThread = Thread.currentThread();
+    mainThread.setName("testMain");
+    // Verify precondition
+    assertEquals(EntityGroupPlugInForTest.class.getName(),
+        store.getConfig().get(
+            YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
+    // Prepare timeline store by making cache items
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
+            AppState.COMPLETED);
+    final EntityCacheItem cacheItem = new EntityCacheItem(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        config, fs);
+
+    cacheItem.setAppLogs(appLogs);
+    store.setCachedLogs(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        cacheItem);
+
+    // Launch the blocking read call in a future
+    ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
+    FutureTask<TimelineEntity> blockingReader =
+        new FutureTask<>(new Callable<TimelineEntity>() {
+          public TimelineEntity call() throws Exception {
+            Thread currThread = Thread.currentThread();
+            currThread.setName("blockingReader");
+            return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
+                EnumSet.allOf(TimelineReader.Field.class));
+          }});
+    threadExecutor.execute(blockingReader);
+    try {
+      while (!store.testCacheReferenced) {
+        Thread.sleep(300);
+      }
+    } catch (InterruptedException e) {
+      fail("Interrupted on exception " + e);
+    }
+    // Try refill the cache after the first cache item is referenced
+    for (ApplicationId appId : sampleAppIds) {
+      // Skip the first appId since it's already in cache
+      if (appId.equals(mainTestAppId)) {
+        continue;
+      }
+      EntityGroupFSTimelineStore.AppLogs currAppLog =
+          store.new AppLogs(appId, getTestRootPath(appId.toString()),
+              AppState.COMPLETED);
+      EntityCacheItem item = new EntityCacheItem(
+          EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
+          config, fs);
+      item.setAppLogs(currAppLog);
+      store.setCachedLogs(
+          EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
+          item);
+    }
+    // At this time, the cache item of the blocking reader should be evicted.
+    assertEquals(1, cacheItem.getRefCount());
+    store.testCanProceed = true;
+    TimelineEntity entity3 = blockingReader.get();
+
+    assertNotNull(entity3);
+    assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+    assertEquals(0, cacheItem.getRefCount());
+
+    threadExecutor.shutdownNow();
+  }
+
   @Test
   @Test
   public void testSummaryRead() throws Exception {
   public void testSummaryRead() throws Exception {
     // Load data
     // Load data
     EntityGroupFSTimelineStore.AppLogs appLogs =
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
         AppState.COMPLETED);
     MutableCounterLong summaryLogEntityRead
     MutableCounterLong summaryLogEntityRead
         = store.metrics.getGetEntityToSummaryOps();
         = store.metrics.getGetEntityToSummaryOps();
@@ -331,28 +436,32 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         UserGroupInformation.getLoginUser());
         UserGroupInformation.getLoginUser());
     assertEquals(entities.getEntities().size(), 1);
     assertEquals(entities.getEntities().size(), 1);
     for (TimelineEntity entity : entities.getEntities()) {
     for (TimelineEntity entity : entities.getEntities()) {
-      assertEquals((Long) 123l, entity.getStartTime());
+      assertEquals((Long) 123L, entity.getStartTime());
     }
     }
     // Verify metrics
     // Verify metrics
     assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
     assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
 
 
   }
   }
 
 
-  private void createTestFiles() throws IOException {
+  private void createTestFiles(ApplicationId appId, Path attemptDirPath)
+      throws IOException {
     TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
     TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
     PluginStoreTestUtils.writeEntities(entities,
     PluginStoreTestUtils.writeEntities(entities,
-        new Path(testAttemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
-
+        new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
+    Map<String, Set<Object>> primaryFilters = new HashMap<>();
+    Set<Object> appSet = new HashSet<Object>();
+    appSet.add(appId.toString());
+    primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet);
     entityNew = PluginStoreTestUtils
     entityNew = PluginStoreTestUtils
-        .createEntity("id_3", "type_3", 789l, null, null,
-            null, null, "domain_id_1");
+        .createEntity(appId.toString(), "type_3", 789L, null, null,
+            primaryFilters, null, "domain_id_1");
     TimelineEntities entityList = new TimelineEntities();
     TimelineEntities entityList = new TimelineEntities();
     entityList.addEntity(entityNew);
     entityList.addEntity(entityNew);
     PluginStoreTestUtils.writeEntities(entityList,
     PluginStoreTestUtils.writeEntities(entityList,
-        new Path(testAttemptDirPath, TEST_ENTITY_LOG_FILE_NAME), fs);
+        new Path(attemptDirPath, mainEntityLogFileName), fs);
 
 
     FSDataOutputStream out = fs.create(
     FSDataOutputStream out = fs.create(
-        new Path(testAttemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
+        new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
     out.close();
     out.close();
   }
   }
 
 
@@ -360,4 +469,41 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     return fileContextTestHelper.getTestRootPath(fc, pathString);
     return fileContextTestHelper.getTestRootPath(fc, pathString);
   }
   }
 
 
+  private static String getAttemptDirName(ApplicationId appId) {
+    return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
+  }
+
+  private static class EntityGroupFSTimelineStoreForTest
+      extends EntityGroupFSTimelineStore {
+    // Flags used for the concurrent testing environment
+    private volatile boolean testCanProceed = false;
+    private volatile boolean testCacheReferenced = false;
+
+    TimelineEntity getEntityBlocking(String entityId, String entityType,
+        EnumSet<Field> fieldsToRetrieve) throws IOException {
+      List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
+      List<TimelineStore> stores = getTimelineStoresForRead(entityId,
+          entityType, relatedCacheItems);
+
+      testCacheReferenced = true;
+      try {
+        while (!testCanProceed) {
+          Thread.sleep(1000);
+        }
+      } catch (InterruptedException e) {
+        fail("Interrupted " + e);
+      }
+
+      for (TimelineStore store : stores) {
+        TimelineEntity e =
+            store.getEntity(entityId, entityType, fieldsToRetrieve);
+        if (e != null) {
+          tryReleaseCacheItems(relatedCacheItems);
+          return e;
+        }
+      }
+      tryReleaseCacheItems(relatedCacheItems);
+      return null;
+    }
+  }
 }
 }