Browse Source

YARN-7487. Ensure volume to include GPU base libraries after created by plugin. Contributed by Wangda Tan.

Sunil G 7 years ago
parent
commit
556aea3f36

+ 61 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java

@@ -337,7 +337,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
     return false;
   }
 
-  private void runDockerVolumeCommand(DockerVolumeCommand dockerVolumeCommand,
+  private String runDockerVolumeCommand(DockerVolumeCommand dockerVolumeCommand,
       Container container) throws ContainerExecutionException {
     try {
       String commandFile = dockerClient.writeCommandToTempFile(
@@ -351,6 +351,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
       LOG.info("ContainerId=" + container.getContainerId()
           + ", docker volume output for " + dockerVolumeCommand + ": "
           + output);
+      return output;
     } catch (ContainerExecutionException e) {
       LOG.error("Error when writing command to temp file, command="
               + dockerVolumeCommand,
@@ -378,15 +379,73 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
             plugin.getDockerCommandPluginInstance();
         if (dockerCommandPlugin != null) {
           DockerVolumeCommand dockerVolumeCommand =
-              dockerCommandPlugin.getCreateDockerVolumeCommand(ctx.getContainer());
+              dockerCommandPlugin.getCreateDockerVolumeCommand(
+                  ctx.getContainer());
           if (dockerVolumeCommand != null) {
             runDockerVolumeCommand(dockerVolumeCommand, container);
+
+            // After volume created, run inspect to make sure volume properly
+            // created.
+            if (dockerVolumeCommand.getSubCommand().equals(
+                DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) {
+              checkDockerVolumeCreated(dockerVolumeCommand, container);
+            }
           }
         }
       }
     }
   }
 
+  private void checkDockerVolumeCreated(
+      DockerVolumeCommand dockerVolumeCreationCommand, Container container)
+      throws ContainerExecutionException {
+    DockerVolumeCommand dockerVolumeInspectCommand = new DockerVolumeCommand(
+        DockerVolumeCommand.VOLUME_LS_SUB_COMMAND);
+    dockerVolumeInspectCommand.setFormat("{{.Name}},{{.Driver}}");
+    String output = runDockerVolumeCommand(dockerVolumeInspectCommand,
+        container);
+
+    // Parse output line by line and check if it matches
+    String volumeName = dockerVolumeCreationCommand.getVolumeName();
+    String driverName = dockerVolumeCreationCommand.getDriverName();
+    if (driverName == null) {
+      driverName = "local";
+    }
+
+    for (String line : output.split("\n")) {
+      line = line.trim();
+      String[] arr = line.split(",");
+      String v = arr[0].trim();
+      String d = null;
+      if (arr.length > 1) {
+        d = arr[1].trim();
+      }
+      if (d != null && volumeName.equals(v) && driverName.equals(d)) {
+        // Good we found it.
+        LOG.info(
+            "Docker volume-name=" + volumeName + " driver-name=" + driverName
+                + " already exists for container=" + container
+                .getContainerId() + ", continue...");
+        return;
+      }
+    }
+
+    // Couldn't find the volume
+    String message =
+        " Couldn't find volume=" + volumeName + " driver=" + driverName
+            + " for container=" + container.getContainerId()
+            + ", please check error message in log to understand "
+            + "why this happens.";
+    LOG.error(message);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("All docker volumes in the system, command="
+          + dockerVolumeInspectCommand.toString());
+    }
+
+    throw new ContainerExecutionException(message);
+  }
+
   private void validateContainerNetworkType(String network)
       throws ContainerExecutionException {
     if (allowedNetworks.contains(network)) {

+ 28 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerVolumeCommand.java

@@ -27,23 +27,50 @@ import java.util.regex.Pattern;
  */
 public class DockerVolumeCommand extends DockerCommand {
   public static final String VOLUME_COMMAND = "volume";
-  public static final String VOLUME_CREATE_COMMAND = "create";
+  public static final String VOLUME_CREATE_SUB_COMMAND = "create";
+  public static final String VOLUME_LS_SUB_COMMAND = "ls";
+
   // Regex pattern for volume name
   public static final Pattern VOLUME_NAME_PATTERN = Pattern.compile(
       "[a-zA-Z0-9][a-zA-Z0-9_.-]*");
 
+  private String volumeName;
+  private String driverName;
+  private String subCommand;
+
   public DockerVolumeCommand(String subCommand) {
     super(VOLUME_COMMAND);
+    this.subCommand = subCommand;
     super.addCommandArguments("sub-command", subCommand);
   }
 
   public DockerVolumeCommand setVolumeName(String volumeName) {
     super.addCommandArguments("volume", volumeName);
+    this.volumeName = volumeName;
     return this;
   }
 
   public DockerVolumeCommand setDriverName(String driverName) {
     super.addCommandArguments("driver", driverName);
+    this.driverName = driverName;
+    return this;
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public String getDriverName() {
+    return driverName;
+  }
+
+  public String getSubCommand() {
+    return subCommand;
+  }
+
+  public DockerVolumeCommand setFormat(String format) {
+    super.addCommandArguments("format", format);
     return this;
   }
+
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/NvidiaDockerV1CommandPlugin.java

@@ -301,7 +301,7 @@ public class NvidiaDockerV1CommandPlugin implements DockerCommandPlugin {
 
     if (newVolumeName != null) {
       DockerVolumeCommand command = new DockerVolumeCommand(
-          DockerVolumeCommand.VOLUME_CREATE_COMMAND);
+          DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND);
       command.setDriverName(volumeDriver);
       command.setVolumeName(newVolumeName);
       return command;

+ 65 - 41
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c

@@ -299,29 +299,19 @@ static int value_permitted(const struct configuration* executor_cfg,
 int get_docker_volume_command(const char *command_file, const struct configuration *conf, char *out,
                                const size_t outlen) {
   int ret = 0;
-  char *driver = NULL, *volume_name = NULL, *sub_command = NULL;
+  char *driver = NULL, *volume_name = NULL, *sub_command = NULL, *format = NULL;
   struct configuration command_config = {0, NULL};
   ret = read_and_verify_command_file(command_file, DOCKER_VOLUME_COMMAND, &command_config);
   if (ret != 0) {
     return ret;
   }
   sub_command = get_configuration_value("sub-command", DOCKER_COMMAND_FILE_SECTION, &command_config);
-  if (sub_command == NULL || 0 != strcmp(sub_command, "create")) {
-    fprintf(ERRORFILE, "\"create\" is the only acceptable sub-command of volume.\n");
-    ret = INVALID_DOCKER_VOLUME_COMMAND;
-    goto cleanup;
-  }
-
-  volume_name = get_configuration_value("volume", DOCKER_COMMAND_FILE_SECTION, &command_config);
-  if (volume_name == NULL || validate_volume_name(volume_name) != 0) {
-    fprintf(ERRORFILE, "%s is not a valid volume name.\n", volume_name);
-    ret = INVALID_DOCKER_VOLUME_NAME;
-    goto cleanup;
-  }
 
-  driver = get_configuration_value("driver", DOCKER_COMMAND_FILE_SECTION, &command_config);
-  if (driver == NULL) {
-    ret = INVALID_DOCKER_VOLUME_DRIVER;
+  if ((sub_command == NULL) || ((0 != strcmp(sub_command, "create")) &&
+      (0 != strcmp(sub_command, "ls")))) {
+    fprintf(ERRORFILE, "\"create/ls\" are the only acceptable sub-command of volume, input sub_command=\"%s\"\n",
+       sub_command);
+    ret = INVALID_DOCKER_VOLUME_COMMAND;
     goto cleanup;
   }
 
@@ -338,42 +328,76 @@ int get_docker_volume_command(const char *command_file, const struct configurati
     goto cleanup;
   }
 
-  ret = add_to_buffer(out, outlen, " create");
-  if (ret != 0) {
-    goto cleanup;
-  }
+  if (0 == strcmp(sub_command, "create")) {
+    volume_name = get_configuration_value("volume", DOCKER_COMMAND_FILE_SECTION, &command_config);
+    if (volume_name == NULL || validate_volume_name(volume_name) != 0) {
+      fprintf(ERRORFILE, "%s is not a valid volume name.\n", volume_name);
+      ret = INVALID_DOCKER_VOLUME_NAME;
+      goto cleanup;
+    }
 
-  ret = add_to_buffer(out, outlen, " --name=");
-  if (ret != 0) {
-    goto cleanup;
-  }
+    driver = get_configuration_value("driver", DOCKER_COMMAND_FILE_SECTION, &command_config);
+    if (driver == NULL) {
+      ret = INVALID_DOCKER_VOLUME_DRIVER;
+      goto cleanup;
+    }
 
-  ret = add_to_buffer(out, outlen, volume_name);
-  if (ret != 0) {
-    goto cleanup;
-  }
+    ret = add_to_buffer(out, outlen, " create");
+    if (ret != 0) {
+      goto cleanup;
+    }
 
-  if (!value_permitted(conf, "docker.allowed.volume-drivers", driver)) {
-    fprintf(ERRORFILE, "%s is not permitted docker.allowed.volume-drivers\n",
-      driver);
-    ret = INVALID_DOCKER_VOLUME_DRIVER;
-    goto cleanup;
-  }
+    ret = add_to_buffer(out, outlen, " --name=");
+    if (ret != 0) {
+      goto cleanup;
+    }
 
-  ret = add_to_buffer(out, outlen, " --driver=");
-  if (ret != 0) {
-    goto cleanup;
-  }
+    ret = add_to_buffer(out, outlen, volume_name);
+    if (ret != 0) {
+      goto cleanup;
+    }
 
-  ret = add_to_buffer(out, outlen, driver);
-  if (ret != 0) {
-    goto cleanup;
+    if (!value_permitted(conf, "docker.allowed.volume-drivers", driver)) {
+      fprintf(ERRORFILE, "%s is not permitted docker.allowed.volume-drivers\n",
+        driver);
+      ret = INVALID_DOCKER_VOLUME_DRIVER;
+      goto cleanup;
+    }
+
+    ret = add_to_buffer(out, outlen, " --driver=");
+    if (ret != 0) {
+      goto cleanup;
+    }
+
+    ret = add_to_buffer(out, outlen, driver);
+    if (ret != 0) {
+      goto cleanup;
+    }
+  } else if (0 == strcmp(sub_command, "ls")) {
+    format = get_configuration_value("format", DOCKER_COMMAND_FILE_SECTION, &command_config);
+
+    ret = add_to_buffer(out, outlen, " ls");
+    if (ret != 0) {
+      goto cleanup;
+    }
+
+    if (format) {
+      ret = add_to_buffer(out, outlen, " --format=");
+      if (ret != 0) {
+        goto cleanup;
+      }
+      ret = add_to_buffer(out, outlen, format);
+      if (ret != 0) {
+        goto cleanup;
+      }
+    }
   }
 
 cleanup:
   free(driver);
   free(volume_name);
   free(sub_command);
+  free(format);
 
   // clean up out buffer
   if (ret != 0) {

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/utils/test_docker_util.cc

@@ -1132,12 +1132,15 @@ namespace ContainerExecutor {
     file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
         "[docker-command-execution]\n  docker-command=volume\n  sub-command=create\n  volume=volume1 \n driver=driver1",
         "volume create --name=volume1 --driver=driver1"));
+    file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
+       "[docker-command-execution]\n  docker-command=volume\n  format={{.Name}},{{.Driver}}\n  sub-command=ls",
+       "volume ls --format={{.Name}},{{.Driver}}"));
 
     std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
 
     // Wrong subcommand
     bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
-        "[docker-command-execution]\n  docker-command=volume\n  sub-command=ls\n  volume=volume1 \n driver=driver1",
+        "[docker-command-execution]\n  docker-command=volume\n  sub-command=inspect\n  volume=volume1 \n driver=driver1",
         static_cast<int>(INVALID_DOCKER_VOLUME_COMMAND)));
 
     // Volume not specified

+ 145 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java

@@ -1301,7 +1301,7 @@ public class TestDockerContainerRuntime {
     //single invocation expected
     //due to type erasure + mocking, this verification requires a suppress
     // warning annotation on the entire method
-    verify(mockExecutor, times(1))
+    verify(mockExecutor, times(2))
         .executePrivilegedOperation(anyList(), opCaptor.capture(), any(
             File.class), anyMap(), anyBoolean(), anyBoolean());
 
@@ -1309,7 +1309,9 @@ public class TestDockerContainerRuntime {
     // hence, reset mock here
     Mockito.reset(mockExecutor);
 
-    PrivilegedOperation op = opCaptor.getValue();
+    List<PrivilegedOperation> allCaptures = opCaptor.getAllValues();
+
+    PrivilegedOperation op = allCaptures.get(0);
     Assert.assertEquals(PrivilegedOperation.OperationType
         .RUN_DOCKER_CMD, op.getOperationType());
 
@@ -1317,14 +1319,151 @@ public class TestDockerContainerRuntime {
     FileInputStream fileInputStream = new FileInputStream(commandFile);
     String fileContent = new String(IOUtils.toByteArray(fileInputStream));
     Assert.assertEquals("[docker-command-execution]\n"
-        + "  docker-command=volume\n" + "  sub-command=create\n"
-        + "  volume=volume1\n", fileContent);
+        + "  docker-command=volume\n" + "  driver=local\n"
+        + "  sub-command=create\n" + "  volume=volume1\n", fileContent);
+    fileInputStream.close();
+
+    op = allCaptures.get(1);
+    Assert.assertEquals(PrivilegedOperation.OperationType
+        .RUN_DOCKER_CMD, op.getOperationType());
+
+    commandFile = new File(StringUtils.join(",", op.getArguments()));
+    fileInputStream = new FileInputStream(commandFile);
+    fileContent = new String(IOUtils.toByteArray(fileInputStream));
+    Assert.assertEquals("[docker-command-execution]\n"
+        + "  docker-command=volume\n" + "  format={{.Name}},{{.Driver}}\n"
+        + "  sub-command=ls\n", fileContent);
+    fileInputStream.close();
+  }
+
+  private static class MockDockerCommandPlugin implements DockerCommandPlugin {
+    private final String volume;
+    private final String driver;
+
+    public MockDockerCommandPlugin(String volume, String driver) {
+      this.volume = volume;
+      this.driver = driver;
+    }
+
+    @Override
+    public void updateDockerRunCommand(DockerRunCommand dockerRunCommand,
+        Container container) throws ContainerExecutionException {
+      dockerRunCommand.setVolumeDriver("driver-1");
+      dockerRunCommand.addReadOnlyMountLocation("/source/path",
+          "/destination/path", true);
+    }
+
+    @Override
+    public DockerVolumeCommand getCreateDockerVolumeCommand(Container container)
+        throws ContainerExecutionException {
+      return new DockerVolumeCommand("create").setVolumeName(volume)
+          .setDriverName(driver);
+    }
+
+    @Override
+    public DockerVolumeCommand getCleanupDockerVolumesCommand(
+        Container container) throws ContainerExecutionException {
+      return null;
+    }
+  }
+
+  private void testDockerCommandPluginWithVolumesOutput(
+      String dockerVolumeListOutput, boolean expectFail)
+      throws PrivilegedOperationException, ContainerExecutionException,
+      IOException {
+    mockExecutor = Mockito
+        .mock(PrivilegedOperationExecutor.class);
+
+    DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
+        mockExecutor, mockCGroupsHandler);
+    when(mockExecutor
+        .executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
+            any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
+        null);
+    when(mockExecutor
+        .executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
+            any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
+        dockerVolumeListOutput);
+
+    Context nmContext = mock(Context.class);
+    ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+    Map<String, ResourcePlugin> pluginsMap = new HashMap<>();
+    ResourcePlugin plugin1 = mock(ResourcePlugin.class);
+
+    // Create the docker command plugin logic, which will set volume driver
+    DockerCommandPlugin dockerCommandPlugin = new MockDockerCommandPlugin(
+        "volume1", "local");
+
+    when(plugin1.getDockerCommandPluginInstance()).thenReturn(
+        dockerCommandPlugin);
+    ResourcePlugin plugin2 = mock(ResourcePlugin.class);
+    pluginsMap.put("plugin1", plugin1);
+    pluginsMap.put("plugin2", plugin2);
+
+    when(rpm.getNameToPlugins()).thenReturn(pluginsMap);
+
+    when(nmContext.getResourcePluginManager()).thenReturn(rpm);
+
+    runtime.initialize(conf, nmContext);
+
+    ContainerRuntimeContext containerRuntimeContext = builder.build();
+
+    try {
+      runtime.prepareContainer(containerRuntimeContext);
+
+      checkVolumeCreateCommand();
+
+      runtime.launchContainer(containerRuntimeContext);
+    } catch (ContainerExecutionException e) {
+      if (expectFail) {
+        // Expected
+        return;
+      } else{
+        Assert.fail("Should successfully prepareContainers" + e);
+      }
+    }
+    if (expectFail) {
+      Assert.fail(
+          "Should fail because output is illegal");
+    }
+  }
+
+  @Test
+  public void testDockerCommandPluginCheckVolumeAfterCreation()
+      throws Exception {
+    // For following tests, we expect to have volume1,local in output
+
+    // Failure cases
+    testDockerCommandPluginWithVolumesOutput("", true);
+    testDockerCommandPluginWithVolumesOutput("volume1", true);
+    testDockerCommandPluginWithVolumesOutput("local", true);
+    testDockerCommandPluginWithVolumesOutput("volume2,local", true);
+    testDockerCommandPluginWithVolumesOutput("volum1,something", true);
+    testDockerCommandPluginWithVolumesOutput("volum1,something\nvolum2,local",
+        true);
+
+    // Success case
+    testDockerCommandPluginWithVolumesOutput("volume1,local\n", false);
+    testDockerCommandPluginWithVolumesOutput(
+        "volume_xyz,nvidia\nvolume1,local\n\n", false);
+    testDockerCommandPluginWithVolumesOutput(" volume1,  local \n", false);
+    testDockerCommandPluginWithVolumesOutput(
+        "volume_xyz,\tnvidia\n   volume1,\tlocal\n\n", false);
   }
 
+
   @Test
   public void testDockerCommandPlugin() throws Exception {
     DockerLinuxContainerRuntime runtime =
         new DockerLinuxContainerRuntime(mockExecutor, mockCGroupsHandler);
+    when(mockExecutor
+        .executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
+            any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
+        null);
+    when(mockExecutor
+        .executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
+            any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
+        "volume1,local");
 
     Context nmContext = mock(Context.class);
     ResourcePluginManager rpm = mock(ResourcePluginManager.class);
@@ -1332,27 +1471,8 @@ public class TestDockerContainerRuntime {
     ResourcePlugin plugin1 = mock(ResourcePlugin.class);
 
     // Create the docker command plugin logic, which will set volume driver
-    DockerCommandPlugin dockerCommandPlugin = new DockerCommandPlugin() {
-      @Override
-      public void updateDockerRunCommand(DockerRunCommand dockerRunCommand,
-          Container container) throws ContainerExecutionException {
-        dockerRunCommand.setVolumeDriver("driver-1");
-        dockerRunCommand.addReadOnlyMountLocation("/source/path",
-            "/destination/path", true);
-      }
-
-      @Override
-      public DockerVolumeCommand getCreateDockerVolumeCommand(Container container)
-          throws ContainerExecutionException {
-        return new DockerVolumeCommand("create").setVolumeName("volume1");
-      }
-
-      @Override
-      public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container)
-          throws ContainerExecutionException {
-        return null;
-      }
-    };
+    DockerCommandPlugin dockerCommandPlugin = new MockDockerCommandPlugin(
+        "volume1", "local");
 
     when(plugin1.getDockerCommandPluginInstance()).thenReturn(
         dockerCommandPlugin);