Просмотр исходного кода

YARN-5137. Make DiskChecker pluggable in NodeManager. (Yufei Gu via rchiang)

(cherry picked from commit dbe9e70cc084220ea1f68da850cdb092281b5e96)
Ray Chiang 9 лет назад
Родитель
Сommit
372f6f8456

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -897,6 +897,10 @@ public class YarnConfiguration extends Configuration {
       NM_PREFIX + "resourcemanager.minimum.version";
   public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
 
+  /** Disk Validator. */
+  public static final String DISK_VALIDATOR = NM_PREFIX + "disk-validator";
+  public static final String DEFAULT_DISK_VALIDATOR = "basic";
+
   /**
    * Maximum size of contain's diagnostics to keep for relaunching container
    * case.

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2892,6 +2892,14 @@
 
   <property>
     <description>
+      The name of disk validator.
+    </description>
+    <name>yarn.nodemanager.disk-validator</name>
+    <value>basic</value>
+  </property>
+
+  <property>
+  <description>
       Enable the CSRF filter for the timeline service web app
     </description>
     <name>yarn.timeline-service.webapp.rest-csrf.enabled</name>

+ 19 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java

@@ -42,7 +42,11 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskValidator;
+import org.apache.hadoop.util.DiskValidatorFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -52,6 +56,8 @@ import com.google.common.annotations.VisibleForTesting;
 public class DirectoryCollection {
   private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
 
+  private final Configuration conf;
+  private final DiskValidator diskValidator;
   /**
    * The enum defines disk failure type.
    */
@@ -172,6 +178,16 @@ public class DirectoryCollection {
       float utilizationPercentageCutOffHigh,
       float utilizationPercentageCutOffLow,
       long utilizationSpaceCutOff) {
+    conf = new YarnConfiguration();
+    try {
+      diskValidator = DiskValidatorFactory.getInstance(
+          conf.get(YarnConfiguration.DISK_VALIDATOR));
+      LOG.info("Disk Validator: " + YarnConfiguration.DISK_VALIDATOR +
+          " is loaded.");
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+
     localDirs = new CopyOnWriteArrayList<>(dirs);
     errorDirs = new CopyOnWriteArrayList<>();
     fullDirs = new CopyOnWriteArrayList<>();
@@ -395,7 +411,7 @@ public class DirectoryCollection {
       String msg;
       try {
         File testDir = new File(dir);
-        DiskChecker.checkDir(testDir);
+        diskValidator.checkStatus(testDir);
         float diskUtilizationPercentageCutoff = goodDirs.contains(dir) ?
             diskUtilizationPercentageCutoffHigh : diskUtilizationPercentageCutoffLow;
         if (isDiskUsageOverPercentageLimit(testDir,
@@ -445,7 +461,7 @@ public class DirectoryCollection {
       i++;
     }
     try {
-      DiskChecker.checkDir(target);
+      diskValidator.checkStatus(target);
     } finally {
       FileUtils.deleteQuietly(target);
     }

+ 9 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java

@@ -51,7 +51,8 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskValidator;
+import org.apache.hadoop.util.DiskValidatorFactory;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -69,7 +70,6 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHe
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -99,6 +99,7 @@ public class ContainerLocalizer {
   private final RecordFactory recordFactory;
   private final Map<LocalResource,Future<Path>> pendingResources;
   private final String appCacheDirContextName;
+  private final DiskValidator diskValidator;
 
   public ContainerLocalizer(FileContext lfs, String user, String appId,
       String localizerId, List<Path> localDirs,
@@ -115,7 +116,11 @@ public class ContainerLocalizer {
     this.localDirs = localDirs;
     this.localizerId = localizerId;
     this.recordFactory = recordFactory;
-    this.conf = new Configuration();
+    this.conf = new YarnConfiguration();
+    this.diskValidator = DiskValidatorFactory.getInstance(
+        conf.get(YarnConfiguration.DISK_VALIDATOR));
+    LOG.info("Disk Validator: " + YarnConfiguration.DISK_VALIDATOR +
+        " is loaded.");
     this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
     this.pendingResources = new HashMap<LocalResource,Future<Path>>();
   }
@@ -199,7 +204,7 @@ public class ContainerLocalizer {
 
   Callable<Path> download(Path path, LocalResource rsrc,
       UserGroupInformation ugi) throws IOException {
-    DiskChecker.checkDir(new File(path.toUri().getRawPath()));
+    diskValidator.checkStatus(new File(path.toUri().getRawPath()));
     return new FSDownload(lfs, ugi, conf, path, rsrc);
   }
 

+ 14 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -72,6 +72,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskValidator;
+import org.apache.hadoop.util.DiskValidatorFactory;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
@@ -168,6 +170,7 @@ public class ResourceLocalizationService extends CompositeService
   private DirsChangeListener localDirsChangeListener;
   private DirsChangeListener logDirsChangeListener;
   private Context nmContext;
+  private DiskValidator diskValidator;
 
   /**
    * Map of LocalResourceTrackers keyed by username, for private
@@ -247,6 +250,10 @@ public class ResourceLocalizationService extends CompositeService
         "Failed to initialize LocalizationService", e);
     }
 
+    diskValidator = DiskValidatorFactory.getInstance(
+        conf.get(YarnConfiguration.DISK_VALIDATOR));
+    LOG.info("Disk Validator: " + YarnConfiguration.DISK_VALIDATOR +
+        " is loaded.");
     cacheTargetSize =
       conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
     cacheCleanupPeriod =
@@ -835,7 +842,13 @@ public class ResourceLocalizationService extends CompositeService
                 publicRsrc.getPathForLocalization(key, publicRootPath,
                     delService);
             if (!publicDirDestPath.getParent().equals(publicRootPath)) {
-              DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
+              if (diskValidator != null) {
+                diskValidator.checkStatus(
+                    new File(publicDirDestPath.toUri().getPath()));
+              } else {
+                throw new DiskChecker.DiskErrorException(
+                    "Disk Validator is null!");
+              }
             }
 
             // explicitly synchronize pending here to avoid future task