浏览代码

MAPREDUCE-5951. Add support for the YARN Shared Cache.

(cherry picked from commit e46d5bb962b0c942f993afc505b165b1cd96e51b)
Chris Trezzo 7 年之前
父节点
当前提交
3e7c06af41
共有 19 个文件被更改,包括 1711 次插入240 次删除
  1. 17 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  2. 44 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  3. 180 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java
  4. 29 117
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
  5. 9 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
  6. 4 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
  7. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
  8. 226 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
  9. 353 63
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
  10. 71 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  11. 102 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java
  12. 11 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  13. 100 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md
  14. 36 40
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
  15. 365 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java
  16. 46 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  17. 52 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
  18. 59 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
  19. 1 0
      hadoop-project/src/site/site.xml

+ 17 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -1412,6 +1413,20 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         new char[] {'"', '=', '.'});
   }
 
+  /*
+   * The goal is to make sure only the NM that hosts MRAppMaster will upload
+   * resources to shared cache. Clean up the shared cache policies for all
+   * resources so that later when TaskAttemptImpl creates
+   * ContainerLaunchContext, LocalResource.setShouldBeUploadedToSharedCache will
+   * be set up to false. In that way, the NMs that host the task containers
+   * won't try to upload the resources to shared cache.
+   */
+  private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+    Map<String, Boolean> emap = Collections.emptyMap();
+    Job.setArchiveSharedCacheUploadPolicies(conf, emap);
+    Job.setFileSharedCacheUploadPolicies(conf, emap);
+  }
+
   public static class InitTransition 
       implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
@@ -1490,6 +1505,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         job.allowedReduceFailuresPercent =
             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
 
+        cleanupSharedCacheUploadPolicies(job.conf);
+
         // create the Tasks but don't start them yet
         createMapTasks(job, inputLength, taskSplitMetaInfo);
         createReduceTasks(job);

+ 44 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -715,17 +717,38 @@ public abstract class TaskAttemptImpl implements
 
   /**
    * Create a {@link LocalResource} record with all the given parameters.
+   * The NM that hosts AM container will upload resources to shared cache.
+   * Thus there is no need to ask task container's NM to upload the
+   * resources to shared cache. Set the shared cache upload policy to
+   * false.
    */
   private static LocalResource createLocalResource(FileSystem fc, Path file,
-      LocalResourceType type, LocalResourceVisibility visibility)
-      throws IOException {
+      String fileSymlink, LocalResourceType type,
+      LocalResourceVisibility visibility) throws IOException {
     FileStatus fstat = fc.getFileStatus(file);
-    URL resourceURL = URL.fromPath(fc.resolvePath(fstat.getPath()));
+    // We need to be careful when converting from path to URL to add a fragment
+    // so that the symlink name when localized will be correct.
+    Path qualifiedPath = fc.resolvePath(fstat.getPath());
+    URI uriWithFragment = null;
+    boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
+    try {
+      if (useFragment) {
+        uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
+      } else {
+        uriWithFragment = qualifiedPath.toUri();
+      }
+    } catch (URISyntaxException e) {
+      throw new IOException(
+          "Error parsing local resource path."
+              + " Path was not able to be converted to a URI: " + qualifiedPath,
+          e);
+    }
+    URL resourceURL = URL.fromURI(uriWithFragment);
     long resourceSize = fstat.getLen();
     long resourceModificationTime = fstat.getModificationTime();
 
     return LocalResource.newInstance(resourceURL, type, visibility,
-      resourceSize, resourceModificationTime);
+        resourceSize, resourceModificationTime, false);
   }
 
   /**
@@ -842,8 +865,18 @@ public abstract class TaskAttemptImpl implements
       final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
       Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
           jobJarFs.getWorkingDirectory());
-      LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
-          LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
+      LocalResourceVisibility jobJarViz =
+          conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
+              MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.APPLICATION;
+      // We hard code the job.jar localized symlink in the container directory.
+      // This is because the mapreduce app expects the job.jar to be named
+      // accordingly. Additionally we set the shared cache upload policy to
+      // false. Resources are uploaded by the AM if necessary.
+      LocalResource rc =
+          createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR,
+              LocalResourceType.PATTERN, jobJarViz);
       String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
       rc.setPattern(pattern);
@@ -868,9 +901,12 @@ public abstract class TaskAttemptImpl implements
     Path remoteJobConfPath =
         new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
     FileSystem remoteFS = FileSystem.get(conf);
+    // There is no point to ask task container's NM to upload the resource
+    // to shared cache (job conf is not shared). Therefore, createLocalResource
+    // will set the shared cache upload policy to false
     localResources.put(MRJobConfig.JOB_CONF_FILE,
-        createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE,
-            LocalResourceVisibility.APPLICATION));
+        createLocalResource(remoteFS, remoteJobConfPath, null,
+            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
     LOG.info("The job-conf file on the remote FS is "
         + remoteJobConfPath.toUri().toASCIIString());
   }

+ 180 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java

@@ -0,0 +1,180 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+/**
+ * Helper class for MR applications that parses distributed cache artifacts and
+ * creates a map of LocalResources.
+ */
+@SuppressWarnings("deprecation")
+@Private
+@Unstable
+class LocalResourceBuilder {
+  public static final Log LOG = LogFactory.getLog(LocalResourceBuilder.class);
+
+  private Configuration conf;
+  private LocalResourceType type;
+  private URI[] uris;
+  private long[] timestamps;
+  private long[] sizes;
+  private boolean[] visibilities;
+  private Map<String, Boolean> sharedCacheUploadPolicies;
+
+  LocalResourceBuilder() {
+  }
+
+  void setConf(Configuration c) {
+    this.conf = c;
+  }
+
+  void setType(LocalResourceType t) {
+    this.type = t;
+  }
+
+  void setUris(URI[] u) {
+    this.uris = u;
+  }
+
+  void setTimestamps(long[] t) {
+    this.timestamps = t;
+  }
+
+  void setSizes(long[] s) {
+    this.sizes = s;
+  }
+
+  void setVisibilities(boolean[] v) {
+    this.visibilities = v;
+  }
+
+  void setSharedCacheUploadPolicies(Map<String, Boolean> policies) {
+    this.sharedCacheUploadPolicies = policies;
+  }
+
+  void createLocalResources(Map<String, LocalResource> localResources)
+      throws IOException {
+
+    if (uris != null) {
+      // Sanity check
+      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+          (uris.length != visibilities.length)) {
+        throw new IllegalArgumentException("Invalid specification for " +
+            "distributed-cache artifacts of type " + type + " :" +
+            " #uris=" + uris.length +
+            " #timestamps=" + timestamps.length +
+            " #visibilities=" + visibilities.length
+            );
+      }
+
+      for (int i = 0; i < uris.length; ++i) {
+        URI u = uris[i];
+        Path p = new Path(u);
+        FileSystem remoteFS = p.getFileSystem(conf);
+        String linkName = null;
+
+        if (p.getName().equals(DistributedCache.WILDCARD)) {
+          p = p.getParent();
+          linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
+        }
+
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+
+        // If there's no wildcard, try using the fragment for the link
+        if (linkName == null) {
+          linkName = u.getFragment();
+
+          // Because we don't know what's in the fragment, we have to handle
+          // it with care.
+          if (linkName != null) {
+            Path linkPath = new Path(linkName);
+
+            if (linkPath.isAbsolute()) {
+              throw new IllegalArgumentException("Resource name must be "
+                  + "relative");
+            }
+
+            linkName = linkPath.toUri().getPath();
+          }
+        } else if (u.getFragment() != null) {
+          throw new IllegalArgumentException("Invalid path URI: " + p +
+              " - cannot contain both a URI fragment and a wildcard");
+        }
+
+        // If there's no wildcard or fragment, just link to the file name
+        if (linkName == null) {
+          linkName = p.getName();
+        }
+
+        LocalResource orig = localResources.get(linkName);
+        URL url = URL.fromURI(p.toUri());
+        if (orig != null && !orig.getResource().equals(url)) {
+          LOG.warn(getResourceDescription(orig.getType())
+              + toString(orig.getResource()) + " conflicts with "
+              + getResourceDescription(type) + toString(url)
+              + " This will be an error in Hadoop 2.0");
+          continue;
+        }
+        Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString());
+        sharedCachePolicy =
+            sharedCachePolicy == null ? Boolean.FALSE : sharedCachePolicy;
+        localResources.put(linkName, LocalResource.newInstance(URL.fromURI(p
+            .toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC
+                : LocalResourceVisibility.PRIVATE,
+            sizes[i], timestamps[i], sharedCachePolicy));
+      }
+    }
+  }
+
+  private static String getResourceDescription(LocalResourceType type) {
+    if (type == LocalResourceType.ARCHIVE
+        || type == LocalResourceType.PATTERN) {
+      return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
+    }
+    return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
+  }
+
+  private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
+    StringBuffer b = new StringBuffer();
+    b.append(url.getScheme()).append("://").append(url.getHost());
+    if (url.getPort() >= 0) {
+      b.append(":").append(url.getPort());
+    }
+    b.append(url.getFile());
+    return b.toString();
+  }
+}

