Browse Source

YARN-2183. [YARN-1492] Cleaner service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
c51e53d7aa
13 tập tin đã thay đổi với 1054 bổ sung40 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 38 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 24 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
  5. 218 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
  6. 308 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
  7. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
  8. 172 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
  9. 21 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
  10. 35 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
  11. 152 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
  12. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
  13. 1 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -39,6 +39,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2180. [YARN-1492] In-memory backing store for cache manager. 
     (Chris Trezzo via kasha)
 
+    YARN-2183. [YARN-1492] Cleaner service for cache manager.
+    (Chris Trezzo and Sangjin Lee via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

+ 38 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1393,25 +1393,54 @@ public class YarnConfiguration extends Configuration {
    * the last reference exceeds the staleness period. This value is specified in
    * minutes.
    */
-  public static final String IN_MEMORY_STALENESS_PERIOD =
-      IN_MEMORY_STORE_PREFIX + "staleness-period";
-  public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
+  public static final String IN_MEMORY_STALENESS_PERIOD_MINS =
+      IN_MEMORY_STORE_PREFIX + "staleness-period-mins";
+  public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS =
+      7 * 24 * 60;
 
   /**
    * Initial delay before the in-memory store runs its first check to remove
    * dead initial applications. Specified in minutes.
    */
-  public static final String IN_MEMORY_INITIAL_DELAY =
-      IN_MEMORY_STORE_PREFIX + "initial-delay";
-  public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
+  public static final String IN_MEMORY_INITIAL_DELAY_MINS =
+      IN_MEMORY_STORE_PREFIX + "initial-delay-mins";
+  public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10;
   
   /**
    * The frequency at which the in-memory store checks to remove dead initial
    * applications. Specified in minutes.
    */
-  public static final String IN_MEMORY_CHECK_PERIOD =
-      IN_MEMORY_STORE_PREFIX + "check-period";
-  public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+  public static final String IN_MEMORY_CHECK_PERIOD_MINS =
+      IN_MEMORY_STORE_PREFIX + "check-period-mins";
+  public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS = 12 * 60;
+
+  // SCM Cleaner service configuration
+
+  private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX
+      + "cleaner.";
+
+  /**
+   * The frequency at which a cleaner task runs. Specified in minutes.
+   */
+  public static final String SCM_CLEANER_PERIOD_MINS =
+      SCM_CLEANER_PREFIX + "period-mins";
+  public static final int DEFAULT_SCM_CLEANER_PERIOD_MINS = 24 * 60;
+
+  /**
+   * Initial delay before the first cleaner task is scheduled. Specified in
+   * minutes.
+   */
+  public static final String SCM_CLEANER_INITIAL_DELAY_MINS =
+      SCM_CLEANER_PREFIX + "initial-delay-mins";
+  public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS = 10;
+
+  /**
+   * The time to sleep between processing each shared cache resource. Specified
+   * in milliseconds.
+   */
+  public static final String SCM_CLEANER_RESOURCE_SLEEP_MS =
+      SCM_CLEANER_PREFIX + "resource-sleep-ms";
+  public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
 
   ////////////////////////////////
   // Other Configs

+ 24 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1380,24 +1380,45 @@
     <description>A resource in the in-memory store is considered stale
     if the time since the last reference exceeds the staleness period.
     This value is specified in minutes.</description>
-    <name>yarn.sharedcache.store.in-memory.staleness-period</name>
+    <name>yarn.sharedcache.store.in-memory.staleness-period-mins</name>
     <value>10080</value>
   </property>
   
   <property>
     <description>Initial delay before the in-memory store runs its first check
     to remove dead initial applications. Specified in minutes.</description>
-    <name>yarn.sharedcache.store.in-memory.initial-delay</name>
+    <name>yarn.sharedcache.store.in-memory.initial-delay-mins</name>
     <value>10</value>
   </property>
   
   <property>
     <description>The frequency at which the in-memory store checks to remove
     dead initial applications. Specified in minutes.</description>
-    <name>yarn.sharedcache.store.in-memory.check-period</name>
+    <name>yarn.sharedcache.store.in-memory.check-period-mins</name>
     <value>720</value>
   </property>
 
+  <property>
+    <description>The frequency at which a cleaner task runs.
+    Specified in minutes.</description>
+    <name>yarn.sharedcache.cleaner.period-mins</name>
+    <value>1440</value>
+  </property>
+
+  <property>
+    <description>Initial delay before the first cleaner task is scheduled.
+    Specified in minutes.</description>
+    <name>yarn.sharedcache.cleaner.initial-delay-mins</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>The time to sleep between processing each shared cache
+    resource. Specified in milliseconds.</description>
+    <name>yarn.sharedcache.cleaner.resource-sleep-ms</name>
+    <value>0</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java

@@ -78,4 +78,14 @@ public class SharedCacheUtil {
 
     return sb.toString();
   }
+
+  @Private
+  public static String getCacheEntryGlobPattern(int depth) {
+    StringBuilder pattern = new StringBuilder();
+    for (int i = 0; i < depth; i++) {
+      pattern.append("*/");
+    }
+    pattern.append("*");
+    return pattern.toString();
+  }
 }

+ 218 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cleaner service that maintains the shared cache area, and cleans up stale
+ * entries on a regular basis.
+ */
+@Private
+@Evolving
+public class CleanerService extends CompositeService {
+  /**
+   * The name of the global cleaner lock that the cleaner creates to indicate
+   * that a cleaning process is in progress.
+   */
+  public static final String GLOBAL_CLEANER_PID = ".cleaner_pid";
+
+  private static final Log LOG = LogFactory.getLog(CleanerService.class);
+
+  private Configuration conf;
+  private CleanerMetrics metrics;
+  private ScheduledExecutorService scheduledExecutor;
+  private final SCMStore store;
+  private final Lock cleanerTaskLock;
+
+  public CleanerService(SCMStore store) {
+    super("CleanerService");
+    this.store = store;
+    this.cleanerTaskLock = new ReentrantLock();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+
+    // create scheduler executor service that services the cleaner tasks
+    // use 2 threads to accommodate the on-demand tasks and reduce the chance of
+    // back-to-back runs
+    ThreadFactory tf =
+        new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
+    scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    if (!writeGlobalCleanerPidFile()) {
+      throw new YarnException("The global cleaner pid file already exists! " +
+          "It appears there is another CleanerService running in the cluster");
+    }
+
+    this.metrics = CleanerMetrics.initSingleton(conf);
+
+    // Start dependent services (i.e. AppChecker)
+    super.serviceStart();
+
+    Runnable task =
+        CleanerTask.create(conf, store, metrics, cleanerTaskLock);
+    long periodInMinutes = getPeriod(conf);
+    scheduledExecutor.scheduleAtFixedRate(task, getInitialDelay(conf),
+        periodInMinutes, TimeUnit.MINUTES);
+    LOG.info("Scheduled the shared cache cleaner task to run every "
+        + periodInMinutes + " minutes.");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Shutting down the background thread.");
+    scheduledExecutor.shutdownNow();
+    try {
+      if (scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.info("The background thread stopped.");
+      } else {
+        LOG.warn("Gave up waiting for the cleaner task to shutdown.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("The cleaner service was interrupted while shutting down the task.",
+          e);
+    }
+
+    removeGlobalCleanerPidFile();
+
+    super.serviceStop();
+  }
+
+  /**
+   * Execute an on-demand cleaner task.
+   */
+  protected void runCleanerTask() {
+    Runnable task =
+        CleanerTask.create(conf, store, metrics, cleanerTaskLock);
+    // this is a non-blocking call (it simply submits the task to the executor
+    // queue and returns)
+    this.scheduledExecutor.execute(task);
+  }
+
+  /**
+   * To ensure there are not multiple instances of the SCM running on a given
+   * cluster, a global pid file is used. This file contains the hostname of the
+   * machine that owns the pid file.
+   *
+   * @return true if the pid file was written, false otherwise
+   * @throws YarnException
+   */
+  private boolean writeGlobalCleanerPidFile() throws YarnException {
+    String root =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+    Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+    try {
+      FileSystem fs = FileSystem.get(this.conf);
+
+      if (fs.exists(pidPath)) {
+        return false;
+      }
+
+      FSDataOutputStream os = fs.create(pidPath, false);
+      // write the hostname and the process id in the global cleaner pid file
+      final String ID = ManagementFactory.getRuntimeMXBean().getName();
+      os.writeUTF(ID);
+      os.close();
+      // add it to the delete-on-exit to ensure it gets deleted when the JVM
+      // exits
+      fs.deleteOnExit(pidPath);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    LOG.info("Created the global cleaner pid file at " + pidPath.toString());
+    return true;
+  }
+
+  private void removeGlobalCleanerPidFile() {
+    try {
+      FileSystem fs = FileSystem.get(this.conf);
+      String root =
+          conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+      Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+
+
+      fs.delete(pidPath, false);
+      LOG.info("Removed the global cleaner pid file at " + pidPath.toString());
+    } catch (IOException e) {
+      LOG.error(
+          "Unable to remove the global cleaner pid file! The file may need "
+              + "to be removed manually.", e);
+    }
+  }
+
+  private static int getInitialDelay(Configuration conf) {
+    int initialDelayInMinutes =
+        conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY_MINS,
+            YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS);
+    // negative value is invalid; use the default
+    if (initialDelayInMinutes < 0) {
+      throw new HadoopIllegalArgumentException("Negative initial delay value: "
+          + initialDelayInMinutes
+          + ". The initial delay must be greater than zero.");
+    }
+    return initialDelayInMinutes;
+  }
+
+  private static int getPeriod(Configuration conf) {
+    int periodInMinutes =
+        conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD_MINS,
+            YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD_MINS);
+    // non-positive value is invalid; use the default
+    if (periodInMinutes <= 0) {
+      throw new HadoopIllegalArgumentException("Non-positive period value: "
+          + periodInMinutes
+          + ". The cleaner period must be greater than or equal to zero.");
+    }
+    return periodInMinutes;
+  }
+}

+ 308 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java

@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+/**
+ * The task that runs and cleans up the shared cache area for stale entries and
+ * orphaned files. It is expected that only one cleaner task runs at any given
+ * point in time.
+ */
+@Private
+@Evolving
+class CleanerTask implements Runnable {
+  private static final String RENAMED_SUFFIX = "-renamed";
+  private static final Log LOG = LogFactory.getLog(CleanerTask.class);
+
+  private final String location;
+  private final long sleepTime;
+  private final int nestedLevel;
+  private final Path root;
+  private final FileSystem fs;
+  private final SCMStore store;
+  private final CleanerMetrics metrics;
+  private final Lock cleanerTaskLock;
+
+  /**
+   * Creates a cleaner task based on the configuration. This is provided for
+   * convenience.
+   *
+   * @param conf
+   * @param store
+   * @param metrics
+   * @param cleanerTaskLock lock that ensures a serial execution of cleaner
+   *                        task
+   * @return an instance of a CleanerTask
+   */
+  public static CleanerTask create(Configuration conf, SCMStore store,
+      CleanerMetrics metrics, Lock cleanerTaskLock) {
+    try {
+      // get the root directory for the shared cache
+      String location =
+          conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+      long sleepTime =
+          conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP_MS,
+              YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS);
+      int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+      FileSystem fs = FileSystem.get(conf);
+
+      return new CleanerTask(location, sleepTime, nestedLevel, fs, store,
+          metrics, cleanerTaskLock);
+    } catch (IOException e) {
+      LOG.error("Unable to obtain the filesystem for the cleaner service", e);
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  /**
+   * Creates a cleaner task based on the root directory location and the
+   * filesystem.
+   */
+  CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs,
+      SCMStore store, CleanerMetrics metrics, Lock cleanerTaskLock) {
+    this.location = location;
+    this.sleepTime = sleepTime;
+    this.nestedLevel = nestedLevel;
+    this.root = new Path(location);
+    this.fs = fs;
+    this.store = store;
+    this.metrics = metrics;
+    this.cleanerTaskLock = cleanerTaskLock;
+  }
+
+  @Override
+  public void run() {
+    if (!this.cleanerTaskLock.tryLock()) {
+      // there is already another task running
+      LOG.warn("A cleaner task is already running. "
+          + "This scheduled cleaner task will do nothing.");
+      return;
+    }
+
+    try {
+      if (!fs.exists(root)) {
+        LOG.error("The shared cache root " + location + " was not found. "
+            + "The cleaner task will do nothing.");
+        return;
+      }
+
+      // we're now ready to process the shared cache area
+      process();
+    } catch (Throwable e) {
+      LOG.error("Unexpected exception while initializing the cleaner task. "
+          + "This task will do nothing,", e);
+    } finally {
+      // this is set to false regardless of if it is a scheduled or on-demand
+      // task
+      this.cleanerTaskLock.unlock();
+    }
+  }
+
+  /**
+   * Sweeps and processes the shared cache area to clean up stale and orphaned
+   * files.
+   */
+  void process() {
+    // mark the beginning of the run in the metrics
+    metrics.reportCleaningStart();
+    try {
+      // now traverse individual directories and process them
+      // the directory structure is specified by the nested level parameter
+      // (e.g. 9/c/d/<checksum>)
+      String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel);
+      FileStatus[] resources =
+          fs.globStatus(new Path(root, pattern));
+      int numResources = resources == null ? 0 : resources.length;
+      LOG.info("Processing " + numResources + " resources in the shared cache");
+      long beginMs = System.currentTimeMillis();
+      if (resources != null) {
+        for (FileStatus resource : resources) {
+          // check for interruption so it can abort in a timely manner in case
+          // of shutdown
+          if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("The cleaner task was interrupted. Aborting.");
+            break;
+          }
+
+          if (resource.isDirectory()) {
+            processSingleResource(resource);
+          } else {
+            LOG.warn("Invalid file at path " + resource.getPath().toString()
+                +
+                " when a directory was expected");
+          }
+          // add sleep time between cleaning each directory if it is non-zero
+          if (sleepTime > 0) {
+            Thread.sleep(sleepTime);
+          }
+        }
+      }
+      long endMs = System.currentTimeMillis();
+      long durationMs = endMs - beginMs;
+      LOG.info("Processed " + numResources + " resource(s) in " + durationMs +
+          " ms.");
+    } catch (IOException e1) {
+      LOG.error("Unable to complete the cleaner task", e1);
+    } catch (InterruptedException e2) {
+      Thread.currentThread().interrupt(); // restore the interrupt
+    }
+  }
+
+  /**
+   * Returns a path for the root directory for the shared cache.
+   */
+  Path getRootPath() {
+    return root;
+  }
+
+  /**
+   * Processes a single shared cache resource directory.
+   */
+  void processSingleResource(FileStatus resource) {
+    Path path = resource.getPath();
+    // indicates the processing status of the resource
+    ResourceStatus resourceStatus = ResourceStatus.INIT;
+
+    // first, if the path ends with the renamed suffix, it indicates the
+    // directory was moved (as stale) but somehow not deleted (probably due to
+    // SCM failure); delete the directory
+    if (path.toString().endsWith(RENAMED_SUFFIX)) {
+      LOG.info("Found a renamed directory that was left undeleted at " +
+          path.toString() + ". Deleting.");
+      try {
+        if (fs.delete(path, true)) {
+          resourceStatus = ResourceStatus.DELETED;
+        }
+      } catch (IOException e) {
+        LOG.error("Error while processing a shared cache resource: " + path, e);
+      }
+    } else {
+      // this is the path to the cache resource directory
+      // the directory name is the resource key (i.e. a unique identifier)
+      String key = path.getName();
+
+      try {
+        store.cleanResourceReferences(key);
+      } catch (YarnException e) {
+        LOG.error("Exception thrown while removing dead appIds.", e);
+      }
+
+      if (store.isResourceEvictable(key, resource)) {
+        try {
+          /*
+           * TODO See YARN-2663: There is a race condition between
+           * store.removeResource(key) and
+           * removeResourceFromCacheFileSystem(path) operations because they do
+           * not happen atomically and resources can be uploaded with different
+           * file names by the node managers.
+           */
+          // remove the resource from scm (checks for appIds as well)
+          if (store.removeResource(key)) {
+            // remove the resource from the file system
+            boolean deleted = removeResourceFromCacheFileSystem(path);
+            if (deleted) {
+              resourceStatus = ResourceStatus.DELETED;
+            } else {
+              LOG.error("Failed to remove path from the file system."
+                  + " Skipping this resource: " + path);
+              resourceStatus = ResourceStatus.ERROR;
+            }
+          } else {
+            // we did not delete the resource because it contained application
+            // ids
+            resourceStatus = ResourceStatus.PROCESSED;
+          }
+        } catch (IOException e) {
+          LOG.error(
+              "Failed to remove path from the file system. Skipping this resource: "
+                  + path, e);
+          resourceStatus = ResourceStatus.ERROR;
+        }
+      } else {
+        resourceStatus = ResourceStatus.PROCESSED;
+      }
+    }
+
+    // record the processing
+    switch (resourceStatus) {
+    case DELETED:
+      metrics.reportAFileDelete();
+      break;
+    case PROCESSED:
+      metrics.reportAFileProcess();
+      break;
+    case ERROR:
+      metrics.reportAFileError();
+      break;
+    default:
+      LOG.error("Cleaner encountered an invalid status (" + resourceStatus
+          + ") while processing resource: " + path.getName());
+    }
+  }
+
+  private boolean removeResourceFromCacheFileSystem(Path path)
+      throws IOException {
+    // rename the directory to make the delete atomic
+    Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX);
+    if (fs.rename(path, renamedPath)) {
+      // the directory can be removed safely now
+      // log the original path
+      LOG.info("Deleting " + path.toString());
+      return fs.delete(renamedPath, true);
+    } else {
+      // we were unable to remove it for some reason: it's best to leave
+      // it at that
+      LOG.error("We were not able to rename the directory to "
+          + renamedPath.toString() + ". We will leave it intact.");
+    }
+    return false;
+  }
+
+  /**
+   * A status indicating what happened with the processing of a given cache
+   * resource.
+   */
+  private enum ResourceStatus {
+    INIT,
+    /** Resource was successfully processed, but not deleted **/
+    PROCESSED,
+    /** Resource was successfully deleted **/
+    DELETED,
+    /** The cleaner task ran into an error while processing the resource **/
+    ERROR
+  }
+}

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java

