Browse Source

YARN-2180. [YARN-1492] In-memory backing store for cache manager. (Chris Trezzo via kasha)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
4f426fe223
11 tập tin đã thay đổi với 1275 bổ sung2 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
  5. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
  6. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
  7. 514 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
  8. 133 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
  9. 64 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java
  10. 86 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java
  11. 334 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -36,6 +36,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2179. [YARN-1492] Initial cache manager structure and context. 
     (Chris Trezzo via kasha) 
 
+    YARN-2180. [YARN-1492] In-memory backing store for cache manager. 
+    (Chris Trezzo via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1332,6 +1332,49 @@ public class YarnConfiguration extends Configuration {
       SHARED_CACHE_PREFIX + "nested-level";
   public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
   
+  // Shared Cache Manager Configs
+
+  public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store.";
+
+  public static final String SCM_STORE_CLASS = SCM_STORE_PREFIX + "class";
+  public static final String DEFAULT_SCM_STORE_CLASS =
+      "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore";
+
+  public static final String SCM_APP_CHECKER_CLASS = SHARED_CACHE_PREFIX
+      + "app-checker.class";
+  public static final String DEFAULT_SCM_APP_CHECKER_CLASS =
+      "org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker";
+
+  // In-memory SCM store configuration
+  
+  public static final String IN_MEMORY_STORE_PREFIX =
+      SHARED_CACHE_PREFIX + "in-memory.";
+
+  /**
+   * A resource in the InMemorySCMStore is considered stale if the time since
+   * the last reference exceeds the staleness period. This value is specified in
+   * minutes.
+   */
+  public static final String IN_MEMORY_STALENESS_PERIOD =
+      IN_MEMORY_STORE_PREFIX + "staleness-period";
+  public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
+
+  /**
+   * Initial delay before the in-memory store runs its first check to remove
+   * dead initial applications. Specified in minutes.
+   */
+  public static final String IN_MEMORY_INITIAL_DELAY =
+      IN_MEMORY_STORE_PREFIX + "initial-delay";
+  public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
+  
+  /**
+   * The frequency at which the in-memory store checks to remove dead initial
+   * applications. Specified in minutes.
+   */
+  public static final String IN_MEMORY_CHECK_PERIOD =
+      IN_MEMORY_STORE_PREFIX + "check-period";
+  public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1342,6 +1342,40 @@
     <value>3</value>
   </property>
 
+  <property>
+    <description>The implementation to be used for the SCM store</description>
+    <name>yarn.sharedcache.store.class</name>
+    <value>org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore</value>
+  </property>
+
+  <property>
+    <description>The implementation to be used for the SCM app-checker</description>
+    <name>yarn.sharedcache.app-checker.class</name>
+    <value>org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker</value>
+  </property>
+  
+  <property>
+    <description>A resource in the in-memory store is considered stale
+    if the time since the last reference exceeds the staleness period.
+    This value is specified in minutes.</description>
+    <name>yarn.sharedcache.store.in-memory.staleness-period</name>
+    <value>10080</value>
+  </property>
+  
+  <property>
+    <description>Initial delay before the in-memory store runs its first check
+    to remove dead initial applications. Specified in minutes.</description>
+    <name>yarn.sharedcache.store.in-memory.initial-delay</name>
+    <value>10</value>
+  </property>
+  
+  <property>
+    <description>The frequency at which the in-memory store checks to remove
+    dead initial applications. Specified in minutes.</description>
+    <name>yarn.sharedcache.store.in-memory.check-period</name>
+    <value>720</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java

@@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
  */
 @Private
 @Unstable
-public class SharedCacheStructureUtil {
+public class SharedCacheUtil {
 
-  private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class);
+  private static final Log LOG = LogFactory.getLog(SharedCacheUtil.class);
 
   @Private
   public static int getCacheDepth(Configuration conf) {

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml

@@ -42,6 +42,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-client</artifactId>

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java

@@ -26,10 +26,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * This service maintains the shared cache meta data. It handles claiming and
@@ -47,12 +52,18 @@ public class SharedCacheManager extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
 
+  private SCMStore store;
+
   public SharedCacheManager() {
     super("SharedCacheManager");
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+
+    this.store = createSCMStoreService(conf);
+    addService(store);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -60,6 +71,25 @@ public class SharedCacheManager extends CompositeService {
     super.serviceInit(conf);
   }
 
+  @SuppressWarnings("unchecked")
+  private static SCMStore createSCMStoreService(Configuration conf) {
+    Class<? extends SCMStore> defaultStoreClass;
+    try {
+      defaultStoreClass =
+          (Class<? extends SCMStore>) Class
+              .forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS);
+    } catch (Exception e) {
+      throw new YarnRuntimeException("Invalid default scm store class"
+          + YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e);
+    }
+
+    SCMStore store =
+        ReflectionUtils.newInstance(conf.getClass(
+            YarnConfiguration.SCM_STORE_CLASS,
+            defaultStoreClass, SCMStore.class), conf);
+    return store;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 
@@ -67,6 +97,14 @@ public class SharedCacheManager extends CompositeService {
     super.serviceStop();
   }
 
+  /**
+   * For testing purposes only.
+   */
+  @VisibleForTesting
+  SCMStore getSCMStore() {
+    return this.store;
+  }
+
   public static void main(String[] args) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
@@ -83,4 +121,24 @@ public class SharedCacheManager extends CompositeService {
       System.exit(-1);
     }
   }
+
+  @Private
+  @SuppressWarnings("unchecked")
+  public static AppChecker createAppCheckerService(Configuration conf) {
+    Class<? extends AppChecker> defaultCheckerClass;
+    try {
+      defaultCheckerClass =
+          (Class<? extends AppChecker>) Class
+              .forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
+    } catch (Exception e) {
+      throw new YarnRuntimeException("Invalid default scm app checker class"
+          + YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
+    }
+
+    AppChecker checker =
+        ReflectionUtils.newInstance(conf.getClass(
+            YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
+            AppChecker.class), conf);
+    return checker;
+  }
 }

+ 514 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java

@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
+import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A thread safe version of an in-memory SCM store. The thread safety is
+ * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap
+ * is used to allow concurrency to resources and their associated references,
+ * and (2) a key level lock is used to ensure mutual exclusion between any
+ * operation that accesses a resource with the same key. <br>
+ * <br>
+ * To ensure safe key-level locking, we use the original string key and intern
+ * it weakly using hadoop's <code>StringInterner</code>. It avoids the pitfalls
+ * of using built-in String interning. The interned strings are also weakly
+ * referenced, so it can be garbage collected once it is done. And there is
+ * little risk of keys being available for other parts of the code so they can
+ * be used as locks accidentally. <br>
+ * <br>
+ * Resources in the in-memory store are evicted based on a time staleness
+ * criteria. If a resource is not referenced (i.e. used) for a given period, it
+ * is designated as a stale resource and is considered evictable.
+ */
+@Private
+@Evolving
+public class InMemorySCMStore extends SCMStore {
+  private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
+
+  private final Map<String, SharedCacheResource> cachedResources =
+      new ConcurrentHashMap<String, SharedCacheResource>();
+  private Collection<ApplicationId> initialApps =
+      new ArrayList<ApplicationId>();
+  private final Object initialAppsLock = new Object();
+  private long startTime;
+  private int stalenessMinutes;
+  private AppChecker appChecker;
+  private ScheduledExecutorService scheduler;
+  private int initialDelayMin;
+  private int checkPeriodMin;
+
+  public InMemorySCMStore() {
+    super(InMemorySCMStore.class.getName());
+  }
+
+  private String intern(String key) {
+    return StringInterner.weakIntern(key);
+  }
+
+  /**
+   * The in-memory store bootstraps itself from the shared cache entries that
+   * exist in HDFS.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    this.startTime = System.currentTimeMillis();
+    this.initialDelayMin = getInitialDelay(conf);
+    this.checkPeriodMin = getCheckPeriod(conf);
+    this.stalenessMinutes = getStalenessPeriod(conf);
+
+    appChecker = createAppCheckerService(conf);
+    addService(appChecker);
+
+    bootstrap(conf);
+
+    ThreadFactory tf =
+        new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
+            .build();
+    scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // start composed services first
+    super.serviceStart();
+
+    // Get initial list of running applications
+    LOG.info("Getting the active app list to initialize the in-memory scm store");
+    synchronized (initialAppsLock) {
+      initialApps = appChecker.getActiveApplications();
+    }
+    LOG.info(initialApps.size() + " apps recorded as active at this time");
+
+    Runnable task = new AppCheckTask(appChecker);
+    scheduler.scheduleAtFixedRate(task, initialDelayMin, checkPeriodMin,
+        TimeUnit.MINUTES);
+    LOG.info("Scheduled the in-memory scm store app check task to run every "
+        + checkPeriodMin + " minutes.");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Shutting down the background thread.");
+    scheduler.shutdownNow();
+    try {
+      if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.warn("Gave up waiting for the app check task to shutdown.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("The InMemorySCMStore was interrupted while shutting down the "
+          + "app check task.", e);
+    }
+    LOG.info("The background thread stopped.");
+
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  AppChecker createAppCheckerService(Configuration conf) {
+    return SharedCacheManager.createAppCheckerService(conf);
+  }
+
+  private void bootstrap(Configuration conf) throws IOException {
+    Map<String, String> initialCachedResources =
+        getInitialCachedResources(FileSystem.get(conf), conf);
+    LOG.info("Bootstrapping from " + initialCachedResources.size()
+        + " cache resources located in the file system");
+    Iterator<Map.Entry<String, String>> it =
+        initialCachedResources.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, String> e = it.next();
+      String key = intern(e.getKey());
+      String fileName = e.getValue();
+      SharedCacheResource resource = new SharedCacheResource(fileName);
+      // we don't hold the lock for this as it is done as part of serviceInit
+      cachedResources.put(key, resource);
+      // clear out the initial resource to reduce the footprint
+      it.remove();
+    }
+    LOG.info("Bootstrapping complete");
+  }
+
+  @VisibleForTesting
+  Map<String, String> getInitialCachedResources(FileSystem fs,
+      Configuration conf) throws IOException {
+    // get the root directory for the shared cache
+    String location =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+    Path root = new Path(location);
+    if (!fs.exists(root)) {
+      String message =
+          "The shared cache root directory " + location + " was not found";
+      LOG.error(message);
+      throw new IOException(message);
+    }
+
+    int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+    // now traverse individual directories and process them
+    // the directory structure is specified by the nested level parameter
+    // (e.g. 9/c/d/<checksum>/file)
+    StringBuilder pattern = new StringBuilder();
+    for (int i = 0; i < nestedLevel + 1; i++) {
+      pattern.append("*/");
+    }
+    pattern.append("*");
+
+    LOG.info("Querying for all individual cached resource files");
+    FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+    int numEntries = entries == null ? 0 : entries.length;
+    LOG.info("Found " + numEntries + " files: processing for one resource per "
+        + "key");
+
+    Map<String, String> initialCachedEntries = new HashMap<String, String>();
+    if (entries != null) {
+      for (FileStatus entry : entries) {
+        Path file = entry.getPath();
+        String fileName = file.getName();
+        if (entry.isFile()) {
+          // get the parent to get the checksum
+          Path parent = file.getParent();
+          if (parent != null) {
+            // the name of the immediate parent directory is the checksum
+            String key = parent.getName();
+            // make sure we insert only one file per checksum whichever comes
+            // first
+            if (initialCachedEntries.containsKey(key)) {
+              LOG.warn("Key " + key + " is already mapped to file "
+                  + initialCachedEntries.get(key) + "; file " + fileName
+                  + " will not be added");
+            } else {
+              initialCachedEntries.put(key, fileName);
+            }
+          }
+        }
+      }
+    }
+    LOG.info("A total of " + initialCachedEntries.size()
+        + " files are now mapped");
+    return initialCachedEntries;
+  }
+
+  /**
+   * Adds the given resource to the store under the key and the filename. If the
+   * entry is already found, it returns the existing filename. It represents the
+   * state of the store at the time of this query. The entry may change or even
+   * be removed once this method returns. The caller should be prepared to
+   * handle that situation.
+   * 
+   * @return the filename of the newly inserted resource or that of the existing
+   *         resource
+   */
+  @Override
+  public String addResource(String key, String fileName) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) {
+        resource = new SharedCacheResource(fileName);
+        cachedResources.put(interned, resource);
+      }
+      return resource.getFileName();
+    }
+  }
+
+  /**
+   * Adds the provided resource reference to the cache resource under the key,
+   * and updates the access time. If it returns a non-null value, the caller may
+   * safely assume that the resource will not be removed at least until the app
+   * in this resource reference has terminated.
+   * 
+   * @return the filename of the resource, or null if the resource is not found
+   */
+  @Override
+  public String addResourceReference(String key,
+      SharedCacheResourceReference ref) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) { // it's not mapped
+        return null;
+      }
+      resource.addReference(ref);
+      resource.updateAccessTime();
+      return resource.getFileName();
+    }
+  }
+
+  /**
+   * Returns the list of resource references currently registered under the
+   * cache entry. If the list is empty, it returns an empty collection. The
+   * returned collection is unmodifiable and a snapshot of the information at
+   * the time of the query. The state may change after this query returns. The
+   * caller should handle the situation that some or all of these resource
+   * references are no longer relevant.
+   * 
+   * @return the collection that contains the resource references associated
+   *         with the resource; or an empty collection if no resource references
+   *         are registered under this resource
+   */
+  @Override
+  public Collection<SharedCacheResourceReference> getResourceReferences(String key) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) {
+        return Collections.emptySet();
+      }
+      Set<SharedCacheResourceReference> refs =
+          new HashSet<SharedCacheResourceReference>(
+              resource.getResourceReferences());
+      return Collections.unmodifiableSet(refs);
+    }
+  }
+
+  /**
+   * Removes the provided resource reference from the resource. If the resource
+   * does not exist, nothing will be done.
+   */
+  @Override
+  public boolean removeResourceReference(String key, SharedCacheResourceReference ref,
+      boolean updateAccessTime) {
+    String interned = intern(key);
+    synchronized (interned) {
+      boolean removed = false;
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource != null) {
+        Set<SharedCacheResourceReference> resourceRefs =
+            resource.getResourceReferences();
+        removed = resourceRefs.remove(ref);
+        if (updateAccessTime) {
+          resource.updateAccessTime();
+        }
+      }
+      return removed;
+    }
+  }
+
+  /**
+   * Removes the provided collection of resource references from the resource.
+   * If the resource does not exist, nothing will be done.
+   */
+  @Override
+  public void removeResourceReferences(String key,
+      Collection<SharedCacheResourceReference> refs, boolean updateAccessTime) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource != null) {
+        Set<SharedCacheResourceReference> resourceRefs =
+            resource.getResourceReferences();
+        resourceRefs.removeAll(refs);
+        if (updateAccessTime) {
+          resource.updateAccessTime();
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the given resource from the store. Returns true if the resource is
+   * found and removed or if the resource is not found. Returns false if it was
+   * unable to remove the resource because the resource reference list was not
+   * empty.
+   */
+  @Override
+  public boolean removeResource(String key) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      if (resource == null) {
+        return true;
+      }
+
+      if (!resource.getResourceReferences().isEmpty()) {
+        return false;
+      }
+      // no users
+      cachedResources.remove(interned);
+      return true;
+    }
+  }
+
+  /**
+   * Obtains the access time for a resource. It represents the view of the
+   * resource at the time of the query. The value may have been updated at a
+   * later point.
+   * 
+   * @return the access time of the resource if found; -1 if the resource is not
+   *         found
+   */
+  @VisibleForTesting
+  long getAccessTime(String key) {
+    String interned = intern(key);
+    synchronized (interned) {
+      SharedCacheResource resource = cachedResources.get(interned);
+      return resource == null ? -1 : resource.getAccessTime();
+    }
+  }
+
+  @Override
+  public boolean isResourceEvictable(String key, FileStatus file) {
+    synchronized (initialAppsLock) {
+      if (initialApps.size() > 0) {
+        return false;
+      }
+    }
+
+    long staleTime =
+        System.currentTimeMillis()
+            - TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
+    long accessTime = getAccessTime(key);
+    if (accessTime == -1) {
+      // check modification time
+      long modTime = file.getModificationTime();
+      // if modification time is older then the store startup time, we need to
+      // just use the store startup time as the last point of certainty
+      long lastUse = modTime < this.startTime ? this.startTime : modTime;
+      return lastUse < staleTime;
+    } else {
+      // check access time
+      return accessTime < staleTime;
+    }
+  }
+
+  private static int getStalenessPeriod(Configuration conf) {
+    int stalenessMinutes =
+        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
+            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
+    // non-positive value is invalid; use the default
+    if (stalenessMinutes <= 0) {
+      throw new HadoopIllegalArgumentException("Non-positive staleness value: "
+          + stalenessMinutes
+          + ". The staleness value must be greater than zero.");
+    }
+    return stalenessMinutes;
+  }
+
+  private static int getInitialDelay(Configuration conf) {
+    int initialMinutes =
+        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
+            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
+    // non-positive value is invalid; use the default
+    if (initialMinutes <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Non-positive initial delay value: " + initialMinutes
+              + ". The initial delay value must be greater than zero.");
+    }
+    return initialMinutes;
+  }
+
+  private static int getCheckPeriod(Configuration conf) {
+    int checkMinutes =
+        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
+            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
+    // non-positive value is invalid; use the default
+    if (checkMinutes <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Non-positive check period value: " + checkMinutes
+              + ". The check period value must be greater than zero.");
+    }
+    return checkMinutes;
+  }
+
+  @Private
+  @Evolving
+  class AppCheckTask implements Runnable {
+
+    private final AppChecker taskAppChecker;
+
+    public AppCheckTask(AppChecker appChecker) {
+      this.taskAppChecker = appChecker;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.info("Checking the initial app list for finished applications.");
+        synchronized (initialAppsLock) {
+          if (initialApps.isEmpty()) {
+            // we're fine, no-op; there are no active apps that were running at
+            // the time of the service start
+          } else {
+            LOG.info("Looking into " + initialApps.size()
+                + " apps to see if they are still active");
+            Iterator<ApplicationId> it = initialApps.iterator();
+            while (it.hasNext()) {
+              ApplicationId id = it.next();
+              try {
+                if (!taskAppChecker.isApplicationActive(id)) {
+                  // remove it from the list
+                  it.remove();
+                }
+              } catch (YarnException e) {
+                LOG.warn("Exception while checking the app status;"
+                    + " will leave the entry in the list", e);
+                // continue
+              }
+            }
+          }
+          LOG.info("There are now " + initialApps.size()
+              + " entries in the list");
+        }
+      } catch (Throwable e) {
+        LOG.error(
+            "Unexpected exception thrown during in-memory store app check task."
+                + " Rescheduling task.", e);
+      }
+
+    }
+  }
+}