+ 29 - 117
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskLog;
@@ -67,12 +67,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * Helper class for MR applications
@@ -274,10 +271,17 @@ public class MRApps extends Apps {
   @SuppressWarnings("deprecation")
   public static void addClasspathToEnv(Map<String, String> environment,
       String classpathEnvVar, Configuration conf) throws IOException {
+
+    /*
+     * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for
+     * the case where the job jar is not necessarily named "job.jar". This can
+     * happen, for example, when the job is leveraging a resource from the YARN
+     * shared cache.
+     */
     MRApps.addToEnvironment(
         environment,
         classpathEnvVar,
-        MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
+        MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf);
     MRApps.addToEnvironment(
         environment,
         classpathEnvVar,
@@ -502,23 +506,29 @@ public class MRApps extends Apps {
       Configuration conf, 
       Map<String, LocalResource> localResources) 
   throws IOException {
-    
+
+    LocalResourceBuilder lrb = new LocalResourceBuilder();
+    lrb.setConf(conf);
+
     // Cache archives
-    parseDistributedCacheArtifacts(conf, localResources,  
-        LocalResourceType.ARCHIVE, 
-        DistributedCache.getCacheArchives(conf), 
-        DistributedCache.getArchiveTimestamps(conf),
-        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf));
+    lrb.setType(LocalResourceType.ARCHIVE);
+    lrb.setUris(DistributedCache.getCacheArchives(conf));
+    lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf));
+    lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
+    lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
+    lrb.setSharedCacheUploadPolicies(
+        Job.getArchiveSharedCacheUploadPolicies(conf));
+    lrb.createLocalResources(localResources);
     
     // Cache files
-    parseDistributedCacheArtifacts(conf, 
-        localResources,  
-        LocalResourceType.FILE, 
-        DistributedCache.getCacheFiles(conf),
-        DistributedCache.getFileTimestamps(conf),
-        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf));
+    lrb.setType(LocalResourceType.FILE);
+    lrb.setUris(DistributedCache.getCacheFiles(conf));
+    lrb.setTimestamps(DistributedCache.getFileTimestamps(conf));
+    lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
+    lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
+    lrb.setSharedCacheUploadPolicies(
+        Job.getFileSharedCacheUploadPolicies(conf));
+    lrb.createLocalResources(localResources);
   }
 
   /**
@@ -578,104 +588,6 @@ public class MRApps extends Apps {
     }
   }
 
-  private static String getResourceDescription(LocalResourceType type) {
-    if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
-      return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
-    }
-    return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
-  }
-  
-  private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
-    StringBuffer b = new StringBuffer();
-    b.append(url.getScheme()).append("://").append(url.getHost());
-    if(url.getPort() >= 0) {
-      b.append(":").append(url.getPort());
-    }
-    b.append(url.getFile());
-    return b.toString();
-  }
-  
-  // TODO - Move this to MR!
-  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
-  // long[], boolean[], Path[], FileType)
-  @SuppressWarnings("deprecation")
-  private static void parseDistributedCacheArtifacts(
-      Configuration conf,
-      Map<String, LocalResource> localResources,
-      LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
-  throws IOException {
-
-    if (uris != null) {
-      // Sanity check
-      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
-          (uris.length != visibilities.length)) {
-        throw new IllegalArgumentException("Invalid specification for " +
-            "distributed-cache artifacts of type " + type + " :" +
-            " #uris=" + uris.length +
-            " #timestamps=" + timestamps.length +
-            " #visibilities=" + visibilities.length
-            );
-      }
-      
-      for (int i = 0; i < uris.length; ++i) {
-        URI u = uris[i];
-        Path p = new Path(u);
-        FileSystem remoteFS = p.getFileSystem(conf);
-        String linkName = null;
-
-        if (p.getName().equals(DistributedCache.WILDCARD)) {
-          p = p.getParent();
-          linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
-        }
-
-        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-            remoteFS.getWorkingDirectory()));
-
-        // If there's no wildcard, try using the fragment for the link
-        if (linkName == null) {
-          linkName = u.getFragment();
-
-          // Because we don't know what's in the fragment, we have to handle
-          // it with care.
-          if (linkName != null) {
-            Path linkPath = new Path(linkName);
-
-            if (linkPath.isAbsolute()) {
-              throw new IllegalArgumentException("Resource name must be "
-                  + "relative");
-            }
-
-            linkName = linkPath.toUri().getPath();
-          }
-        } else if (u.getFragment() != null) {
-          throw new IllegalArgumentException("Invalid path URI: " + p +
-              " - cannot contain both a URI fragment and a wildcard");
-        }
-
-        // If there's no wildcard or fragment, just link to the file name
-        if (linkName == null) {
-          linkName = p.getName();
-        }
-
-        LocalResource orig = localResources.get(linkName);
-        URL url = URL.fromURI(p.toUri());
-        if(orig != null && !orig.getResource().equals(url)) {
-          LOG.warn(
-              getResourceDescription(orig.getType()) + 
-              toString(orig.getResource()) + " conflicts with " + 
-              getResourceDescription(type) + toString(url) + 
-              " This will be an error in Hadoop 2.0");
-          continue;
-        }
-        localResources.put(linkName, LocalResource
-            .newInstance(URL.fromURI(p.toUri()), type, visibilities[i]
-            ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
-          sizes[i], timestamps[i]));
-      }
-    }
-  }
-  
   // TODO - Move this to MR!
   private static long[] getFileSizes(Configuration conf, String key) {
     String[] strs = conf.getStrings(key);

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java

@@ -30,6 +30,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -39,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -164,6 +167,9 @@ public class TestLocalDistributedCacheManager {
     });
 
     DistributedCache.addCacheFile(file, conf);
+    Map<String, Boolean> policies = new HashMap<String, Boolean>();
+    policies.put(file.toString(), true);
+    Job.setFileSharedCacheUploadPolicies(conf, policies);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
@@ -272,6 +278,9 @@ public class TestLocalDistributedCacheManager {
 
     DistributedCache.addCacheFile(file, conf);
     DistributedCache.addCacheFile(file, conf);
+    Map<String, Boolean> policies = new HashMap<String, Boolean>();
+    policies.put(file.toString(), true);
+    Job.setFileSharedCacheUploadPolicies(conf, policies);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java

@@ -339,7 +339,7 @@ public class TestMRApps {
     }
     String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
     String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-      Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
+        Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
         "job.jar/classes/", "job.jar/lib/*",
         ApplicationConstants.Environment.PWD.$$() + "/*"));
     assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
@@ -359,7 +359,7 @@ public class TestMRApps {
     }
     String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
     String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-      Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+        Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
         ApplicationConstants.Environment.PWD.$$() + "/*"));
     assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
       + " the classpath!", env_str.contains(expectedClasspath));
@@ -382,7 +382,7 @@ public class TestMRApps {
     assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
       cp.contains("PWD"));
     String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-      Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
+        Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
         "job.jar/classes/", "job.jar/lib/*",
         ApplicationConstants.Environment.PWD.$$() + "/*"));
     assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
@@ -411,7 +411,7 @@ public class TestMRApps {
     conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH);
     MRApps.setClasspath(env, conf);
     final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
-        Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+        Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
             ApplicationConstants.Environment.PWD.$$() + "/*"));
     String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
         Arrays.asList(ApplicationConstants.Environment.PWD.$$(),

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml

@@ -48,6 +48,12 @@
       <artifactId>hadoop-hdfs</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.skyscreamer</groupId>
       <artifactId>jsonassert</artifactId>

+ 226 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

@@ -21,12 +21,17 @@ package org.apache.hadoop.mapreduce;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.FileSystem;
@@ -1303,6 +1308,227 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
     }   
   }
 
+  /**
+   * Add a file to job config for shared cache processing. If shared cache is
+   * enabled, it will return true, otherwise, return false. We don't check with
+   * SCM here given application might not be able to provide the job id;
+   * ClientSCMProtocol.use requires the application id. Job Submitter will read
+   * the files from job config and take care of things.
+   *
+   * @param resource The resource that Job Submitter will process later using
+   *          shared cache.
+   * @param conf Configuration to add the resource to
+   * @return whether the resource has been added to the configuration
+   */
+  @Unstable
+  public static boolean addFileToSharedCache(URI resource, Configuration conf) {
+    SharedCacheConfig scConfig = new SharedCacheConfig();
+    scConfig.init(conf);
+    if (scConfig.isSharedCacheFilesEnabled()) {
+      String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE);
+      conf.set(
+          MRJobConfig.FILES_FOR_SHARED_CACHE,
+          files == null ? resource.toString() : files + ","
+              + resource.toString());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Add a file to job config for shared cache processing. If shared cache is
+   * enabled, it will return true, otherwise, return false. We don't check with
+   * SCM here given application might not be able to provide the job id;
+   * ClientSCMProtocol.use requires the application id. Job Submitter will read
+   * the files from job config and take care of things. Job Submitter will also
+   * add the file to classpath. Intended to be used by user code.
+   *
+   * @param resource The resource that Job Submitter will process later using
+   *          shared cache.
+   * @param conf Configuration to add the resource to
+   * @return whether the resource has been added to the configuration
+   */
+  @Unstable
+  public static boolean addFileToSharedCacheAndClasspath(URI resource,
+      Configuration conf) {
+    SharedCacheConfig scConfig = new SharedCacheConfig();
+    scConfig.init(conf);
+    if (scConfig.isSharedCacheLibjarsEnabled()) {
+      String files =
+          conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE);
+      conf.set(
+          MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE,
+          files == null ? resource.toString() : files + ","
+              + resource.toString());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Add an archive to job config for shared cache processing. If shared cache
+   * is enabled, it will return true, otherwise, return false. We don't check
+   * with SCM here given application might not be able to provide the job id;
+   * ClientSCMProtocol.use requires the application id. Job Submitter will read
+   * the files from job config and take care of things. Intended to be used by
+   * user code.
+   *
+   * @param resource The resource that Job Submitter will process later using
+   *          shared cache.
+   * @param conf Configuration to add the resource to
+   * @return whether the resource has been added to the configuration
+   */
+  @Unstable
+  public static boolean addArchiveToSharedCache(URI resource,
+      Configuration conf) {
+    SharedCacheConfig scConfig = new SharedCacheConfig();
+    scConfig.init(conf);
+    if (scConfig.isSharedCacheArchivesEnabled()) {
+      String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE);
+      conf.set(
+          MRJobConfig.ARCHIVES_FOR_SHARED_CACHE,
+          files == null ? resource.toString() : files + ","
+              + resource.toString());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * This is to set the shared cache upload policies for files. If the parameter
+   * was previously set, this method will replace the old value with the new
+   * provided map.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param policies A map containing the shared cache upload policies for a set
+   *          of resources. The key is the url of the resource and the value is
+   *          the upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static void setFileSharedCacheUploadPolicies(Configuration conf,
+      Map<String, Boolean> policies) {
+    setSharedCacheUploadPolicies(conf, policies, true);
+  }
+
+  /**
+   * This is to set the shared cache upload policies for archives. If the
+   * parameter was previously set, this method will replace the old value with
+   * the new provided map.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param policies A map containing the shared cache upload policies for a set
+   *          of resources. The key is the url of the resource and the value is
+   *          the upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static void setArchiveSharedCacheUploadPolicies(Configuration conf,
+      Map<String, Boolean> policies) {
+    setSharedCacheUploadPolicies(conf, policies, false);
+  }
+
+  // We use a double colon because a colon is a reserved character in a URI and
+  // there should not be two colons next to each other.
+  private static final String DELIM = "::";
+
+  /**
+   * Set the shared cache upload policies config parameter. This is done by
+   * serializing the provided map of shared cache upload policies into a config
+   * parameter. If the parameter was previously set, this method will replace
+   * the old value with the new provided map.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param policies A map containing the shared cache upload policies for a set
+   *          of resources. The key is the url of the resource and the value is
+   *          the upload policy. True if it should be uploaded, false otherwise.
+   * @param areFiles True if these policies are for files, false if they are for
+   *          archives.
+   */
+  private static void setSharedCacheUploadPolicies(Configuration conf,
+      Map<String, Boolean> policies, boolean areFiles) {
+    if (policies != null) {
+      StringBuilder sb = new StringBuilder();
+      Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
+      Map.Entry<String, Boolean> e;
+      if (it.hasNext()) {
+        e = it.next();
+        sb.append(e.getKey() + DELIM + e.getValue());
+      } else {
+        // policies is an empty map, just skip setting the parameter
+        return;
+      }
+      while (it.hasNext()) {
+        e = it.next();
+        sb.append("," + e.getKey() + DELIM + e.getValue());
+      }
+      String confParam =
+          areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
+              : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+      conf.set(confParam, sb.toString());
+    }
+  }
+
+  /**
+   * Deserialize a map of shared cache upload policies from a config parameter.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @param areFiles True if these policies are for files, false if they are for
+   *          archives.
+   * @return A map containing the shared cache upload policies for a set of
+   *         resources. The key is the url of the resource and the value is the
+   *         upload policy. True if it should be uploaded, false otherwise.
+   */
+  private static Map<String, Boolean> getSharedCacheUploadPolicies(
+      Configuration conf, boolean areFiles) {
+    String confParam =
+        areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
+            : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+    Collection<String> policies = conf.getStringCollection(confParam);
+    String[] policy;
+    Map<String, Boolean> policyMap = new LinkedHashMap<String, Boolean>();
+    for (String s : policies) {
+      policy = s.split(DELIM);
+      if (policy.length != 2) {
+        LOG.error(confParam
+            + " is mis-formatted, returning empty shared cache upload policies."
+            + " Error on [" + s + "]");
+        return new LinkedHashMap<String, Boolean>();
+      }
+      policyMap.put(policy[0], Boolean.parseBoolean(policy[1]));
+    }
+    return policyMap;
+  }
+
+  /**
+   * This is to get the shared cache upload policies for files.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @return A map containing the shared cache upload policies for a set of
+   *         resources. The key is the url of the resource and the value is the
+   *         upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static Map<String, Boolean> getFileSharedCacheUploadPolicies(
+      Configuration conf) {
+    return getSharedCacheUploadPolicies(conf, true);
+  }
+
+  /**
+   * This is to get the shared cache upload policies for archives.
+   *
+   * @param conf Configuration which stores the shared cache upload policies
+   * @return A map containing the shared cache upload policies for a set of
+   *         resources. The key is the url of the resource and the value is the
+   *         upload policy. True if it should be uploaded, false otherwise.
+   */
+  @Unstable
+  public static Map<String, Boolean> getArchiveSharedCacheUploadPolicies(
+      Configuration conf) {
+    return getSharedCacheUploadPolicies(conf, false);
+  }
+
   private synchronized void connect()
           throws IOException, InterruptedException, ClassNotFoundException {
     if (cluster == null) {

+ 353 - 63
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java

@@ -24,12 +24,13 @@ import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,30 +39,100 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.client.api.SharedCacheClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.annotations.VisibleForTesting;
 
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
+/**
+ * This class is responsible for uploading resources from the client to HDFS
+ * that are associated with a MapReduce job.
+ */
+@Private
+@Unstable
 class JobResourceUploader {
   protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
   private final boolean useWildcard;
   private final FileSystem jtFs;
+  private SharedCacheClient scClient = null;
+  private SharedCacheConfig scConfig = new SharedCacheConfig();
+  private ApplicationId appId = null;
 
   JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
     this.jtFs = submitFs;
     this.useWildcard = useWildcard;
   }
 
+  private void initSharedCache(JobID jobid, Configuration conf) {
+    this.scConfig.init(conf);
+    if (this.scConfig.isSharedCacheEnabled()) {
+      this.scClient = createSharedCacheClient(conf);
+      appId = jobIDToAppId(jobid);
+    }
+  }
+
+  /*
+   * We added this method so that we could do the conversion between JobId and
+   * ApplicationId for the shared cache client. This logic is very similar to
+   * the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use
+   * that because mapreduce-client-core can not depend on
+   * mapreduce-client-common.
+   */
+  private ApplicationId jobIDToAppId(JobID jobId) {
+    return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()),
+        jobId.getId());
+  }
+
+  private void stopSharedCache() {
+    if (scClient != null) {
+      scClient.stop();
+      scClient = null;
+    }
+  }
+
+  /**
+   * Create, initialize and start a new shared cache client.
+   */
+  @VisibleForTesting
+  protected SharedCacheClient createSharedCacheClient(Configuration conf) {
+    SharedCacheClient scc = SharedCacheClient.createSharedCacheClient();
+    scc.init(conf);
+    scc.start();
+    return scc;
+  }
+
   /**
    * Upload and configure files, libjars, jobjars, and archives pertaining to
    * the passed job.
-   * 
+   * <p>
+   * This client will use the shared cache for libjars, files, archives and
+   * jobjars if it is enabled. When shared cache is enabled, it will try to use
+   * the shared cache and fall back to the default behavior when the scm isn't
+   * available.
+   * <p>
+   * 1. For the resources that have been successfully shared, we will continue
+   * to use them in a shared fashion.
+   * <p>
+   * 2. For the resources that weren't in the cache and need to be uploaded by
+   * NM, we won't ask NM to upload them.
+   *
    * @param job the job containing the files to be uploaded
    * @param submitJobDir the submission directory of the job
    * @throws IOException
    */
   public void uploadResources(Job job, Path submitJobDir) throws IOException {
+    try {
+      initSharedCache(job.getJobID(), job.getConfiguration());
+      uploadResourcesInternal(job, submitJobDir);
+    } finally {
+      stopSharedCache();
+    }
+  }
+
+  private void uploadResourcesInternal(Job job, Path submitJobDir)
+      throws IOException {
     Configuration conf = job.getConfiguration();
     short replication =
         (short) conf.getInt(Job.SUBMIT_REPLICATION,
@@ -88,26 +159,52 @@ class JobResourceUploader {
           + " already exists!! This is unexpected.Please check what's there in"
           + " that directory");
     }
+    // Create the submission directory for the MapReduce job.
     submitJobDir = jtFs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
     FsPermission mapredSysPerms =
         new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
     mkdirs(jtFs, submitJobDir, mapredSysPerms);
 
+    // Get the resources that have been added via command line arguments in the
+    // GenericOptionsParser (i.e. files, libjars, archives).
     Collection<String> files = conf.getStringCollection("tmpfiles");
     Collection<String> libjars = conf.getStringCollection("tmpjars");
     Collection<String> archives = conf.getStringCollection("tmparchives");
     String jobJar = job.getJar();
 
+    // Merge resources that have been programmatically specified for the shared
+    // cache via the Job API.
+    files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE));
+    libjars.addAll(conf.getStringCollection(
+            MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE));
+    archives.addAll(conf
+        .getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE));
+
+
     Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
     checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);
 
