Bladeren bron

YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev

Jason Lowe 10 jaren geleden
bovenliggende
commit
6f2028bd15
15 gewijzigde bestanden met toevoegingen van 1441 en 289 verwijderingen
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 175 44
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
  3. 150 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
  4. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
  5. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  6. 220 50
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
  7. 25 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
  8. 12 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  9. 35 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
  10. 64 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
  11. 42 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
  13. 309 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  14. 102 45
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
  15. 300 82
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -379,6 +379,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan
     Gong via zjshen)
 
+    YARN-90. NodeManager should identify failed disks becoming good again
+    (Varun Vasudev via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 175 - 44
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java

@@ -21,18 +21,23 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
  * Manages a list of local storage directories.
@@ -40,9 +45,38 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 class DirectoryCollection {
   private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
 
+  public enum DiskErrorCause {
+    DISK_FULL, OTHER
+  }
+
+  static class DiskErrorInformation {
+    DiskErrorCause cause;
+    String message;
+
+    DiskErrorInformation(DiskErrorCause cause, String message) {
+      this.cause = cause;
+      this.message = message;
+    }
+  }
+
+  /**
+   * Returns a merged list which contains all the elements of l1 and l2
+   * @param l1 the first list to be included
+   * @param l2 the second list to be included
+   * @return a new list containing all the elements of the first and second list
+   */
+  static List<String> concat(List<String> l1, List<String> l2) {
+    List<String> ret = new ArrayList<String>(l1.size() + l2.size());
+    ret.addAll(l1);
+    ret.addAll(l2);
+    return ret;
+  }
+
   // Good local storage directories
   private List<String> localDirs;
-  private List<String> failedDirs;
+  private List<String> errorDirs;
+  private List<String> fullDirs;
+
   private int numFailures;
   
   private float diskUtilizationPercentageCutoff;
@@ -109,7 +143,9 @@ class DirectoryCollection {
       float utilizationPercentageCutOff,
       long utilizationSpaceCutOff) {
     localDirs = new CopyOnWriteArrayList<String>(dirs);
-    failedDirs = new CopyOnWriteArrayList<String>();
+    errorDirs = new CopyOnWriteArrayList<String>();
+    fullDirs = new CopyOnWriteArrayList<String>();
+
     diskUtilizationPercentageCutoff = utilizationPercentageCutOff;
     diskUtilizationSpaceCutoff = utilizationSpaceCutOff;
     diskUtilizationPercentageCutoff =
@@ -131,7 +167,16 @@ class DirectoryCollection {
    * @return the failed directories
    */
   synchronized List<String> getFailedDirs() {
-    return Collections.unmodifiableList(failedDirs);
+    return Collections.unmodifiableList(
+        DirectoryCollection.concat(errorDirs, fullDirs));
+  }
+
+  /**
+   * @return the directories that have used all disk space
+   */
+
+  synchronized List<String> getFullDirs() {
+    return fullDirs;
   }
 
   /**
@@ -158,7 +203,7 @@ class DirectoryCollection {
         LOG.warn("Unable to create directory " + dir + " error " +
             e.getMessage() + ", removing from the list of valid directories.");
         localDirs.remove(dir);
-        failedDirs.add(dir);
+        errorDirs.add(dir);
         numFailures++;
         failed = true;
       }
@@ -167,61 +212,147 @@ class DirectoryCollection {
   }
 
   /**
-   * Check the health of current set of local directories, updating the list
-   * of valid directories if necessary.
-   * @return <em>true</em> if there is a new disk-failure identified in
-   *         this checking. <em>false</em> otherwise.
+   * Check the health of current set of local directories(good and failed),
+   * updating the list of valid directories if necessary.
+   *
+   * @return <em>true</em> if there is a new disk-failure identified in this
+   *         checking or a failed directory passes the disk check <em>false</em>
+   *         otherwise.
    */
   synchronized boolean checkDirs() {
-    int oldNumFailures = numFailures;
-    HashSet<String> checkFailedDirs = new HashSet<String>();
-    for (final String dir : localDirs) {
+    boolean setChanged = false;
+    Set<String> preCheckGoodDirs = new HashSet<String>(localDirs);
+    Set<String> preCheckFullDirs = new HashSet<String>(fullDirs);
+    Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
+    List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
+    List<String> allLocalDirs =
+        DirectoryCollection.concat(localDirs, failedDirs);
+
+    Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs);
+
+    localDirs.clear();
+    errorDirs.clear();
+    fullDirs.clear();
+
+    for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
+      .entrySet()) {
+      String dir = entry.getKey();
+      DiskErrorInformation errorInformation = entry.getValue();
+      switch (entry.getValue().cause) {
+      case DISK_FULL:
+        fullDirs.add(entry.getKey());
+        break;
+      case OTHER:
+        errorDirs.add(entry.getKey());
+        break;
+      }
+      if (preCheckGoodDirs.contains(dir)) {
+        LOG.warn("Directory " + dir + " error, " + errorInformation.message
+            + ", removing from list of valid directories");
+        setChanged = true;
+        numFailures++;
+      }
+    }
+    for (String dir : allLocalDirs) {
+      if (!dirsFailedCheck.containsKey(dir)) {
+        localDirs.add(dir);
+        if (preCheckFullDirs.contains(dir)
+            || preCheckOtherErrorDirs.contains(dir)) {
+          setChanged = true;
+          LOG.info("Directory " + dir
+              + " passed disk check, adding to list of valid directories.");
+        }
+      }
+    }
+    Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
+    Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
+    for (String dir : preCheckFullDirs) {
+      if (postCheckOtherDirs.contains(dir)) {
+        LOG.warn("Directory " + dir + " error "
+            + dirsFailedCheck.get(dir).message);
+      }
+    }
+
+    for (String dir : preCheckOtherErrorDirs) {
+      if (postCheckFullDirs.contains(dir)) {
+        LOG.warn("Directory " + dir + " error "
+            + dirsFailedCheck.get(dir).message);
+      }
+    }
+    return setChanged;
+  }
+
+  Map<String, DiskErrorInformation> testDirs(List<String> dirs) {
+    HashMap<String, DiskErrorInformation> ret =
+        new HashMap<String, DiskErrorInformation>();
+    for (final String dir : dirs) {
+      String msg;
       try {
         File testDir = new File(dir);
         DiskChecker.checkDir(testDir);
-        if (isDiskUsageUnderPercentageLimit(testDir)) {
-          LOG.warn("Directory " + dir
-              + " error, used space above threshold of "
-              + diskUtilizationPercentageCutoff
-              + "%, removing from the list of valid directories.");
-          checkFailedDirs.add(dir);
-        } else if (isDiskFreeSpaceWithinLimit(testDir)) {
-          LOG.warn("Directory " + dir + " error, free space below limit of "
-              + diskUtilizationSpaceCutoff
-              + "MB, removing from the list of valid directories.");
-          checkFailedDirs.add(dir);
+        if (isDiskUsageOverPercentageLimit(testDir)) {
+          msg =
+              "used space above threshold of "
+                  + diskUtilizationPercentageCutoff
+                  + "%";
+          ret.put(dir,
+            new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
+          continue;
+        } else if (isDiskFreeSpaceUnderLimit(testDir)) {
+          msg =
+              "free space below limit of " + diskUtilizationSpaceCutoff
+                  + "MB";
+          ret.put(dir,
+            new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
+          continue;
         }
-      } catch (DiskErrorException de) {
-        LOG.warn("Directory " + dir + " error " + de.getMessage()
-            + ", removing from the list of valid directories.");
-        checkFailedDirs.add(dir);
+
+        // create a random dir to make sure fs isn't in read-only mode
+        verifyDirUsingMkdir(testDir);
+      } catch (IOException ie) {
+        ret.put(dir,
+          new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
       }
     }
-    for (String dir : checkFailedDirs) {
-      localDirs.remove(dir);
-      failedDirs.add(dir);
-      numFailures++;
+    return ret;
+  }
+
+  /**
+   * Function to test whether a dir is working correctly by actually creating a
+   * random directory.
+   *
+   * @param dir
+   *          the dir to test
+   */
+  private void verifyDirUsingMkdir(File dir) throws IOException {
+
+    String randomDirName = RandomStringUtils.randomAlphanumeric(5);
+    File target = new File(dir, randomDirName);
+    int i = 0;
+    while (target.exists()) {
+
+      randomDirName = RandomStringUtils.randomAlphanumeric(5) + i;
+      target = new File(dir, randomDirName);
+      i++;
+    }
+    try {
+      DiskChecker.checkDir(target);
+    } finally {
+      FileUtils.deleteQuietly(target);
     }
-    return numFailures > oldNumFailures;
   }
-  
-  private boolean isDiskUsageUnderPercentageLimit(File dir) {
+
+  private boolean isDiskUsageOverPercentageLimit(File dir) {
     float freePercentage =
         100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
     float usedPercentage = 100.0F - freePercentage;
-    if (usedPercentage > diskUtilizationPercentageCutoff
-        || usedPercentage >= 100.0F) {
-      return true;
-    }
-    return false;
+    return (usedPercentage > diskUtilizationPercentageCutoff
+        || usedPercentage >= 100.0F);
   }
 
-  private boolean isDiskFreeSpaceWithinLimit(File dir) {
+  private boolean isDiskFreeSpaceUnderLimit(File dir) {
     long freeSpace = dir.getUsableSpace() / (1024 * 1024);
-    if (freeSpace < this.diskUtilizationSpaceCutoff) {
-      return true;
-    }
-    return false;
+    return freeSpace < this.diskUtilizationSpaceCutoff;
   }
 
   private void createDir(FileContext localFs, Path dir, FsPermission perm)

+ 150 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -150,7 +152,7 @@ public class LocalDirsHandlerService extends AbstractService {
     boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm);
     createSucceeded &= logDirs.createNonExistentDirs(localFs, perm);
     if (!createSucceeded) {
-      updateDirsAfterFailure();
+      updateDirsAfterTest();
     }
 
     // Check the disk health immediately to weed out bad directories
@@ -197,9 +199,52 @@ public class LocalDirsHandlerService extends AbstractService {
   }
 
   /**
+   * @return the local directories which have no disk space
+   */
+  public List<String> getDiskFullLocalDirs() {
+    return localDirs.getFullDirs();
+  }
+
+  /**
+   * @return the log directories that have no disk space
+   */
+  public List<String> getDiskFullLogDirs() {
+    return logDirs.getFullDirs();
+  }
+
+  /**
+   * Function to get the local dirs which should be considered when cleaning up
+   * resources. Contains the good local dirs and the local dirs that have reached
+   * the disk space limit
+   *
+   * @return the local dirs which should be considered for cleaning up
+   */
+  public List<String> getLocalDirsForCleanup() {
+    return DirectoryCollection.concat(localDirs.getGoodDirs(),
+        localDirs.getFullDirs());
+  }
+
+  /**
+   * Function to get the log dirs which should be considered when cleaning up
+   * resources. Contains the good log dirs and the log dirs that have reached
+   * the disk space limit
+   *
+   * @return the log dirs which should be considered for cleaning up
+   */
+  public List<String> getLogDirsForCleanup() {
+    return DirectoryCollection.concat(logDirs.getGoodDirs(),
+        logDirs.getFullDirs());
+  }
+
+  /**
+   * Function to generate a report on the state of the disks.
+   *
+   * @param listGoodDirs
+   *          flag to determine whether the report should report the state of
+   *          good dirs or failed dirs
    * @return the health report of nm-local-dirs and nm-log-dirs
    */
-  public String getDisksHealthReport() {
+  public String getDisksHealthReport(boolean listGoodDirs) {
     if (!isDiskHealthCheckerEnabled) {
       return "";
     }
@@ -207,20 +252,31 @@ public class LocalDirsHandlerService extends AbstractService {
     StringBuilder report = new StringBuilder();
     List<String> failedLocalDirsList = localDirs.getFailedDirs();
     List<String> failedLogDirsList = logDirs.getFailedDirs();
-    int numLocalDirs = localDirs.getGoodDirs().size()
-        + failedLocalDirsList.size();
-    int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size();
-    if (!failedLocalDirsList.isEmpty()) {
-      report.append(failedLocalDirsList.size() + "/" + numLocalDirs
-          + " local-dirs turned bad: "
-          + StringUtils.join(",", failedLocalDirsList) + ";");
-    }
-    if (!failedLogDirsList.isEmpty()) {
-      report.append(failedLogDirsList.size() + "/" + numLogDirs
-          + " log-dirs turned bad: "
-          + StringUtils.join(",", failedLogDirsList));
+    List<String> goodLocalDirsList = localDirs.getGoodDirs();
+    List<String> goodLogDirsList = logDirs.getGoodDirs();
+    int numLocalDirs = goodLocalDirsList.size() + failedLocalDirsList.size();
+    int numLogDirs = goodLogDirsList.size() + failedLogDirsList.size();
+    if (!listGoodDirs) {
+      if (!failedLocalDirsList.isEmpty()) {
+        report.append(failedLocalDirsList.size() + "/" + numLocalDirs
+            + " local-dirs are bad: "
+            + StringUtils.join(",", failedLocalDirsList) + "; ");
+      }
+      if (!failedLogDirsList.isEmpty()) {
+        report.append(failedLogDirsList.size() + "/" + numLogDirs
+            + " log-dirs are bad: " + StringUtils.join(",", failedLogDirsList));
+      }
+    } else {
+      report.append(goodLocalDirsList.size() + "/" + numLocalDirs
+          + " local-dirs are good: " + StringUtils.join(",", goodLocalDirsList)
+          + "; ");
+      report.append(goodLogDirsList.size() + "/" + numLogDirs
+          + " log-dirs are good: " + StringUtils.join(",", goodLogDirsList));
+
     }
+
     return report.toString();
+
   }
 
   /**
@@ -262,8 +318,8 @@ public class LocalDirsHandlerService extends AbstractService {
    * Set good local dirs and good log dirs in the configuration so that the
    * LocalDirAllocator objects will use this updated configuration only.
    */
-  private void updateDirsAfterFailure() {
-    LOG.info("Disk(s) failed. " + getDisksHealthReport());
+  private void updateDirsAfterTest() {
+
     Configuration conf = getConfig();
     List<String> localDirs = getLocalDirs();
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
@@ -273,23 +329,91 @@ public class LocalDirsHandlerService extends AbstractService {
                       logDirs.toArray(new String[logDirs.size()]));
     if (!areDisksHealthy()) {
       // Just log.
-      LOG.error("Most of the disks failed. " + getDisksHealthReport());
+      LOG.error("Most of the disks failed. " + getDisksHealthReport(false));
     }
   }
 
+  private void logDiskStatus(boolean newDiskFailure, boolean diskTurnedGood) {
+    if (newDiskFailure) {
+      String report = getDisksHealthReport(false);
+      LOG.info("Disk(s) failed: " + report);
+    }
+    if (diskTurnedGood) {
+      String report = getDisksHealthReport(true);
+      LOG.info("Disk(s) turned good: " + report);
+    }
+
+  }
+
   private void checkDirs() {
-      boolean newFailure = false;
-      if (localDirs.checkDirs()) {
-        newFailure = true;
-      }
-      if (logDirs.checkDirs()) {
-        newFailure = true;
+    boolean disksStatusChange = false;
+    Set<String> failedLocalDirsPreCheck =
+        new HashSet<String>(localDirs.getFailedDirs());
+    Set<String> failedLogDirsPreCheck =
+        new HashSet<String>(logDirs.getFailedDirs());
+
+    if (localDirs.checkDirs()) {
+      disksStatusChange = true;
+    }
+    if (logDirs.checkDirs()) {
+      disksStatusChange = true;
+    }
+
+    Set<String> failedLocalDirsPostCheck =
+        new HashSet<String>(localDirs.getFailedDirs());
+    Set<String> failedLogDirsPostCheck =
+        new HashSet<String>(logDirs.getFailedDirs());
+
+    boolean disksFailed = false;
+    boolean disksTurnedGood = false;
+
+    disksFailed =
+        disksTurnedBad(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
+    disksTurnedGood =
+        disksTurnedGood(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
+
+    // skip check if we have new failed or good local dirs since we're going to
+    // log anyway
+    if (!disksFailed) {
+      disksFailed =
+          disksTurnedBad(failedLogDirsPreCheck, failedLogDirsPostCheck);
+    }
+    if (!disksTurnedGood) {
+      disksTurnedGood =
+          disksTurnedGood(failedLogDirsPreCheck, failedLogDirsPostCheck);
+    }
+
+    logDiskStatus(disksFailed, disksTurnedGood);
+
+    if (disksStatusChange) {
+      updateDirsAfterTest();
+    }
+
+    lastDisksCheckTime = System.currentTimeMillis();
+  }
+
+  private boolean disksTurnedBad(Set<String> preCheckFailedDirs,
+      Set<String> postCheckDirs) {
+    boolean disksFailed = false;
+    for (String dir : postCheckDirs) {
+      if (!preCheckFailedDirs.contains(dir)) {
+        disksFailed = true;
+        break;
       }
+    }
+    return disksFailed;
+  }
 
-      if (newFailure) {
-        updateDirsAfterFailure();
+  private boolean disksTurnedGood(Set<String> preCheckDirs,
+      Set<String> postCheckDirs) {
+    boolean disksTurnedGood = false;
+    for (String dir : preCheckDirs) {
+      if (!postCheckDirs.contains(dir)) {
+        disksTurnedGood = true;
+        break;
       }
-      lastDisksCheckTime = System.currentTimeMillis();
+    }
+    return disksTurnedGood;
   }
 
   public Path getLocalPathForWrite(String pathStr) throws IOException {

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java

@@ -55,9 +55,9 @@ public class NodeHealthCheckerService extends CompositeService {
     String scriptReport = (nodeHealthScriptRunner == null) ? ""
         : nodeHealthScriptRunner.getHealthReport();
     if (scriptReport.equals("")) {
-      return dirsHandler.getDisksHealthReport();
+      return dirsHandler.getDisksHealthReport(false);
     } else {
-      return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport());
+      return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport(false));
     }
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -240,7 +240,7 @@ public class ContainerLaunch implements Callable<Integer> {
       if (!dirsHandler.areDisksHealthy()) {
         ret = ContainerExitStatus.DISKS_FAILED;
         throw new IOException("Most of the disks failed. "
-            + dirsHandler.getDisksHealthReport());
+            + dirsHandler.getDisksHealthReport(false));
       }
 
       try {

+ 220 - 50
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -55,11 +55,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.Credentials;
@@ -170,6 +172,8 @@ public class ResourceLocalizationService extends CompositeService
    */
   private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
     new ConcurrentHashMap<String,LocalResourcesTracker>();
+  
+  FileContext lfs;
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService,
@@ -219,32 +223,17 @@ public class ResourceLocalizationService extends CompositeService
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
-      FileContext lfs = getLocalFileContext(conf);
-      lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
-
-      if (!stateStore.canRecover() || stateStore.isNewlyCreated()) {
-        cleanUpLocalDir(lfs,delService);
-      }
-
-      List<String> localDirs = dirsHandler.getLocalDirs();
-      for (String localDir : localDirs) {
-        // $local/usercache
-        Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
-        lfs.mkdir(userDir, null, true);
-        // $local/filecache
-        Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
-        lfs.mkdir(fileDir, null, true);
-        // $local/nmPrivate
-        Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
-        lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
-      }
+      lfs = getLocalFileContext(conf);
+      lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK));
 
-      List<String> logDirs = dirsHandler.getLogDirs();
-      for (String logDir : logDirs) {
-        lfs.mkdir(new Path(logDir), null, true);
+      if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) {
+        cleanUpLocalDirs(lfs, delService);
+        initializeLocalDirs(lfs);
+        initializeLogDirs(lfs);
       }
-    } catch (IOException e) {
-      throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(
+        "Failed to initialize LocalizationService", e);
     }
 
     cacheTargetSize =
@@ -497,28 +486,45 @@ public class ResourceLocalizationService extends CompositeService
     String containerIDStr = c.toString();
     String appIDStr = ConverterUtils.toString(
         c.getContainerId().getApplicationAttemptId().getApplicationId());
-    for (String localDir : dirsHandler.getLocalDirs()) {
+    
+    // Try deleting from good local dirs and full local dirs because a dir might
+    // have gone bad while the app was running(disk full). In addition
+    // a dir might have become good while the app was running.
+    // Check if the container dir exists and if it does, try to delete it
 
+    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
       // Delete the user-owned container-dir
       Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
       Path userdir = new Path(usersdir, userName);
       Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
       Path appDir = new Path(allAppsdir, appIDStr);
       Path containerDir = new Path(appDir, containerIDStr);
-      delService.delete(userName, containerDir, new Path[] {});
+      submitDirForDeletion(userName, containerDir);
 
       // Delete the nmPrivate container-dir
-      
+
       Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
       Path appSysDir = new Path(sysDir, appIDStr);
       Path containerSysDir = new Path(appSysDir, containerIDStr);
-      delService.delete(null, containerSysDir,  new Path[] {});
+      submitDirForDeletion(null, containerSysDir);
     }
 
     dispatcher.getEventHandler().handle(
         new ContainerEvent(c.getContainerId(),
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
   }
+  
+  private void submitDirForDeletion(String userName, Path dir) {
+    try {
+      lfs.getFileStatus(dir);
+      delService.delete(userName, dir, new Path[] {});
+    } catch (UnsupportedFileSystemException ue) {
+      LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
+    } catch (IOException ie) {
+      // ignore
+      return;
+    }
+  }
 
 
   @SuppressWarnings({"unchecked"})
@@ -545,19 +551,22 @@ public class ResourceLocalizationService extends CompositeService
     }
 
     // Delete the application directories
-    for (String localDir : dirsHandler.getLocalDirs()) {
+    userName = application.getUser();
+    appIDStr = application.toString();
+
+    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
 
       // Delete the user-owned app-dir
       Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
       Path userdir = new Path(usersdir, userName);
       Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
       Path appDir = new Path(allAppsdir, appIDStr);
-      delService.delete(userName, appDir, new Path[] {});
+      submitDirForDeletion(userName, appDir);
 
       // Delete the nmPrivate app-dir
       Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
       Path appSysDir = new Path(sysDir, appIDStr);
-      delService.delete(null, appSysDir, new Path[] {});
+      submitDirForDeletion(null, appSysDir);
     }
 
     // TODO: decrement reference counts of all resources associated with this
@@ -590,8 +599,8 @@ public class ResourceLocalizationService extends CompositeService
 
   private String getAppFileCachePath(String user, String appId) {
     return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
-      ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
-      ContainerLocalizer.FILECACHE));
+        ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+        ContainerLocalizer.FILECACHE));
   }
   
   @VisibleForTesting
@@ -868,7 +877,7 @@ public class ResourceLocalizationService extends CompositeService
     /**
      * Find next resource to be given to a spawned localizer.
      * 
-     * @return
+     * @return the next resource to be localized
      */
     private LocalResource findNextResource() {
       synchronized (pending) {
@@ -1071,8 +1080,8 @@ public class ResourceLocalizationService extends CompositeService
         // 1) write credentials to private dir
         writeCredentials(nmPrivateCTokensPath);
         // 2) exec initApplication and wait
-        List<String> localDirs = dirsHandler.getLocalDirs();
-        List<String> logDirs = dirsHandler.getLogDirs();
+        List<String> localDirs = getInitializedLocalDirs();
+        List<String> logDirs = getInitializedLogDirs();
         if (dirsHandler.areDisksHealthy()) {
           exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
               context.getUser(),
@@ -1082,7 +1091,7 @@ public class ResourceLocalizationService extends CompositeService
               localizerId, localDirs, logDirs);
         } else {
           throw new IOException("All disks failed. "
-              + dirsHandler.getDisksHealthReport());
+              + dirsHandler.getDisksHealthReport(false));
         }
       // TODO handle ExitCodeException separately?
       } catch (Exception e) {
@@ -1151,24 +1160,95 @@ public class ResourceLocalizationService extends CompositeService
 
   }
 
-  private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
-    long currentTimeStamp = System.currentTimeMillis();
-    for (String localDir : dirsHandler.getLocalDirs()) {
-      renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
-          currentTimeStamp);
-      renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
-          currentTimeStamp);
-      renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
-          currentTimeStamp);
+  private void initializeLocalDirs(FileContext lfs) {
+    List<String> localDirs = dirsHandler.getLocalDirs();
+    for (String localDir : localDirs) {
+      initializeLocalDir(lfs, localDir);
+    }
+  }
+
+  private void initializeLocalDir(FileContext lfs, String localDir) {
+
+    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
+    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
+      FileStatus status;
       try {
-        deleteLocalDir(lfs, del, localDir);
-      } catch (IOException e) {
-        // Do nothing, just give the warning
-        LOG.warn("Failed to delete localDir: " + localDir);
+        status = lfs.getFileStatus(entry.getKey());
+      }
+      catch(FileNotFoundException fs) {
+        status = null;
+      }
+      catch(IOException ie) {
+        String msg = "Could not get file status for local dir " + entry.getKey();
+        LOG.warn(msg, ie);
+        throw new YarnRuntimeException(msg, ie);
+      }
+      if(status == null) {
+        try {
+          lfs.mkdir(entry.getKey(), entry.getValue(), true);
+          status = lfs.getFileStatus(entry.getKey());
+        } catch (IOException e) {
+          String msg = "Could not initialize local dir " + entry.getKey();
+          LOG.warn(msg, e);
+          throw new YarnRuntimeException(msg, e);
+        }
+      }
+      FsPermission perms = status.getPermission();
+      if(!perms.equals(entry.getValue())) {
+        try {
+          lfs.setPermission(entry.getKey(), entry.getValue());
+        }
+        catch(IOException ie) {
+          String msg = "Could not set permissions for local dir " + entry.getKey();
+          LOG.warn(msg, ie);
+          throw new YarnRuntimeException(msg, ie);
+        }
       }
     }
   }
 
+  private void initializeLogDirs(FileContext lfs) {
+    List<String> logDirs = dirsHandler.getLogDirs();
+    for (String logDir : logDirs) {
+      initializeLogDir(lfs, logDir);
+    }
+  }
+
+  private void initializeLogDir(FileContext lfs, String logDir) {
+    try {
+      lfs.mkdir(new Path(logDir), null, true);
+    } catch (FileAlreadyExistsException fe) {
+      // do nothing
+    } catch (IOException e) {
+      String msg = "Could not initialize log dir " + logDir;
+      LOG.warn(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+
+  private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
+    for (String localDir : dirsHandler.getLocalDirs()) {
+      cleanUpLocalDir(lfs, del, localDir);
+    }
+  }
+
+  private void cleanUpLocalDir(FileContext lfs, DeletionService del,
+      String localDir) {
+    long currentTimeStamp = System.currentTimeMillis();
+    renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
+      currentTimeStamp);
+    renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
+      currentTimeStamp);
+    renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
+      currentTimeStamp);
+    try {
+      deleteLocalDir(lfs, del, localDir);
+    } catch (IOException e) {
+      // Do nothing, just give the warning
+      LOG.warn("Failed to delete localDir: " + localDir);
+    }
+  }
+
   private void renameLocalDir(FileContext lfs, String localDir,
       String localSubDir, long currentTimeStamp) {
     try {
@@ -1234,5 +1314,95 @@ public class ResourceLocalizationService extends CompositeService
       del.scheduleFileDeletionTask(dependentDeletionTask);
     }
   }
+  
+  /**
+   * Synchronized method to get a list of initialized local dirs. Method will
+   * check each local dir to ensure it has been setup correctly and will attempt
+   * to fix any issues it finds.
+   * 
+   * @return list of initialized local dirs
+   */
+  synchronized private List<String> getInitializedLocalDirs() {
+    List<String> dirs = dirsHandler.getLocalDirs();
+    List<String> checkFailedDirs = new ArrayList<String>();
+    for (String dir : dirs) {
+      try {
+        checkLocalDir(dir);
+      } catch (YarnRuntimeException e) {
+        checkFailedDirs.add(dir);
+      }
+    }
+    for (String dir : checkFailedDirs) {
+      LOG.info("Attempting to initialize " + dir);
+      initializeLocalDir(lfs, dir);
+      try {
+        checkLocalDir(dir);
+      } catch (YarnRuntimeException e) {
+        String msg =
+            "Failed to setup local dir " + dir + ", which was marked as good.";
+        LOG.warn(msg, e);
+        throw new YarnRuntimeException(msg, e);
+      }
+    }
+    return dirs;
+  }
+
+  private boolean checkLocalDir(String localDir) {
+
+    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
+
+    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
+      FileStatus status;
+      try {
+        status = lfs.getFileStatus(entry.getKey());
+      } catch (Exception e) {
+        String msg =
+            "Could not carry out resource dir checks for " + localDir
+                + ", which was marked as good";
+        LOG.warn(msg, e);
+        throw new YarnRuntimeException(msg, e);
+      }
+
+      if (!status.getPermission().equals(entry.getValue())) {
+        String msg =
+            "Permissions incorrectly set for dir " + entry.getKey()
+                + ", should be " + entry.getValue() + ", actual value = "
+                + status.getPermission();
+        LOG.warn(msg);
+        throw new YarnRuntimeException(msg);
+      }
+    }
+    return true;
+  }
+
+  private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir) {
+    Map<Path, FsPermission> localDirPathFsPermissionsMap = new HashMap<Path, FsPermission>();
+
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPrivatePermission =
+        NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
 
+    Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
+    Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
+    Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+
+    localDirPathFsPermissionsMap.put(userDir, defaultPermission);
+    localDirPathFsPermissionsMap.put(fileDir, defaultPermission);
+    localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
+    return localDirPathFsPermissionsMap;
+  }
+  
+  /**
+   * Synchronized method to get a list of initialized log dirs. Method will
+   * check each local dir to ensure it has been setup correctly and will attempt
+   * to fix any issues it finds.
+   * 
+   * @return list of initialized log dirs
+   */
+  synchronized private List<String> getInitializedLogDirs() {
+    List<String> dirs = dirsHandler.getLogDirs();
+    initializeLogDirs(lfs);
+    return dirs;
+  }
 }

+ 25 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,9 +38,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -107,6 +110,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
   private final AtomicBoolean aborted = new AtomicBoolean();
   private final Map<ApplicationAccessType, String> appAcls;
+  private final FileContext lfs;
   private final LogAggregationContext logAggregationContext;
   private final Context context;
   private final int retentionSize;
@@ -122,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
       ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls,
-      LogAggregationContext logAggregationContext,
-      Context context) {
+      LogAggregationContext logAggregationContext, Context context,
+      FileContext lfs) {
     this.dispatcher = dispatcher;
     this.conf = conf;
     this.delService = deletionService;
@@ -136,6 +140,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.retentionPolicy = retentionPolicy;
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
     this.appAcls = appAcls;
+    this.lfs = lfs;
     this.logAggregationContext = logAggregationContext;
     this.context = context;
     this.nodeId = nodeId;
@@ -395,15 +400,25 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     uploadLogsForContainers();
 
     // Remove the local app-log-dirs
-    List<String> rootLogDirs = dirsHandler.getLogDirs();
-    Path[] localAppLogDirs = new Path[rootLogDirs.size()];
-    int index = 0;
-    for (String rootLogDir : rootLogDirs) {
-      localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
-      index++;
+    List<Path> localAppLogDirs = new ArrayList<Path>();
+    for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+      Path logPath = new Path(rootLogDir, applicationId);
+      try {
+        // check if log dir exists
+        lfs.getFileStatus(logPath);
+        localAppLogDirs.add(logPath);
+      } catch (UnsupportedFileSystemException ue) {
+        LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue);
+        continue;
+      } catch (IOException fe) {
+        continue;
+      }
+    }
+
+    if (localAppLogDirs.size() > 0) {
+      this.delService.delete(this.userUgi.getShortUserName(), null,
+        localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
     }
-    this.delService.delete(this.userUgi.getShortUserName(), null,
-        localAppLogDirs);
     
     this.dispatcher.getEventHandler().handle(
         new ApplicationEvent(this.appId,

+ 12 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -326,6 +327,15 @@ public class LogAggregationService extends AbstractService implements
     }
     this.dispatcher.getEventHandler().handle(eventResponse);
   }
+  
+  FileContext getLocalFileContext(Configuration conf) {
+    try {
+      return FileContext.getLocalFSFileContext(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Failed to access local fs");
+    }
+  }
+
 
   protected void initAppAggregator(final ApplicationId appId, String user,
       Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
@@ -344,7 +354,8 @@ public class LogAggregationService extends AbstractService implements
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
             getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
-            appAcls, logAggregationContext, this.context);
+            appAcls, logAggregationContext, this.context,
+            getLocalFileContext(getConfig()));
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }

+ 35 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,11 +30,14 @@ import java.util.concurrent.RejectedExecutionException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -96,6 +101,15 @@ public class NonAggregatingLogHandler extends AbstractService implements
     }
     super.serviceStop();
   }
+  
+  FileContext getLocalFileContext(Configuration conf) {
+    try {
+      return FileContext.getLocalFSFileContext(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Failed to access local fs");
+    }
+  }
+
 
   @SuppressWarnings("unchecked")
   @Override
@@ -160,21 +174,30 @@ public class NonAggregatingLogHandler extends AbstractService implements
     @Override
     @SuppressWarnings("unchecked")
     public void run() {
-      List<String> rootLogDirs =
-          NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
-      Path[] localAppLogDirs = new Path[rootLogDirs.size()];
-      int index = 0;
-      for (String rootLogDir : rootLogDirs) {
-        localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
-        index++;
+      List<Path> localAppLogDirs = new ArrayList<Path>();
+      FileContext lfs = getLocalFileContext(getConfig());
+      for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+        Path logDir = new Path(rootLogDir, applicationId.toString());
+        try {
+          lfs.getFileStatus(logDir);
+          localAppLogDirs.add(logDir);
+        } catch (UnsupportedFileSystemException ue) {
+          LOG.warn("Unsupported file system used for log dir " + logDir, ue);
+          continue;
+        } catch (IOException ie) {
+          continue;
+        }
       }
+
       // Inform the application before the actual delete itself, so that links
-      // to logs will no longer be there on NM web-UI. 
+      // to logs will no longer be there on NM web-UI.
       NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
-          new ApplicationEvent(this.applicationId,
-              ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
-      NonAggregatingLogHandler.this.delService.delete(user, null,
-          localAppLogDirs);
+        new ApplicationEvent(this.applicationId,
+          ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
+      if (localAppLogDirs.size() > 0) {
+        NonAggregatingLogHandler.this.delService.delete(user, null,
+          (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+      }
     }
 
     @Override

+ 64 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java

@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestDirectoryCollection {
@@ -42,8 +44,13 @@ public class TestDirectoryCollection {
       TestDirectoryCollection.class.getName()).getAbsoluteFile();
   private static final File testFile = new File(testDir, "testfile");
 
+  private Configuration conf;
+  private FileContext localFs;
+
   @Before
-  public void setup() throws IOException {
+  public void setupForTests() throws IOException {
+    conf = new Configuration();
+    localFs = FileContext.getLocalFSFileContext(conf);
     testDir.mkdirs();
     testFile.createNewFile();
   }
@@ -56,11 +63,12 @@ public class TestDirectoryCollection {
   @Test
   public void testConcurrentAccess() throws IOException {
     // Initialize DirectoryCollection with a file instead of a directory
-    Configuration conf = new Configuration();
+    
     String[] dirs = {testFile.getPath()};
-    DirectoryCollection dc = new DirectoryCollection(dirs,
-      conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
-        YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
+    DirectoryCollection dc =
+        new DirectoryCollection(dirs, conf.getFloat(
+          YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+          YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
 
     // Create an iterator before checkDirs is called to reliable test case
     List<String> list = dc.getGoodDirs();
@@ -78,9 +86,8 @@ public class TestDirectoryCollection {
 
   @Test
   public void testCreateDirectories() throws IOException {
-    Configuration conf = new Configuration();
+    
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
-    FileContext localFs = FileContext.getLocalFSFileContext(conf);
 
     String dirA = new File(testDir, "dirA").getPath();
     String dirB = new File(dirA, "dirB").getPath();
@@ -92,9 +99,10 @@ public class TestDirectoryCollection {
     localFs.setPermission(pathC, permDirC);
 
     String[] dirs = { dirA, dirB, dirC };
-    DirectoryCollection dc = new DirectoryCollection(dirs,
-      conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
-        YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
+    DirectoryCollection dc =
+        new DirectoryCollection(dirs, conf.getFloat(
+          YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+          YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
     FsPermission defaultPerm = FsPermission.getDefault()
         .applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
     boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm);
@@ -120,25 +128,29 @@ public class TestDirectoryCollection {
     dc.checkDirs();
     Assert.assertEquals(0, dc.getGoodDirs().size());
     Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(1, dc.getFullDirs().size());
 
     dc = new DirectoryCollection(dirs, 100.0F);
     dc.checkDirs();
     Assert.assertEquals(1, dc.getGoodDirs().size());
     Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
 
     dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024));
     dc.checkDirs();
     Assert.assertEquals(0, dc.getGoodDirs().size());
     Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(1, dc.getFullDirs().size());
 
     dc = new DirectoryCollection(dirs, 100.0F, 0);
     dc.checkDirs();
     Assert.assertEquals(1, dc.getGoodDirs().size());
     Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
   }
 
   @Test
-  public void testDiskLimitsCutoffSetters() {
+  public void testDiskLimitsCutoffSetters() throws IOException {
 
     String[] dirs = { "dir" };
     DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100);
@@ -162,6 +174,47 @@ public class TestDirectoryCollection {
     Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
   }
 
+  @Test
+  public void testFailedDisksBecomingGoodAgain() throws Exception {
+
+    String dirA = new File(testDir, "dirA").getPath();
+    String[] dirs = { dirA };
+    DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
+    dc.checkDirs();
+    Assert.assertEquals(0, dc.getGoodDirs().size());
+    Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(1, dc.getFullDirs().size());
+
+    dc.setDiskUtilizationPercentageCutoff(100.0F);
+    dc.checkDirs();
+    Assert.assertEquals(1, dc.getGoodDirs().size());
+    Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+
+    String dirB = new File(testDir, "dirB").getPath();
+    Path pathB = new Path(dirB);
+    FsPermission permDirB = new FsPermission((short) 0400);
+
+    localFs.mkdir(pathB, null, true);
+    localFs.setPermission(pathB, permDirB);
+
+    String[] dirs2 = { dirB };
+
+    dc = new DirectoryCollection(dirs2, 100.0F);
+    dc.checkDirs();
+    Assert.assertEquals(0, dc.getGoodDirs().size());
+    Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
+    permDirB = new FsPermission((short) 0700);
+    localFs.setPermission(pathB, permDirB);
+    dc.checkDirs();
+    Assert.assertEquals(1, dc.getGoodDirs().size());
+    Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
+  }
+
   @Test
   public void testConstructors() {
 

+ 42 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java

@@ -21,8 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -57,10 +62,11 @@ public class TestLocalDirsHandlerService {
     LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
     dirSvc.init(conf);
     Assert.assertEquals(1, dirSvc.getLocalDirs().size());
+    dirSvc.close();
   }
 
   @Test
-  public void testValidPathsDirHandlerService() {
+  public void testValidPathsDirHandlerService() throws Exception {
     Configuration conf = new YarnConfiguration();
     String localDir1 = new File("file:///" + testDir, "localDir1").getPath();
     String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath();
@@ -76,5 +82,40 @@ public class TestLocalDirsHandlerService {
     Assert.assertEquals("Service should not be inited",
                         STATE.STOPPED,
                         dirSvc.getServiceState());
+    dirSvc.close();
+  }
+  
+  @Test
+  public void testGetFullDirs() throws Exception {
+    Configuration conf = new YarnConfiguration();
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext localFs = FileContext.getLocalFSFileContext(conf);
+
+    String localDir1 = new File(testDir, "localDir1").getPath();
+    String localDir2 = new File(testDir, "localDir2").getPath();
+    String logDir1 = new File(testDir, "logDir1").getPath();
+    String logDir2 = new File(testDir, "logDir2").getPath();
+    Path localDir1Path = new Path(localDir1);
+    Path logDir1Path = new Path(logDir1);
+    FsPermission dirPermissions = new FsPermission((short) 0410);
+    localFs.mkdir(localDir1Path, dirPermissions, true);
+    localFs.mkdir(logDir1Path, dirPermissions, true);
+
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2);
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2);
+    conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+      0.0f);
+    LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
+    dirSvc.init(conf);
+    Assert.assertEquals(0, dirSvc.getLocalDirs().size());
+    Assert.assertEquals(0, dirSvc.getLogDirs().size());
+    Assert.assertEquals(1, dirSvc.getDiskFullLocalDirs().size());
+    Assert.assertEquals(1, dirSvc.getDiskFullLogDirs().size());
+    FileUtils.deleteDirectory(new File(localDir1));
+    FileUtils.deleteDirectory(new File(localDir2));
+    FileUtils.deleteDirectory(new File(logDir1));
+    FileUtils.deleteDirectory(new File(logDir1));
+    dirSvc.close();
   }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java

@@ -196,7 +196,7 @@ public class TestNodeHealthService {
         healthStatus.getHealthReport().equals(
             NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG
             + NodeHealthCheckerService.SEPARATOR
-            + nodeHealthChecker.getDiskHandler().getDisksHealthReport()));
+            + nodeHealthChecker.getDiskHandler().getDisksHealthReport(false)));
   }
 
 }

+ 309 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -42,6 +42,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -68,13 +69,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.security.AccessControlException;
 import org.junit.Assert;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -167,15 +171,15 @@ public class TestResourceLocalizationService {
     conf = new Configuration();
     spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
     lfs = FileContext.getFileContext(spylfs, conf);
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
     String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
   }
 
   @After
-  public void cleanup() {
+  public void cleanup() throws IOException {
     conf = null;
+    FileUtils.deleteDirectory(new File(basedir.toString()));
   }
   
   @Test
@@ -752,6 +756,39 @@ public class TestResourceLocalizationService {
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final Path userDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.USERCACHE);
+    final Path fileDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.FILECACHE);
+    final Path sysDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ResourceLocalizationService.NM_PRIVATE_DIR);
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "", new Path(sDirs[0]));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          nmPermission, "", "", sysDir);
+
+    doAnswer(new Answer<FileStatus>() {
+      @Override
+      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length > 0) {
+          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+            return fs;
+          }
+        }
+        return nmFs;
+      }
+    }).when(spylfs).getFileStatus(isA(Path.class));
+
     try {
       spyService.init(conf);
       spyService.start();
@@ -1775,5 +1812,274 @@ public class TestResourceLocalizationService {
     return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
         new Text("kind" + id), new Text("service" + id));
   }
+  
+  /*
+   * Test to ensure ResourceLocalizationService can handle local dirs going bad.
+   * Test first sets up all the components required, then sends events to fetch
+   * a private, app and public resource. It then sends events to clean up the
+   * container and the app and ensures the right delete calls were made.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  // mocked generics
+  public void testFailedDirsResourceRelease() throws Exception {
+    // setup components
+    File f = new File(basedir.toString());
+    String[] sDirs = new String[4];
+    List<Path> localDirs = new ArrayList<Path>(sDirs.length);
+    for (int i = 0; i < 4; ++i) {
+      sDirs[i] = f.getAbsolutePath() + i;
+      localDirs.add(new Path(sDirs[i]));
+    }
+    List<Path> containerLocalDirs = new ArrayList<Path>(localDirs.size());
+    List<Path> appLocalDirs = new ArrayList<Path>(localDirs.size());
+    List<Path> nmLocalContainerDirs = new ArrayList<Path>(localDirs.size());
+    List<Path> nmLocalAppDirs = new ArrayList<Path>(localDirs.size());
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
+
+    LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    // Ignore actual localization
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalDirsHandlerService mockDirsHandler =
+        mock(LocalDirsHandlerService.class);
+    doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
+        mockDirsHandler).getLocalDirsForCleanup();
+
+    DeletionService delService = mock(DeletionService.class);
+
+    // setup mocks
+    ResourceLocalizationService rawService =
+        new ResourceLocalizationService(dispatcher, exec, delService,
+          mockDirsHandler, new NMNullStateStoreService());
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
+      isA(Configuration.class));
+    doReturn(lfs).when(spyService)
+      .getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "", localDirs.get(0));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          nmPermission, "", "", localDirs.get(0));
+
+    final String user = "user0";
+    // init application
+    final Application app = mock(Application.class);
+    final ApplicationId appId =
+        BuilderUtils.newApplicationId(314159265358979L, 3);
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    when(app.toString()).thenReturn(ConverterUtils.toString(appId));
+
+    // init container.
+    final Container c = getMockContainer(appId, 42, user);
+
+    // setup local app dirs
+    List<String> tmpDirs = mockDirsHandler.getLocalDirs();
+    for (int i = 0; i < tmpDirs.size(); ++i) {
+      Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE);
+      Path userdir = new Path(usersdir, user);
+      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+      Path appDir = new Path(allAppsdir, ConverterUtils.toString(appId));
+      Path containerDir =
+          new Path(appDir, ConverterUtils.toString(c.getContainerId()));
+      containerLocalDirs.add(containerDir);
+      appLocalDirs.add(appDir);
+
+      Path sysDir =
+          new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR);
+      Path appSysDir = new Path(sysDir, ConverterUtils.toString(appId));
+      Path containerSysDir =
+          new Path(appSysDir, ConverterUtils.toString(c.getContainerId()));
+
+      nmLocalContainerDirs.add(containerSysDir);
+      nmLocalAppDirs.add(appSysDir);
+    }
+
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      spyService.handle(new ApplicationLocalizationEvent(
+        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      // Get a handle on the trackers after they're setup with
+      // INIT_APP_RESOURCES
+      LocalResourcesTracker appTracker =
+          spyService.getLocalResourcesTracker(
+            LocalResourceVisibility.APPLICATION, user, appId);
+      LocalResourcesTracker privTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+            user, appId);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+            user, appId);
+
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+
+      // Send localization requests, one for each type of resource
+      final LocalResource privResource = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq =
+          new LocalResourceRequest(privResource);
+
+      final LocalResource appResource = getAppMockedResource(r);
+      final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
+
+      final LocalResource pubResource = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+      req.put(LocalResourceVisibility.PRIVATE,
+        Collections.singletonList(privReq));
+      req.put(LocalResourceVisibility.APPLICATION,
+        Collections.singletonList(appReq));
+      req
+        .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+      req2.put(LocalResourceVisibility.PRIVATE,
+        Collections.singletonList(privReq));
+
+      // Send Request event
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
+      dispatcher.await();
+
+      int privRsrcCount = 0;
+      for (LocalizedResource lr : privTracker) {
+        privRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
+        Assert.assertEquals(privReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, privRsrcCount);
+
+      int appRsrcCount = 0;
+      for (LocalizedResource lr : appTracker) {
+        appRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        Assert.assertEquals(appReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, appRsrcCount);
+
+      int pubRsrcCount = 0;
+      for (LocalizedResource lr : pubTracker) {
+        pubRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        Assert.assertEquals(pubReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, pubRsrcCount);
+
+      // setup mocks for test, a set of dirs with IOExceptions and let the rest
+      // go through
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 2) {
+          Mockito.doThrow(new IOException()).when(spylfs)
+            .getFileStatus(eq(containerLocalDirs.get(i)));
+          Mockito.doThrow(new IOException()).when(spylfs)
+            .getFileStatus(eq(nmLocalContainerDirs.get(i)));
+        } else {
+          doReturn(fs).when(spylfs)
+            .getFileStatus(eq(containerLocalDirs.get(i)));
+          doReturn(nmFs).when(spylfs).getFileStatus(
+            eq(nmLocalContainerDirs.get(i)));
+        }
+      }
+
+      // Send Cleanup Event
+      spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+      verify(mockLocallilzerTracker).cleanupPrivLocalizers(
+        "container_314159265358979_0003_01_000042");
+
+      // match cleanup events with the mocks we setup earlier
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 2) {
+          try {
+            verify(delService).delete(user, containerLocalDirs.get(i));
+            verify(delService).delete(null, nmLocalContainerDirs.get(i));
+            Assert.fail("deletion attempts for invalid dirs");
+          } catch (Throwable e) {
+            continue;
+          }
+        } else {
+          verify(delService).delete(user, containerLocalDirs.get(i));
+          verify(delService).delete(null, nmLocalContainerDirs.get(i));
+        }
+      }
+
+      ArgumentMatcher<ApplicationEvent> matchesAppDestroy =
+          new ArgumentMatcher<ApplicationEvent>() {
+            @Override
+            public boolean matches(Object o) {
+              ApplicationEvent evt = (ApplicationEvent) o;
+              return (evt.getType() == ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)
+                  && appId == evt.getApplicationID();
+            }
+          };
+
+      dispatcher.await();
+
+      // setup mocks again, this time throw UnsupportedFileSystemException and
+      // IOExceptions
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 3) {
+          Mockito.doThrow(new IOException()).when(spylfs)
+            .getFileStatus(eq(appLocalDirs.get(i)));
+          Mockito.doThrow(new UnsupportedFileSystemException("test"))
+            .when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
+        } else {
+          doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i)));
+          doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
+        }
+      }
+      LocalizationEvent destroyApp =
+          new ApplicationLocalizationEvent(
+            LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app);
+      spyService.handle(destroyApp);
+      verify(applicationBus).handle(argThat(matchesAppDestroy));
+
+      // verify we got the right delete calls
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 3) {
+          try {
+            verify(delService).delete(user, containerLocalDirs.get(i));
+            verify(delService).delete(null, nmLocalContainerDirs.get(i));
+            Assert.fail("deletion attempts for invalid dirs");
+          } catch (Throwable e) {
+            continue;
+          }
+        } else {
+          verify(delService).delete(user, appLocalDirs.get(i));
+          verify(delService).delete(null, nmLocalAppDirs.get(i));
+        }
+      }
+
+    } finally {
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
 
 }

+ 102 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -58,6 +59,8 @@ import org.junit.Assert;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -105,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -136,12 +140,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     super();
     this.remoteRootLogDir.mkdir();
   }
+  
+  DrainDispatcher dispatcher;
+  EventHandler<ApplicationEvent> appEventHandler;
 
   @Override
+  @SuppressWarnings("unchecked")
   public void setup() throws IOException {
     super.setup();
     NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
     ((NMContext)context).setNodeId(nodeId);
+    dispatcher = createDispatcher();
+    appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
   }
 
   @Override
@@ -149,10 +160,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     super.tearDown();
     createContainerExecutor().deleteAsUser(user,
         new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
+    dispatcher.await();
+    dispatcher.stop();
+    dispatcher.close();
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testLocalFileDeletionAfterUpload() throws Exception {
     this.delSrvc = new DeletionService(createContainerExecutor());
     delSrvc = spy(delSrvc);
@@ -161,10 +174,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
     
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
@@ -236,16 +245,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testNoContainerOnNode() throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
     
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler);
@@ -285,6 +289,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
     dispatcher.stop();
+    logAggregationService.close();
   }
 
   @Test
@@ -294,6 +299,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
+    
     String[] fileNames = new String[] { "stdout", "stderr", "syslog" };
     DrainDispatcher dispatcher = createDispatcher();
     EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@@ -432,17 +438,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   }
   
   @Test
-  @SuppressWarnings("unchecked")
   public void testVerifyAndCreateRemoteDirsFailure()
       throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
     
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
@@ -456,8 +457,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.start();
     
     // Now try to start an application
-    ApplicationId appId = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
         this.user, null,
         ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
@@ -475,8 +477,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     Mockito.reset(logAggregationService);
     
     // Now try to start another one
-    ApplicationId appId2 = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId2 =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     File appLogDir =
         new File(localLogDir, ConverterUtils.toString(appId2));
     appLogDir.mkdir();
@@ -578,6 +581,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
         ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
     verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
+    aggSvc.stop();
+    aggSvc.close();
   }
 
   @Test
@@ -588,19 +593,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+    
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
-    ApplicationId appId = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     doThrow(new YarnRuntimeException("KABOOM!"))
       .when(logAggregationService).initAppAggregator(
           eq(appId), eq(user), any(Credentials.class),
@@ -634,26 +636,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testLogAggregationCreateDirsFailsWithoutKillingNM()
       throws Exception {
     
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-    
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-    
+        
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
     
-    ApplicationId appId = BuilderUtils.newApplicationId(
-        System.currentTimeMillis(), (int)Math.random());
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(),
+          (int) (Math.random() * 1000));
     Exception e = new RuntimeException("KABOOM!");
     doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
@@ -905,18 +903,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   }
 
   @Test(timeout=20000)
-  @SuppressWarnings("unchecked")
   public void testStopAfterError() throws Exception {
     DeletionService delSrvc = mock(DeletionService.class);
 
     // get the AppLogAggregationImpl thread to crash
     LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
     when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
-
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+    
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, this.context, delSrvc,
                                   mockedDirSvc);
@@ -930,20 +923,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
+    logAggregationService.close();
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testLogAggregatorCleanup() throws Exception {
     DeletionService delSrvc = mock(DeletionService.class);
 
     // get the AppLogAggregationImpl thread to crash
     LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
 
-    DrainDispatcher dispatcher = createDispatcher();
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, this.context, delSrvc,
                                   mockedDirSvc);
@@ -964,6 +953,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     }
     Assert.assertEquals("Log aggregator failed to cleanup!", 0,
         logAggregationService.getNumAggregators());
+    logAggregationService.stop();
+    logAggregationService.close();
   }
   
   @SuppressWarnings("unchecked")
@@ -1039,6 +1030,72 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     return sb.toString();
   }
 
+  /*
+   * Test to make sure we handle cases where the directories we get back from
+   * the LocalDirsHandler may have issues including the log dir not being
+   * present as well as other issues. The test uses helper functions from
+   * TestNonAggregatingLogHandler.
+   */
+  @Test
+  public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception {
+
+    // setup conf and services
+    DeletionService mockDelService = mock(DeletionService.class);
+    File[] localLogDirs =
+        TestNonAggregatingLogHandler.getLocalLogDirFiles(this.getClass()
+          .getName(), 7);
+    final List<String> localLogDirPaths =
+        new ArrayList<String>(localLogDirs.length);
+    for (int i = 0; i < localLogDirs.length; i++) {
+      localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+    }
+
+    String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
+
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+      this.remoteRootLogDir.getAbsolutePath());
+    this.conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(application1, 1);
+
+    this.dirsHandler = new LocalDirsHandlerService();
+    LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
+
+    LogAggregationService logAggregationService =
+        spy(new LogAggregationService(dispatcher, this.context, mockDelService,
+          mockDirsHandler));
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logAggregationService).getLocalFileContext(
+      isA(Configuration.class));
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService,
+      application1, user, mockDelService, mockDirsHandler, conf, spylfs, lfs,
+      localLogDirs);
+
+    logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
+    verify(logAggregationService).closeFileSystems(
+      any(UserGroupInformation.class));
+
+    ApplicationEvent expectedEvents[] =
+        new ApplicationEvent[] {
+            new ApplicationEvent(appAttemptId.getApplicationId(),
+              ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+            new ApplicationEvent(appAttemptId.getApplicationId(),
+              ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
+
+    checkEvents(appEventHandler, expectedEvents, true, "getType",
+      "getApplicationID");
+  }
+
   @Test (timeout = 50000)
   @SuppressWarnings("unchecked")
   public void testLogAggregationServiceWithPatterns() throws Exception {

+ 300 - 82
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java

@@ -19,15 +19,36 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -45,25 +66,52 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.mockito.exceptions.verification.WantedButNotInvoked;
+import org.mockito.internal.matchers.VarargMatcher;
 
 public class TestNonAggregatingLogHandler {
+  
+  DeletionService mockDelService;
+  Configuration conf;
+  DrainDispatcher dispatcher;
+  EventHandler<ApplicationEvent> appEventHandler;
+  String user = "testuser";
+  ApplicationId appId;
+  ApplicationAttemptId appAttemptId;
+  ContainerId container11;
+  LocalDirsHandlerService dirsHandler;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setup() {
+    mockDelService = mock(DeletionService.class);
+    conf = new YarnConfiguration();
+    dispatcher = createDispatcher(conf);
+    appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    appId = BuilderUtils.newApplicationId(1234, 1);
+    appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+    container11 = BuilderUtils.newContainerId(appAttemptId, 1);
+    dirsHandler = new LocalDirsHandlerService();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    dirsHandler.stop();
+    dirsHandler.close();
+    dispatcher.await();
+    dispatcher.stop();
+    dispatcher.close();
+  }  
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void testLogDeletion() {
-    DeletionService delService = mock(DeletionService.class);
-    Configuration conf = new YarnConfiguration();
-    String user = "testuser";
-
-    File[] localLogDirs = new File[2];
-    localLogDirs[0] =
-        new File("target", this.getClass().getName() + "-localLogDir0")
-            .getAbsoluteFile();
-    localLogDirs[1] =
-        new File("target", this.getClass().getName() + "-localLogDir1")
-            .getAbsoluteFile();
+  public void testLogDeletion() throws IOException {
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
     String localLogDirsString =
         localLogDirs[0].getAbsolutePath() + ","
             + localLogDirs[1].getAbsolutePath();
@@ -72,72 +120,50 @@ public class TestNonAggregatingLogHandler {
     conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
 
-    DrainDispatcher dispatcher = createDispatcher(conf);
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
-    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
 
-    ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
-    ApplicationAttemptId appAttemptId1 =
-        BuilderUtils.newApplicationAttemptId(appId1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+    NonAggregatingLogHandler rawLogHandler =
+        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
+    NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logHandler)
+      .getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "",
+          new Path(localLogDirs[0].getAbsolutePath()));
+    doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
 
-    NonAggregatingLogHandler logHandler =
-        new NonAggregatingLogHandler(dispatcher, delService, dirsHandler);
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
         ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
-    logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
     Path[] localAppLogDirs = new Path[2];
     localAppLogDirs[0] =
-        new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
     localAppLogDirs[1] =
-        new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
 
-    // 5 seconds for the delete which is a separate thread.
-    long verifyStartTime = System.currentTimeMillis();
-    WantedButNotInvoked notInvokedException = null;
-    boolean matched = false;
-    while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
-      try {
-        verify(delService).delete(eq(user), (Path) eq(null),
-            eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
-        matched = true;
-      } catch (WantedButNotInvoked e) {
-        notInvokedException = e;
-        try {
-          Thread.sleep(50l);
-        } catch (InterruptedException i) {
-        }
-      }
-    }
-    if (!matched) {
-      throw notInvokedException;
+    testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
+    logHandler.close();
+    for (int i = 0; i < localLogDirs.length; i++) {
+      FileUtils.deleteDirectory(localLogDirs[i]);
     }
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void testDelayedDelete() {
-    DeletionService delService = mock(DeletionService.class);
-    Configuration conf = new YarnConfiguration();
-    String user = "testuser";
-
-    File[] localLogDirs = new File[2];
-    localLogDirs[0] =
-        new File("target", this.getClass().getName() + "-localLogDir0")
-            .getAbsoluteFile();
-    localLogDirs[1] =
-        new File("target", this.getClass().getName() + "-localLogDir1")
-            .getAbsoluteFile();
+  public void testDelayedDelete() throws IOException {
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
     String localLogDirsString =
         localLogDirs[0].getAbsolutePath() + ","
             + localLogDirs[1].getAbsolutePath();
@@ -148,42 +174,36 @@ public class TestNonAggregatingLogHandler {
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
             YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
 
-    DrainDispatcher dispatcher = createDispatcher(conf);
-    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, appEventHandler);
-
-    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
 
-    ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
-    ApplicationAttemptId appAttemptId1 =
-        BuilderUtils.newApplicationAttemptId(appId1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
-
     NonAggregatingLogHandler logHandler =
-        new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService,
+        new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService,
                                                      dirsHandler);
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
         ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
-    logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
     Path[] localAppLogDirs = new Path[2];
     localAppLogDirs[0] =
-        new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
     localAppLogDirs[1] =
-        new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+        new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
 
     ScheduledThreadPoolExecutor mockSched =
         ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
 
     verify(mockSched).schedule(any(Runnable.class), eq(10800l),
         eq(TimeUnit.SECONDS));
+    logHandler.close();
+    for (int i = 0; i < localLogDirs.length; i++) {
+      FileUtils.deleteDirectory(localLogDirs[i]);
+    }
   }
   
   @Test
@@ -202,25 +222,25 @@ public class TestNonAggregatingLogHandler {
     verify(logHandler.mockSched)
         .awaitTermination(eq(10l), eq(TimeUnit.SECONDS));
     verify(logHandler.mockSched).shutdownNow();
+    logHandler.close();
+    aggregatingLogHandler.close();
   }
 
   @Test
-  public void testHandlingApplicationFinishedEvent() {
-    Configuration conf = new Configuration();
-    LocalDirsHandlerService dirsService  = new LocalDirsHandlerService();
+  public void testHandlingApplicationFinishedEvent() throws IOException {
     DeletionService delService = new DeletionService(null);
     NonAggregatingLogHandler aggregatingLogHandler =
         new NonAggregatingLogHandler(new InlineDispatcher(),
             delService,
-            dirsService);
+            dirsHandler);
 
-    dirsService.init(conf);
-    dirsService.start();
+    dirsHandler.init(conf);
+    dirsHandler.start();
     delService.init(conf);
     delService.start();
     aggregatingLogHandler.init(conf);
     aggregatingLogHandler.start();
-    ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
+    
     // It should NOT throw RejectedExecutionException
     aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
     aggregatingLogHandler.stop();
@@ -228,6 +248,7 @@ public class TestNonAggregatingLogHandler {
     // It should NOT throw RejectedExecutionException after stopping
     // handler service.
     aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
+    aggregatingLogHandler.close();
   }
 
   private class NonAggregatingLogHandlerWithMockExecutor extends
@@ -255,4 +276,201 @@ public class TestNonAggregatingLogHandler {
     dispatcher.start();
     return dispatcher;
   }
+  
+  /*
+   * Test to ensure that we handle the cleanup of directories that may not have
+   * the application log dirs we're trying to delete or may have other problems.
+   * Test creates 7 log dirs, and fails the directory check for 4 of them and
+   * then checks to ensure we tried to delete only the ones that passed the
+   * check.
+   */
+  @Test
+  public void testFailedDirLogDeletion() throws Exception {
+
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 7);
+    final List<String> localLogDirPaths =
+        new ArrayList<String>(localLogDirs.length);
+    for (int i = 0; i < localLogDirs.length; i++) {
+      localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+    }
+
+    String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
+
+    conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
+
+    LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
+
+    NonAggregatingLogHandler rawLogHandler =
+        new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler);
+    NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logHandler)
+      .getLocalFileContext(isA(Configuration.class));
+    logHandler.init(conf);
+    logHandler.start();
+    runMockedFailedDirs(logHandler, appId, user, mockDelService,
+      mockDirsHandler, conf, spylfs, lfs, localLogDirs);
+    logHandler.close();
+  }
+  
+  /**
+   * Function to run a log handler with directories failing the getFileStatus
+   * call. The function accepts the log handler, setup the mocks to fail with
+   * specific exceptions and ensures the deletion service has the correct calls.
+   * 
+   * @param logHandler the logHandler implementation to test
+   * 
+   * @param appId the application id that we wish when sending events to the log
+   * handler
+   * 
+   * @param user the user name to use
+   * 
+   * @param mockDelService a mock of the DeletionService which we will verify
+   * the delete calls against
+   * 
+   * @param dirsHandler a spy or mock on the LocalDirsHandler service used to
+   * when creating the logHandler. It needs to be a spy so that we can intercept
+   * the getAllLogDirs() call.
+   * 
+   * @param conf the configuration used
+   * 
+   * @param spylfs a spy on the AbstractFileSystem object used when creating lfs
+   * 
+   * @param lfs the FileContext object to be used to mock the getFileStatus()
+   * calls
+   * 
+   * @param localLogDirs list of the log dirs to run the test against, must have
+   * at least 7 entries
+   */
+  public static void runMockedFailedDirs(LogHandler logHandler,
+      ApplicationId appId, String user, DeletionService mockDelService,
+      LocalDirsHandlerService dirsHandler, Configuration conf,
+      AbstractFileSystem spylfs, FileContext lfs, File[] localLogDirs)
+      throws Exception {
+    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+    if (localLogDirs.length < 7) {
+      throw new IllegalArgumentException(
+        "Argument localLogDirs must be at least of length 7");
+    }
+    Path[] localAppLogDirPaths = new Path[localLogDirs.length];
+    for (int i = 0; i < localAppLogDirPaths.length; i++) {
+      localAppLogDirPaths[i] =
+          new Path(localLogDirs[i].getAbsolutePath(), appId.toString());
+    }
+    final List<String> localLogDirPaths =
+        new ArrayList<String>(localLogDirs.length);
+    for (int i = 0; i < localLogDirs.length; i++) {
+      localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+    }
+
+    // setup mocks
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "",
+          new Path(localLogDirs[0].getAbsolutePath()));
+    doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
+    doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
+
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
+      ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
+
+    // test case where some dirs have the log dir to delete
+    // mock some dirs throwing various exceptions
+    // verify deletion happens only on the others
+    Mockito.doThrow(new FileNotFoundException()).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[0]));
+    doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[1]));
+    Mockito.doThrow(new AccessControlException()).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[2]));
+    doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[3]));
+    Mockito.doThrow(new IOException()).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[4]));
+    Mockito.doThrow(new UnsupportedFileSystemException("test")).when(spylfs)
+      .getFileStatus(eq(localAppLogDirPaths[5]));
+    doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[6]));
+
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+    testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirPaths[1],
+      localAppLogDirPaths[3], localAppLogDirPaths[6]);
+
+    return;
+  }
+
+  static class DeletePathsMatcher extends ArgumentMatcher<Path[]> implements
+      VarargMatcher {
+    
+    // to get rid of serialization warning
+    static final long serialVersionUID = 0;
+
+    private transient Path[] matchPaths;
+
+    DeletePathsMatcher(Path... matchPaths) {
+      this.matchPaths = matchPaths;
+    }
+
+    @Override
+    public boolean matches(Object varargs) {
+      return new EqualsBuilder().append(matchPaths, varargs).isEquals();
+    }
+
+    // function to get rid of FindBugs warning
+    private void readObject(ObjectInputStream os) throws NotSerializableException {
+      throw new NotSerializableException(this.getClass().getName());
+    }
+  }
+
+  /**
+   * Function to verify that the DeletionService object received the right
+   * requests.
+   * 
+   * @param delService the DeletionService mock which we verify against
+   * 
+   * @param user the user name to use when verifying the deletion
+   * 
+   * @param timeout amount in milliseconds to wait before we decide the calls
+   * didn't come through
+   * 
+   * @param matchPaths the paths to match in the delete calls
+   * 
+   * @throws WantedButNotInvoked if the calls could not be verified
+   */
+  static void testDeletionServiceCall(DeletionService delService, String user,
+      long timeout, Path... matchPaths) {
+
+    long verifyStartTime = System.currentTimeMillis();
+    WantedButNotInvoked notInvokedException = null;
+    boolean matched = false;
+    while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
+      try {
+        verify(delService).delete(eq(user), (Path) eq(null),
+          Mockito.argThat(new DeletePathsMatcher(matchPaths)));
+        matched = true;
+      } catch (WantedButNotInvoked e) {
+        notInvokedException = e;
+        try {
+          Thread.sleep(50l);
+        } catch (InterruptedException i) {
+        }
+      }
+    }
+    if (!matched) {
+      throw notInvokedException;
+    }
+    return;
+  }
+
+  public static File[] getLocalLogDirFiles(String name, int number) {
+    File[] dirs = new File[number];
+    for (int i = 0; i < dirs.length; i++) {
+      dirs[i] = new File("target", name + "-localLogDir" + i).getAbsoluteFile();
+    }
+    return dirs;
+  }
 }