|
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.UncheckedIOException;
|
|
|
+import java.nio.file.Files;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
@@ -28,22 +30,27 @@ import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.util.DiskChecker;
|
|
|
import org.apache.hadoop.util.DiskValidator;
|
|
|
import org.apache.hadoop.util.DiskValidatorFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
|
|
@@ -62,6 +69,7 @@ public class DirectoryCollection {
|
|
|
|
|
|
private boolean diskUtilizationThresholdEnabled;
|
|
|
private boolean diskFreeSpaceThresholdEnabled;
|
|
|
+ private boolean subAccessibilityValidationEnabled;
|
|
|
/**
|
|
|
* The enum defines disk failure type.
|
|
|
*/
|
|
@@ -242,16 +250,15 @@ public class DirectoryCollection {
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
|
|
|
|
- diskUtilizationThresholdEnabled = conf.
|
|
|
- getBoolean(YarnConfiguration.
|
|
|
- NM_DISK_UTILIZATION_THRESHOLD_ENABLED,
|
|
|
- YarnConfiguration.
|
|
|
- DEFAULT_NM_DISK_UTILIZATION_THRESHOLD_ENABLED);
|
|
|
- diskFreeSpaceThresholdEnabled = conf.
|
|
|
- getBoolean(YarnConfiguration.
|
|
|
- NM_DISK_FREE_SPACE_THRESHOLD_ENABLED,
|
|
|
- YarnConfiguration.
|
|
|
- DEFAULT_NM_DISK_FREE_SPACE_THRESHOLD_ENABLED);
|
|
|
+ diskUtilizationThresholdEnabled = conf.getBoolean(
|
|
|
+ YarnConfiguration.NM_DISK_UTILIZATION_THRESHOLD_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_NM_DISK_UTILIZATION_THRESHOLD_ENABLED);
|
|
|
+ diskFreeSpaceThresholdEnabled = conf.getBoolean(
|
|
|
+ YarnConfiguration.NM_DISK_FREE_SPACE_THRESHOLD_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_NM_DISK_FREE_SPACE_THRESHOLD_ENABLED);
|
|
|
+ subAccessibilityValidationEnabled = conf.getBoolean(
|
|
|
+ YarnConfiguration.NM_WORKING_DIR_CONTENT_ACCESSIBILITY_VALIDATION_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_NM_WORKING_DIR_CONTENT_ACCESSIBILITY_VALIDATION_ENABLED);
|
|
|
|
|
|
localDirs = new ArrayList<>(Arrays.asList(dirs));
|
|
|
errorDirs = new ArrayList<>();
|
|
@@ -448,8 +455,7 @@ public class DirectoryCollection {
|
|
|
|
|
|
// move testDirs out of any lock as it could wait for very long time in
|
|
|
// case of busy IO
|
|
|
- Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs,
|
|
|
- preCheckGoodDirs);
|
|
|
+ Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs, preCheckGoodDirs);
|
|
|
|
|
|
this.writeLock.lock();
|
|
|
try {
|
|
@@ -521,60 +527,89 @@ public class DirectoryCollection {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Map<String, DiskErrorInformation> testDirs(List<String> dirs,
|
|
|
- Set<String> goodDirs) {
|
|
|
- HashMap<String, DiskErrorInformation> ret =
|
|
|
- new HashMap<String, DiskErrorInformation>();
|
|
|
- for (final String dir : dirs) {
|
|
|
- String msg;
|
|
|
- try {
|
|
|
- File testDir = new File(dir);
|
|
|
- diskValidator.checkStatus(testDir);
|
|
|
- float diskUtilizationPercentageCutoff = goodDirs.contains(dir) ?
|
|
|
- diskUtilizationPercentageCutoffHigh : diskUtilizationPercentageCutoffLow;
|
|
|
- long diskFreeSpaceCutoff = goodDirs.contains(dir) ?
|
|
|
- diskFreeSpaceCutoffLow : diskFreeSpaceCutoffHigh;
|
|
|
-
|
|
|
- if (diskUtilizationThresholdEnabled
|
|
|
- && isDiskUsageOverPercentageLimit(testDir,
|
|
|
- diskUtilizationPercentageCutoff)) {
|
|
|
- msg =
|
|
|
- "used space above threshold of "
|
|
|
- + diskUtilizationPercentageCutoff
|
|
|
- + "%";
|
|
|
- ret.put(dir,
|
|
|
- new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
|
|
|
- continue;
|
|
|
- } else if (diskFreeSpaceThresholdEnabled
|
|
|
- && isDiskFreeSpaceUnderLimit(testDir, diskFreeSpaceCutoff)) {
|
|
|
- msg =
|
|
|
- "free space below limit of " + diskFreeSpaceCutoff
|
|
|
- + "MB";
|
|
|
- ret.put(dir,
|
|
|
- new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
|
|
|
- continue;
|
|
|
- }
|
|
|
- } catch (IOException ie) {
|
|
|
- ret.put(dir,
|
|
|
- new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
|
|
|
- }
|
|
|
+ Map<String, DiskErrorInformation> testDirs(List<String> dirs, Set<String> goodDirs) {
|
|
|
+ final Map<String, DiskErrorInformation> ret = new HashMap<>(0);
|
|
|
+ for (String dir : dirs) {
|
|
|
+ LOG.debug("Start testing dir accessibility: {}", dir);
|
|
|
+ File testDir = new File(dir);
|
|
|
+ boolean goodDir = goodDirs.contains(dir);
|
|
|
+ Stream.of(
|
|
|
+ validateDisk(testDir),
|
|
|
+ validateUsageOverPercentageLimit(testDir, goodDir),
|
|
|
+ validateDiskFreeSpaceUnderLimit(testDir, goodDir),
|
|
|
+ validateSubsAccessibility(testDir)
|
|
|
+ )
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .findFirst()
|
|
|
+ .ifPresent(diskErrorInformation -> ret.put(dir, diskErrorInformation));
|
|
|
}
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
- private boolean isDiskUsageOverPercentageLimit(File dir,
|
|
|
- float diskUtilizationPercentageCutoff) {
|
|
|
- float freePercentage =
|
|
|
- 100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
|
|
|
+ private DiskErrorInformation validateDisk(File dir) {
|
|
|
+ try {
|
|
|
+ diskValidator.checkStatus(dir);
|
|
|
+ LOG.debug("Dir {} pass throw the disk validation", dir);
|
|
|
+ return null;
|
|
|
+ } catch (IOException | UncheckedIOException | SecurityException e) {
|
|
|
+ return new DiskErrorInformation(DiskErrorCause.OTHER, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private DiskErrorInformation validateUsageOverPercentageLimit(File dir, boolean isGoodDir) {
|
|
|
+ if (!diskUtilizationThresholdEnabled) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ float diskUtilizationPercentageCutoff = isGoodDir
|
|
|
+ ? diskUtilizationPercentageCutoffHigh
|
|
|
+ : diskUtilizationPercentageCutoffLow;
|
|
|
+ float freePercentage = 100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
|
|
|
float usedPercentage = 100.0F - freePercentage;
|
|
|
- return (usedPercentage > diskUtilizationPercentageCutoff
|
|
|
- || usedPercentage >= 100.0F);
|
|
|
+ if (usedPercentage > diskUtilizationPercentageCutoff || usedPercentage >= 100.0F) {
|
|
|
+ return new DiskErrorInformation(DiskErrorCause.DISK_FULL,
|
|
|
+ "used space above threshold of " + diskUtilizationPercentageCutoff + "%");
|
|
|
+ } else {
|
|
|
+ LOG.debug("Dir {} pass throw the usage over percentage validation", dir);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private boolean isDiskFreeSpaceUnderLimit(File dir,
|
|
|
- long freeSpaceCutoff) {
|
|
|
+ private DiskErrorInformation validateDiskFreeSpaceUnderLimit(File dir, boolean isGoodDir) {
|
|
|
+ if (!diskFreeSpaceThresholdEnabled) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ long freeSpaceCutoff = isGoodDir ? diskFreeSpaceCutoffLow : diskFreeSpaceCutoffHigh;
|
|
|
long freeSpace = dir.getUsableSpace() / (1024 * 1024);
|
|
|
- return freeSpace < freeSpaceCutoff;
|
|
|
+ if (freeSpace < freeSpaceCutoff) {
|
|
|
+ return new DiskErrorInformation(DiskErrorCause.DISK_FULL,
|
|
|
+ "free space below limit of " + freeSpaceCutoff + "MB");
|
|
|
+ } else {
|
|
|
+ LOG.debug("Dir {} pass throw the free space validation", dir);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private DiskErrorInformation validateSubsAccessibility(File dir) {
|
|
|
+ if (!subAccessibilityValidationEnabled) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ try (Stream<java.nio.file.Path> walk = Files.walk(dir.toPath())) {
|
|
|
+ List<File> subs = walk
|
|
|
+ .map(java.nio.file.Path::toFile)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ for (File sub : subs) {
|
|
|
+ if (sub.isDirectory()) {
|
|
|
+ DiskChecker.checkDir(sub);
|
|
|
+ } else if (!Files.isReadable(sub.toPath())) {
|
|
|
+ return new DiskErrorInformation(DiskErrorCause.OTHER, "Can not read " + sub);
|
|
|
+ } else {
|
|
|
+ LOG.debug("{} under {} is accessible", sub, dir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException | UncheckedIOException | SecurityException e) {
|
|
|
+ return new DiskErrorInformation(DiskErrorCause.OTHER, e.getMessage());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
private void createDir(FileContext localFs, Path dir, FsPermission perm)
|