-    uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication);
-    uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication);
-    uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication);
-    uploadJobJar(job, jobJar, submitJobDir, replication);
+    Map<String, Boolean> fileSCUploadPolicies =
+        new LinkedHashMap<String, Boolean>();
+    Map<String, Boolean> archiveSCUploadPolicies =
+        new LinkedHashMap<String, Boolean>();
+
+    uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
+        fileSCUploadPolicies, statCache);
+    uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
+        fileSCUploadPolicies, statCache);
+    uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
+        archiveSCUploadPolicies, statCache);
+    uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
     addLog4jToDistributedCache(job, submitJobDir);
 
+    // Note, we do not consider resources in the distributed cache for the
+    // shared cache at this time. Only resources specified via the
+    // GenericOptionsParser or the jobjar.
+    Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies);
+    Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies);
+
     // set the timestamps of the archives and files
     // set the public/private visibility of the archives and files
     ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
@@ -118,9 +215,11 @@ class JobResourceUploader {
   }
 
   @VisibleForTesting
-  void uploadFiles(Configuration conf, Collection<String> files,
-      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+  void uploadFiles(Job job, Collection<String> files,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
+      Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
       throws IOException {
+    Configuration conf = job.getConfiguration();
     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
     if (!files.isEmpty()) {
       mkdirs(jtFs, filesDir, mapredSysPerms);
@@ -133,17 +232,33 @@ class JobResourceUploader {
               + " Argument must be a valid URI: " + tmpFile, e);
         }
         Path tmp = new Path(tmpURI);
-        Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          DistributedCache.addCacheFile(pathURI, conf);
-        } catch (URISyntaxException ue) {
-          // should not throw a uri exception
-          throw new IOException(
-              "Failed to create a URI (URISyntaxException) for the remote path "
-                  + newPath + ". This was based on the files parameter: "
-                  + tmpFile,
-              ue);
+        URI newURI = null;
+        boolean uploadToSharedCache = false;
+        if (scConfig.isSharedCacheFilesEnabled()) {
+          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          }
+        }
+
+        if (newURI == null) {
+          Path newPath =
+              copyRemoteFiles(filesDir, tmp, conf, submitReplication);
+          try {
+            newURI = getPathURI(newPath, tmpURI.getFragment());
+          } catch (URISyntaxException ue) {
+            // should not throw a uri exception
+            throw new IOException(
+                "Failed to create a URI (URISyntaxException) for the"
+                    + " remote path " + newPath
+                    + ". This was based on the files parameter: " + tmpFile,
+                ue);
+          }
+        }
+
+        job.addCacheFile(newURI);
+        if (scConfig.isSharedCacheFilesEnabled()) {
+          fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
         }
       }
     }