+ 133 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java

@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.service.CompositeService;
+
+
+/**
+ * An abstract class for the data store used by the shared cache manager
+ * service. All implementations of methods in this interface need to be thread
+ * safe and atomic.
+ */
+@Private
+@Evolving
+public abstract class SCMStore extends CompositeService {
+
+  protected SCMStore(String name) {
+    super(name);
+  }
+
+  /**
+   * Add a resource to the shared cache and it's associated filename. The
+   * resource is identified by a unique key. If the key already exists no action
+   * is taken and the filename of the existing resource is returned. If the key
+   * does not exist, the resource is added, it's access time is set, and the
+   * filename of the resource is returned.
+   * 
+   * @param key a unique identifier for a resource
+   * @param fileName the filename of the resource
+   * @return the filename of the resource as represented by the cache
+   */
+  @Private
+  public abstract String addResource(String key, String fileName);
+
+  /**
+   * Remove a resource from the shared cache.
+   * 
+   * @param key a unique identifier for a resource
+   * @return true if the resource was removed or did not exist, false if the
+   *         resource existed, contained at least one
+   *         <code>SharedCacheResourceReference</code> and was not removed.
+   */
+  @Private
+  public abstract boolean removeResource(String key);
+
+  /**
+   * Add a <code>SharedCacheResourceReference</code> to a resource and update
+   * the resource access time.
+   * 
+   * @param key a unique identifier for a resource
+   * @param ref the <code>SharedCacheResourceReference</code> to add
+   * @return String the filename of the resource if the
+   *         <code>SharedCacheResourceReference</code> was added or already
+   *         existed. null if the resource did not exist
+   */
+  @Private
+  public abstract String addResourceReference(String key,
+      SharedCacheResourceReference ref);
+
+  /**
+   * Get the <code>SharedCacheResourceReference</code>(s) associated with the
+   * resource.
+   * 
+   * @param key a unique identifier for a resource
+   * @return an unmodifiable collection of
+   *         <code>SharedCacheResourceReferences</code>. If the resource does
+   *         not exist, an empty set is returned.
+   */
+  @Private
+  public abstract Collection<SharedCacheResourceReference> getResourceReferences(
+      String key);
+
+  /**
+   * Remove a <code>SharedCacheResourceReference</code> from a resource.
+   * 
+   * @param key a unique identifier for a resource
+   * @param ref the <code>SharedCacheResourceReference</code> to remove
+   * @param updateAccessTime true if the call should update the access time for
+   *          the resource
+   * @return true if the reference was removed, false otherwise
+   */
+  @Private
+  public abstract boolean removeResourceReference(String key,
+      SharedCacheResourceReference ref, boolean updateAccessTime);
+
+  /**
+   * Remove a collection of <code>SharedCacheResourceReferences</code> from a
+   * resource.
+   * 
+   * @param key a unique identifier for a resource
+   * @param refs the collection of <code>SharedCacheResourceReference</code>s to
+   *          remove
+   * @param updateAccessTime true if the call should update the access time for
+   *          the resource
+   */
+  @Private
+  public abstract void removeResourceReferences(String key,
+      Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
+
+  /**
+   * Check if a specific resource is evictable according to the store's enabled
+   * cache eviction policies.
+   * 
+   * @param key a unique identifier for a resource
+   * @param file the <code>FileStatus</code> object for the resource file in the
+   *          file system.
+   * @return true if the resource is evicatble, false otherwise
+   */
+  @Private
+  public abstract boolean isResourceEvictable(String key, FileStatus file);
+
+}

+ 64 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Class that encapsulates the cache resource. The instances are not thread
+ * safe. Any operation that uses the resource must use thread-safe mechanisms to
+ * ensure safe access with the only exception of the filename.
+ */
+@Private
+@Evolving
+class SharedCacheResource {
+  private long accessTime;
+  private final Set<SharedCacheResourceReference> refs;
+  private final String fileName;
+
+  SharedCacheResource(String fileName) {
+    this.accessTime = System.currentTimeMillis();
+    this.refs = new HashSet<SharedCacheResourceReference>();
+    this.fileName = fileName;
+  }
+
+  long getAccessTime() {
+    return accessTime;
+  }
+
+  void updateAccessTime() {
+    accessTime = System.currentTimeMillis();
+  }
+
+  String getFileName() {
+    return this.fileName;
+  }
+
+  Set<SharedCacheResourceReference> getResourceReferences() {
+    return this.refs;
+  }
+
+  boolean addReference(SharedCacheResourceReference ref) {
+    return this.refs.add(ref);
+  }
+}

+ 86 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * This is an object that represents a reference to a shared cache resource.
+ */
+@Private
+@Evolving
+public class SharedCacheResourceReference {
+  private final ApplicationId appId;
+  private final String shortUserName;
+
+  /**
+   * Create a resource reference.
+   * 
+   * @param appId <code>ApplicationId</code> that is referencing a resource.
+   * @param shortUserName <code>ShortUserName</code> of the user that created
+   *          the reference.
+   */
+  public SharedCacheResourceReference(ApplicationId appId, String shortUserName) {
+    this.appId = appId;
+    this.shortUserName = shortUserName;
+  }
+
+  public ApplicationId getAppId() {
+    return this.appId;
+  }
+
+  public String getShortUserName() {
+    return this.shortUserName;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((appId == null) ? 0 : appId.hashCode());
+    result =
+        prime * result
+            + ((shortUserName == null) ? 0 : shortUserName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SharedCacheResourceReference other = (SharedCacheResourceReference) obj;
+    if (appId == null) {
+      if (other.appId != null)
+        return false;
+    } else if (!appId.equals(other.appId))
+      return false;
+    if (shortUserName == null) {
+      if (other.shortUserName != null)
+        return false;
+    } else if (!shortUserName.equals(other.shortUserName))
+      return false;
+    return true;
+  }
+}

+ 334 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java

@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemorySCMStore {
+
+  private InMemorySCMStore store;
+  private AppChecker checker;
+
+  @Before
+  public void setup() {
+    this.store = spy(new InMemorySCMStore());
+    this.checker = spy(new DummyAppChecker());
+    doReturn(checker).when(store).createAppCheckerService(
+        isA(Configuration.class));
+  }
+
+  @After
+  public void cleanup() {
+    if (this.store != null) {
+      this.store.stop();
+    }
+  }
+
+  private void startEmptyStore() throws Exception {
+    doReturn(new ArrayList<ApplicationId>()).when(checker)
+        .getActiveApplications();
+    doReturn(new HashMap<String, String>()).when(store)
+        .getInitialCachedResources(isA(FileSystem.class),
+            isA(Configuration.class));
+    this.store.init(new Configuration());
+    this.store.start();
+  }
+
+  private Map<String, String> startStoreWithResources() throws Exception {
+    Map<String, String> initialCachedResources = new HashMap<String, String>();
+    int count = 10;
+    for (int i = 0; i < count; i++) {
+      String key = String.valueOf(i);
+      String fileName = key + ".jar";
+      initialCachedResources.put(key, fileName);
+    }
+    doReturn(new ArrayList<ApplicationId>()).when(checker)
+        .getActiveApplications();
+    doReturn(initialCachedResources).when(store).getInitialCachedResources(
+        isA(FileSystem.class), isA(Configuration.class));
+    this.store.init(new Configuration());
+    this.store.start();
+    return initialCachedResources;
+  }
+
+  private void startStoreWithApps() throws Exception {
+    ArrayList<ApplicationId> list = new ArrayList<ApplicationId>();
+    int count = 5;
+    for (int i = 0; i < count; i++) {
+      list.add(createAppId(i, i));
+    }
+    doReturn(list).when(checker).getActiveApplications();
+    doReturn(new HashMap<String, String>()).when(store)
+        .getInitialCachedResources(isA(FileSystem.class),
+            isA(Configuration.class));
+    this.store.init(new Configuration());
+    this.store.start();
+  }
+
+  @Test
+  public void testAddResourceConcurrency() throws Exception {
+    startEmptyStore();
+    final String key = "key1";
+    int count = 5;
+    ExecutorService exec = Executors.newFixedThreadPool(count);
+    List<Future<String>> futures = new ArrayList<Future<String>>(count);
+    final CountDownLatch start = new CountDownLatch(1);
+    for (int i = 0; i < count; i++) {
+      final String fileName = "foo-" + i + ".jar";
+      Callable<String> task = new Callable<String>() {
+        public String call() throws Exception {
+          start.await();
+          String result = store.addResource(key, fileName);
+          System.out.println("fileName: " + fileName + ", result: " + result);
+          return result;
+        }
+      };
+      futures.add(exec.submit(task));
+    }
+    // start them all at the same time
+    start.countDown();
+    // check the result; they should all agree with the value
+    Set<String> results = new HashSet<String>();
+    for (Future<String> future: futures) {
+      results.add(future.get());
+    }
+    assertSame(1, results.size());
+    exec.shutdown();
+  }
+
+  @Test
+  public void testAddResourceRefNonExistentResource() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    ApplicationId id = createAppId(1, 1L);
+    // try adding an app id without adding the key first
+    assertNull(store.addResourceReference(key,
+        new SharedCacheResourceReference(id, "user")));
+  }
+
+  @Test
+  public void testRemoveResourceEmptyRefs() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    String fileName = "foo.jar";
+    // first add resource
+    store.addResource(key, fileName);
+    // try removing the resource; it should return true
+    assertTrue(store.removeResource(key));
+  }
+
+  @Test
+  public void testAddResourceRefRemoveResource() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    ApplicationId id = createAppId(1, 1L);
+    String user = "user";
+    // add the resource, and then add a resource ref
+    store.addResource(key, "foo.jar");
+    store.addResourceReference(key, new SharedCacheResourceReference(id, user));
+    // removeResource should return false
+    assertTrue(!store.removeResource(key));
+    // the resource and the ref should be intact
+    Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
+    assertTrue(refs != null);
+    assertEquals(Collections.singleton(new SharedCacheResourceReference(id, user)), refs);
+  }
+
+  @Test
+  public void testAddResourceRefConcurrency() throws Exception {
+    startEmptyStore();
+    final String key = "key1";
+    final String user = "user";
+    String fileName = "foo.jar";
+
+    // first add the resource
+    store.addResource(key, fileName);
+
+    // make concurrent addResourceRef calls (clients)
+    int count = 5;
+    ExecutorService exec = Executors.newFixedThreadPool(count);
+    List<Future<String>> futures = new ArrayList<Future<String>>(count);
+    final CountDownLatch start = new CountDownLatch(1);
+    for (int i = 0; i < count; i++) {
+      final ApplicationId id = createAppId(i, i);
+      Callable<String> task = new Callable<String>() {
+        public String call() throws Exception {
+          start.await();
+          return store.addResourceReference(key,
+              new SharedCacheResourceReference(id, user));
+        }
+      };
+      futures.add(exec.submit(task));
+    }
+    // start them all at the same time
+    start.countDown();
+    // check the result
+    Set<String> results = new HashSet<String>();
+    for (Future<String> future: futures) {
+      results.add(future.get());
+    }
+    // they should all have the same file name
+    assertSame(1, results.size());
+    assertEquals(Collections.singleton(fileName), results);
+    // there should be 5 refs as a result
+    Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
+    assertSame(count, refs.size());
+    exec.shutdown();
+  }
+
+  @Test
+  public void testAddResourceRefAddResourceConcurrency() throws Exception {
+    startEmptyStore();
+    final String key = "key1";
+    final String fileName = "foo.jar";
+    final String user = "user";
+    final ApplicationId id = createAppId(1, 1L);
+    // add the resource and add the resource ref at the same time
+    ExecutorService exec = Executors.newFixedThreadPool(2);
+    final CountDownLatch start = new CountDownLatch(1);
+    Callable<String> addKeyTask = new Callable<String>() {
+      public String call() throws Exception {
+        start.await();
+        return store.addResource(key, fileName);
+      }
+    };
+    Callable<String> addAppIdTask = new Callable<String>() {
+      public String call() throws Exception {
+        start.await();
+        return store.addResourceReference(key,
+            new SharedCacheResourceReference(id, user));
+      }
+    };
+    Future<String> addAppIdFuture = exec.submit(addAppIdTask);
+    Future<String> addKeyFuture = exec.submit(addKeyTask);
+    // start them at the same time
+    start.countDown();
+    // get the results
+    String addKeyResult = addKeyFuture.get();
+    String addAppIdResult = addAppIdFuture.get();
+    assertEquals(fileName, addKeyResult);
+    System.out.println("addAppId() result: " + addAppIdResult);
+    // it may be null or the fileName depending on the timing
+    assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName));
+    exec.shutdown();
+  }
+
+  @Test
+  public void testRemoveRef() throws Exception {
+    startEmptyStore();
+    String key = "key1";
+    String fileName = "foo.jar";
+    String user = "user";
+    // first add the resource
+    store.addResource(key, fileName);
+    // add a ref
+    ApplicationId id = createAppId(1, 1L);
+    SharedCacheResourceReference myRef = new SharedCacheResourceReference(id, user);
+    String result = store.addResourceReference(key, myRef);
+    assertEquals(fileName, result);
+    Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
+    assertSame(1, refs.size());
+    assertEquals(Collections.singleton(myRef), refs);
+    // remove the same ref
+    store.removeResourceReferences(key, Collections.singleton(myRef), true);
+    Collection<SharedCacheResourceReference> newRefs = store.getResourceReferences(key);
+    assertTrue(newRefs == null || newRefs.isEmpty());
+  }
+
+  @Test
+  public void testBootstrapping() throws Exception {
+    Map<String, String> initialCachedResources = startStoreWithResources();
+    int count = initialCachedResources.size();
+    ApplicationId id = createAppId(1, 1L);
+    // the entries from the cached entries should now exist
+    for (int i = 0; i < count; i++) {
+      String key = String.valueOf(i);
+      String fileName = key + ".jar";
+      String result =
+          store.addResourceReference(key, new SharedCacheResourceReference(id,
+              "user"));
+      // the value should not be null (i.e. it has the key) and the filename should match
+      assertEquals(fileName, result);
+      // the initial input should be emptied
+      assertTrue(initialCachedResources.isEmpty());
+    }
+  }
+
+  @Test
+  public void testEvictableWithInitialApps() throws Exception {
+    startStoreWithApps();
+    assertFalse(store.isResourceEvictable("key", mock(FileStatus.class)));
+  }
+
+  private ApplicationId createAppId(int id, long timestamp) {
+    return ApplicationId.newInstance(timestamp, id);
+  }
+
+  class DummyAppChecker extends AppChecker {
+
+    @Override
+    @Private
+    public boolean isApplicationActive(ApplicationId id) throws YarnException {
+      // stub
+      return false;
+    }
+
+    @Override
+    @Private
+    public Collection<ApplicationId> getActiveApplications()
+        throws YarnException {
+      // stub
+      return null;
+    }
+
+  }
+}