Browse Source

YARN-9477. Implement VE discovery using libudev. Contributed by Peter Bacsko.

Zhankun Tang 5 years ago
parent
commit
062eb605ac

+ 6 - 0
hadoop-project/pom.xml

@@ -108,6 +108,7 @@
     <hikari.version>2.4.12</hikari.version>
     <mssql.version>6.2.1.jre7</mssql.version>
     <okhttp.version>2.7.5</okhttp.version>
+    <jna.version>5.2.0</jna.version>
 
     <!-- Maven protoc compiler -->
     <protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
@@ -1537,6 +1538,11 @@
         <artifactId>javax.annotation-api</artifactId>
         <version>1.3.2</version>
       </dependency>
+      <dependency>
+        <groupId>net.java.dev.jna</groupId>
+        <artifactId>jna</artifactId>
+        <version>${jna.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -695,4 +695,10 @@
     <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
   </Match>
 
+  <!-- Device files are always under /dev in production, but it's necessary to override in tests -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec.NECVEPlugin"/>
+    <Method name="getDevices" />
+    <Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
+  </Match>
 </FindBugsFilter>

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml

@@ -169,6 +169,10 @@
       <groupId>org.fusesource.leveldbjni</groupId>
       <artifactId>leveldbjni-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>net.java.dev.jna</groupId>
+      <artifactId>jna</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>

+ 45 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java

@@ -51,23 +51,38 @@ public class NECVEPlugin implements DevicePlugin, DevicePluginScheduler {
   private static final String HADOOP_COMMON_HOME = "HADOOP_COMMON_HOME";
   private static final String ENV_SCRIPT_PATH = "NEC_VE_GET_SCRIPT_PATH";
   private static final String ENV_SCRIPT_NAME = "NEC_VE_GET_SCRIPT_NAME";
+  private static final String ENV_USE_UDEV = "NEC_USE_UDEV";
   private static final String DEFAULT_SCRIPT_NAME = "nec-ve-get.py";
   private static final Logger LOG = LoggerFactory.getLogger(NECVEPlugin.class);
   private static final String[] DEFAULT_BINARY_SEARCH_DIRS = new String[]{
       "/usr/bin", "/bin", "/opt/nec/ve/bin"};
 
   private String binaryPath;
+  private boolean useUdev;
+  private VEDeviceDiscoverer discoverer;
 
   private Function<String[], CommandExecutor>
       commandExecutorProvider = this::createCommandExecutor;
 
   public NECVEPlugin() throws ResourceHandlerException {
-    this(System::getenv, DEFAULT_BINARY_SEARCH_DIRS);
+    this(System::getenv, DEFAULT_BINARY_SEARCH_DIRS, new UdevUtil());
   }
 
   @VisibleForTesting
-  NECVEPlugin(Function<String, String> envProvider, String[] scriptPaths)
-      throws ResourceHandlerException {
+  NECVEPlugin(Function<String, String> envProvider, String[] scriptPaths,
+      UdevUtil udev) throws ResourceHandlerException {
+    if (Boolean.parseBoolean(envProvider.apply(ENV_USE_UDEV))) {
+      LOG.info("Using libudev to retrieve syspath & device status");
+      useUdev = true;
+      udev.init();
+      discoverer = new VEDeviceDiscoverer(udev);
+    } else {
+      scriptBasedInit(envProvider, scriptPaths);
+    }
+  }
+
+  private void scriptBasedInit(Function<String, String> envProvider,
+      String[] scriptPaths) throws ResourceHandlerException {
     String binaryName = DEFAULT_SCRIPT_NAME;
 
     String envScriptName = envProvider.apply(ENV_SCRIPT_NAME);
@@ -125,15 +140,29 @@ public class NECVEPlugin implements DevicePlugin, DevicePluginScheduler {
   public Set<Device> getDevices() {
     Set<Device> devices = null;
 
-    CommandExecutor executor =
-        commandExecutorProvider.apply(new String[]{this.binaryPath});
-    try {
-      executor.execute();
-      String output = executor.getOutput();
-      devices = parseOutput(output);
-    } catch (IOException e) {
-      LOG.warn(e.toString());
+    if (useUdev) {
+      try {
+        devices = discoverer.getDevicesFromPath("/dev");
+      } catch (IOException e) {
+        LOG.error("Error during scanning devices", e);
+      }
+    } else {
+      CommandExecutor executor =
+          commandExecutorProvider.apply(new String[]{this.binaryPath});
+      try {
+        executor.execute();
+        String output = executor.getOutput();
+        devices = parseOutput(output);
+      } catch (IOException e) {
+        LOG.error("Error during executing external binary", e);
+      }
     }
+
+    if (devices != null) {
+      LOG.info("Found devices:");
+      devices.forEach(dev -> LOG.info("{}", dev));
+    }
+
     return devices;
   }
 
@@ -303,6 +332,11 @@ public class NECVEPlugin implements DevicePlugin, DevicePluginScheduler {
     this.commandExecutorProvider = provider;
   }
 
+  @VisibleForTesting
+  void setVeDeviceDiscoverer(VEDeviceDiscoverer veDeviceDiscoverer) {
+    this.discoverer = veDeviceDiscoverer;
+  }
+
   @VisibleForTesting
   String getBinaryPath() {
     return binaryPath;

+ 100 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/UdevUtil.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec;
+
+import com.sun.jna.Native;
+import com.sun.jna.Pointer;
+
+class UdevUtil {
+  private static LibUdev libUdev;
+
+  public synchronized void init() {
+    LibUdev.init();
+    libUdev = LibUdev.instance;
+  }
+
+  public String getSysPath(int deviceNo, char devType) {
+    Pointer udev = null;
+    Pointer device = null;
+
+    try {
+      udev = libUdev.udev_new();
+      device = libUdev.udev_device_new_from_devnum(
+          udev, (byte)devType, deviceNo);
+      if (device == null) {
+        throw new IllegalArgumentException("Udev: device not found");
+      }
+      Pointer sysPathPtr = libUdev.udev_device_get_syspath(device);
+      if (sysPathPtr == null) {
+        throw new IllegalArgumentException(
+            "Udev: syspath not found for device");
+      }
+      return sysPathPtr.getString(0);
+    } finally {
+      if (device != null) {
+        libUdev.udev_device_unref(device);
+      }
+
+      if (udev != null) {
+        libUdev.udev_unref(udev);
+      }
+    }
+  }
+
+  @SuppressWarnings({"checkstyle:staticvariablename", "checkstyle:methodname",
+      "checkstyle:parametername"})
+  private static class LibUdev implements LibUdevMapping {
+    private static LibUdev instance;
+
+    public static void init() {
+      if (instance == null) {
+        Native.register("udev");
+        instance = new LibUdev();
+      }
+    }
+
+    public native Pointer udev_new();
+
+    public native Pointer udev_unref(Pointer udev);
+
+    public native Pointer udev_device_new_from_devnum(Pointer udev,
+        byte type,
+        int devnum);
+
+    public native Pointer udev_device_get_syspath(Pointer udev_device);
+
+    public native Pointer udev_device_unref(Pointer udev_device);
+  }
+
+  @SuppressWarnings({"checkstyle:staticvariablename", "checkstyle:methodname",
+      "checkstyle:parametername"})
+  interface LibUdevMapping {
+    Pointer udev_new();
+
+    Pointer udev_unref(Pointer udev);
+
+    Pointer udev_device_new_from_devnum(Pointer udev,
+        byte type,
+        int devnum);
+
+    Pointer udev_device_get_syspath(Pointer udev_device);
+
+    Pointer udev_device_unref(Pointer udev_device);
+  }
+}

+ 143 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/VEDeviceDiscoverer.java

@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.CommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class VEDeviceDiscoverer {
+  private static final String STATE_TERMINATING = "TERMINATING";
+  private static final String STATE_INITIALIZING = "INITIALIZING";
+  private static final String STATE_OFFLINE = "OFFLINE";
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final Logger LOG =
+      LoggerFactory.getLogger(VEDeviceDiscoverer.class);
+
+  private static final String[] DEVICE_STATE = {STATE_ONLINE, STATE_OFFLINE,
+      STATE_INITIALIZING, STATE_TERMINATING};
+
+  private UdevUtil udev;
+  private Function<String[], CommandExecutor>
+      commandExecutorProvider = this::createCommandExecutor;
+
+  VEDeviceDiscoverer(UdevUtil udevUtil) {
+    udev = udevUtil;
+  }
+
+  public Set<Device> getDevicesFromPath(String path) throws IOException {
+    MutableInt counter = new MutableInt(0);
+
+    return Files.walk(Paths.get(path), 1)
+      .filter(p -> p.toFile().getName().startsWith("veslot"))
+      .map(p -> toDevice(p, counter))
+      .collect(Collectors.toSet());
+  }
+
+  private Device toDevice(Path p, MutableInt counter) {
+    CommandExecutor executor =
+        commandExecutorProvider.apply(
+            new String[]{"stat", "-L", "-c", "%t:%T:%F", p.toString()});
+
+    try {
+      LOG.info("Checking device file: {}", p);
+      executor.execute();
+      String statOutput = executor.getOutput();
+      String[] stat = statOutput.trim().split(":");
+
+      int major = Integer.parseInt(stat[0], 16);
+      int minor = Integer.parseInt(stat[1], 16);
+      char devType = getDevType(p, stat[2]);
+      int deviceNumber = makeDev(major, minor);
+      LOG.info("Device: major: {}, minor: {}, devNo: {}, type: {}",
+          major, minor, deviceNumber, devType);
+      String sysPath = udev.getSysPath(deviceNumber, devType);
+      LOG.info("Device syspath: {}", sysPath);
+      String deviceState = getDeviceState(sysPath);
+
+      Device.Builder builder = Device.Builder.newInstance();
+      builder.setId(counter.getAndIncrement())
+        .setMajorNumber(major)
+        .setMinorNumber(minor)
+        .setHealthy(STATE_ONLINE.equalsIgnoreCase(deviceState))
+        .setStatus(deviceState)
+        .setDevPath(p.toAbsolutePath().toString());
+
+      return builder.build();
+    } catch (IOException e) {
+      throw new UncheckedIOException("Cannot execute stat command", e);
+    }
+  }
+
+  private int makeDev(int major, int minor) {
+    return major * 256 + minor;
+  }
+
+  private char getDevType(Path p, String fromStat) {
+    if (fromStat.contains("character")) {
+      return 'c';
+    } else if (fromStat.contains("block")) {
+      return 'b';
+    } else {
+      throw new IllegalArgumentException(
+          "File is neither a char nor block device: " + p);
+    }
+  }
+
+  private String getDeviceState(String sysPath) throws IOException {
+    Path statePath = Paths.get(sysPath, "os_state");
+
+    try (FileInputStream fis =
+        new FileInputStream(statePath.toString())) {
+      byte state = (byte) fis.read();
+
+      if (state < 0 || DEVICE_STATE.length <= state) {
+        return String.format("Unknown (%d)", state);
+      } else {
+        return DEVICE_STATE[state];
+      }
+    }
+  }
+
+  private CommandExecutor createCommandExecutor(String[] command) {
+    return new Shell.ShellCommandExecutor(
+        command);
+  }
+
+  @VisibleForTesting
+  void setCommandExecutorProvider(
+      Function<String[], CommandExecutor> provider) {
+    this.commandExecutorProvider = provider;
+  }
+}

+ 46 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/TestNECVEPlugin.java

@@ -23,6 +23,12 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.anyString;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 
+import org.apache.commons.compress.utils.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.util.Shell.CommandExecutor;
@@ -70,6 +77,9 @@ public class TestNECVEPlugin {
   @Mock
   private CommandExecutor mockCommandExecutor;
 
+  @Mock
+  private UdevUtil udevUtil;
+
   private String defaultScriptOutput;
 
   private NECVEPlugin plugin;
@@ -104,7 +114,7 @@ public class TestNECVEPlugin {
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
 
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
     plugin.setCommandExecutorProvider(commandExecutorProvider);
     when(mockCommandExecutor.getOutput()).thenReturn(defaultScriptOutput);
 
@@ -125,7 +135,7 @@ public class TestNECVEPlugin {
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
 
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
     plugin.setCommandExecutorProvider(commandExecutorProvider);
 
     defaultScriptOutput += "\n";
@@ -183,7 +193,7 @@ public class TestNECVEPlugin {
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
 
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
     plugin.setCommandExecutorProvider(commandExecutorProvider);
     defaultScriptOutput = getOutputForDevice(
         0,
@@ -204,7 +214,7 @@ public class TestNECVEPlugin {
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
 
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
     plugin.setCommandExecutorProvider(commandExecutorProvider);
 
     defaultScriptOutput += "\n";
@@ -254,7 +264,7 @@ public class TestNECVEPlugin {
     Files.delete(Paths.get(testFolder, DEFAULT_SCRIPT_NAME));
     env.put("NEC_VE_GET_SCRIPT_NAME", dummyScriptName);
 
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
 
     verifyBinaryPathSet(scriptPath);
   }
@@ -272,7 +282,7 @@ public class TestNECVEPlugin {
     env.put("NEC_VE_GET_SCRIPT_PATH",
         testFolder + "/" + DEFAULT_SCRIPT_NAME);
 
-    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS);
+    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS, udevUtil);
 
     verifyBinaryPathSet(scriptPath);
   }
@@ -284,7 +294,7 @@ public class TestNECVEPlugin {
 
     env.put("NEC_VE_GET_SCRIPT_PATH", testFolder);
 
-    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS);
+    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS, udevUtil);
   }
 
   @Test(expected = ResourceHandlerException.class)
@@ -300,7 +310,7 @@ public class TestNECVEPlugin {
     env.put("NEC_VE_GET_SCRIPT_PATH",
         testFolder + "/" + DEFAULT_SCRIPT_NAME);
 
-    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS);
+    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS, udevUtil);
   }
 
   @Test
@@ -317,7 +327,7 @@ public class TestNECVEPlugin {
 
     env.put("HADOOP_COMMON_HOME", testFolder);
 
-    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS);
+    plugin = new NECVEPlugin(envProvider, EMPTY_SEARCH_DIRS, udevUtil);
     verifyBinaryPathSet(scriptPath);
   }
 
@@ -326,7 +336,7 @@ public class TestNECVEPlugin {
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
 
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
 
     Path scriptPath = Paths.get(testFolder, DEFAULT_SCRIPT_NAME);
     verifyBinaryPathSet(scriptPath);
@@ -336,7 +346,7 @@ public class TestNECVEPlugin {
   public void testAllocateSingleDevice()
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
     Set<Device> available = new HashSet<>();
     Device device = getTestDevice(0);
     available.add(device);
@@ -352,7 +362,7 @@ public class TestNECVEPlugin {
   public void testAllocateMultipleDevices()
       throws ResourceHandlerException, IOException {
     setupTestDirectoryWithScript();
-    plugin = new NECVEPlugin(envProvider, defaultSearchDirs);
+    plugin = new NECVEPlugin(envProvider, defaultSearchDirs, udevUtil);
     Set<Device> available = new HashSet<>();
     Device device0 = getTestDevice(0);
     Device device1 = getTestDevice(1);
@@ -366,6 +376,29 @@ public class TestNECVEPlugin {
     assertTrue("Device missing", allocated.contains(device1));
   }
 
+  @Test
+  public void testFindDevicesWithUdev()
+      throws ResourceHandlerException, IOException {
+    @SuppressWarnings("unchecked")
+    Function<String, String> mockEnvProvider = mock(Function.class);
+    VEDeviceDiscoverer veDeviceDiscoverer = mock(VEDeviceDiscoverer.class);
+    when(mockEnvProvider.apply(eq("NEC_USE_UDEV"))).thenReturn("true");
+    Device testDevice = getTestDevice(0);
+    when(veDeviceDiscoverer.getDevicesFromPath(anyString()))
+      .thenReturn(Sets.newHashSet(testDevice));
+    plugin = new NECVEPlugin(mockEnvProvider, defaultSearchDirs, udevUtil);
+    plugin.setVeDeviceDiscoverer(veDeviceDiscoverer);
+
+    Set<Device> devices = plugin.getDevices();
+
+    assertEquals("No. of devices", 1, devices.size());
+    Device device = devices.iterator().next();
+    assertSame("Device", device, testDevice);
+    verifyZeroInteractions(mockCommandExecutor);
+    verify(mockEnvProvider).apply(eq("NEC_USE_UDEV"));
+    verifyNoMoreInteractions(mockEnvProvider);
+  }
+
   private void setupTestDirectoryWithScript() throws IOException {
     setupTestDirectory(null);
 
@@ -409,5 +442,6 @@ public class TestNECVEPlugin {
   private void verifyBinaryPathSet(Path expectedPath) {
     assertEquals("Binary path", expectedPath.toString(),
         plugin.getBinaryPath());
+    verifyZeroInteractions(udevUtil);
   }
 }

+ 283 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/TestVEDeviceDiscoverer.java

@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyChar;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.util.Shell.CommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Unit tests for VEDeviceDiscoverer class.
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestVEDeviceDiscoverer {
+  private static final Comparator<Device> DEVICE_COMPARATOR =
+      Comparator.comparingInt(Device::getId);
+
+  @Rule
+  public ExpectedException expected = ExpectedException.none();
+
+  @Mock
+  private UdevUtil udevUtil;
+
+  @Mock
+  private CommandExecutor mockCommandExecutor;
+
+  private String testFolder;
+  private VEDeviceDiscoverer discoverer;
+
+  @Before
+  public void setup() throws IOException {
+    Function<String[], CommandExecutor> commandExecutorProvider =
+        (String[] cmd) -> mockCommandExecutor;
+    discoverer = new VEDeviceDiscoverer(udevUtil);
+    discoverer.setCommandExecutorProvider(commandExecutorProvider);
+    setupTestDirectory();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (testFolder != null) {
+      File f = new File(testFolder);
+      FileUtils.deleteDirectory(f);
+    }
+  }
+
+  @Test
+  public void testDetectSingleOnlineDevice() throws IOException {
+    createVeSlotFile(0);
+    createOsStateFile(0);
+    when(mockCommandExecutor.getOutput())
+      .thenReturn("8:1:character special file");
+    when(udevUtil.getSysPath(anyInt(), anyChar())).thenReturn(testFolder);
+
+    Set<Device> devices = discoverer.getDevicesFromPath(testFolder);
+
+    assertEquals("Number of devices", 1, devices.size());
+    Device device = devices.iterator().next();
+    assertEquals("Device ID", 0, device.getId());
+    assertEquals("Major number", 8, device.getMajorNumber());
+    assertEquals("Minor number", 1, device.getMinorNumber());
+    assertEquals("Status", "ONLINE", device.getStatus());
+    assertTrue("Device is not healthy", device.isHealthy());
+  }
+
+  @Test
+  public void testDetectMultipleOnlineDevices() throws IOException {
+    createVeSlotFile(0);
+    createVeSlotFile(1);
+    createVeSlotFile(2);
+    createOsStateFile(0);
+    when(mockCommandExecutor.getOutput()).thenReturn(
+        "8:1:character special file",
+        "9:1:character special file",
+        "a:1:character special file");
+    when(udevUtil.getSysPath(anyInt(), anyChar())).thenReturn(testFolder);
+
+    Set<Device> devices = discoverer.getDevicesFromPath(testFolder);
+
+    assertEquals("Number of devices", 3, devices.size());
+    List<Device> devicesList = Lists.newArrayList(devices);
+    devicesList.sort(DEVICE_COMPARATOR);
+
+    Device device0 = devicesList.get(0);
+    assertEquals("Device ID", 0, device0.getId());
+    assertEquals("Major number", 8, device0.getMajorNumber());
+    assertEquals("Minor number", 1, device0.getMinorNumber());
+    assertEquals("Status", "ONLINE", device0.getStatus());
+    assertTrue("Device is not healthy", device0.isHealthy());
+
+    Device device1 = devicesList.get(1);
+    assertEquals("Device ID", 1, device1.getId());
+    assertEquals("Major number", 9, device1.getMajorNumber());
+    assertEquals("Minor number", 1, device1.getMinorNumber());
+    assertEquals("Status", "ONLINE", device1.getStatus());
+    assertTrue("Device is not healthy", device1.isHealthy());
+
+    Device device2 = devicesList.get(2);
+    assertEquals("Device ID", 2, device2.getId());
+    assertEquals("Major number", 10, device2.getMajorNumber());
+    assertEquals("Minor number", 1, device2.getMinorNumber());
+    assertEquals("Status", "ONLINE", device2.getStatus());
+    assertTrue("Device is not healthy", device2.isHealthy());
+  }
+
+  @Test
+  public void testNegativeDeviceStateNumber() throws IOException {
+    createVeSlotFile(0);
+    createOsStateFile(-1);
+    when(mockCommandExecutor.getOutput())
+      .thenReturn("8:1:character special file");
+    when(udevUtil.getSysPath(anyInt(), anyChar())).thenReturn(testFolder);
+
+    Set<Device> devices = discoverer.getDevicesFromPath(testFolder);
+
+    assertEquals("Number of devices", 1, devices.size());
+    Device device = devices.iterator().next();
+    assertEquals("Device ID", 0, device.getId());
+    assertEquals("Major number", 8, device.getMajorNumber());
+    assertEquals("Minor number", 1, device.getMinorNumber());
+    assertEquals("Status", "Unknown (-1)", device.getStatus());
+    assertFalse("Device should not be healthy", device.isHealthy());
+  }
+
+  @Test
+  public void testDeviceStateNumberTooHigh() throws IOException {
+    createVeSlotFile(0);
+    createOsStateFile(5);
+    when(mockCommandExecutor.getOutput())
+      .thenReturn("8:1:character special file");
+    when(udevUtil.getSysPath(anyInt(), anyChar())).thenReturn(testFolder);
+
+    Set<Device> devices = discoverer.getDevicesFromPath(testFolder);
+
+    assertEquals("Number of devices", 1, devices.size());
+    Device device = devices.iterator().next();
+    assertEquals("Device ID", 0, device.getId());
+    assertEquals("Major number", 8, device.getMajorNumber());
+    assertEquals("Minor number", 1, device.getMinorNumber());
+    assertEquals("Status", "Unknown (5)", device.getStatus());
+    assertFalse("Device should not be healthy", device.isHealthy());
+  }
+
+  @Test
+  public void testDeviceNumberFromMajorAndMinor() throws IOException {
+    createVeSlotFile(0);
+    createVeSlotFile(1);
+    createVeSlotFile(2);
+    createOsStateFile(0);
+    when(mockCommandExecutor.getOutput()).thenReturn(
+        "10:1:character special file",
+        "1d:2:character special file",
+        "4:3c:character special file");
+    when(udevUtil.getSysPath(anyInt(), anyChar())).thenReturn(testFolder);
+
+    Set<Device> devices = discoverer.getDevicesFromPath(testFolder);
+
+    List<Device> devicesList = Lists.newArrayList(devices);
+    devicesList.sort(DEVICE_COMPARATOR);
+
+    Device device0 = devicesList.get(0);
+    assertEquals("Major number", 16, device0.getMajorNumber());
+    assertEquals("Minor number", 1, device0.getMinorNumber());
+
+    Device device1 = devicesList.get(1);
+    assertEquals("Major number", 29, device1.getMajorNumber());
+    assertEquals("Minor number", 2, device1.getMinorNumber());
+
+    Device device2 = devicesList.get(2);
+    assertEquals("Major number", 4, device2.getMajorNumber());
+    assertEquals("Minor number", 60, device2.getMinorNumber());
+  }
+
+  @Test
+  public void testNonVESlotFilesAreSkipped() throws IOException {
+    createVeSlotFile(0);
+    createOsStateFile(0);
+    createFile("abcde");
+    createFile("vexlot");
+    createFile("xyzveslot");
+
+    when(mockCommandExecutor.getOutput()).thenReturn(
+        "8:1:character special file",
+        "9:1:character special file",
+        "10:1:character special file",
+        "11:1:character special file",
+        "12:1:character special file");
+    when(udevUtil.getSysPath(anyInt(), anyChar())).thenReturn(testFolder);
+
+    Set<Device> devices = discoverer.getDevicesFromPath(testFolder);
+
+    assertEquals("Number of devices", 1, devices.size());
+    Device device = devices.iterator().next();
+    assertEquals("Device ID", 0, device.getId());
+    assertEquals("Major number", 8, device.getMajorNumber());
+    assertEquals("Minor number", 1, device.getMinorNumber());
+    assertEquals("Status", "ONLINE", device.getStatus());
+    assertTrue("Device is not healthy", device.isHealthy());
+  }
+
+  @Test
+  public void testNonBlockOrCharFilesAreRejected() throws IOException {
+    expected.expect(IllegalArgumentException.class);
+    expected.expectMessage("File is neither a char nor block device");
+    createVeSlotFile(0);
+    when(mockCommandExecutor.getOutput()).thenReturn(
+        "0:0:regular file");
+
+    discoverer.getDevicesFromPath(testFolder);
+  }
+
+  private void setupTestDirectory() throws IOException {
+    String path = "target/temp/" +
+        TestVEDeviceDiscoverer.class.getName();
+
+    testFolder = new File(path).getAbsolutePath();
+    File f = new File(testFolder);
+    FileUtils.deleteDirectory(f);
+
+    if (!f.mkdirs()) {
+      throw new RuntimeException("Could not create directory: " +
+          f.getAbsolutePath());
+    }
+  }
+
+  private void createVeSlotFile(int slot) throws IOException {
+    Files.createFile(Paths.get(testFolder, "veslot" + String.valueOf(slot)));
+  }
+
+  private void createFile(String name) throws IOException {
+    Files.createFile(Paths.get(testFolder, name));
+  }
+
+  private void createOsStateFile(int state) throws IOException {
+    Path path = Paths.get(testFolder, "os_state");
+    Files.createFile(path);
+
+    Files.write(path, new byte[]{(byte) state});
+  }
+}