@@ -152,9 +267,11 @@ class JobResourceUploader {
   // Suppress warning for use of DistributedCache (it is everywhere).
   @SuppressWarnings("deprecation")
   @VisibleForTesting
-  void uploadLibJars(Configuration conf, Collection<String> libjars,
-      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+  void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
+      FsPermission mapredSysPerms, short submitReplication,
+      Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
       throws IOException {
+    Configuration conf = job.getConfiguration();
     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     if (!libjars.isEmpty()) {
       mkdirs(jtFs, libjarsDir, mapredSysPerms);
@@ -169,23 +286,53 @@ class JobResourceUploader {
               + " Argument must be a valid URI: " + tmpjars, e);
         }
         Path tmp = new Path(tmpURI);
-        Path newPath =
-            copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          if (!foundFragment) {
-            foundFragment = pathURI.getFragment() != null;
+        URI newURI = null;
+        boolean uploadToSharedCache = false;
+        boolean fromSharedCache = false;
+        if (scConfig.isSharedCacheLibjarsEnabled()) {
+          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          } else {
+            fromSharedCache = true;
+          }
+        }
+
+        if (newURI == null) {
+          Path newPath =
+              copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
+          try {
+            newURI = getPathURI(newPath, tmpURI.getFragment());
+          } catch (URISyntaxException ue) {
+            // should not throw a uri exception
+            throw new IOException(
+                "Failed to create a URI (URISyntaxException) for the"
+                    + " remote path " + newPath
+                    + ". This was based on the libjar parameter: " + tmpjars,
+                ue);
           }
-          DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf,
-              jtFs, false);
-          libjarURIs.add(pathURI);
-        } catch (URISyntaxException ue) {
-          // should not throw a uri exception
-          throw new IOException(
-              "Failed to create a URI (URISyntaxException) for the remote path "
-                  + newPath + ". This was based on the libjar parameter: "
-                  + tmpjars,
-              ue);
+        }
+
+        if (!foundFragment) {
+          // We do not count shared cache paths containing fragments as a
+          // "foundFragment." This is because these resources are not in the
+          // staging directory and will be added to the distributed cache
+          // separately.
+          foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
+        }
+        DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf,
+            jtFs, false);
+        if (fromSharedCache) {
+          // We simply add this URI to the distributed cache. It will not come
+          // from the staging directory (it is in the shared cache), so we
+          // must add it to the cache regardless of the wildcard feature.
+          DistributedCache.addCacheFile(newURI, conf);
+        } else {
+          libjarURIs.add(newURI);
+        }
+
+        if (scConfig.isSharedCacheLibjarsEnabled()) {
+          fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
         }
       }
 
@@ -203,9 +350,11 @@ class JobResourceUploader {
   }
 
   @VisibleForTesting
