Browse Source

YARN-8714. [Submarine] Support files/tarballs to be localized for a training job. (Zhankun Tang via wangda)

Change-Id: I845131273e52a9d81dbc813ea6d4af06b205e334
Wangda Tan 6 years ago
parent
commit
c771fe6e10
14 changed files with 1433 additions and 42 deletions
  1. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
  2. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
  3. 133 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java
  4. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
  5. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java
  6. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
  7. 66 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
  8. 17 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
  9. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
  10. 255 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
  11. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md
  12. 772 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
  13. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
  14. 85 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java

@@ -52,6 +52,7 @@ public class CliConstants {
   public static final String QUICKLINK = "quicklink";
   public static final String TENSORBOARD_DOCKER_IMAGE =
       "tensorboard_docker_image";
+  public static final String LOCALIZATION = "localization";
   public static final String KEYTAB = "keytab";
   public static final String PRINCIPAL = "principal";
   public static final String DISTRIBUTE_KEYTAB = "distribute_keytab";

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java

@@ -125,6 +125,24 @@ public class RunJobCli extends AbstractCli {
         + "if want to link to first worker's 7070 port, and text of quicklink "
         + "is Notebook_UI, user need to specify --quicklink "
         + "Notebook_UI=https://master-0:7070");
+    options.addOption(CliConstants.LOCALIZATION, true, "Specify"
+        + " localization to make remote/local file/directory available to"
+        + " all container(Docker)."
+        + " Argument format is \"RemoteUri:LocalFilePath[:rw] \" (ro"
+        + " permission is not supported yet)"
+        + " The RemoteUri can be a file or directory in local or"
+        + " HDFS or s3 or abfs or http .etc."
+        + " The LocalFilePath can be absolute or relative."
+        + " If it's a relative path, it'll be"
+        + " under container's implied working directory"
+        + " but sub directory is not supported yet."
+        + " This option can be set mutiple times."
+        + " Examples are \n"
+        + "-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n"
+        + "-localization \"s3a:///a/b/myfile1:./\"\n"
+        + "-localization \"https:///a/b/myfile2:./myfile\"\n"
+        + "-localization \"/user/yarn/mydir3:/opt/mydir3\"\n"
+        + "-localization \"./mydir1:.\"\n");
     options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " +
         "job under security environment");
     options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +

+ 133 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java

@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
+
+import org.apache.commons.cli.ParseException;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Localization parameter.
+ * */
+public class Localization {
+
+  private String mountPermissionPattern = "(wr|rw)$";
+  /**
+   * Regex for directory/file path in container.
+   * YARN only support absolute path for mount, but we can
+   * support some relative path.
+   * For relative path, we only allow ".", "./","./name".
+   * relative path like "./a/b" is not allowed.
+   * "." and "./" means original dir/file name in container working directory
+   * "./name" means use same or new "name" in container working directory
+   * A absolute path means same path in container filesystem
+   */
+  private String localPathPattern = "((^\\.$)|(^\\./$)|(^\\./[^/]+)|(^/.*))";
+  private String remoteUri;
+  private String localPath;
+
+  // Read write by default
+  private String mountPermission = "rw";
+
+  private static final List<String> SUPPORTED_SCHEME = Arrays.asList(
+      "hdfs", "oss", "s3a", "s3n", "wasb",
+      "wasbs", "abfs", "abfss", "adl", "har",
+      "ftp", "http", "https", "viewfs", "swebhdfs",
+      "webhdfs", "swift");
+
+  public void parse(String arg) throws ParseException {
+    String[] tokens = arg.split(":");
+    int minimum = "a:b".split(":").length;
+    int minimumWithPermission = "a:b:rw".split(":").length;
+    int minimumParts = minimum;
+    int miniPartsWithRemoteScheme = "scheme://a:b".split(":").length;
+    int maximumParts = "scheme://a:b:rw".split(":").length;
+    // If remote uri starts with a remote scheme
+    if (isSupportedScheme(tokens[0])) {
+      minimumParts = miniPartsWithRemoteScheme;
+    }
+    if (tokens.length < minimumParts
+        || tokens.length > maximumParts) {
+      throw new ParseException("Invalid parameter,"
+          + "should be \"remoteUri:localPath[:rw|:wr]\" "
+          + "format for --localizations");
+    }
+
+    /**
+     * RemoteUri starts with remote scheme.
+     * Merge part 0 and 1 to build a hdfs path in token[0].
+     * toke[1] will be localPath to ease following logic
+     * */
+    if (minimumParts == miniPartsWithRemoteScheme) {
+      tokens[0] = tokens[0] + ":" + tokens[1];
+      tokens[1] = tokens[2];
+      if (tokens.length == maximumParts) {
+        // Has permission part
+        mountPermission = tokens[maximumParts - 1];
+      }
+    }
+    // RemoteUri starts with linux file path
+    if (minimumParts == minimum
+        && tokens.length == minimumWithPermission) {
+      // Has permission part
+      mountPermission = tokens[minimumWithPermission - 1];
+    }
+    remoteUri = tokens[0];
+    localPath = tokens[1];
+    if (!localPath.matches(localPathPattern)) {
+      throw new ParseException("Invalid local file path:"
+          + localPath
+          + ", it only support \".\", \"./\", \"./name\" and "
+          + "absolute path.");
+    }
+    if (!mountPermission.matches(mountPermissionPattern)) {
+      throw new ParseException("Invalid mount permission (ro is not "
+          + "supported yet), " + mountPermission);
+    }
+  }
+
+  public String getRemoteUri() {
+    return remoteUri;
+  }
+
+  public void setRemoteUri(String rUti) {
+    this.remoteUri = rUti;
+  }
+
+  public String getLocalPath() {
+    return localPath;
+  }
+
+  public void setLocalPath(String lPath) {
+    this.localPath = lPath;
+  }
+
+  public String getMountPermission() {
+    return mountPermission;
+  }
+
+  public void setMountPermission(String mPermission) {
+    this.mountPermission = mPermission;
+  }
+
+  private boolean isSupportedScheme(String scheme) {
+    return SUPPORTED_SCHEME.contains(scheme);
+  }
+}

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java

