Browse Source

YARN-9008. Extend YARN distributed shell with file localization feature. (Contributed by Peter Bacsko)

Haibo Chen 6 years ago
parent
commit
fb55e5201e

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.StringReader;
+import java.io.UncheckedIOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -57,6 +58,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -232,6 +235,9 @@ public class ApplicationMaster {
   @VisibleForTesting
   protected ApplicationAttemptId appAttemptID;
 
+  private ApplicationId appId;
+  private String appName;
+
   // TODO
   // For status update for clients - yet to be implemented
   // Hostname of the container
@@ -316,6 +322,8 @@ public class ApplicationMaster {
   private int containrRetryInterval = 0;
   private long containerFailuresValidityInterval = -1;
 
+  private List<String> localizableFiles = new ArrayList<>();
+
   // Timeline domain ID
   private String domainId = null;
 
@@ -447,6 +455,8 @@ public class ApplicationMaster {
    */
   public boolean init(String[] args) throws ParseException, IOException {
     Options opts = new Options();
+    opts.addOption("appname", true,
+        "Application Name. Default value - DistributedShell");
     opts.addOption("app_attempt_id", true,
         "App Attempt ID. Not to be used unless for testing purposes");
     opts.addOption("shell_env", true,
@@ -493,6 +503,7 @@ public class ApplicationMaster {
             + " application attempt fails and these containers will be "
             + "retrieved by"
             + " the new application attempt ");
+    opts.addOption("localized_files", true, "List of localized files");
 
     opts.addOption("help", false, "Print usage");
     CommandLine cliParser = new GnuParser().parse(opts, args);
@@ -513,6 +524,8 @@ public class ApplicationMaster {
       }
     }
 
+    appName = cliParser.getOptionValue("appname", "DistributedShell");
+
     if (cliParser.hasOption("help")) {
       printUsage(opts);
       return false;
@@ -553,6 +566,7 @@ public class ApplicationMaster {
       ContainerId containerId = ContainerId.fromString(envs
           .get(Environment.CONTAINER_ID.name()));
       appAttemptID = containerId.getApplicationAttemptId();
+      appId = appAttemptID.getApplicationId();
     }
 
     if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
@@ -698,6 +712,16 @@ public class ApplicationMaster {
       LOG.warn("Timeline service is not enabled");
     }
 
+    if (cliParser.hasOption("localized_files")) {
+      String localizedFilesArg = cliParser.getOptionValue("localized_files");
+      if (localizedFilesArg.contains(",")) {
+        String[] files = localizedFilesArg.split(",");
+        localizableFiles = Arrays.asList(files);
+      } else {
+        localizableFiles.add(localizedFilesArg);
+      }
+    }
+
     return true;
   }
 
@@ -1002,6 +1026,11 @@ public class ApplicationMaster {
     return success;
   }
 
+  public static String getRelativePath(String appName,
+      String appId, String fileDstPath) {
+    return appName + "/" + appId + "/" + fileDstPath;
+  }
+
   @VisibleForTesting
   class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
     @SuppressWarnings("unchecked")
@@ -1422,6 +1451,35 @@ public class ApplicationMaster {
         shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
       }
 
+      // Set up localization for the container which runs the command
+      if (localizableFiles.size() > 0) {
+        FileSystem fs;
+        try {
+          fs = FileSystem.get(conf);
+        } catch (IOException e) {
+          throw new UncheckedIOException("Cannot get FileSystem", e);
+        }
+
+        localizableFiles.stream().forEach(fileName -> {
+          try {
+            String relativePath =
+                getRelativePath(appName, appId.toString(), fileName);
+            Path dst =
+                new Path(fs.getHomeDirectory(), relativePath);
+            FileStatus fileStatus = fs.getFileStatus(dst);
+            LocalResource localRes = LocalResource.newInstance(
+                URL.fromURI(dst.toUri()),
+                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+                fileStatus.getLen(), fileStatus.getModificationTime());
+            LOG.info("Setting up file for localization: " + dst);
+            localResources.put(fileName, localRes);
+          } catch (IOException e) {
+            throw new UncheckedIOException(
+                "Error during localization setup", e);
+          }
+        });
+      }
+
       // Set the necessary command to execute on the allocated container
       Vector<CharSequence> vargs = new Vector<CharSequence>(5);
 

+ 73 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.applications.distributedshell;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -32,6 +34,7 @@ import java.util.Arrays;
 import java.util.Base64;
 
 import com.google.common.base.Joiner;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -235,6 +238,8 @@ public class Client {
   // Application tags
   private Set<String> applicationTags = new HashSet<>();
 
+  private List<String> filesToLocalize = new ArrayList<>();
+
   // Command line options
   private Options opts;
 
@@ -392,6 +397,8 @@ public class Client {
             + " The \"num_containers\" option will be ignored. All requested"
             + " containers will be of type GUARANTEED" );
     opts.addOption("application_tags", true, "Application tags.");
+    opts.addOption("localize_files", true, "List of files, separated by comma"
+        + " to be localized for the command");
   }
 
   /**
@@ -621,6 +628,17 @@ public class Client {
         this.applicationTags.add(appTag.trim());
       }
     }
+
+    if (cliParser.hasOption("localize_files")) {
+      String filesStr = cliParser.getOptionValue("localize_files");
+      if (filesStr.contains(",")) {
+        String[] files = filesStr.split(",");
+        filesToLocalize = Arrays.asList(files);
+      } else {
+        filesToLocalize.add(filesStr);
+      }
+    }
+
     return true;
   }
 
@@ -714,7 +732,7 @@ public class Client {
           + ", specified=" + amMemory
           + ", max=" + maxMem);
       amMemory = maxMem;
-    }				
+    }
 
     int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
     LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
@@ -776,7 +794,42 @@ public class Client {
     if (!log4jPropFile.isEmpty()) {
       addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
           localResources, null);
-    }			
+    }
+
+    // Process local files for localization
+    // Here we just upload the files, the AM
+    // will set up localization later.
+    StringBuilder localizableFiles = new StringBuilder();
+    filesToLocalize.stream().forEach(path -> {
+      File f = new File(path);
+
+      if (!f.exists()) {
+        throw new UncheckedIOException(
+            new IOException(path + " does not exist"));
+      }
+
+      if (!f.canRead()) {
+        throw new UncheckedIOException(
+            new IOException(path + " cannot be read"));
+      }
+
+      if (f.isDirectory()) {
+        throw new UncheckedIOException(
+          new IOException(path + " is a directory"));
+      }
+
+      try {
+        String fileName = f.getName();
+        uploadFile(fs, path, fileName, appId.toString());
+        if (localizableFiles.length() == 0) {
+          localizableFiles.append(fileName);
+        } else {
+          localizableFiles.append(",").append(fileName);
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException("Cannot upload file: " + path, e);
+      }
+    });
 
     // The shell script has to be made available on the final container(s)
     // where it will be executed. 
@@ -790,7 +843,9 @@ public class Client {
     if (!shellScriptPath.isEmpty()) {
       Path shellSrc = new Path(shellScriptPath);
       String shellPathSuffix =
-          appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
+          ApplicationMaster.getRelativePath(appName,
+              appId.toString(),
+              SCRIPT_PATH);
       Path shellDst =
           new Path(fs.getHomeDirectory(), shellPathSuffix);
       fs.copyFromLocalFile(false, true, shellSrc, shellDst);
@@ -908,6 +963,10 @@ public class Client {
     if (debugFlag) {
       vargs.add("--debug");
     }
+    if (localizableFiles.length() > 0) {
+      vargs.add("--localized_files " + localizableFiles.toString());
+    }
+    vargs.add("--appname " + appName);
 
     vargs.addAll(containerRetryOptions);
 
@@ -1088,7 +1147,7 @@ public class Client {
       String fileDstPath, String appId, Map<String, LocalResource> localResources,
       String resources) throws IOException {
     String suffix =
-        appName + "/" + appId + "/" + fileDstPath;
+        ApplicationMaster.getRelativePath(appName, appId, fileDstPath);
     Path dst =
         new Path(fs.getHomeDirectory(), suffix);
     if (fileSrcPath == null) {
@@ -1112,6 +1171,16 @@ public class Client {
     localResources.put(fileDstPath, scRsrc);
   }
 
+  private void uploadFile(FileSystem fs, String fileSrcPath,
+      String fileDstPath, String appId) throws IOException {
+    String relativePath =
+        ApplicationMaster.getRelativePath(appName, appId, fileDstPath);
+    Path dst =
+        new Path(fs.getHomeDirectory(), relativePath);
+    LOG.info("Uploading file: " + fileSrcPath + " to " + dst);
+    fs.copyFromLocalFile(new Path(fileSrcPath), dst);
+  }
+
   private void prepareTimelineDomain() {
     TimelineClient timelineClient = null;
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,

+ 65 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -17,7 +17,7 @@
  */
 
 package org.apache.hadoop.yarn.applications.distributedshell;
-
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -30,6 +30,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.io.UncheckedIOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URL;
@@ -1630,4 +1631,67 @@ public class TestDistributedShell {
     client.init(args);
     client.run();
   }
+
+  @Test
+  public void testDistributedShellWithSingleFileLocalization()
+      throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "type" : "cat",
+        "--localize_files",
+        "./src/test/resources/a.txt",
+        "--shell_args",
+        "a.txt"
+    };
+
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    client.init(args);
+    assertTrue("Client exited with an error", client.run());
+  }
+
+  @Test
+  public void testDistributedShellWithMultiFileLocalization()
+      throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "type" : "cat",
+        "--localize_files",
+        "./src/test/resources/a.txt,./src/test/resources/b.txt",
+        "--shell_args",
+        "a.txt b.txt"
+    };
+
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    client.init(args);
+    assertTrue("Client exited with an error", client.run());
+  }
+
+  @Test(expected=UncheckedIOException.class)
+  public void testDistributedShellWithNonExistentFileLocalization()
+      throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "type" : "cat",
+        "--localize_files",
+        "/non/existing/path/file.txt",
+        "--shell_args",
+        "file.txt"
+    };
+
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    client.init(args);
+    client.run();
+  }
 }

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/a.txt

@@ -0,0 +1,15 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Sample file for testing
+
+aaaaaaaaaaaaaaaaaa

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/b.txt

@@ -0,0 +1,15 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Sample file for testing
+
+bbbbbbbbbbbbbbbbbbbb