-  void uploadArchives(Configuration conf, Collection<String> archives,
-      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
-      throws IOException {
+  void uploadArchives(Job job, Collection<String> archives,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
+      Map<String, Boolean> archiveSCUploadPolicies,
+      Map<URI, FileStatus> statCache) throws IOException {
+    Configuration conf = job.getConfiguration();
     Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
     if (!archives.isEmpty()) {
       mkdirs(jtFs, archivesDir, mapredSysPerms);
@@ -218,18 +367,34 @@ class JobResourceUploader {
               + " Argument must be a valid URI: " + tmpArchives, e);
         }
         Path tmp = new Path(tmpURI);
-        Path newPath =
-            copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
-        try {
-          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
-          DistributedCache.addCacheArchive(pathURI, conf);
-        } catch (URISyntaxException ue) {
-          // should not throw an uri excpetion
-          throw new IOException(
-              "Failed to create a URI (URISyntaxException) for the remote path"
-                  + newPath + ". This was based on the archive parameter: "
-                  + tmpArchives,
-              ue);
+        URI newURI = null;
+        boolean uploadToSharedCache = false;
+        if (scConfig.isSharedCacheArchivesEnabled()) {
+          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          }
+        }
+
+        if (newURI == null) {
+          Path newPath =
+              copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
+          try {
+            newURI = getPathURI(newPath, tmpURI.getFragment());
+          } catch (URISyntaxException ue) {
+            // should not throw a uri exception
+            throw new IOException(
+                "Failed to create a URI (URISyntaxException) for the"
+                    + " remote path " + newPath
+                    + ". This was based on the archive parameter: "
+                    + tmpArchives,
+                ue);
+          }
+        }
+
+        job.addCacheArchive(newURI);
+        if (scConfig.isSharedCacheArchivesEnabled()) {
+          archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
         }
       }
     }
@@ -237,7 +402,9 @@ class JobResourceUploader {
 
   @VisibleForTesting
   void uploadJobJar(Job job, String jobJar, Path submitJobDir,
-      short submitReplication) throws IOException {
+      short submitReplication, Map<URI, FileStatus> statCache)
+      throws IOException {
+    Configuration conf = job.getConfiguration();
     if (jobJar != null) { // copy jar to JobTracker's fs
       // use jar name if job is not named.
       if ("".equals(job.getJobName())) {
@@ -245,12 +412,59 @@ class JobResourceUploader {
       }
       Path jobJarPath = new Path(jobJar);
       URI jobJarURI = jobJarPath.toUri();
-      // If the job jar is already in a global fs,
-      // we don't need to copy it from local fs
-      if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
-        copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
-            submitReplication);
-        job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      Path newJarPath = null;
+      boolean uploadToSharedCache = false;
+      if (jobJarURI.getScheme() == null ||
+          jobJarURI.getScheme().equals("file")) {
+        // job jar is on the local file system
+        if (scConfig.isSharedCacheJobjarEnabled()) {
+          // We must have a qualified path for the shared cache client. We can
+          // assume this is for the local filesystem
+          jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath);
+          // Don't add a resource name here because the resource name (i.e.
+          // job.jar directory symlink) will always be hard coded to job.jar for
+          // the job.jar
+          URI newURI =
+              useSharedCache(jobJarPath.toUri(), null, statCache, conf, false);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+          } else {
+            newJarPath = stringToPath(newURI.toString());
+            // The job jar is coming from the shared cache (i.e. a public
+            // place), so we want the job.jar to have a public visibility.
+            conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
+          }
+        }
+        if (newJarPath == null) {
+          newJarPath = JobSubmissionFiles.getJobJar(submitJobDir);
+          copyJar(jobJarPath, newJarPath, submitReplication);
+        }
+      } else {
+        // job jar is in a remote file system
+        if (scConfig.isSharedCacheJobjarEnabled()) {
+          // Don't add a resource name here because the resource name (i.e.
+          // job.jar directory symlink) will always be hard coded to job.jar for
+          // the job.jar
+          URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false);
+          if (newURI == null) {
+            uploadToSharedCache = true;
+            newJarPath = jobJarPath;
+          } else {
+            newJarPath = stringToPath(newURI.toString());
+            // The job jar is coming from the shared cache (i.e. a public
+            // place), so we want the job.jar to have a public visibility.
+            conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
+          }
+        } else {
+          // we don't need to upload the jobjar to the staging directory because
+          // it is already in an accessible place
+          newJarPath = jobJarPath;
+        }
+      }
+      job.setJar(newJarPath.toString());
+      if (scConfig.isSharedCacheJobjarEnabled()) {
+        conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
+            uploadToSharedCache);
       }
     } else {
       LOG.warn("No job jar file set.  User classes may not be found. "
@@ -260,7 +474,9 @@ class JobResourceUploader {
 
   /**
    * Verify that the resources this job is going to localize are within the
-   * localization limits.
+   * localization limits. We count all resources towards these limits regardless
+   * of where they are coming from (i.e. local, distributed cache, or shared
+   * cache).
    */
   @VisibleForTesting
   void checkLocalizationLimits(Configuration conf, Collection<String> files,
@@ -457,6 +673,80 @@ class JobResourceUploader {
     return newPath;
   }
 
+  /**
+   * Checksum a local resource file and call use for that resource with the scm.
+   */
+  private URI useSharedCache(URI sourceFile, String resourceName,
+      Map<URI, FileStatus> statCache, Configuration conf, boolean honorFragment)
+      throws IOException {
+    if (scClient == null) {
+      return null;
+    }
+    Path filePath = new Path(sourceFile);
+    if (getFileStatus(statCache, conf, filePath).isDirectory()) {
+      LOG.warn("Shared cache does not support directories"
+          + " (see YARN-6097)." + " Will not upload " + filePath
+          + " to the shared cache.");
+      return null;
+    }
+
+    String rn = resourceName;
+    if (honorFragment) {
+      if (sourceFile.getFragment() != null) {
+        rn = sourceFile.getFragment();
+      }
+    }
+
+    // If for whatever reason, we can't even calculate checksum for
+    // a resource, something is really wrong with the file system;
+    // even non-SCM approach won't work. Let us just throw the exception.
+    String checksum = scClient.getFileChecksum(filePath);
+    URL url = null;
+    try {
+      url = scClient.use(this.appId, checksum);
+    } catch (YarnException e) {
+      LOG.warn("Error trying to contact the shared cache manager,"
+          + " disabling the SCMClient for the rest of this job submission", e);
+      /*
+       * If we fail to contact the SCM, we do not use it for the rest of this
+       * JobResourceUploader's life. This prevents us from having to timeout
+       * each time we try to upload a file while the SCM is unavailable. Instead
+       * we timeout/error the first time and quickly revert to the default
+       * behavior without the shared cache. We do this by stopping the shared
+       * cache client and setting it to null.
+       */
+      stopSharedCache();
+    }
+
+    if (url != null) {
+      // Because we deal with URI's in mapreduce, we need to convert the URL to
+      // a URI and add a fragment if necessary.
+      URI uri = null;
+      try {
+        String name = new Path(url.getFile()).getName();
+        if (rn != null && !name.equals(rn)) {
+          // A name was specified that is different then the URL in the shared
+          // cache. Therefore, we need to set the fragment portion of the URI to
+          // preserve the user's desired name. We assume that there is no
+          // existing fragment in the URL since the shared cache manager does
+          // not use fragments.
+          uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
+              url.getPort(), url.getFile(), null, rn);
+        } else {
+          uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
+              url.getPort(), url.getFile(), null, null);
+        }
+        return uri;
+      } catch (URISyntaxException e) {
+        LOG.warn("Error trying to convert URL received from shared cache to"
+            + " a URI: " + url.toString());
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
   @VisibleForTesting
   void copyJar(Path originalJarPath, Path submitJarFile,
       short replication) throws IOException {

+ 71 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -190,6 +190,77 @@ public interface MRJobConfig {
 
   public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
 
+  /**
+   * This parameter controls the visibility of the localized job jar on the node
+   * manager. If set to true, the visibility will be set to
+   * LocalResourceVisibility.PUBLIC. If set to false, the visibility will be set
+   * to LocalResourceVisibility.APPLICATION. This is a generated parameter and
+   * should not be set manually via config files.
+   */
+  String JOBJAR_VISIBILITY = "mapreduce.job.jobjar.visibility";
+  boolean JOBJAR_VISIBILITY_DEFAULT = false;
+
+  /**
+   * This is a generated parameter and should not be set manually via config
+   * files.
+   */
+  String JOBJAR_SHARED_CACHE_UPLOAD_POLICY =
+      "mapreduce.job.jobjar.sharedcache.uploadpolicy";
+  boolean JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT = false;
+
+  /**
+   * This is a generated parameter and should not be set manually via config
+   * files.
+   */
+  String CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES =
+      "mapreduce.job.cache.files.sharedcache.uploadpolicies";
+
+  /**
+   * This is a generated parameter and should not be set manually via config
+   * files.
+   */
+  String CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES =
+      "mapreduce.job.cache.archives.sharedcache.uploadpolicies";
+
+  /**
+   * A comma delimited list of file resources that are needed for this MapReduce
+   * job. These resources, if the files resource type is enabled, should either
+   * use the shared cache or be added to the shared cache. This parameter can be
+   * modified programmatically using the MapReduce Job api.
+   */
+  String FILES_FOR_SHARED_CACHE = "mapreduce.job.cache.sharedcache.files";
+
+  /**
+   * A comma delimited list of libjar resources that are needed for this
+   * MapReduce job. These resources, if the libjars resource type is enabled,
+   * should either use the shared cache or be added to the shared cache. These
+   * resources will also be added to the classpath of all tasks for this
+   * MapReduce job. This parameter can be modified programmatically using the
+   * MapReduce Job api.
+   */
+  String FILES_FOR_CLASSPATH_AND_SHARED_CACHE =
+      "mapreduce.job.cache.sharedcache.files.addtoclasspath";
+
+  /**
+   * A comma delimited list of archive resources that are needed for this
+   * MapReduce job. These resources, if the archives resource type is enabled,
+   * should either use the shared cache or be added to the shared cache. This
+   * parameter can be modified programmatically using the MapReduce Job api.
+   */
+  String ARCHIVES_FOR_SHARED_CACHE =
+      "mapreduce.job.cache.sharedcache.archives";
+
+  /**
+   * A comma delimited list of resource categories that are enabled for the
+   * shared cache. If a category is enabled, resources in that category will be
+   * uploaded to the shared cache. The valid categories are: jobjar, libjars,
+   * files, archives. If "disabled" is specified then all categories are
+   * disabled. If "enabled" is specified then all categories are enabled.
+   */
+  String SHARED_CACHE_MODE = "mapreduce.job.sharedcache.mode";
+
+  String SHARED_CACHE_MODE_DEFAULT = "disabled";
+
   /**
    * @deprecated Symlinks are always on and cannot be disabled.
    */

+ 102 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * A class for parsing configuration parameters associated with the shared
+ * cache.
+ */
+@Private
+@Unstable
+public class SharedCacheConfig {
+  protected static final Log LOG = LogFactory.getLog(SharedCacheConfig.class);
+
+  private boolean sharedCacheFilesEnabled = false;
+  private boolean sharedCacheLibjarsEnabled = false;
+  private boolean sharedCacheArchivesEnabled = false;
+  private boolean sharedCacheJobjarEnabled = false;
+
+  public void init(Configuration conf) {
+    if (!MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(
+        MRConfig.FRAMEWORK_NAME))) {
+      // Shared cache is only valid if the job runs on yarn
+      return;
+    }
+
+    if(!conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
+        YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED)) {
+      return;
+    }
+
+
+    Collection<String> configs = StringUtils.getTrimmedStringCollection(
+        conf.get(MRJobConfig.SHARED_CACHE_MODE,
+            MRJobConfig.SHARED_CACHE_MODE_DEFAULT));
+    if (configs.contains("files")) {
+      this.sharedCacheFilesEnabled = true;
+    }
+    if (configs.contains("libjars")) {
+      this.sharedCacheLibjarsEnabled = true;
+    }
+    if (configs.contains("archives")) {
+      this.sharedCacheArchivesEnabled = true;
+    }
+    if (configs.contains("jobjar")) {
+      this.sharedCacheJobjarEnabled = true;
+    }
+    if (configs.contains("enabled")) {
+      this.sharedCacheFilesEnabled = true;
+      this.sharedCacheLibjarsEnabled = true;
+      this.sharedCacheArchivesEnabled = true;
+      this.sharedCacheJobjarEnabled = true;
+    }
+    if (configs.contains("disabled")) {
+      this.sharedCacheFilesEnabled = false;
+      this.sharedCacheLibjarsEnabled = false;
+      this.sharedCacheArchivesEnabled = false;
+      this.sharedCacheJobjarEnabled = false;
+    }
+  }
+
+  public boolean isSharedCacheFilesEnabled() {
+    return sharedCacheFilesEnabled;
+  }
+  public boolean isSharedCacheLibjarsEnabled() {
+    return sharedCacheLibjarsEnabled;
+  }
+  public boolean isSharedCacheArchivesEnabled() {
+    return sharedCacheArchivesEnabled;
+  }
+  public boolean isSharedCacheJobjarEnabled() {
+    return sharedCacheJobjarEnabled;
+  }
+  public boolean isSharedCacheEnabled() {
+    return (sharedCacheFilesEnabled || sharedCacheLibjarsEnabled ||
+        sharedCacheArchivesEnabled || sharedCacheJobjarEnabled);
+  }
+}

+ 11 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -676,6 +676,17 @@
     </description>
 </property>
 
+<property>
+  <name>mapreduce.job.sharedcache.mode</name>
+  <value>disabled</value>
+  <description>
+    A comma delimited list of resource categories to submit to the shared cache.
+    The valid categories are: jobjar, libjars, files, archives.
+    If "disabled" is specified then the job submission code will not use
+    the shared cache.
+  </description>
+</property>
+
 <property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>

+ 100 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md

@@ -0,0 +1,100 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+MR Support for YARN Shared Cache
+==================
+
+<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
+
+Overview
+-------
+
+MapReduce support for the YARN shared cache allows MapReduce jobs to take advantage
+of additional resource caching. This saves network bandwidth between the job
+submission client as well as within the YARN cluster itself. This will reduce job
+submission time and overall job runtime.
+
+
+Enabling/Disabling the shared cache
+-------
+
+First, your YARN cluster must have the shared cache service running. Please see YARN documentation
+for information on how to setup the shared cache service.
+
+A MapReduce user can specify what resources are eligible to be uploaded to the shared cache
+based on resource type. This is done using a configuration parameter in mapred-site.xml:
+
+```
+<property>
+    <name>mapreduce.job.sharedcache.mode</name>
+    <value>disabled</value>
+    <description>
+       A comma delimited list of resource categories to submit to the
+       shared cache. The valid categories are: jobjar, libjars, files,
+       archives. If "disabled" is specified then the job submission code
+       will not use the shared cache.
+    </description>
+</property>
+```
+
+If a resource type is listed, it will check the shared cache to see if the resource is already in the
+cache. If so, it will use the cached resource, if not, it will specify that the resource needs to be
+uploaded asynchronously.
+
+Specifying resources for the cache
+-------
+
+A MapReduce user has 3 ways to specify resources for a MapReduce job:
+
+1. **The command line via the generic options parser (i.e. -files, -archives, -libjars):** If a
+resource is specified via the command line and the resource type is enabled for the
+shared cache, that resource will use the shared cache.
+2. **The distributed cache api:** If a resource is specified via the distributed cache the
+resource will not use the shared cache regardless of if the resource type is enabled for
+the shared cache.
+3. **The shared cache api:** This is a new set of methods added to the
+org.apache.hadoop.mapreduce.Job api. It allows users to add a file to the shared cache,
+add it to the shared cache and the classpath and add an archive to the shared cache.
+These resources will be placed in the distributed cache and, if their resource type is
+enabled the client will use the shared cache as well.
+
+Resource naming
+-------
+
+It is important to ensure that each resource for a MapReduce job has a unique file name.
+This prevents symlink clobbering when YARN containers running MapReduce tasks are localized
+during container launch. A user can specify their own resource name by using the fragment
+portion of a URI. For example, for file resources specified on the command line, it could look
+like this:
+```
+-files /local/path/file1.txt#foo.txt,/local/path2/file1.txt#bar.txt
+```
+In the above example two files, named file1.txt, will be localized with two different names: foo.txt
+and bar.txt.
+
+Resource Visibility
+-------
+
+All resources in the shared cache have a PUBLIC visibility.
+
+
+MapReduce client behavior while the shared cache is unavailable
+-------
+
+In the event that the shared cache manager is unavailable, the MapReduce client uses a fail-fast
+mechanism. If the MapReduce client fails to contact the shared cache manager, the client will
+no longer use the shared cache for the rest of that job submission. This
+prevents the MapReduce client from timing out each time it tries to check for a resource
+in the shared cache. The MapReduce client quickly reverts to the default behavior and submits a
+Job as if the shared cache was never enabled in the first place.

+ 36 - 40
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java

@@ -212,7 +212,7 @@ public class TestJobResourceUploader {
           destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" };
 
   private String jobjarSubmitDir = "/jobjar-submit-dir";
-  private String expectedJobJar = jobjarSubmitDir + "/job.jar";
+  private String basicExpectedJobJar = jobjarSubmitDir + "/job.jar";
 
   @Test
   public void testPathsWithNoFragNoSchemeRelative() throws IOException {
@@ -228,7 +228,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -246,7 +246,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -264,7 +264,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -282,7 +282,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -300,7 +300,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -318,7 +318,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -336,7 +336,7 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf, true);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard,
-        expectedArchivesNoFrags, expectedJobJar);
+        expectedArchivesNoFrags, basicExpectedJobJar);
   }
 
   @Test
@@ -354,50 +354,45 @@ public class TestJobResourceUploader {
     JobResourceUploader uploader = new StubedUploader(jConf, true);
 
     runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
-        expectedArchivesWithFrags, expectedJobJar);
+        expectedArchivesWithFrags, basicExpectedJobJar);
   }
 
   private void runTmpResourcePathTest(JobResourceUploader uploader,
       ResourceConf rConf, JobConf jConf, String[] expectedFiles,
       String[] expectedArchives, String expectedJobJar) throws IOException {
-    rConf.setupJobConf(jConf);
-    // We use a pre and post job object here because we need the post job object
-    // to get the new values set during uploadResources, but we need the pre job
-    // to set the job jar because JobResourceUploader#uploadJobJar uses the Job
-    // interface not the JobConf. The post job is automatically created in
-    // validateResourcePaths.
-    Job jobPre = Job.getInstance(jConf);
-    uploadResources(uploader, jConf, jobPre);
-
-    validateResourcePaths(jConf, expectedFiles, expectedArchives,
-        expectedJobJar, jobPre);
+    Job job = rConf.setupJobConf(jConf);
+    uploadResources(uploader, job);
+    validateResourcePaths(job, expectedFiles, expectedArchives, expectedJobJar);
   }
 
-  private void uploadResources(JobResourceUploader uploader, JobConf jConf,
-      Job job) throws IOException {
-    Collection<String> files = jConf.getStringCollection("tmpfiles");
-    Collection<String> libjars = jConf.getStringCollection("tmpjars");
-    Collection<String> archives = jConf.getStringCollection("tmparchives");
-    String jobJar = jConf.getJar();
-    uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null,
-        (short) 3);
-    uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"),
-        null, (short) 3);
-    uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"),
-        null, (short) 3);
-    uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3);
+  private void uploadResources(JobResourceUploader uploader, Job job)
+      throws IOException {
+    Configuration conf = job.getConfiguration();
+    Collection<String> files = conf.getStringCollection("tmpfiles");
+    Collection<String> libjars = conf.getStringCollection("tmpjars");
+    Collection<String> archives = conf.getStringCollection("tmparchives");
+    Map<URI, FileStatus> statCache = new HashMap<>();
+    Map<String, Boolean> fileSCUploadPolicies = new HashMap<>();
+    String jobJar = job.getJar();
+    uploader.uploadFiles(job, files, new Path("/files-submit-dir"), null,
+        (short) 3, fileSCUploadPolicies, statCache);
+    uploader.uploadArchives(job, archives, new Path("/archives-submit-dir"),
+        null, (short) 3, fileSCUploadPolicies, statCache);
+    uploader.uploadLibJars(job, libjars, new Path("/libjars-submit-dir"), null,
+        (short) 3, fileSCUploadPolicies, statCache);
+    uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3,
+        statCache);
   }
 