@@ -64,6 +64,9 @@ public class SharedCacheManager extends CompositeService {
     this.store = createSCMStoreService(conf);
     addService(store);
 
+    CleanerService cs = createCleanerService(store);
+    addService(cs);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -90,6 +93,10 @@ public class SharedCacheManager extends CompositeService {
     return store;
   }
 
+  private CleanerService createCleanerService(SCMStore store) {
+    return new CleanerService(store);
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 

+ 172 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java

@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * This class is for maintaining the various Cleaner activity statistics and
+ * publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn")
+public class CleanerMetrics {
+  public static final Log LOG = LogFactory.getLog(CleanerMetrics.class);
+  private final MetricsRegistry registry = new MetricsRegistry("cleaner");
+
+  enum Singleton {
+    INSTANCE;
+
+    CleanerMetrics impl;
+
+    synchronized CleanerMetrics init(Configuration conf) {
+      if (impl == null) {
+        impl = create(conf);
+      }
+      return impl;
+    }
+  }
+
+  public static CleanerMetrics initSingleton(Configuration conf) {
+    return Singleton.INSTANCE.init(conf);
+  }
+
+  public static CleanerMetrics getInstance() {
+    CleanerMetrics topMetrics = Singleton.INSTANCE.impl;
+    if (topMetrics == null)
+      throw new IllegalStateException(
+          "The CleanerMetics singlton instance is not initialized."
+              + " Have you called init first?");
+    return topMetrics;
+  }
+
+  @Metric("number of deleted files over all runs")
+  private MutableCounterLong totalDeletedFiles;
+
+  public long getTotalDeletedFiles() {
+    return totalDeletedFiles.value();
+  }
+
+  private @Metric("number of deleted files in the last run")
+  MutableGaugeLong deletedFiles;
+
+  public long getDeletedFiles() {
+    return deletedFiles.value();
+  }
+
+  private @Metric("number of processed files over all runs")
+  MutableCounterLong totalProcessedFiles;
+
+  public long getTotalProcessedFiles() {
+    return totalProcessedFiles.value();
+  }
+
+  private @Metric("number of processed files in the last run")
+  MutableGaugeLong processedFiles;
+
+  public long getProcessedFiles() {
+    return processedFiles.value();
+  }
+
+  @Metric("number of file errors over all runs")
+  private MutableCounterLong totalFileErrors;
+
+  public long getTotalFileErrors() {
+    return totalFileErrors.value();
+  }
+
+  private @Metric("number of file errors in the last run")
+  MutableGaugeLong fileErrors;
+
+  public long getFileErrors() {
+    return fileErrors.value();
+  }
+
+  private CleanerMetrics() {
+  }
+
+  /**
+   * The metric source obtained after parsing the annotations
+   */
+  MetricsSource metricSource;
+
+  static CleanerMetrics create(Configuration conf) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+
+    CleanerMetrics metricObject = new CleanerMetrics();
+    MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject);
+    final MetricsSource s = sb.build();
+    ms.register("cleaner", "The cleaner service of truly shared cache", s);
+    metricObject.metricSource = s;
+    return metricObject;
+  }
+
+  /**
+   * Report a delete operation at the current system time
+   */
+  public void reportAFileDelete() {
+    totalProcessedFiles.incr();
+    processedFiles.incr();
+    totalDeletedFiles.incr();
+    deletedFiles.incr();
+  }
+
+  /**
+   * Report a process operation at the current system time
+   */
+  public void reportAFileProcess() {
+    totalProcessedFiles.incr();
+    processedFiles.incr();
+  }
+
+  /**
+   * Report a process operation error at the current system time
+   */
+  public void reportAFileError() {
+    totalProcessedFiles.incr();
+    processedFiles.incr();
+    totalFileErrors.incr();
+    fileErrors.incr();
+  }
+
+  /**
+   * Report the start a new run of the cleaner.
+   *
+   */
+  public void reportCleaningStart() {
+    processedFiles.set(0);
+    deletedFiles.set(0);
+    fileErrors.set(0);
+  }
+
+}

