Просмотр исходного кода

YARN-9265. FPGA plugin fails to recognize Intel Processing Accelerator Card. Contributed by Peter Bacsko.

Sunil G 6 лет назад
Родитель
Сommit
de15a66d78
12 измененных файлов с 699 добавлено и 87 удалено
  1. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  3. 107 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java
  4. 6 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java
  5. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java
  6. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/AoclOutputBasedDiscoveryStrategy.java
  7. 87 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/DeviceSpecParser.java
  8. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/FPGADiscoveryStrategy.java
  9. 66 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/ScriptBasedFPGADiscoveryStrategy.java
  10. 54 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/SettingsBasedFPGADiscoveryStrategy.java
  11. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/package-info.java
  12. 239 37
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/TestFpgaDiscoverer.java

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1715,6 +1715,15 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_FPGA_VENDOR_PLUGIN =
       "org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin";
 
+  @Private
+  public static final String NM_FPGA_DEVICE_DISCOVERY_SCRIPT =
+      NM_FPGA_RESOURCE_PREFIX + "device-discovery-script";
+
+  @Private
+  public static final String NM_FPGA_AVAILABLE_DEVICES =
+      NM_FPGA_RESOURCE_PREFIX + "available-devices";
+
+
   public static final String NM_NETWORK_TAG_PREFIX = NM_PREFIX
       + "network-tagging";
 

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -3960,6 +3960,33 @@
     <value>auto</value>
   </property>
 
+  <property>
+    <description>
+      Absolute path to a script or executable that returns the available FPGA cards.
+      The returned string must be a single line and follow the format:
+      "deviceA/N:M,deviceB/X:Y". Example: "acl0/243:0,acl1/243:1". The numbers after
+      the "/" character are the device major and minor numbers.
+
+      When the script is enabled, auto-discovery is disabled the "aocl" command is not
+      invoked to verify the available cards.
+    </description>
+    <name>yarn.nodemanager.resource-plugins.fpga.device-discovery-script</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+      List of FPGA available devices in the given node.
+      The value must follow the format: "deviceA/N:M,deviceB/X:Y".
+      Example: "acl0/243:0,acl1/243:1". The numbers after
+      the "/" character are the device major and minor numbers.
+
+      When this property is used, both auto-discovery and external script are ignored.
+    </description>
+    <name>yarn.nodemanager.resource-plugins.fpga.available-devices</name>
+    <value></value>
+  </property>
+
   <property>
     <description>The http address of the timeline reader web application.</description>
     <name>yarn.timeline-service.reader.webapp.address</name>

+ 107 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java