-  private void validateResourcePaths(JobConf jConf, String[] expectedFiles,
-      String[] expectedArchives, String expectedJobJar, Job preJob)
+  private void validateResourcePaths(Job job, String[] expectedFiles,
+      String[] expectedArchives, String expectedJobJar)
       throws IOException {
-    Job j = Job.getInstance(jConf);
-    validateResourcePathsSub(j.getCacheFiles(), expectedFiles);
-    validateResourcePathsSub(j.getCacheArchives(), expectedArchives);
+    validateResourcePathsSub(job.getCacheFiles(), expectedFiles);
+    validateResourcePathsSub(job.getCacheArchives(), expectedArchives);
     // We use a different job object here because the jobjar was set on a
     // different job object
     Assert.assertEquals("Job jar path is different than expected!",
-        expectedJobJar, preJob.getJar());
+        expectedJobJar, job.getJar());
   }
 
   private void validateResourcePathsSub(URI[] actualURIs,
@@ -603,7 +598,7 @@ public class TestJobResourceUploader {
       }
     }
 
-    private void setupJobConf(JobConf conf) {
+    private Job setupJobConf(JobConf conf) throws IOException {
       conf.set("tmpfiles",
           buildPathString("tmpFiles", this.numOfTmpFiles, ".txt"));
       conf.set("tmpjars",
@@ -633,6 +628,7 @@ public class TestJobResourceUploader {
       conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB);
       conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
           this.maxSingleResourceMB);
+      return new Job(conf);
     }
 
     // We always want absolute paths with a scheme in the DistributedCache, so

+ 365 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java

@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.client.api.SharedCacheClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests the JobResourceUploader class with the shared cache.
+ */
+public class TestJobResourceUploaderWithSharedCache {
+  protected static final Log LOG = LogFactory
+      .getLog(TestJobResourceUploaderWithSharedCache.class);
+  private static MiniDFSCluster dfs;
+  private static FileSystem localFs;
+  private static FileSystem remoteFs;
+  private static Configuration conf = new Configuration();
+  private static Path testRootDir;
+  private static Path remoteStagingDir =
+      new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR);
+  private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+
+  @Before
+  public void cleanup() throws Exception {
+    remoteFs.delete(remoteStagingDir, true);
+  }
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    // create configuration, dfs, file system
+    localFs = FileSystem.getLocal(conf);
+    testRootDir =
+        new Path("target",
+            TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir")
+            .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    remoteFs = dfs.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    try {
+      if (localFs != null) {
+        localFs.close();
+      }
+      if (remoteFs != null) {
+        remoteFs.close();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    } catch (IOException ioe) {
+      LOG.info("IO exception in closing file system");
+      ioe.printStackTrace();
+    }
+  }
+
+  private class MyFileUploader extends JobResourceUploader {
+    // The mocked SharedCacheClient that will be fed into the FileUploader
+    private SharedCacheClient mockscClient = mock(SharedCacheClient.class);
+    // A real client for checksum calculation
+    private SharedCacheClient scClient = SharedCacheClient
+        .createSharedCacheClient();
+
+    MyFileUploader(FileSystem submitFs, Configuration conf)
+        throws IOException {
+      super(submitFs, false);
+      // Initialize the real client, but don't start it. We don't need or want
+      // to create an actual proxy because we only use this for mocking out the
+      // getFileChecksum method.
+      scClient.init(conf);
+      when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer(
+          new Answer<String>() {
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable {
+              Path file = (Path) invocation.getArguments()[0];
+              // Use the real scClient to generate the checksum. We use an
+              // answer/mock combination to avoid having to spy on a real
+              // SharedCacheClient object.
+              return scClient.getFileChecksum(file);
+            }
+          });
+    }
+
+    // This method is to prime the mock client with the correct checksum, so it
+    // looks like a given resource is present in the shared cache.
+    public void mockFileInSharedCache(Path localFile, URL remoteFile)
+        throws YarnException, IOException {
+      // when the resource is referenced, simply return the remote path to the
+      // caller
+      when(mockscClient.use(any(ApplicationId.class),
+          eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile);
+    }
+
+    @Override
+    protected SharedCacheClient createSharedCacheClient(Configuration c) {
+      // Feed the mocked SharedCacheClient into the FileUploader logic
+      return mockscClient;
+    }
+  }
+
+  @Test
+  public void testSharedCacheDisabled() throws Exception {
+    JobConf jobConf = createJobConf();
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is disabled by default
+    uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false);
+
+  }
+
+  @Test
+  public void testSharedCacheEnabled() throws Exception {
+    JobConf jobConf = createJobConf();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is enabled for every file type
+    // the # of times SharedCacheClient.use is called should ==
+    // total # of files/libjars/archive/jobjar
+    uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false);
+  }
+
+  @Test
+  public void testSharedCacheEnabledWithJobJarInSharedCache()
+      throws Exception {
+    JobConf jobConf = createJobConf();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is enabled for every file type
+    // the # of times SharedCacheClient.use is called should ==
+    // total # of files/libjars/archive/jobjar
+    uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true);
+  }
+
+  @Test
+  public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception {
+    JobConf jobConf = createJobConf();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars");
+    Job job = new Job(jobConf);
+    job.setJobID(new JobID("567789", 1));
+
+    // shared cache is enabled for archives and libjars type
+    // the # of times SharedCacheClient.use is called should ==
+    // total # of libjars and archives
+    uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true);
+  }
+
+  private JobConf createJobConf() {
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+
+    jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri()
+        .toString());
+    return jobConf;
+  }
+
+  private Path copyToRemote(Path jar) throws IOException {
+    Path remoteFile = new Path("/tmp", jar.getName());
+    remoteFs.copyFromLocalFile(jar, remoteFile);
+    return remoteFile;
+  }
+
+  private void makeJarAvailableInSharedCache(Path jar,
+      MyFileUploader fileUploader) throws YarnException, IOException {
+    // copy file to remote file system
+    Path remoteFile = copyToRemote(jar);
+    // prime mocking so that it looks like this file is in the shared cache
+    fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile));
+  }
+
+  private void uploadFilesToRemoteFS(Job job, JobConf jobConf,
+      int useCallCountExpected,
+      int numOfFilesShouldBeUploadedToSharedCacheExpected,
+      int numOfArchivesShouldBeUploadedToSharedCacheExpected,
+      boolean jobJarInSharedCacheBeforeUpload) throws Exception {
+    MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf);
+    SharedCacheConfig sharedCacheConfig = new SharedCacheConfig();
+    sharedCacheConfig.init(jobConf);
+
+    Path firstFile = createTempFile("first-input-file", "x");
+    Path secondFile = createTempFile("second-input-file", "xx");
+
+    // Add files to job conf via distributed cache API as well as command line
+    boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf);
+    assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded);
+    if (!fileAdded) {
+      Path remoteFile = copyToRemote(firstFile);
+      job.addCacheFile(remoteFile.toUri());
+    }
+    jobConf.set("tmpfiles", secondFile.toString());
+
+    // Create jars with a single file inside them.
+    Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1);
+    Path secondJar =
+        makeJar(new Path(testRootDir, "distributed.second.jar"), 2);
+
+    // Verify duplicated contents can be handled properly.
+    Path thirdJar = new Path(testRootDir, "distributed.third.jar");
+    localFs.copyFromLocalFile(secondJar, thirdJar);
+
+    // make secondJar cache available
+    makeJarAvailableInSharedCache(secondJar, fileUploader);
+
+    // Add libjars to job conf via distributed cache API as well as command
+    // line
+    boolean libjarAdded =
+        Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf);
+    assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded);
+    if (!libjarAdded) {
+      Path remoteJar = copyToRemote(firstJar);
+      job.addFileToClassPath(remoteJar);
+    }
+
+    jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString());
+
+    Path firstArchive = makeArchive("first-archive.zip", "first-file");
+    Path secondArchive = makeArchive("second-archive.zip", "second-file");
+
+    // Add archives to job conf via distributed cache API as well as command
+    // line
+    boolean archiveAdded =
+        Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf);
+    assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(),
+        archiveAdded);
+    if (!archiveAdded) {
+      Path remoteArchive = copyToRemote(firstArchive);
+      job.addCacheArchive(remoteArchive.toUri());
+    }
+
+    jobConf.set("tmparchives", secondArchive.toString());
+
+    // Add job jar to job conf
+    Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4);
+    if (jobJarInSharedCacheBeforeUpload) {
+      makeJarAvailableInSharedCache(jobJar, fileUploader);
+    }
+    jobConf.setJar(jobJar.toString());
+
+    fileUploader.uploadResources(job, remoteStagingDir);
+
+    verify(fileUploader.mockscClient, times(useCallCountExpected)).use(
+        any(ApplicationId.class), anyString());
+
+    int numOfFilesShouldBeUploadedToSharedCache = 0;
+    Map<String, Boolean> filesSharedCacheUploadPolicies =
+        Job.getFileSharedCacheUploadPolicies(jobConf);
+    for (Boolean policy : filesSharedCacheUploadPolicies.values()) {
+      if (policy) {
+        numOfFilesShouldBeUploadedToSharedCache++;
+      }
+    }
+    assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected,
+        numOfFilesShouldBeUploadedToSharedCache);
+
+    int numOfArchivesShouldBeUploadedToSharedCache = 0;
+    Map<String, Boolean> archivesSharedCacheUploadPolicies =
+        Job.getArchiveSharedCacheUploadPolicies(jobConf);
+    for (Boolean policy : archivesSharedCacheUploadPolicies.values()) {
+      if (policy) {
+        numOfArchivesShouldBeUploadedToSharedCache++;
+      }
+    }
+    assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected,
+        numOfArchivesShouldBeUploadedToSharedCache);
+  }
+
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(testRootDir, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    localFs.setPermission(path, new FsPermission("700"));
+    return path;
+  }
+
+  private Path makeJar(Path p, int index) throws FileNotFoundException,
+      IOException {
+    FileOutputStream fos =
+        new FileOutputStream(new File(p.toUri().getPath()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
+    jos.putNextEntry(ze);
+    jos.write(("inside the jar!" + index).getBytes());
+    jos.closeEntry();
+    jos.close();
+    localFs.setPermission(p, new FsPermission("700"));
+    return p;
+  }
+
+  private Path makeArchive(String archiveFile, String filename)
+      throws Exception {
+    Path archive = new Path(testRootDir, archiveFile);
+    Path file = new Path(testRootDir, filename);
+    DataOutputStream out = localFs.create(archive);
+    ZipOutputStream zos = new ZipOutputStream(out);
+    ZipEntry ze = new ZipEntry(file.toString());
+    zos.putNextEntry(ze);
+    zos.write(input.getBytes("UTF-8"));
+    zos.closeEntry();
+    zos.close();
+    return archive;
+  }
+}

+ 46 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -338,16 +340,41 @@ public class YARNRunner implements ClientProtocol {
     }
   }
 