@@ -44,6 +44,7 @@ public class RunJobParameters extends RunParameters {
   private String workerLaunchCmd;
   private String psLaunchCmd;
   private List<Quicklink> quicklinks = new ArrayList<>();
+  private List<Localization> localizations = new ArrayList<>();
 
   private String psDockerImage = null;
   private String workerDockerImage = null;
@@ -159,6 +160,16 @@ public class RunJobParameters extends RunParameters {
     String psLaunchCommand = parsedCommandLine.getOptionValue(
         CliConstants.PS_LAUNCH_CMD);
 
+    // Localizations
+    String[] localizationsStr = parsedCommandLine.getOptionValues(
+        CliConstants.LOCALIZATION);
+    if (null != localizationsStr) {
+      for (String loc : localizationsStr) {
+        Localization localization = new Localization();
+        localization.parse(loc);
+        localizations.add(localization);
+      }
+    }
     boolean distributeKerberosKeytab = parsedCommandLine.hasOption(CliConstants
         .DISTRIBUTE_KEYTAB);
 
@@ -288,6 +299,10 @@ public class RunJobParameters extends RunParameters {
     return quicklinks;
   }
 
+  public List<Localization> getLocalizations() {
+    return localizations;
+  }
+
   public String getKeytab() {
     return keytab;
   }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java

@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java

@@ -19,6 +19,21 @@ import org.apache.hadoop.conf.Configuration;
 public class SubmarineConfiguration extends Configuration {
   private static final String SUBMARINE_CONFIGURATION_FILE = "submarine.xml";
 
+  public static final String SUBMARINE_CONFIGURATION_PREFIX = "submarine.";
+
+  public static final String SUBMARINE_LOCALIZATION_PREFIX =
+      SUBMARINE_CONFIGURATION_PREFIX + "localization.";
+  /**
+   * Limit the size of directory/file to be localized.
+   * To avoid exhausting local disk space,
+   * this limit both remote and local file to be localized
+   */
+  public static final String LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB =
+      SUBMARINE_LOCALIZATION_PREFIX + "max-allowed-file-size-mb";
+
+  // Default 2GB
+  public static final long DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB = 2048;
+
   public SubmarineConfiguration() {
     this(new Configuration(false), true);
   }

+ 66 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java

@@ -14,22 +14,28 @@
 
 package org.apache.hadoop.yarn.submarine.common.fs;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 
 /**
  * Manages remote directories for staging, log, etc.
  * TODO, need to properly handle permission / name validation, etc.
  */
 public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
-  FileSystem fs;
+  private FileSystem fs;
+  private Configuration conf;
 
   public DefaultRemoteDirectoryManager(ClientContext context) {
+    this.conf = context.getYarnConfig();
     try {
       this.fs = FileSystem.get(context.getYarnConfig());
     } catch (IOException e) {
@@ -38,7 +44,8 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
   }
 
   @Override
-  public Path getJobStagingArea(String jobName, boolean create) throws IOException {
+  public Path getJobStagingArea(String jobName, boolean create)
+      throws IOException {
     Path staging = new Path(getJobRootFolder(jobName), "staging");
     if (create) {
       createFolderIfNotExist(staging);
@@ -61,7 +68,8 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
   }
 
   @Override
-  public Path getModelDir(String modelName, boolean create) throws IOException {
+  public Path getModelDir(String modelName, boolean create)
+      throws IOException {
     Path modelDir = new Path(new Path("submarine", "models"), modelName);
     if (create) {
       createFolderIfNotExist(modelDir);
@@ -70,10 +78,15 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
   }
 
   @Override
-  public FileSystem getFileSystem() {
+  public FileSystem getDefaultFileSystem() {
     return fs;
   }
 
+  @Override
+  public FileSystem getFileSystemByUri(String uri) throws IOException {
+    return FileSystem.get(URI.create(uri), conf);
+  }
+
   @Override
   public Path getUserRootFolder() throws IOException {
     Path rootPath = new Path("submarine", "jobs");
@@ -83,6 +96,55 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
     return fStatus.getPath();
   }
 
+  @Override
+  public boolean isDir(String uri) throws IOException {
+    if (isRemote(uri)) {
+      return getFileSystemByUri(uri).getFileStatus(new Path(uri)).isDirectory();
+    }
+    return new File(uri).isDirectory();
+  }
+
+  @Override
+  public boolean isRemote(String uri) {
+    String scheme = new Path(uri).toUri().getScheme();
+    if (null == scheme) {
+      return false;
+    }
+    return !scheme.startsWith("file://");
+  }
+
+  @Override
+  public boolean copyRemoteToLocal(String remoteUri, String localUri)
+      throws IOException {
+    // Delete old to avoid failure in FileUtil.copy
+    File old = new File(localUri);
+    if (old.exists()) {
+      if (!FileUtil.fullyDelete(old)) {
+        throw new IOException("Failed to delete dir:"
+            + old.getAbsolutePath());
+      }
+    }
+    return FileUtil.copy(getFileSystemByUri(remoteUri), new Path(remoteUri),
+        new File(localUri), false,
+        conf);
+  }
+
+  @Override
+  public boolean existsRemoteFile(Path url) throws IOException {
+    return getFileSystemByUri(url.toUri().toString()).exists(url);
+  }
+
+  @Override
+  public FileStatus getRemoteFileStatus(Path url) throws IOException {
+    return getFileSystemByUri(url.toUri().toString()).getFileStatus(url);
+  }
+
+  @Override
+  public long getRemoteFileSize(String uri) throws IOException {
+    return getFileSystemByUri(uri)
+        .getContentSummary(new Path(uri)).getSpaceConsumed();
+  }
+
   private Path getJobRootFolder(String jobName) throws IOException {
     Path userRoot = getUserRootFolder();
     Path jobRootPath = new Path(userRoot, jobName);

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java

@@ -14,6 +14,7 @@
 
 package org.apache.hadoop.yarn.submarine.common.fs;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -26,7 +27,22 @@ public interface RemoteDirectoryManager {
 
   Path getModelDir(String modelName, boolean create) throws IOException;
 
-  FileSystem getFileSystem() throws IOException;
+  FileSystem getDefaultFileSystem() throws IOException;
+
+  FileSystem getFileSystemByUri(String uri) throws IOException;
 
   Path getUserRootFolder() throws IOException;
+
+  boolean isDir(String uri) throws IOException;
+
+  boolean isRemote(String uri) throws IOException;
+
+  boolean copyRemoteToLocal(String remoteUri, String localUri)
+      throws IOException;
+
+  boolean existsRemoteFile(Path uri) throws IOException;
+
+  FileStatus getRemoteFileStatus(Path uri) throws IOException;
+
+  long getRemoteFileSize(String uri) throws IOException;
 }

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java

@@ -42,7 +42,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
   public void addNewJob(String jobName, Map<String, String> jobInfo)
       throws IOException {
     Path jobInfoPath = getJobInfoPath(jobName, true);
-    FSDataOutputStream fos = rdm.getFileSystem().create(jobInfoPath);
+    FSDataOutputStream fos = rdm.getDefaultFileSystem().create(jobInfoPath);
     serializeMap(fos, jobInfo);
   }
 
@@ -50,7 +50,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
   public Map<String, String> getJobInfoByName(String jobName)
       throws IOException {
     Path jobInfoPath = getJobInfoPath(jobName, false);
-    FSDataInputStream fis = rdm.getFileSystem().open(jobInfoPath);
+    FSDataInputStream fis = rdm.getDefaultFileSystem().open(jobInfoPath);
     return deserializeMap(fis);
   }
 
@@ -58,7 +58,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
   public void addNewModel(String modelName, String version,
       Map<String, String> modelInfo) throws IOException {
     Path modelInfoPath = getModelInfoPath(modelName, version, true);
-    FSDataOutputStream fos = rdm.getFileSystem().create(modelInfoPath);
+    FSDataOutputStream fos = rdm.getDefaultFileSystem().create(modelInfoPath);
     serializeMap(fos, modelInfo);
   }
 
@@ -66,7 +66,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
   public Map<String, String> getModelInfoByName(String modelName,
       String version) throws IOException {
     Path modelInfoPath = getModelInfoPath(modelName, version, false);
-    FSDataInputStream fis = rdm.getFileSystem().open(modelInfoPath);
+    FSDataInputStream fis = rdm.getDefaultFileSystem().open(modelInfoPath);
     return deserializeMap(fis);
   }
 

+ 255 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java

@@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,17 +33,21 @@ import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.apache.hadoop.yarn.submarine.client.cli.param.Localization;
 import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
 import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
 import org.apache.hadoop.yarn.submarine.common.Envs;
 import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
 import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -56,6 +61,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
 import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
@@ -307,7 +314,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
   }
 
   private void locateRemoteFileToContainerWorkDir(String destFilename,
-      Component comp, Path uploadedFilePath) throws IOException {
+      Component comp, Path uploadedFilePath)
+      throws IOException {
     FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
 
     FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
@@ -321,7 +329,9 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
 
   private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws
       IOException {
-    FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
+    FileSystem fs = clientContext.getRemoteDirectoryManager()
+        .getDefaultFileSystem();
+
     // Upload to remote FS under staging area
     File localFile = new File(fileToUpload);
     if (!localFile.exists()) {
@@ -368,6 +378,111 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
         localScriptFile);
   }
 
+  private String getLastNameFromPath(String srcFileStr) {
+    return new Path(srcFileStr).getName();
+  }
+
+  /**
+   * May download a remote uri(file/dir) and zip.
+   * Skip download if local dir
+   * Remote uri can be a local dir(won't download)
+   * or remote HDFS dir, s3 dir/file .etc
+   * */
+  private String mayDownloadAndZipIt(String remoteDir, String zipFileName,
+      boolean doZip)
+      throws IOException {
+    RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
+    //Append original modification time and size to zip file name
+    String suffix;
+    String srcDir = remoteDir;
+    String zipDirPath =
+        System.getProperty("java.io.tmpdir") + "/" + zipFileName;
+    boolean needDeleteTempDir = false;
+    if (rdm.isRemote(remoteDir)) {
+      //Append original modification time and size to zip file name
+      FileStatus status = rdm.getRemoteFileStatus(new Path(remoteDir));
+      suffix = "_" + status.getModificationTime()
+          + "-" + rdm.getRemoteFileSize(remoteDir);
+      // Download them to temp dir
+      boolean downloaded = rdm.copyRemoteToLocal(remoteDir, zipDirPath);
+      if (!downloaded) {
+        throw new IOException("Failed to download files from "
+            + remoteDir);
+      }
+      LOG.info("Downloaded remote: {} to local: {}", remoteDir, zipDirPath);
+      srcDir = zipDirPath;
+      needDeleteTempDir = true;
+    } else {
+      File localDir = new File(remoteDir);
+      suffix = "_" + localDir.lastModified()
+          + "-" + localDir.length();
+    }
+    if (!doZip) {
+      return srcDir;
+    }
+    // zip a local dir
+    String zipFileUri = zipDir(srcDir, zipDirPath + suffix + ".zip");
+    // delete downloaded temp dir
+    if (needDeleteTempDir) {
+      deleteFiles(srcDir);
+    }
+    return zipFileUri;
+  }
+
+  @VisibleForTesting
+  public String zipDir(String srcDir, String dstFile) throws IOException {
+    FileOutputStream fos = new FileOutputStream(dstFile);
+    ZipOutputStream zos = new ZipOutputStream(fos);
+    File srcFile = new File(srcDir);
+    LOG.info("Compressing {}", srcDir);
+    addDirToZip(zos, srcFile, srcFile);
+    // close the ZipOutputStream
+    zos.close();
+    LOG.info("Compressed {} to {}", srcDir, dstFile);
+    return dstFile;
+  }
+
+  private void deleteFiles(String localUri) {
+    boolean success = FileUtil.fullyDelete(new File(localUri));
+    if (!success) {
+      LOG.warn("Fail to delete {}", localUri);
+    }
+    LOG.info("Deleted {}", localUri);
+  }
+
+  private void addDirToZip(ZipOutputStream zos, File srcFile, File base)
+      throws IOException {
+    File[] files = srcFile.listFiles();
+    if (null == files) {
+      return;
+    }
+    FileInputStream fis = null;
+    for (int i = 0; i < files.length; i++) {
+      // if it's directory, add recursively
+      if (files[i].isDirectory()) {
+        addDirToZip(zos, files[i], base);
+        continue;
+      }
+      byte[] buffer = new byte[1024];
+      try {
+        fis = new FileInputStream(files[i]);
+        String name =  base.toURI().relativize(files[i].toURI()).getPath();
+        LOG.info(" Zip adding: " + name);
+        zos.putNextEntry(new ZipEntry(name));
+        int length;
+        while ((length = fis.read(buffer)) > 0) {
+          zos.write(buffer, 0, length);
+        }
+        zos.flush();
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+        zos.closeEntry();
+      }
+    }
+  }
+
   private void addWorkerComponent(Service service,
       RunJobParameters parameters, TaskType taskType) throws IOException {
     Component workerComponent = new Component();
@@ -498,6 +613,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
 
     handleServiceEnvs(serviceSpec, parameters);
 
+    handleLocalizations(parameters);
+
     if (parameters.getNumWorkers() > 0) {
       addWorkerComponents(serviceSpec, parameters);
     }
@@ -553,6 +670,142 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
     return serviceSpec;
   }
 
+  /**
+   * Localize dependencies for all containers.
+   * If remoteUri is a local directory,
+   * we'll zip it, upload to HDFS staging dir HDFS.
+   * If remoteUri is directory, we'll download it, zip it and upload
+   * to HDFS.
+   * If localFilePath is ".", we'll use remoteUri's file/dir name
+   * */
+  private void handleLocalizations(RunJobParameters parameters)
+      throws IOException {
+    // Handle localizations
+    Path stagingDir =
+        clientContext.getRemoteDirectoryManager().getJobStagingArea(
+            parameters.getName(), true);
+    List<Localization> locs = parameters.getLocalizations();
+    String remoteUri;
+    String containerLocalPath;
+    RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
+
+    // Check to fail fast
+    for (Localization loc : locs) {
+      remoteUri = loc.getRemoteUri();
+      Path resourceToLocalize = new Path(remoteUri);
+      // Check if remoteUri exists
+      if (rdm.isRemote(remoteUri)) {
+        // check if exists
+        if (!rdm.existsRemoteFile(resourceToLocalize)) {
+          throw new FileNotFoundException(
+              "File " + remoteUri + " doesn't exists.");
+        }
+      } else {
+        // Check if exists
+        File localFile = new File(remoteUri);
+        if (!localFile.exists()) {
+          throw new FileNotFoundException(
+              "File " + remoteUri + " doesn't exists.");
+        }
+      }
+      // check remote file size
+      validFileSize(remoteUri);
+    }
+    // Start download remote if needed and upload to HDFS
+    for (Localization loc : locs) {
+      remoteUri = loc.getRemoteUri();
+      containerLocalPath = loc.getLocalPath();
+      String srcFileStr = remoteUri;
+      ConfigFile.TypeEnum destFileType = ConfigFile.TypeEnum.STATIC;
+      Path resourceToLocalize = new Path(remoteUri);
+      boolean needUploadToHDFS = true;
+
+      /**
+       * Special handling for remoteUri directory.
+       * */
+      boolean needDeleteTempFile = false;
+      if (rdm.isDir(remoteUri)) {
+        destFileType = ConfigFile.TypeEnum.ARCHIVE;
+        srcFileStr = mayDownloadAndZipIt(
+            remoteUri, getLastNameFromPath(srcFileStr), true);
+      } else if (rdm.isRemote(remoteUri)) {
+        if (!needHdfs(remoteUri)) {
+          // Non HDFS remote uri. Non directory, no need to zip
+          srcFileStr = mayDownloadAndZipIt(
+              remoteUri, getLastNameFromPath(srcFileStr), false);
+          needDeleteTempFile = true;
+        } else {
+          // HDFS file, no need to upload
+          needUploadToHDFS = false;
+        }
+      }
+
+      // Upload file to HDFS
+      if (needUploadToHDFS) {
+        resourceToLocalize = uploadToRemoteFile(stagingDir, srcFileStr);
+      }
+      if (needDeleteTempFile) {
+        deleteFiles(srcFileStr);
+      }
+      // Remove .zip from zipped dir name
+      if (destFileType == ConfigFile.TypeEnum.ARCHIVE
+          && srcFileStr.endsWith(".zip")) {
+        // Delete local zip file
+        deleteFiles(srcFileStr);
+        int suffixIndex = srcFileStr.lastIndexOf('_');
+        srcFileStr = srcFileStr.substring(0, suffixIndex);
+      }
+      // If provided, use the name of local uri
+      if (!containerLocalPath.equals(".")
+          && !containerLocalPath.equals("./")) {
+        // Change the YARN localized file name to what'll used in container
+        srcFileStr = getLastNameFromPath(containerLocalPath);
+      }
+      String localizedName = getLastNameFromPath(srcFileStr);
+      LOG.info("The file/dir to be localized is {}",
+          resourceToLocalize.toString());
+      LOG.info("Its localized file name will be {}", localizedName);
+      serviceSpec.getConfiguration().getFiles().add(new ConfigFile().srcFile(
+          resourceToLocalize.toUri().toString()).destFile(localizedName)
+          .type(destFileType));
+      // set mounts
+      // if mount path is absolute, just use it.
+      // if relative, no need to mount explicitly
+      if (containerLocalPath.startsWith("/")) {
+        String mountStr = getLastNameFromPath(srcFileStr) + ":"
+            + containerLocalPath + ":" + loc.getMountPermission();
+        LOG.info("Add bind-mount string {}", mountStr);
+        appendToEnv(serviceSpec, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS",
+            mountStr, ",");
+      }
+    }
+  }
+
+  private void validFileSize(String uri) throws IOException {
+    RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
+    long actualSizeByte;
+    String locationType = "Local";
+    if (rdm.isRemote(uri)) {
+      actualSizeByte = clientContext.getRemoteDirectoryManager()
+          .getRemoteFileSize(uri);
+      locationType = "Remote";
+    } else {
+      actualSizeByte = FileUtil.getDU(new File(uri));
+    }
+    long maxFileSizeMB = clientContext.getSubmarineConfig()
+        .getLong(SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB,
+            SubmarineConfiguration.DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB);
+    LOG.info("{} fie/dir: {}, size(Byte):{},"
+        + " Allowed max file/dir size: {}",
+        locationType, uri, actualSizeByte, maxFileSizeMB * 1024 * 1024);
+
+    if (actualSizeByte > maxFileSizeMB * 1024 * 1024) {
+      throw new IOException(uri + " size(Byte): "
+          + actualSizeByte + " exceeds configured max size:"
+          + maxFileSizeMB * 1024 * 1024);
+    }
+  }
+
   private String generateServiceSpecFile(Service service) throws IOException {
     File serviceSpecFile = File.createTempFile(service.getName(), ".json");
     String buffer = jsonSerDeser.toJson(service);
@@ -622,8 +875,6 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
     return appid;
   }
 
-
-
   @VisibleForTesting
   public Service getServiceSpec() {
     return serviceSpec;

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md

@@ -70,7 +70,33 @@ usage: job run
                               directly used to launch the worker
  -worker_resources <arg>      Resource of each worker, for example
                               memory-mb=2048,vcores=2,yarn.io/gpu=2
+ -localization <arg>          Specify localization to remote/local
+                              file/directory available to all container(Docker).
+                              Argument format is "RemoteUri:LocalFilePath[:rw]"
+                              (ro permission is not supported yet).
+                              The RemoteUri can be a file or directory in local
+                              or HDFS or s3 or abfs or http .etc.
+                              The LocalFilePath can be absolute or relative.
+                              If relative, it'll be under container's implied
+                              working directory.
+                              This option can be set mutiple times.
+                              Examples are
+                              -localization "hdfs:///user/yarn/mydir2:/opt/data"
+                              -localization "s3a:///a/b/myfile1:./"
+                              -localization "https:///a/b/myfile2:./myfile"
+                              -localization "/user/yarn/mydir3:/opt/mydir3"
+                              -localization "./mydir1:."
 ```
+### Submarine Configuration
+
+For submarine internal configuration, please create a `submarine.xml` which should be placed under `$HADOOP_CONF_DIR`.
+
+|Configuration Name | Description |
+|:---- |:---- |
+| `submarine.runtime.class` | Optional. Full qualified class name for your runtime factory. |
+| `submarine.localization.max-allowed-file-size-mb` | Optional. This sets a size limit to the file/directory to be localized in "-localization" CLI option. 2GB by default. |
+
+
 
 ### Launch Standalone Tensorflow Application:
 

+ 772 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java

@@ -19,15 +19,20 @@
 package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
 import org.apache.hadoop.yarn.submarine.common.MockClientContext;
 import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
 import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
 import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
 import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
@@ -38,15 +43,22 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestYarnServiceRunJobCli {
@@ -137,13 +149,13 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
             "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
             "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
             "ps.image", "--worker_docker_image", "worker.image",
-            "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+            "--ps_launch_cmd", "python run-ps.py", "--verbose"});
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
     Assert.assertEquals(3, serviceSpec.getComponents().size());
@@ -162,14 +174,14 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
             "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
             "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
             "ps.image", "--worker_docker_image", "worker.image",
             "--tensorboard", "--ps_launch_cmd", "python run-ps.py",
-            "--verbose" });
+            "--verbose"});
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
     Assert.assertEquals(4, serviceSpec.getComponents().size());
@@ -192,10 +204,10 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
-            "--worker_resources", "memory=2G,vcores=2", "--verbose" });
+            "--worker_resources", "memory=2G,vcores=2", "--verbose"});
 
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
@@ -212,9 +224,9 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
-            "--num_workers", "0", "--tensorboard", "--verbose" });
+            "--num_workers", "0", "--tensorboard", "--verbose"});
 
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
@@ -233,11 +245,11 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "0", "--tensorboard", "--verbose",
             "--tensorboard_resources", "memory=2G,vcores=2",
-            "--tensorboard_docker_image", "tb_docker_image:001" });
+            "--tensorboard_docker_image", "tb_docker_image:001"});
 
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
@@ -256,10 +268,10 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--num_workers", "0", "--tensorboard", "--verbose",
             "--tensorboard_resources", "memory=2G,vcores=2",
-            "--tensorboard_docker_image", "tb_docker_image:001" });
+            "--tensorboard_docker_image", "tb_docker_image:001"});
 
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
@@ -307,7 +319,7 @@ public class TestYarnServiceRunJobCli {
       Assert.assertEquals(
           runJobCli.getRunJobParameters().getTensorboardDockerImage(),
           tensorboardComp.getArtifact().getId());
-    } else{
+    } else {
       Assert.assertNull(tensorboardComp.getArtifact());
     }
 
@@ -352,11 +364,11 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
             "--worker_resources", "memory=2G,vcores=2", "--tensorboard",
-            "--verbose" });
+            "--verbose"});
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
 
@@ -376,10 +388,10 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--num_workers", "1",
             "--worker_launch_cmd", "python run-job.py", "--worker_resources",
-            "memory=2G,vcores=2", "--tensorboard", "--verbose" });
+            "memory=2G,vcores=2", "--tensorboard", "--verbose"});
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
 
@@ -398,11 +410,11 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
             "--worker_resources", "memory=2G,vcores=2", "--tensorboard", "true",
-            "--verbose" });
+            "--verbose"});
     SubmarineStorage storage =
         mockClientContext.getRuntimeFactory().getSubmarineStorage();
     Map<String, String> jobInfo = storage.getJobInfoByName("my-job");
@@ -419,7 +431,7 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
             "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
@@ -427,7 +439,7 @@ public class TestYarnServiceRunJobCli {
             "ps.image", "--worker_docker_image", "worker.image",
             "--ps_launch_cmd", "python run-ps.py", "--verbose", "--quicklink",
             "AAA=http://master-0:8321", "--quicklink",
-            "BBB=http://worker-0:1234" });
+            "BBB=http://worker-0:1234"});
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
     Assert.assertEquals(3, serviceSpec.getComponents().size());
@@ -447,7 +459,7 @@ public class TestYarnServiceRunJobCli {
     Assert.assertFalse(SubmarineLogs.isVerbose());
 
     runJobCli.run(
-        new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
             "--input_path", "s3://input", "--checkpoint_path", "s3://output",
             "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
             "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
@@ -455,7 +467,7 @@ public class TestYarnServiceRunJobCli {
             "ps.image", "--worker_docker_image", "worker.image",
             "--ps_launch_cmd", "python run-ps.py", "--verbose", "--quicklink",
             "AAA=http://master-0:8321", "--quicklink",
-            "BBB=http://worker-0:1234", "--tensorboard" });
+            "BBB=http://worker-0:1234", "--tensorboard"});
     Service serviceSpec = getServiceSpecFromJobSubmitter(
         runJobCli.getJobSubmitter());
     Assert.assertEquals(4, serviceSpec.getComponents().size());
@@ -468,4 +480,741 @@ public class TestYarnServiceRunJobCli {
             YarnServiceJobSubmitter.TENSORBOARD_QUICKLINK_LABEL,
             "http://tensorboard-0.my-job.username.null:6006"));
   }
+
+  /**
+   * Basic test.
+   * In one hand, create local temp file/dir for hdfs URI in
+   * local staging dir.
+   * In the other hand, use MockRemoteDirectoryManager mock
+   * implementation when check FileStatus or exists of HDFS file/dir
+   * --localization hdfs:///user/yarn/script1.py:.
+   * --localization /temp/script2.py:./
+   * --localization /temp/script2.py:/opt/script.py
+   */
+  @Test
+  public void testRunJobWithBasicLocalization() throws Exception {
+    String remoteUrl = "hdfs:///user/yarn/script1.py";
+    String containerLocal1 = ".";
+    String localUrl = "/temp/script2.py";
+    String containerLocal2 = "./";
+    String containerLocal3 = "/opt/script.py";
+    String fakeLocalDir = System.getProperty("java.io.tmpdir");
+    // create local file, we need to put it under local temp dir
+    File localFile1 = new File(fakeLocalDir,
+        new Path(localUrl).getName());
+    localFile1.createNewFile();
+
+
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    RemoteDirectoryManager spyRdm =
+        spy(mockClientContext.getRemoteDirectoryManager());
+    mockClientContext.setRemoteDirectoryMgr(spyRdm);
+
+    // create remote file in local staging dir to simulate HDFS
+    Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+        .getJobStagingArea("my-job", true);
+    File remoteFile1 = new File(stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUrl).getName());
+    remoteFile1.createNewFile();
+
+    Assert.assertTrue(localFile1.exists());
+    Assert.assertTrue(remoteFile1.exists());
+
+    runJobCli.run(
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+            "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+            "ps.image", "--worker_docker_image", "worker.image",
+            "--ps_launch_cmd", "python run-ps.py", "--verbose",
+            "--localization",
+            remoteUrl + ":" + containerLocal1,
+            "--localization",
+            localFile1.getAbsolutePath() + ":" + containerLocal2,
+            "--localization",
+            localFile1.getAbsolutePath() + ":" + containerLocal3});
+    Service serviceSpec = getServiceSpecFromJobSubmitter(
+        runJobCli.getJobSubmitter());
+    Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+    // No remote dir and hdfs file exists. Ensure download 0 times
+    verify(spyRdm, times(0)).copyRemoteToLocal(
+        anyString(), anyString());
+    // Ensure local original files are not deleted
+    Assert.assertTrue(localFile1.exists());
+
+    List<ConfigFile> files = serviceSpec.getConfiguration().getFiles();
+    Assert.assertEquals(3, files.size());
+    ConfigFile file = files.get(0);
+    Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+    String expectedSrcLocalization = remoteUrl;
+    Assert.assertEquals(expectedSrcLocalization,
+        file.getSrcFile());
+    String expectedDstFileName = new Path(remoteUrl).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(1);
+    Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(localUrl).getName();
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    expectedDstFileName = new Path(localUrl).getName();
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+
+    file = files.get(2);
+    Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(localUrl).getName();
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    expectedDstFileName = new Path(localUrl).getName();
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+
+    // Ensure env value is correct
+    String env = serviceSpec.getConfiguration().getEnv()
+        .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+    String expectedMounts = new Path(containerLocal3).getName()
+        + ":" + containerLocal3 + ":rw";
+    Assert.assertTrue(env.contains(expectedMounts));
+
+    remoteFile1.delete();
+    localFile1.delete();
+  }
+
+  /**
+   * Non HDFS remote URI test.
+   * --localization https://a/b/1.patch:.
+   * --localization s3a://a/dir:/opt/mys3dir
+   */
+  @Test
+  public void testRunJobWithNonHDFSRemoteLocalization() throws Exception {
+    String remoteUri1 = "https://a/b/1.patch";
+    String containerLocal1 = ".";
+    String remoteUri2 = "s3a://a/s3dir";
+    String containerLocal2 = "/opt/mys3dir";
+
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    RemoteDirectoryManager spyRdm =
+        spy(mockClientContext.getRemoteDirectoryManager());
+    mockClientContext.setRemoteDirectoryMgr(spyRdm);
+
+    // create remote file in local staging dir to simulate HDFS
+    Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+        .getJobStagingArea("my-job", true);
+    File remoteFile1 = new File(stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUri1).getName());
+    remoteFile1.createNewFile();
+
+    File remoteDir1 = new File(stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUri2).getName());
+    remoteDir1.mkdir();
+    File remoteDir1File1 = new File(remoteDir1, "afile");
+    remoteDir1File1.createNewFile();
+
+    Assert.assertTrue(remoteFile1.exists());
+    Assert.assertTrue(remoteDir1.exists());
+    Assert.assertTrue(remoteDir1File1.exists());
+
+    String suffix1 = "_" + remoteDir1.lastModified()
+        + "-" + mockClientContext.getRemoteDirectoryManager()
+        .getRemoteFileSize(remoteUri2);
+    runJobCli.run(
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+            "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+            "ps.image", "--worker_docker_image", "worker.image",
+            "--ps_launch_cmd", "python run-ps.py", "--verbose",
+            "--localization",
+            remoteUri1 + ":" + containerLocal1,
+            "--localization",
+            remoteUri2 + ":" + containerLocal2});
+    Service serviceSpec = getServiceSpecFromJobSubmitter(
+        runJobCli.getJobSubmitter());
+    Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+    // Ensure download remote dir 2 times
+    verify(spyRdm, times(2)).copyRemoteToLocal(
+        anyString(), anyString());
+
+    // Ensure downloaded temp files are deleted
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUri1).getName()).exists());
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUri2).getName()).exists());
+
+    // Ensure zip file are deleted
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUri2).getName()
+        + "_" + suffix1 + ".zip").exists());
+
+    List<ConfigFile> files = serviceSpec.getConfiguration().getFiles();
+    Assert.assertEquals(2, files.size());
+    ConfigFile file = files.get(0);
+    Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+    String expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUri1).getName();
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    String expectedDstFileName = new Path(remoteUri1).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(1);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUri2).getName() + suffix1 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+
+    expectedDstFileName = new Path(containerLocal2).getName();
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+
+    // Ensure env value is correct
+    String env = serviceSpec.getConfiguration().getEnv()
+        .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+    String expectedMounts = new Path(remoteUri2).getName()
+        + ":" + containerLocal2 + ":rw";
+    Assert.assertTrue(env.contains(expectedMounts));
+
+    remoteDir1File1.delete();
+    remoteFile1.delete();
+    remoteDir1.delete();
+  }
+
+  /**
+   * Test HDFS dir localization.
+   * --localization hdfs:///user/yarn/mydir:./mydir1
+   * --localization hdfs:///user/yarn/mydir2:/opt/dir2:rw
+   * --localization hdfs:///user/yarn/mydir:.
+   * --localization hdfs:///user/yarn/mydir2:./
+   */
+  @Test
+  public void testRunJobWithHdfsDirLocalization() throws Exception {
+    String remoteUrl = "hdfs:///user/yarn/mydir";
+    String containerPath = "./mydir1";
+    String remoteUrl2 = "hdfs:///user/yarn/mydir2";
+    String containPath2 = "/opt/dir2";
+    String containerPath3 = ".";
+    String containerPath4 = "./";
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    RemoteDirectoryManager spyRdm =
+        spy(mockClientContext.getRemoteDirectoryManager());
+    mockClientContext.setRemoteDirectoryMgr(spyRdm);
+    // create remote file in local staging dir to simulate HDFS
+    Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+        .getJobStagingArea("my-job", true);
+    File remoteDir1 = new File(stagingDir.toUri().getPath().toString()
+        + "/" + new Path(remoteUrl).getName());
+    remoteDir1.mkdir();
+    File remoteFile1 = new File(remoteDir1.getAbsolutePath() + "/1.py");
+    File remoteFile2 = new File(remoteDir1.getAbsolutePath() + "/2.py");
+    remoteFile1.createNewFile();
+    remoteFile2.createNewFile();
+
+    File remoteDir2 = new File(stagingDir.toUri().getPath().toString()
+        + "/" + new Path(remoteUrl2).getName());
+    remoteDir2.mkdir();
+    File remoteFile3 = new File(remoteDir1.getAbsolutePath() + "/3.py");
+    File remoteFile4 = new File(remoteDir1.getAbsolutePath() + "/4.py");
+    remoteFile3.createNewFile();
+    remoteFile4.createNewFile();
+
+    Assert.assertTrue(remoteDir1.exists());
+    Assert.assertTrue(remoteDir2.exists());
+
+    String suffix1 = "_" + remoteDir1.lastModified()
+        + "-" + mockClientContext.getRemoteDirectoryManager()
+        .getRemoteFileSize(remoteUrl);
+    String suffix2 = "_" + remoteDir2.lastModified()
+        + "-" + mockClientContext.getRemoteDirectoryManager()
+        .getRemoteFileSize(remoteUrl2);
+    runJobCli.run(
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+            "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+            "ps.image", "--worker_docker_image", "worker.image",
+            "--ps_launch_cmd", "python run-ps.py", "--verbose",
+            "--localization",
+            remoteUrl + ":" + containerPath,
+            "--localization",
+            remoteUrl2 + ":" + containPath2 + ":rw",
+            "--localization",
+            remoteUrl + ":" + containerPath3,
+            "--localization",
+            remoteUrl2 + ":" + containerPath4});
+    Service serviceSpec = getServiceSpecFromJobSubmitter(
+        runJobCli.getJobSubmitter());
+    Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+    // Ensure download remote dir 4 times
+    verify(spyRdm, times(4)).copyRemoteToLocal(
+        anyString(), anyString());
+
+    // Ensure downloaded temp files are deleted
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUrl).getName()).exists());
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUrl2).getName()).exists());
+    // Ensure zip file are deleted
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUrl).getName()
+        + suffix1 + ".zip").exists());
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(remoteUrl2).getName()
+        + suffix2 + ".zip").exists());
+
+    // Ensure files will be localized
+    List<ConfigFile> files = serviceSpec.getConfiguration().getFiles();
+    Assert.assertEquals(4, files.size());
+    ConfigFile file = files.get(0);
+    // The hdfs dir should be download and compress and let YARN to uncompress
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    String expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUrl).getName() + suffix1 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+
+    // Relative path in container, but not "." or "./". Use its own name
+    String expectedDstFileName = new Path(containerPath).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(1);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUrl2).getName() + suffix2 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+
+    expectedDstFileName = new Path(containPath2).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(2);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUrl).getName() + suffix1 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    // Relative path in container ".", use remote path name
+    expectedDstFileName = new Path(remoteUrl).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(3);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUrl2).getName() + suffix2 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    // Relative path in container "./", use remote path name
+    expectedDstFileName = new Path(remoteUrl2).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    // Ensure mounts env value is correct. Add one mount string
+    String env = serviceSpec.getConfiguration().getEnv()
+        .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+
+    String expectedMounts =
+        new Path(containPath2).getName() + ":" + containPath2 + ":rw";
+    Assert.assertTrue(env.contains(expectedMounts));
+
+    remoteFile1.delete();
+    remoteFile2.delete();
+    remoteFile3.delete();
+    remoteFile4.delete();
+    remoteDir1.delete();
+    remoteDir2.delete();
+  }
+
+  /**
+   * Test if file/dir to be localized whose size exceeds limit.
+   * Max 10MB in configuration, mock remote will
+   * always return file size 100MB.
+   * This configuration will fail the job which has remoteUri
+   * But don't impact local dir/file
+   *
+   * --localization https://a/b/1.patch:.
+   * --localization s3a://a/dir:/opt/mys3dir
+   * --localization /temp/script2.py:./
+   */
+  @Test
+  public void testRunJobRemoteUriExceedLocalizationSize() throws Exception {
+    String remoteUri1 = "https://a/b/1.patch";
+    String containerLocal1 = ".";
+    String remoteUri2 = "s3a://a/s3dir";
+    String containerLocal2 = "/opt/mys3dir";
+    String localUri1 = "/temp/script2";
+    String containerLocal3 = "./";
+
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    SubmarineConfiguration submarineConf = new SubmarineConfiguration();
+    RemoteDirectoryManager spyRdm =
+        spy(mockClientContext.getRemoteDirectoryManager());
+    mockClientContext.setRemoteDirectoryMgr(spyRdm);
+    /**
+     * Max 10MB, mock remote will always return file size 100MB.
+     * */
+    submarineConf.set(
+        SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB,
+        "10");
+    mockClientContext.setSubmarineConfig(submarineConf);
+
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    // create remote file in local staging dir to simulate
+    Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+        .getJobStagingArea("my-job", true);
+    File remoteFile1 = new File(stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUri1).getName());
+    remoteFile1.createNewFile();
+    File remoteDir1 = new File(stagingDir.toUri().getPath()
+        + "/" + new Path(remoteUri2).getName());
+    remoteDir1.mkdir();
+
+    File remoteDir1File1 = new File(remoteDir1, "afile");
+    remoteDir1File1.createNewFile();
+
+    String fakeLocalDir = System.getProperty("java.io.tmpdir");
+    // create local file, we need to put it under local temp dir
+    File localFile1 = new File(fakeLocalDir,
+        new Path(localUri1).getName());
+    localFile1.createNewFile();
+
+    Assert.assertTrue(remoteFile1.exists());
+    Assert.assertTrue(remoteDir1.exists());
+    Assert.assertTrue(remoteDir1File1.exists());
+
+    String suffix1 = "_" + remoteDir1.lastModified()
+        + "-" + remoteDir1.length();
+    try {
+      runJobCli = new RunJobCli(mockClientContext);
+      runJobCli.run(
+          new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+              "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+              "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+              "python run-job.py", "--worker_resources",
+              "memory=2048M,vcores=2",
+              "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+              "ps.image", "--worker_docker_image", "worker.image",
+              "--ps_launch_cmd", "python run-ps.py", "--verbose",
+              "--localization",
+              remoteUri1 + ":" + containerLocal1});
+    } catch (IOException e) {
+      // Shouldn't have exception because it's within file size limit
+      Assert.assertFalse(true);
+    }
+    // we should download because fail fast
+    verify(spyRdm, times(1)).copyRemoteToLocal(
+        anyString(), anyString());
+    try {
+      // reset
+      reset(spyRdm);
+      runJobCli = new RunJobCli(mockClientContext);
+      runJobCli.run(
+          new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+              "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+              "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+              "python run-job.py", "--worker_resources",
+              "memory=2048M,vcores=2",
+              "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+              "ps.image", "--worker_docker_image", "worker.image",
+              "--ps_launch_cmd", "python run-ps.py", "--verbose",
+              "--localization",
+              remoteUri1 + ":" + containerLocal1,
+              "--localization",
+              remoteUri2 + ":" + containerLocal2,
+              "--localization",
+              localFile1.getAbsolutePath() + ":" + containerLocal3});
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("104857600 exceeds configured max size:10485760"));
+      // we shouldn't do any download because fail fast
+      verify(spyRdm, times(0)).copyRemoteToLocal(
+          anyString(), anyString());
+    }
+
+    try {
+      runJobCli = new RunJobCli(mockClientContext);
+      runJobCli.run(
+          new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+              "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+              "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+              "python run-job.py", "--worker_resources",
+              "memory=2048M,vcores=2",
+              "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+              "ps.image", "--worker_docker_image", "worker.image",
+              "--ps_launch_cmd", "python run-ps.py", "--verbose",
+              "--localization",
+              localFile1.getAbsolutePath() + ":" + containerLocal3});
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("104857600 exceeds configured max size:10485760"));
+      // we shouldn't do any download because fail fast
+      verify(spyRdm, times(0)).copyRemoteToLocal(
+          anyString(), anyString());
+    }
+
+    localFile1.delete();
+    remoteDir1File1.delete();
+    remoteFile1.delete();
+    remoteDir1.delete();
+  }
+
+  /**
+   * Test remote Uri doesn't exist.
+   * */
+  @Test
+  public void testRunJobWithNonExistRemoteUri() throws Exception {
+    String remoteUri1 = "hdfs:///a/b/1.patch";
+    String containerLocal1 = ".";
+    String localUri1 = "/a/b/c";
+    String containerLocal2 = "./";
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    try {
+      runJobCli.run(
+          new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+              "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+              "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+              "python run-job.py", "--worker_resources",
+              "memory=2048M,vcores=2",
+              "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+              "ps.image", "--worker_docker_image", "worker.image",
+              "--ps_launch_cmd", "python run-ps.py", "--verbose",
+              "--localization",
+              remoteUri1 + ":" + containerLocal1});
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("doesn't exists"));
+    }
+
+    try {
+      runJobCli = new RunJobCli(mockClientContext);
+      runJobCli.run(
+          new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+              "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+              "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+              "python run-job.py", "--worker_resources",
+              "memory=2048M,vcores=2",
+              "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+              "ps.image", "--worker_docker_image", "worker.image",
+              "--ps_launch_cmd", "python run-ps.py", "--verbose",
+              "--localization",
+              localUri1 + ":" + containerLocal2});
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("doesn't exists"));
+    }
+  }
+
+  /**
+   * Test local dir
+   * --localization /user/yarn/mydir:./mydir1
+   * --localization /user/yarn/mydir2:/opt/dir2:rw
+   * --localization /user/yarn/mydir2:.
+   */
+  @Test
+  public void testRunJobWithLocalDirLocalization() throws Exception {
+    String fakeLocalDir = System.getProperty("java.io.tmpdir");
+    String localUrl = "/user/yarn/mydir";
+    String containerPath = "./mydir1";
+    String localUrl2 = "/user/yarn/mydir2";
+    String containPath2 = "/opt/dir2";
+    String containerPath3 = ".";
+
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    Assert.assertFalse(SubmarineLogs.isVerbose());
+
+    RemoteDirectoryManager spyRdm =
+        spy(mockClientContext.getRemoteDirectoryManager());
+    mockClientContext.setRemoteDirectoryMgr(spyRdm);
+    // create local file
+    File localDir1 = new File(fakeLocalDir,
+        localUrl);
+    localDir1.mkdirs();
+    File temp1 = new File(localDir1.getAbsolutePath() + "/1.py");
+    File temp2 = new File(localDir1.getAbsolutePath() + "/2.py");
+    temp1.createNewFile();
+    temp2.createNewFile();
+
+    File localDir2 = new File(fakeLocalDir,
+        localUrl2);
+    localDir2.mkdirs();
+    File temp3 = new File(localDir1.getAbsolutePath() + "/3.py");
+    File temp4 = new File(localDir1.getAbsolutePath() + "/4.py");
+    temp3.createNewFile();
+    temp4.createNewFile();
+
+    Assert.assertTrue(localDir1.exists());
+    Assert.assertTrue(localDir2.exists());
+
+    String suffix1 = "_" + localDir1.lastModified()
+        + "-" + localDir1.length();
+    String suffix2 = "_" + localDir2.lastModified()
+        + "-" + localDir2.length();
+
+    runJobCli.run(
+        new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+            "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+            "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+            "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+            "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+            "ps.image", "--worker_docker_image", "worker.image",
+            "--ps_launch_cmd", "python run-ps.py", "--verbose",
+            "--localization",
+            fakeLocalDir + localUrl + ":" + containerPath,
+            "--localization",
+            fakeLocalDir + localUrl2 + ":" + containPath2 + ":rw",
+            "--localization",
+            fakeLocalDir + localUrl2 + ":" + containerPath3});
+
+    Service serviceSpec = getServiceSpecFromJobSubmitter(
+        runJobCli.getJobSubmitter());
+    Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+    // we shouldn't do any download
+    verify(spyRdm, times(0)).copyRemoteToLocal(
+        anyString(), anyString());
+
+    // Ensure local original files are not deleted
+    Assert.assertTrue(localDir1.exists());
+    Assert.assertTrue(localDir2.exists());
+
+    // Ensure zip file are deleted
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(localUrl).getName()
+        + suffix1 + ".zip").exists());
+    Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+        + "/" + new Path(localUrl2).getName()
+        + suffix2 + ".zip").exists());
+
+    // Ensure dirs will be zipped and localized
+    List<ConfigFile> files = serviceSpec.getConfiguration().getFiles();
+    Assert.assertEquals(3, files.size());
+    ConfigFile file = files.get(0);
+    Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+        .getJobStagingArea("my-job", true);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    String expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(localUrl).getName() + suffix1 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    String expectedDstFileName = new Path(containerPath).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(1);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(localUrl2).getName() + suffix2 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    expectedDstFileName = new Path(containPath2).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    file = files.get(2);
+    Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+    expectedSrcLocalization = stagingDir.toUri().getPath()
+        + "/" + new Path(localUrl2).getName() + suffix2 + ".zip";
+    Assert.assertEquals(expectedSrcLocalization,
+        new Path(file.getSrcFile()).toUri().getPath());
+    expectedDstFileName = new Path(localUrl2).getName();
+    Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+    // Ensure mounts env value is correct
+    String env = serviceSpec.getConfiguration().getEnv()
+        .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+    String expectedMounts = new Path(containPath2).getName()
+        + ":" + containPath2 + ":rw";
+
+    Assert.assertTrue(env.contains(expectedMounts));
+
+    temp1.delete();
+    temp2.delete();
+    temp3.delete();
+    temp4.delete();
+    localDir2.delete();
+    localDir1.delete();
+  }
+
+  /**
+   * Test zip function.
+   * A dir "/user/yarn/mydir" has two files and one subdir
+   * */
+  @Test
+  public void testYarnServiceSubmitterZipFunction()
+      throws Exception {
+    MockClientContext mockClientContext =
+        YarnServiceCliTestUtils.getMockClientContext();
+    RunJobCli runJobCli = new RunJobCli(mockClientContext);
+    YarnServiceJobSubmitter submitter =
+        (YarnServiceJobSubmitter)mockClientContext
+            .getRuntimeFactory().getJobSubmitterInstance();
+    String fakeLocalDir = System.getProperty("java.io.tmpdir");
+    String localUrl = "/user/yarn/mydir";
+    String localSubDirName = "subdir1";
+    // create local file
+    File localDir1 = new File(fakeLocalDir,
+        localUrl);
+    localDir1.mkdirs();
+    File temp1 = new File(localDir1.getAbsolutePath() + "/1.py");
+    File temp2 = new File(localDir1.getAbsolutePath() + "/2.py");
+    temp1.createNewFile();
+    temp2.createNewFile();
+
+
+    File localSubDir = new File(localDir1.getAbsolutePath(), localSubDirName);
+    localSubDir.mkdir();
+    File temp3 = new File(localSubDir.getAbsolutePath(), "3.py");
+    temp3.createNewFile();
+
+
+    String zipFilePath = submitter.zipDir(localDir1.getAbsolutePath(),
+        fakeLocalDir + "/user/yarn/mydir.zip");
+    File zipFile = new File(zipFilePath);
+    File unzipTargetDir = new File(fakeLocalDir, "unzipDir");
+    FileUtil.unZip(zipFile, unzipTargetDir);
+    Assert.assertTrue(
+        new File(fakeLocalDir + "/unzipDir/1.py").exists());
+    Assert.assertTrue(
+        new File(fakeLocalDir + "/unzipDir/2.py").exists());
+    Assert.assertTrue(
+        new File(fakeLocalDir + "/unzipDir/subdir1").exists());
+    Assert.assertTrue(
+        new File(fakeLocalDir + "/unzipDir/subdir1/3.py").exists());
+
+    zipFile.delete();
+    unzipTargetDir.delete();
+    temp1.delete();
+    temp2.delete();
+    temp3.delete();
+    localSubDir.delete();
+    localDir1.delete();
+  }
+
 }

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java

@@ -31,7 +31,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class MockClientContext extends ClientContext {
-  private MockRemoteDirectoryManager remoteDirectoryMgr =
+
+  private RemoteDirectoryManager remoteDirectoryMgr =
       new MockRemoteDirectoryManager();
 
   @Override
@@ -39,6 +40,11 @@ public class MockClientContext extends ClientContext {
     return remoteDirectoryMgr;
   }
 
+  public void setRemoteDirectoryMgr(
+      RemoteDirectoryManager remoteDirectoryMgr) {
+    this.remoteDirectoryMgr = remoteDirectoryMgr;
+  }
+
   @Override
   public synchronized YarnClient getOrCreateYarnClient() {
     YarnClient client = mock(YarnClient.class);

+ 85 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java

@@ -19,7 +19,9 @@
 package org.apache.hadoop.yarn.submarine.common.fs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 import java.io.File;
@@ -29,6 +31,7 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
   private File jobsParentDir = null;
   private File modelParentDir = null;
 
+  private File jobDir = null;
   @Override
   public Path getJobStagingArea(String jobName, boolean create)
       throws IOException {
@@ -41,10 +44,11 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
       }
     }
 
-    File jobDir = new File(jobsParentDir.getAbsolutePath(), jobName);
+    this.jobDir = new File(jobsParentDir.getAbsolutePath(), jobName);
     if (create && !jobDir.exists()) {
       if (!jobDir.mkdirs()) {
-        throw new IOException("Failed to mkdirs for " + jobDir.getAbsolutePath());
+        throw new IOException("Failed to mkdirs for "
+            + jobDir.getAbsolutePath());
       }
     }
     return new Path(jobDir.getAbsolutePath());
@@ -57,7 +61,8 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
   }
 
   @Override
-  public Path getModelDir(String modelName, boolean create) throws IOException {
+  public Path getModelDir(String modelName, boolean create)
+      throws IOException {
     if (modelParentDir == null && create) {
       modelParentDir = new File(
           "target/_models_" + System.currentTimeMillis());
@@ -70,19 +75,94 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
     File modelDir = new File(modelParentDir.getAbsolutePath(), modelName);
     if (create) {
       if (!modelDir.exists() && !modelDir.mkdirs()) {
-        throw new IOException("Failed to mkdirs for " + modelDir.getAbsolutePath());
+        throw new IOException("Failed to mkdirs for "
+            + modelDir.getAbsolutePath());
       }
     }
     return new Path(modelDir.getAbsolutePath());
   }
 
   @Override
-  public FileSystem getFileSystem() throws IOException {
+  public FileSystem getDefaultFileSystem() throws IOException {
     return FileSystem.getLocal(new Configuration());
   }
 
+  @Override
+  public FileSystem getFileSystemByUri(String uri) throws IOException {
+    return getDefaultFileSystem();
+  }
+
   @Override
   public Path getUserRootFolder() throws IOException {
     return new Path("s3://generated_root_dir");
   }
+
+  @Override
+  public boolean isDir(String uri) throws IOException {
+    return getDefaultFileSystem().getFileStatus(
+        new Path(convertToStagingPath(uri))).isDirectory();
+
+  }
+
+  @Override
+  public boolean isRemote(String uri) throws IOException {
+    String scheme = new Path(uri).toUri().getScheme();
+    if (null == scheme) {
+      return false;
+    }
+    return !scheme.startsWith("file://");
+  }
+
+  private String convertToStagingPath(String uri) throws IOException {
+    String ret = uri;
+    if (isRemote(uri)) {
+      String dirName = new Path(uri).getName();
+      ret = this.jobDir.getAbsolutePath()
+          + "/" + dirName;
+    }
+    return ret;
+  }
+
+  /**
+   * We use staging dir as mock HDFS dir.
+   * */
+  @Override
+  public boolean copyRemoteToLocal(String remoteUri, String localUri)
+      throws IOException {
+    // mock the copy from HDFS into a local copy
+    Path remoteToLocalDir = new Path(convertToStagingPath(remoteUri));
+    File old = new File(convertToStagingPath(localUri));
+    if (old.isDirectory() && old.exists()) {
+      if (!FileUtil.fullyDelete(old)) {
+        throw new IOException("Cannot delete temp dir:"
+            + old.getAbsolutePath());
+      }
+    }
+    return FileUtil.copy(getDefaultFileSystem(), remoteToLocalDir,
+        new File(localUri), false,
+        getDefaultFileSystem().getConf());
+  }
+
+  @Override
+  public boolean existsRemoteFile(Path uri) throws IOException {
+    String fakeLocalFilePath = this.jobDir.getAbsolutePath()
+        + "/" + uri.getName();
+    return new File(fakeLocalFilePath).exists();
+  }
+
+  @Override
+  public FileStatus getRemoteFileStatus(Path p) throws IOException {
+    return getDefaultFileSystem().getFileStatus(new Path(
+        convertToStagingPath(p.toUri().toString())));
+  }
+
+  @Override
+  public long getRemoteFileSize(String uri) throws IOException {
+    // 5 byte for this file to test
+    if (uri.equals("https://a/b/1.patch")) {
+      return 5;
+    }
+    return 100 * 1024 * 1024;
+  }
+
 }