+ 21 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
 import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
-import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -83,13 +82,12 @@ public class InMemorySCMStore extends SCMStore {
   private final Object initialAppsLock = new Object();
   private long startTime;
   private int stalenessMinutes;
-  private AppChecker appChecker;
   private ScheduledExecutorService scheduler;
   private int initialDelayMin;
   private int checkPeriodMin;
 
-  public InMemorySCMStore() {
-    super(InMemorySCMStore.class.getName());
+  public InMemorySCMStore(AppChecker appChecker) {
+    super(InMemorySCMStore.class.getName(), appChecker);
   }
 
   private String intern(String key) {
@@ -108,9 +106,6 @@ public class InMemorySCMStore extends SCMStore {
     this.checkPeriodMin = getCheckPeriod(conf);
     this.stalenessMinutes = getStalenessPeriod(conf);
 
-    appChecker = createAppCheckerService(conf);
-    addService(appChecker);
-
     bootstrap(conf);
 
     ThreadFactory tf =
@@ -157,11 +152,6 @@ public class InMemorySCMStore extends SCMStore {
     super.serviceStop();
   }
 
-  @VisibleForTesting
-  AppChecker createAppCheckerService(Configuration conf) {
-    return SharedCacheManager.createAppCheckerService(conf);
-  }
-
   private void bootstrap(Configuration conf) throws IOException {
     Map<String, String> initialCachedResources =
         getInitialCachedResources(FileSystem.get(conf), conf);
@@ -201,14 +191,10 @@ public class InMemorySCMStore extends SCMStore {
     // now traverse individual directories and process them
     // the directory structure is specified by the nested level parameter
     // (e.g. 9/c/d/<checksum>/file)
-    StringBuilder pattern = new StringBuilder();
-    for (int i = 0; i < nestedLevel + 1; i++) {
-      pattern.append("*/");
-    }
-    pattern.append("*");
+    String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1);
 
     LOG.info("Querying for all individual cached resource files");
-    FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+    FileStatus[] entries = fs.globStatus(new Path(root, pattern));
     int numEntries = entries == null ? 0 : entries.length;
     LOG.info("Found " + numEntries + " files: processing for one resource per "
         + "key");
@@ -359,6 +345,17 @@ public class InMemorySCMStore extends SCMStore {
     }
   }
 
+  /**
+   * Provides atomicity for the method.
+   */
+  @Override
+  public void cleanResourceReferences(String key) throws YarnException {
+    String interned = intern(key);
+    synchronized (interned) {
+      super.cleanResourceReferences(key);
+    }
+  }
+
   /**
    * Removes the given resource from the store. Returns true if the resource is
    * found and removed or if the resource is not found. Returns false if it was
@@ -427,8 +424,8 @@ public class InMemorySCMStore extends SCMStore {
 
   private static int getStalenessPeriod(Configuration conf) {
     int stalenessMinutes =
-        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
-            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
+        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS,
+            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS);
     // non-positive value is invalid; use the default
     if (stalenessMinutes <= 0) {
       throw new HadoopIllegalArgumentException("Non-positive staleness value: "
@@ -440,8 +437,8 @@ public class InMemorySCMStore extends SCMStore {
 
   private static int getInitialDelay(Configuration conf) {
     int initialMinutes =
-        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
-            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
+        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS,
+            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS);
     // non-positive value is invalid; use the default
     if (initialMinutes <= 0) {
       throw new HadoopIllegalArgumentException(
@@ -453,8 +450,8 @@ public class InMemorySCMStore extends SCMStore {
 
   private static int getCheckPeriod(Configuration conf) {
     int checkMinutes =
-        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
-            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
+        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS,
+            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS);
     // non-positive value is invalid; use the default
     if (checkMinutes <= 0) {
       throw new HadoopIllegalArgumentException(

+ 35 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java

@@ -19,11 +19,15 @@
 package org.apache.hadoop.yarn.server.sharedcachemanager.store;
 
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
 
 
 /**
@@ -35,8 +39,11 @@ import org.apache.hadoop.service.CompositeService;
 @Evolving
 public abstract class SCMStore extends CompositeService {
 
-  protected SCMStore(String name) {
+  protected final AppChecker appChecker;
+
+  protected SCMStore(String name, AppChecker appChecker) {
     super(name);
+    this.appChecker = appChecker;
   }
 
   /**
@@ -118,6 +125,33 @@ public abstract class SCMStore extends CompositeService {
   public abstract void removeResourceReferences(String key,
       Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
 
+  /**
+   * Clean all resource references to a cache resource that contain application
+   * ids pointing to finished applications. If the resource key does not exist,
+   * do nothing.
+   *
+   * @param key a unique identifier for a resource
+   * @throws YarnException
+   */
+  @Private
+  public void cleanResourceReferences(String key) throws YarnException {
+    Collection<SharedCacheResourceReference> refs = getResourceReferences(key);
+    if (!refs.isEmpty()) {
+      Set<SharedCacheResourceReference> refsToRemove =
+          new HashSet<SharedCacheResourceReference>();
+      for (SharedCacheResourceReference r : refs) {
+        if (!appChecker.isApplicationActive(r.getAppId())) {
+          // application in resource reference is dead, it is safe to remove the
+          // reference
+          refsToRemove.add(r);
+        }
+      }
+      if (refsToRemove.size() > 0) {
+        removeResourceReferences(key, refsToRemove, false);
+      }
+    }
+  }
+
   /**
    * Check if a specific resource is evictable according to the store's enabled
    * cache eviction policies.

+ 152 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.junit.Test;
+
+public class TestCleanerTask {
+  private static final String ROOT =
+      YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT;
+  private static final long SLEEP_TIME =
+      YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS;
+  private static final int NESTED_LEVEL =
+      YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+
+  @Test
+  public void testNonExistentRoot() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+    // the shared cache root does not exist
+    when(fs.exists(task.getRootPath())).thenReturn(false);
+
+    task.run();
+
+    // process() should not be called
+    verify(task, never()).process();
+  }
+
+  @Test
+  public void testProcessFreshResource() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+    // mock a resource that is not evictable
+    when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+        .thenReturn(false);
+    FileStatus status = mock(FileStatus.class);
+    when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+
+    // process the resource
+    task.processSingleResource(status);
+
+    // the directory should not be renamed
+    verify(fs, never()).rename(eq(status.getPath()), isA(Path.class));
+    // metrics should record a processed file (but not delete)
+    verify(metrics).reportAFileProcess();
+    verify(metrics, never()).reportAFileDelete();
+  }
+
+  @Test
+  public void testProcessEvictableResource() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+    // mock an evictable resource
+    when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+        .thenReturn(true);
+    FileStatus status = mock(FileStatus.class);
+    when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+    when(store.removeResource(isA(String.class))).thenReturn(true);
+    // rename succeeds
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+    // delete returns true
+    when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true);
+
+    // process the resource
+    task.processSingleResource(status);
+
+    // the directory should be renamed
+    verify(fs).rename(eq(status.getPath()), isA(Path.class));
+    // metrics should record a deleted file
+    verify(metrics).reportAFileDelete();
+    verify(metrics, never()).reportAFileProcess();
+  }
+
+  private CleanerTask createSpiedTask(FileSystem fs, SCMStore store,
+      CleanerMetrics metrics, Lock isCleanerRunning) {
+    return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, store,
+        metrics, isCleanerRunning));
+  }
+
+  @Test
+  public void testResourceIsInUseHasAnActiveApp() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    FileStatus resource = mock(FileStatus.class);
+    when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+    // resource is stale
+    when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+        .thenReturn(true);
+    // but still has appIds
+    when(store.removeResource(isA(String.class))).thenReturn(false);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+    // process the resource
+    task.processSingleResource(resource);
+
+    // metrics should record a processed file (but not delete)
+    verify(metrics).reportAFileProcess();
+    verify(metrics, never()).reportAFileDelete();
+  }
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCleanerMetrics {
+
+  Configuration conf = new Configuration();
+  CleanerMetrics cleanerMetrics;
+
+  @Before
+  public void init() {
+    CleanerMetrics.initSingleton(conf);
+    cleanerMetrics = CleanerMetrics.getInstance();
+  }
+
+  @Test
+  public void testMetricsOverMultiplePeriods() {
+    simulateACleanerRun();
+    assertMetrics(4, 4, 1, 1);
+    simulateACleanerRun();
+    assertMetrics(4, 8, 1, 2);
+  }
+
+  public void simulateACleanerRun() {
+    cleanerMetrics.reportCleaningStart();
+    cleanerMetrics.reportAFileProcess();
+    cleanerMetrics.reportAFileDelete();
+    cleanerMetrics.reportAFileProcess();
+    cleanerMetrics.reportAFileProcess();
+  }
+
+  void assertMetrics(int proc, int totalProc, int del, int totalDel) {
+    assertEquals(
+        "Processed files in the last period are not measured correctly", proc,
+        cleanerMetrics.getProcessedFiles());
+    assertEquals("Total processed files are not measured correctly",
+        totalProc, cleanerMetrics.getTotalProcessedFiles());
+    assertEquals(
+        "Deleted files in the last period are not measured correctly", del,
+        cleanerMetrics.getDeletedFiles());
+    assertEquals("Total deleted files are not measured correctly",
+        totalDel, cleanerMetrics.getTotalDeletedFiles());
+  }
+}

+ 1 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java

@@ -60,10 +60,8 @@ public class TestInMemorySCMStore {
 
   @Before
   public void setup() {
-    this.store = spy(new InMemorySCMStore());
     this.checker = spy(new DummyAppChecker());
-    doReturn(checker).when(store).createAppCheckerService(
-        isA(Configuration.class));
+    this.store = spy(new InMemorySCMStore(checker));
   }
 
   @After