-  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
-      throws IOException {
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      LocalResourceType type) throws IOException {
+    return createApplicationResource(fs, p, null, type,
+        LocalResourceVisibility.APPLICATION, false);
+  }
+
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      String fileSymlink, LocalResourceType type, LocalResourceVisibility viz,
+      Boolean uploadToSharedCache) throws IOException {
     LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
     FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(URL.fromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    // We need to be careful when converting from path to URL to add a fragment
+    // so that the symlink name when localized will be correct.
+    Path qualifiedPath =
+        fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath());
+    URI uriWithFragment = null;
+    boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
+    try {
+      if (useFragment) {
+        uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
+      } else {
+        uriWithFragment = qualifiedPath.toUri();
+      }
+    } catch (URISyntaxException e) {
+      throw new IOException(
+          "Error parsing local resource path."
+              + " Path was not able to be converted to a URI: " + qualifiedPath,
+          e);
+    }
+    rsrc.setResource(URL.fromURI(uriWithFragment));
     rsrc.setSize(rsrcStat.getLen());
     rsrc.setTimestamp(rsrcStat.getModificationTime());
     rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc.setVisibility(viz);
+    rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache);
     return rsrc;
   }
 
@@ -368,10 +395,21 @@ public class YARNRunner implements ClientProtocol {
             jobConfPath, LocalResourceType.FILE));
     if (jobConf.get(MRJobConfig.JAR) != null) {
       Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      // We hard code the job.jar symlink because mapreduce code expects the
+      // job.jar to be named that way.
+      FileContext fccc =
+          FileContext.getFileContext(jobJarPath.toUri(), jobConf);
+      LocalResourceVisibility jobJarViz =
+          jobConf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
+              MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.APPLICATION;
       LocalResource rc = createApplicationResource(
-          FileContext.getFileContext(jobJarPath.toUri(), jobConf),
-          jobJarPath,
-          LocalResourceType.PATTERN);
+          FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath,
+          MRJobConfig.JOB_JAR, LocalResourceType.PATTERN, jobJarViz,
+          jobConf.getBoolean(
+                  MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
+                  MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT));
       String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
       rc.setPattern(pattern);