@@ -19,21 +19,34 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery.AoclOutputBasedDiscoveryStrategy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery.FPGADiscoveryStrategy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery.ScriptBasedFPGADiscoveryStrategy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery.SettingsBasedFPGADiscoveryStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 
 public class FpgaDiscoverer {
-
-  public static final Logger LOG = LoggerFactory.getLogger(
+  private static final Logger LOG = LoggerFactory.getLogger(
       FpgaDiscoverer.class);
 
   private static FpgaDiscoverer instance;
@@ -44,8 +57,10 @@ public class FpgaDiscoverer {
 
   private List<FpgaResourceAllocator.FpgaDevice> currentFpgaInfo = null;
 
+  private Function<String, Optional<String>> scriptRunner = this::runScript;
+
   // shell command timeout
-  private static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000;
+  public static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000;
 
   static {
     instance = new FpgaDiscoverer();
@@ -56,31 +71,41 @@ public class FpgaDiscoverer {
   }
 
   @VisibleForTesting
-  public synchronized static FpgaDiscoverer setInstance(FpgaDiscoverer newInstance) {
+  void setScriptRunner(Function<String, Optional<String>> scriptRunner) {
+    this.scriptRunner = scriptRunner;
+  }
+
+  @VisibleForTesting
+  static void reset() {
+    instance = new FpgaDiscoverer();
+  }
+
+  @VisibleForTesting
+  public static FpgaDiscoverer setInstance(FpgaDiscoverer newInstance) {
     instance = newInstance;
     return instance;
   }
 
   @VisibleForTesting
-  public synchronized void setConf(Configuration conf) {
-    this.conf = conf;
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
   }
 
   public List<FpgaResourceAllocator.FpgaDevice> getCurrentFpgaInfo() {
     return currentFpgaInfo;
   }
 
-  public synchronized void setResourceHanderPlugin(AbstractFpgaVendorPlugin plugin) {
-    this.plugin = plugin;
+  public void setResourceHanderPlugin(AbstractFpgaVendorPlugin vendorPlugin) {
+    this.plugin = vendorPlugin;
   }
 
-  public synchronized boolean diagnose() {
+  public boolean diagnose() {
     return this.plugin.diagnose(MAX_EXEC_TIMEOUT_MS);
   }
 
-  public synchronized void initialize(Configuration conf) throws YarnException {
-    this.conf = conf;
-    this.plugin.initPlugin(conf);
+  public void initialize(Configuration config) throws YarnException {
+    this.conf = config;
+    this.plugin.initPlugin(config);
     // Try to diagnose FPGA
     LOG.info("Trying to diagnose FPGA information ...");
     if (!diagnose()) {
@@ -91,40 +116,45 @@ public class FpgaDiscoverer {
   /**
    * get avialable devices minor numbers from toolchain or static configuration
    * */
-  public synchronized List<FpgaResourceAllocator.FpgaDevice> discover() throws ResourceHandlerException {
+  public List<FpgaResourceAllocator.FpgaDevice> discover()
+      throws ResourceHandlerException {
     List<FpgaResourceAllocator.FpgaDevice> list;
     String allowed = this.conf.get(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES);
-    // whatever static or auto discover, we always needs
-    // the vendor plugin to discover. For instance, IntelFpgaOpenclPlugin need to
-    // setup a mapping of <major:minor> to <aliasDevName>
-    list = this.plugin.discover(MAX_EXEC_TIMEOUT_MS);
-    if (0 == list.size()) {
-      throw new ResourceHandlerException("No FPGA devices detected!");
+
+    String availableDevices = conf.get(
+        YarnConfiguration.NM_FPGA_AVAILABLE_DEVICES);
+    String discoveryScript = conf.get(
+        YarnConfiguration.NM_FPGA_DEVICE_DISCOVERY_SCRIPT);
+
+    FPGADiscoveryStrategy discoveryStrategy;
+    if (availableDevices != null) {
+      discoveryStrategy =
+          new SettingsBasedFPGADiscoveryStrategy(
+              plugin.getFpgaType(), availableDevices);
+    } else if (discoveryScript != null) {
+      discoveryStrategy =
+          new ScriptBasedFPGADiscoveryStrategy(
+              plugin.getFpgaType(), scriptRunner, discoveryScript);
+    } else {
+      discoveryStrategy = new AoclOutputBasedDiscoveryStrategy(plugin);
     }
-    currentFpgaInfo = list;
-    if (allowed.equalsIgnoreCase(
+
+    list = discoveryStrategy.discover();
+
+    if (allowed == null || allowed.equalsIgnoreCase(
         YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES)) {
-        return list;
+      return list;
     } else if (allowed.matches("(\\d,)*\\d")){
-      String[] minors = allowed.split(",");
-      Iterator<FpgaResourceAllocator.FpgaDevice> iterator = list.iterator();
-      // remove the non-configured minor numbers
-      FpgaResourceAllocator.FpgaDevice t;
-      while (iterator.hasNext()) {
-        boolean valid = false;
-        t = iterator.next();
-        for (String minorNumber : minors) {
-          if (t.getMinor().toString().equals(minorNumber)) {
-            valid = true;
-            break;
-          }
-        }
-        if (!valid) {
-          iterator.remove();
-        }
-      }
+      Set<String> minors = Sets.newHashSet(allowed.split(","));
+
+      // Replace list with a filtered one
+      list = list
+        .stream()
+        .filter(dev -> minors.contains(dev.getMinor().toString()))
+        .collect(Collectors.toList());
+
       // if the count of user configured is still larger than actual
-      if (list.size() != minors.length) {
+      if (list.size() != minors.size()) {
         LOG.warn("We continue although there're mistakes in user's configuration " +
             YarnConfiguration.NM_FPGA_ALLOWED_DEVICES +
             "user configured:" + allowed + ", while the real:" + list.toString());
@@ -133,7 +163,41 @@ public class FpgaDiscoverer {
       throw new ResourceHandlerException("Invalid value configured for " +
           YarnConfiguration.NM_FPGA_ALLOWED_DEVICES + ":\"" + allowed + "\"");
     }
+
+    currentFpgaInfo = ImmutableList.copyOf(list);
+
     return list;
   }
 
+  private Optional<String> runScript(String path) {
+    if (path == null || path.trim().isEmpty()) {
+      LOG.error("Undefined script");
+      return Optional.empty();
+    }
+
+    File f = new File(path);
+    if (!f.exists()) {
+      LOG.error("Script does not exist");
+      return Optional.empty();
+    }
+
+    if (!FileUtil.canExecute(f)) {
+      LOG.error("Script is not executable");
+      return Optional.empty();
+    }
+
+    ShellCommandExecutor shell = new ShellCommandExecutor(
+        new String[] {path},
+        null,
+        null,
+        MAX_EXEC_TIMEOUT_MS);
+    try {
+      shell.execute();
+      String output = shell.getOutput();
+      return Optional.of(output);
+    } catch (IOException e) {
+      LOG.error("Cannot execute script", e);
+      return Optional.empty();
+    }
+  }
 }

+ 6 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java

@@ -20,6 +20,12 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
 
 
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -30,13 +36,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
-
 public class FpgaNodeResourceUpdateHandler extends NodeResourceUpdaterPlugin {
   private static final Logger LOG = LoggerFactory.getLogger(
       FpgaNodeResourceUpdateHandler.class);

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java

@@ -100,6 +100,7 @@ public class IntelFpgaOpenclPlugin implements AbstractFpgaVendorPlugin {
     if (this.initialized) {
       return true;
     }
+
     // Find the proper toolchain, mainly aocl
     String pluginDefaultBinaryName = getDefaultBinaryName();
     String pathToExecutable = conf.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC,

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/AoclOutputBasedDiscoveryStrategy.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
+
+/**
+ * FPGA device discovery strategy which invokes the "aocl" SDK command
+ * to retrieve the list of available FPGA cards.
+ */
+public class AoclOutputBasedDiscoveryStrategy
+    implements FPGADiscoveryStrategy {
+
+  private final AbstractFpgaVendorPlugin plugin;
+
+  public AoclOutputBasedDiscoveryStrategy(AbstractFpgaVendorPlugin fpgaPlugin) {
+    this.plugin = fpgaPlugin;
+  }
+
+  @Override
+  public List<FpgaDevice> discover() throws ResourceHandlerException {
+    List<FpgaDevice> list =
+        plugin.discover(FpgaDiscoverer.MAX_EXEC_TIMEOUT_MS);
+    if (list.isEmpty()) {
+      throw new ResourceHandlerException("No FPGA devices detected!");
+    }
+
+    return list;
+  }
+}

+ 87 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/DeviceSpecParser.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
+
+/**
+ * Parses a string which specifies FPGA devices. Multiple devices should be
+ * separated by a comma. A device specification should contain the
+ * symbolic name of the device, major and minor device numbers.
+ *
+ * Example: "acl0/243:0,acl1/243:1".
+ */
+public final class DeviceSpecParser {
+  private static final String DEVICE_SPEC_REGEX =
+      "(\\w+[0-31])(\\/)(\\d+)(\\:)(\\d+)";
+
+  private static final Pattern DEVICE_PATTERN =
+      Pattern.compile(DEVICE_SPEC_REGEX);
+
+  private DeviceSpecParser() {
+    // no instances
+  }
+
+  static List<FpgaDevice> getDevicesFromString(String type, String devices)
+      throws ResourceHandlerException {
+    if (devices.trim().isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    String[] deviceList = devices.split(",");
+
+    List<FpgaDevice> fpgaDevices = new ArrayList<>();
+
+    for (final String deviceSpec : deviceList) {
+      Matcher matcher = DEVICE_PATTERN.matcher(deviceSpec);
+      if (matcher.matches()) {
+        try {
+          String devName = matcher.group(1);
+          int major = Integer.parseInt(matcher.group(3));
+          int minor = Integer.parseInt(matcher.group(5));
+          fpgaDevices.add(new FpgaDevice(type,
+              major,
+              minor,
+              null,
+              null,
+              devName,
+              null,
+              null,
+              null));
+        } catch (NumberFormatException e) {
+          throw new ResourceHandlerException(
+              "Cannot parse major/minor number: " + deviceSpec);
+        }
+      } else {
+        throw new ResourceHandlerException(
+            "Illegal device specification string: " + deviceSpec);
+      }
+    }
+
+    return fpgaDevices;
+  }
+}

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/FPGADiscoveryStrategy.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
+
+/**
+ * Interface for an FPGA device discovery strategy.
+ */
+public interface FPGADiscoveryStrategy {
+  List<FpgaDevice> discover() throws ResourceHandlerException;
+}

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/ScriptBasedFPGADiscoveryStrategy.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
+
+/**
+ * FPGA device discovery strategy which invokes an external script.
+ * The script must return a single line in given format.
+ *
+ * See DeviceSpecParser for details.
+ */
+public class ScriptBasedFPGADiscoveryStrategy
+    implements FPGADiscoveryStrategy {
+
+  private final Function<String, Optional<String>> scriptRunner;
+  private final String discoveryScript;
+  private final String type;
+
+  public ScriptBasedFPGADiscoveryStrategy(
+      String fpgaType,
+      Function<String, Optional<String>> scriptRunner,
+      String propValue) {
+    this.scriptRunner = scriptRunner;
+    this.discoveryScript = propValue;
+    this.type = fpgaType;
+  }
+
+  @Override
+  public List<FpgaDevice> discover() throws ResourceHandlerException {
+    Optional<String> scriptOutput =
+        scriptRunner.apply(discoveryScript);
+    if (scriptOutput.isPresent()) {
+      List<FpgaDevice> list =
+          DeviceSpecParser.getDevicesFromString(type, scriptOutput.get());
+      if (list.isEmpty()) {
+        throw new ResourceHandlerException("No FPGA devices were specified");
+      }
+      return list;
+    } else {
+      throw new ResourceHandlerException("Unable to run external script");
+    }
+  }
+}

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/SettingsBasedFPGADiscoveryStrategy.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
+
+/**
+ * FPGA device discovery strategy which parses a string.
+ * The string must consist of a single line and be in a specific format.
+ *
+ * See DeviceSpecParser for details.
+ */
+public class SettingsBasedFPGADiscoveryStrategy
+    implements FPGADiscoveryStrategy {
+
+  private final String type;
+  private final String availableDevices;
+
+  public SettingsBasedFPGADiscoveryStrategy(
+      String fpgaType, String devices) {
+    this.type = fpgaType;
+    this.availableDevices = devices;
+  }
+
+  @Override
+  public List<FpgaDevice> discover() throws ResourceHandlerException {
+    List<FpgaDevice> list =
+        DeviceSpecParser.getDevicesFromString(type, availableDevices);
+    if (list.isEmpty()) {
+      throw new ResourceHandlerException("No FPGA devices were specified");
+    }
+    return list;
+  }
+}

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/discovery/package-info.java

@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.discovery;

+ 239 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/TestFpgaDiscoverer.java

@@ -24,23 +24,34 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
-import org.junit.Assert;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestFpgaDiscoverer {
+  @Rule
+  public ExpectedException expected = ExpectedException.none();
 
   private String getTestParentFolder() {
     File f = new File("target/temp/" + TestFpgaDiscoverer.class.getName());
@@ -57,6 +68,7 @@ public class TestFpgaDiscoverer {
     File f = new File(folder);
     FileUtils.deleteDirectory(f);
     f.mkdirs();
+    FpgaDiscoverer.reset();
   }
 
   // A dirty hack to modify the env of the current JVM itself - Dirty, but
@@ -106,7 +118,7 @@ public class TestFpgaDiscoverer {
 
     discoverer.initialize(conf);
     // Case 1. No configuration set for binary(no environment "ALTERAOCLSDKROOT" set)
-    Assert.assertEquals("No configuration(no environment ALTERAOCLSDKROOT set)" +
+    assertEquals("No configuration(no environment ALTERAOCLSDKROOT set)" +
             "should return just a single binary name",
         "aocl", openclPlugin.getPathToExecutable());
 
@@ -115,19 +127,19 @@ public class TestFpgaDiscoverer {
     conf.set(YarnConfiguration.NM_FPGA_PATH_TO_EXEC, getTestParentFolder() + "/aocl");
     touchFile(fakeBinary);
     discoverer.initialize(conf);
-    Assert.assertEquals("Correct configuration should return user setting",
+    assertEquals("Correct configuration should return user setting",
         getTestParentFolder() + "/aocl", openclPlugin.getPathToExecutable());
 
     // Case 3. With correct configuration but file doesn't exists. Use default
     fakeBinary.delete();
     discoverer.initialize(conf);
-    Assert.assertEquals("Correct configuration but file doesn't exists should return just a single binary name",
+    assertEquals("Should return just a single binary name",
         "aocl", openclPlugin.getPathToExecutable());
 
     // Case 4. Set a empty value
     conf.set(YarnConfiguration.NM_FPGA_PATH_TO_EXEC, "");
     discoverer.initialize(conf);
-    Assert.assertEquals("configuration with empty string value, should use aocl",
+    assertEquals("configuration with empty string value, should use aocl",
         "aocl", openclPlugin.getPathToExecutable());
 
     // Case 5. No configuration set for binary, but set environment "ALTERAOCLSDKROOT"
@@ -140,7 +152,7 @@ public class TestFpgaDiscoverer {
     newEnv.put("ALTERAOCLSDKROOT", getTestParentFolder());
     setNewEnvironmentHack(newEnv);
     discoverer.initialize(conf);
-    Assert.assertEquals("No configuration but with environment ALTERAOCLSDKROOT set",
+    assertEquals("No configuration but with environment ALTERAOCLSDKROOT set",
         getTestParentFolder() + "/bin/aocl", openclPlugin.getPathToExecutable());
 
   }
@@ -193,39 +205,229 @@ public class TestFpgaDiscoverer {
 
     // Case 1. core parsing
     openclPlugin.parseDiagnoseInfo(output, list);
-    Assert.assertEquals(3, list.size());
-    Assert.assertEquals("IntelOpenCL", list.get(0).getType());
-    Assert.assertEquals("247", list.get(0).getMajor().toString());
-    Assert.assertEquals("0", list.get(0).getMinor().toString());
-    Assert.assertEquals("acl0", list.get(0).getAliasDevName());
-    Assert.assertEquals("aclnalla_pcie0", list.get(0).getDevName());
-    Assert.assertEquals("02:00.00", list.get(0).getBusNum());
-    Assert.assertEquals("53.1 degrees C", list.get(0).getTemperature());
-    Assert.assertEquals("31.7 Watts", list.get(0).getCardPowerUsage());
-
-    Assert.assertEquals("IntelOpenCL", list.get(1).getType());
-    Assert.assertEquals("247", list.get(1).getMajor().toString());
-    Assert.assertEquals("1", list.get(1).getMinor().toString());
-    Assert.assertEquals("acl1", list.get(1).getAliasDevName());
-    Assert.assertEquals("aclnalla_pcie1", list.get(1).getDevName());
-    Assert.assertEquals("03:00.00", list.get(1).getBusNum());
-    Assert.assertEquals("43.1 degrees C", list.get(1).getTemperature());
-    Assert.assertEquals("11.7 Watts", list.get(1).getCardPowerUsage());
-
-    Assert.assertEquals("IntelOpenCL", list.get(2).getType());
-    Assert.assertEquals("246", list.get(2).getMajor().toString());
-    Assert.assertEquals("0", list.get(2).getMinor().toString());
-    Assert.assertEquals("acl2", list.get(2).getAliasDevName());
-    Assert.assertEquals("acla10_ref0", list.get(2).getDevName());
-    Assert.assertEquals("09:00.00", list.get(2).getBusNum());
-    Assert.assertEquals("50.5781 degrees C", list.get(2).getTemperature());
-    Assert.assertEquals("", list.get(2).getCardPowerUsage());
+    assertEquals(3, list.size());
+    assertEquals("IntelOpenCL", list.get(0).getType());
+    assertEquals("247", list.get(0).getMajor().toString());
+    assertEquals("0", list.get(0).getMinor().toString());
+    assertEquals("acl0", list.get(0).getAliasDevName());
+    assertEquals("aclnalla_pcie0", list.get(0).getDevName());
+    assertEquals("02:00.00", list.get(0).getBusNum());
+    assertEquals("53.1 degrees C", list.get(0).getTemperature());
+    assertEquals("31.7 Watts", list.get(0).getCardPowerUsage());
+
+    assertEquals("IntelOpenCL", list.get(1).getType());
+    assertEquals("247", list.get(1).getMajor().toString());
+    assertEquals("1", list.get(1).getMinor().toString());
+    assertEquals("acl1", list.get(1).getAliasDevName());
+    assertEquals("aclnalla_pcie1", list.get(1).getDevName());
+    assertEquals("03:00.00", list.get(1).getBusNum());
+    assertEquals("43.1 degrees C", list.get(1).getTemperature());
+    assertEquals("11.7 Watts", list.get(1).getCardPowerUsage());
+
+    assertEquals("IntelOpenCL", list.get(2).getType());
+    assertEquals("246", list.get(2).getMajor().toString());
+    assertEquals("0", list.get(2).getMinor().toString());
+    assertEquals("acl2", list.get(2).getAliasDevName());
+    assertEquals("acla10_ref0", list.get(2).getDevName());
+    assertEquals("09:00.00", list.get(2).getBusNum());
+    assertEquals("50.5781 degrees C", list.get(2).getTemperature());
+    assertEquals("", list.get(2).getCardPowerUsage());
 
     // Case 2. check alias map
     Map<String, String> aliasMap = openclPlugin.getAliasMap();
-    Assert.assertEquals("acl0", aliasMap.get("247:0"));
-    Assert.assertEquals("acl1", aliasMap.get("247:1"));
-    Assert.assertEquals("acl2", aliasMap.get("246:0"));
+    assertEquals("acl0", aliasMap.get("247:0"));
+    assertEquals("acl1", aliasMap.get("247:1"));
+    assertEquals("acl2", aliasMap.get("246:0"));
+  }
+
+  @Test
+  public void testDiscoveryWhenAvailableDevicesDefined()
+      throws YarnException {
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_AVAILABLE_DEVICES,
+        "acl0/243:0,acl1/244:1");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+
+    discoverer.initialize(conf);
+    List<FpgaDevice> devices = discoverer.discover();
+    assertEquals("Number of devices", 2, devices.size());
+    FpgaDevice device0 = devices.get(0);
+    FpgaDevice device1 = devices.get(1);
+
+    assertEquals("Device id", "acl0", device0.getAliasDevName());
+    assertEquals("Minor number", new Integer(0), device0.getMinor());
+    assertEquals("Major", new Integer(243), device0.getMajor());
+
+    assertEquals("Device id", "acl1", device1.getAliasDevName());
+    assertEquals("Minor number", new Integer(1), device1.getMinor());
+    assertEquals("Major", new Integer(244), device1.getMajor());
+  }
+
+  @Test
+  public void testDiscoveryWhenAvailableDevicesEmpty()
+      throws YarnException {
+    expected.expect(ResourceHandlerException.class);
+    expected.expectMessage("No FPGA devices were specified");
+
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_AVAILABLE_DEVICES,
+        "");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+
+    discoverer.initialize(conf);
+    discoverer.discover();
+  }
+
+  @Test
+  public void testDiscoveryWhenAvailableDevicesAreIllegalString()
+      throws YarnException {
+    expected.expect(ResourceHandlerException.class);
+    expected.expectMessage("Illegal device specification string");
+
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_AVAILABLE_DEVICES,
+        "illegal/243:0,acl1/244=1");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+
+    discoverer.initialize(conf);
+    discoverer.discover();
+  }
+
+  @Test
+  public void testDiscoveryWhenExternalScriptDefined()
+      throws YarnException {
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_DEVICE_DISCOVERY_SCRIPT,
+        "/dummy/script");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+    discoverer.setScriptRunner(s -> {
+      return Optional.of("acl0/243:0,acl1/244:1"); });
+
+    discoverer.initialize(conf);
+    List<FpgaDevice> devices = discoverer.discover();
+    assertEquals("Number of devices", 2, devices.size());
+    FpgaDevice device0 = devices.get(0);
+    FpgaDevice device1 = devices.get(1);
+
+    assertEquals("Device id", "acl0", device0.getAliasDevName());
+    assertEquals("Minor number", new Integer(0), device0.getMinor());
+    assertEquals("Major", new Integer(243), device0.getMajor());
+
+    assertEquals("Device id", "acl1", device1.getAliasDevName());
+    assertEquals("Minor number", new Integer(1), device1.getMinor());
+    assertEquals("Major", new Integer(244), device1.getMajor());
+  }
+
+  @Test
+  public void testDiscoveryWhenExternalScriptReturnsEmptyString()
+      throws YarnException {
+    expected.expect(ResourceHandlerException.class);
+    expected.expectMessage("No FPGA devices were specified");
+
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_DEVICE_DISCOVERY_SCRIPT,
+        "/dummy/script");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+    discoverer.setScriptRunner(s -> {
+      return Optional.of(""); });
+
+    discoverer.initialize(conf);
+    discoverer.discover();
+  }
+
+  @Test
+
+  public void testDiscoveryWhenExternalScriptFails()
+      throws YarnException {
+    expected.expect(ResourceHandlerException.class);
+    expected.expectMessage("Unable to run external script");
+
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_DEVICE_DISCOVERY_SCRIPT,
+        "/dummy/script");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+    discoverer.setScriptRunner(s -> {
+      return Optional.empty(); });
+
+    discoverer.initialize(conf);
+    discoverer.discover();
+  }
+
+  @Test
+  public void testDiscoveryWhenExternalScriptUndefined()
+      throws YarnException {
+    expected.expect(ResourceHandlerException.class);
+    expected.expectMessage("Unable to run external script");
+
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.NM_FPGA_DEVICE_DISCOVERY_SCRIPT, "");
+    FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+    IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+    discoverer.setResourceHanderPlugin(openclPlugin);
+    openclPlugin.initPlugin(conf);
+    openclPlugin.setShell(mockPuginShell());
+
+    discoverer.initialize(conf);
+    discoverer.discover();
+  }
+
+  @Test
+  public void testDiscoveryWhenExternalScriptCannotBeExecuted()
+      throws YarnException, IOException {
+    File fakeScript = new File(getTestParentFolder() + "/fakeScript");
+    try {
+      expected.expect(ResourceHandlerException.class);
+      expected.expectMessage("Unable to run external script");
+
+      Configuration conf = new Configuration(false);
+      fakeScript = new File(getTestParentFolder() + "/fakeScript");
+      touchFile(fakeScript);
+      fakeScript.setExecutable(false);
+      conf.set(YarnConfiguration.NM_FPGA_DEVICE_DISCOVERY_SCRIPT,
+          fakeScript.getAbsolutePath());
+      FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+      IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+      discoverer.setResourceHanderPlugin(openclPlugin);
+      openclPlugin.initPlugin(conf);
+      openclPlugin.setShell(mockPuginShell());
+
+      discoverer.initialize(conf);
+      discoverer.discover();
+    } finally {
+      fakeScript.delete();
+    }
   }
 
   private IntelFpgaOpenclPlugin.InnerShellExecutor mockPuginShell() {