+ 52 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java

@@ -132,6 +132,58 @@ public class TestLocalJobSubmission {
     }
   }
 
+  /**
+   * Test local job submission with a file option.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobFilesOption() throws IOException {
+    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    final String[] args =
+        {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
+            "-mt", "1", "-rt", "1"};
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
+
+  /**
+   * Test local job submission with an archive option.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobArchivesOption() throws IOException {
+    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    final String[] args =
+        {"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
+            "1", "-mt", "1", "-rt", "1"};
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
+
   private Path makeJar(Path p) throws IOException {
     FileOutputStream fos = new FileOutputStream(new File(p.toString()));
     JarOutputStream jos = new JarOutputStream(fos);

+ 59 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -1298,6 +1298,65 @@ public class TestMRJobs {
     jarFile.delete();
   }
 
+  @Test
+  public void testSharedCache() throws Exception {
+    Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    Job job = Job.getInstance(mrCluster.getConfig());
+
+    Configuration jobConf = job.getConfiguration();
+    jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
+
+    Path inputFile = createTempFile("input-file", "x");
+
+    // Create jars with a single file inside them.
+    Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
+    Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
+    Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
+
+    // Add libjars to job conf
+    jobConf.set("tmpjars", second.toString() + "," + third.toString() + ","
+        + fourth.toString());
+
+    // Because the job jar is a "dummy" jar, we need to include the jar with
+    // DistributedCacheChecker or it won't be able to find it
+    Path distributedCacheCheckerJar =
+        new Path(JarFinder.getJar(SharedCacheChecker.class));
+    job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
+        localFs.getUri(), distributedCacheCheckerJar.getParent()));
+
+    job.setMapperClass(SharedCacheChecker.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    FileInputFormat.setInputPaths(job, inputFile);
+
+    job.setMaxMapAttempts(1); // speed up failures
+
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    Assert.assertTrue(job.waitForCompletion(true));
+    Assert.assertTrue("Tracking URL was " + trackingUrl
+        + " but didn't Match Job ID " + jobId,
+        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+  }
+
+  /**
+   * An identity mapper for testing the shared cache.
+   */
+  public static class SharedCacheChecker extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+    @Override
+    public void setup(Context context) throws IOException {
+    }
+  }
+
   public static class ConfVerificationMapper extends SleepMapper {
     @Override
     protected void setup(Context context)

+ 1 - 0
hadoop-project/src/site/site.xml

@@ -109,6 +109,7 @@
       <item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
       <item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
       <item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/>
+      <item name="Support for YARN Shared Cache" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html"/>
     </menu>
 
     <menu name="MapReduce REST APIs" inherit="top">