Browse Source

MAPREDUCE-6846. Fragments specified for libjar paths are not handled correctly
(Contributed by Chris Trezzo via Daniel Templeton)

(cherry picked from commit a92bc8a6c1c50d34d8229a67dc9d197823001551)

Daniel Templeton 8 years ago
parent
commit
2f32accaaf

+ 79 - 24
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java

@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -91,7 +92,7 @@ class JobResourceUploader {
     submitJobDir = new Path(submitJobDir.toUri().getPath());
     submitJobDir = new Path(submitJobDir.toUri().getPath());
     FsPermission mapredSysPerms =
     FsPermission mapredSysPerms =
         new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
         new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
-    FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
+    mkdirs(jtFs, submitJobDir, mapredSysPerms);
 
 
     Collection<String> files = conf.getStringCollection("tmpfiles");
     Collection<String> files = conf.getStringCollection("tmpfiles");
     Collection<String> libjars = conf.getStringCollection("tmpjars");
     Collection<String> libjars = conf.getStringCollection("tmpjars");
@@ -116,18 +117,20 @@ class JobResourceUploader {
         job.getCredentials());
         job.getCredentials());
   }
   }
 
 
-  private void uploadFiles(Configuration conf, Collection<String> files,
+  @VisibleForTesting
+  void uploadFiles(Configuration conf, Collection<String> files,
       Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
       Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
       throws IOException {
       throws IOException {
     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
     if (!files.isEmpty()) {
     if (!files.isEmpty()) {
-      FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
+      mkdirs(jtFs, filesDir, mapredSysPerms);
       for (String tmpFile : files) {
       for (String tmpFile : files) {
         URI tmpURI = null;
         URI tmpURI = null;
         try {
         try {
           tmpURI = new URI(tmpFile);
           tmpURI = new URI(tmpFile);
         } catch (URISyntaxException e) {
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException(e);
+          throw new IllegalArgumentException("Error parsing files argument."
+              + " Argument must be a valid URI: " + tmpFile, e);
         }
         }
         Path tmp = new Path(tmpURI);
         Path tmp = new Path(tmpURI);
         Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
         Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
@@ -136,50 +139,83 @@ class JobResourceUploader {
           DistributedCache.addCacheFile(pathURI, conf);
           DistributedCache.addCacheFile(pathURI, conf);
         } catch (URISyntaxException ue) {
         } catch (URISyntaxException ue) {
           // should not throw a uri exception
           // should not throw a uri exception
-          throw new IOException("Failed to create uri for " + tmpFile, ue);
+          throw new IOException(
+              "Failed to create a URI (URISyntaxException) for the remote path "
+                  + newPath + ". This was based on the files parameter: "
+                  + tmpFile,
+              ue);
         }
         }
       }
       }
     }
     }
   }
   }
 
 
-  private void uploadLibJars(Configuration conf, Collection<String> libjars,
+  // Suppress warning for use of DistributedCache (it is everywhere).
+  @SuppressWarnings("deprecation")
+  @VisibleForTesting
+  void uploadLibJars(Configuration conf, Collection<String> libjars,
       Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
       Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
       throws IOException {
       throws IOException {
     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     if (!libjars.isEmpty()) {
     if (!libjars.isEmpty()) {
-      FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
+      mkdirs(jtFs, libjarsDir, mapredSysPerms);
+      Collection<URI> libjarURIs = new LinkedList<>();
+      boolean foundFragment = false;
       for (String tmpjars : libjars) {
       for (String tmpjars : libjars) {
-        Path tmp = new Path(tmpjars);
+        URI tmpURI = null;
+        try {
+          tmpURI = new URI(tmpjars);
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException("Error parsing libjars argument."
+              + " Argument must be a valid URI: " + tmpjars, e);
+        }
+        Path tmp = new Path(tmpURI);
         Path newPath =
         Path newPath =
             copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
             copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
-
-        // Add each file to the classpath
-        DistributedCache.addFileToClassPath(
-            new Path(newPath.toUri().getPath()), conf, jtFs, !useWildcard);
+        try {
+          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
+          if (!foundFragment) {
+            foundFragment = pathURI.getFragment() != null;
+          }
+          DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf,
+              jtFs, false);
+          libjarURIs.add(pathURI);
+        } catch (URISyntaxException ue) {
+          // should not throw a uri exception
+          throw new IOException(
+              "Failed to create a URI (URISyntaxException) for the remote path "
+                  + newPath + ". This was based on the libjar parameter: "
+                  + tmpjars,
+              ue);
+        }
       }
       }
 
 
-      if (useWildcard) {
-        // Add the whole directory to the cache
+      if (useWildcard && !foundFragment) {
+        // Add the whole directory to the cache using a wild card
         Path libJarsDirWildcard =
         Path libJarsDirWildcard =
             jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
             jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
-
         DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
         DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
+      } else {
+        for (URI uri : libjarURIs) {
+          DistributedCache.addCacheFile(uri, conf);
+        }
       }
       }
     }
     }
   }
   }
 
 
-  private void uploadArchives(Configuration conf, Collection<String> archives,
+  @VisibleForTesting
+  void uploadArchives(Configuration conf, Collection<String> archives,
       Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
       Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
       throws IOException {
       throws IOException {
     Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
     Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
     if (!archives.isEmpty()) {
     if (!archives.isEmpty()) {
-      FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
+      mkdirs(jtFs, archivesDir, mapredSysPerms);
       for (String tmpArchives : archives) {
       for (String tmpArchives : archives) {
         URI tmpURI;
         URI tmpURI;
         try {
         try {
           tmpURI = new URI(tmpArchives);
           tmpURI = new URI(tmpArchives);
         } catch (URISyntaxException e) {
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException(e);
+          throw new IllegalArgumentException("Error parsing archives argument."
+              + " Argument must be a valid URI: " + tmpArchives, e);
         }
         }
         Path tmp = new Path(tmpURI);
         Path tmp = new Path(tmpURI);
         Path newPath =
         Path newPath =
@@ -189,13 +225,18 @@ class JobResourceUploader {
           DistributedCache.addCacheArchive(pathURI, conf);
           DistributedCache.addCacheArchive(pathURI, conf);
         } catch (URISyntaxException ue) {
         } catch (URISyntaxException ue) {
           // should not throw an uri excpetion
           // should not throw an uri excpetion
-          throw new IOException("Failed to create uri for " + tmpArchives, ue);
+          throw new IOException(
+              "Failed to create a URI (URISyntaxException) for the remote path"
+                  + newPath + ". This was based on the archive parameter: "
+                  + tmpArchives,
+              ue);
         }
         }
       }
       }
     }
     }
   }
   }
 
 
-  private void uploadJobJar(Job job, String jobJar, Path submitJobDir,
+  @VisibleForTesting
+  void uploadJobJar(Job job, String jobJar, Path submitJobDir,
       short submitReplication) throws IOException {
       short submitReplication) throws IOException {
     if (jobJar != null) { // copy jar to JobTracker's fs
     if (jobJar != null) { // copy jar to JobTracker's fs
       // use jar name if job is not named.
       // use jar name if job is not named.
@@ -273,7 +314,8 @@ class JobResourceUploader {
       URI uri = new URI(s);
       URI uri = new URI(s);
       return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
       return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
     } catch (URISyntaxException e) {
     } catch (URISyntaxException e) {
-      throw new IllegalArgumentException(e);
+      throw new IllegalArgumentException(
+          "Error parsing argument." + " Argument must be a valid URI: " + s, e);
     }
     }
   }
   }
 
 
@@ -380,9 +422,20 @@ class JobResourceUploader {
     return status;
     return status;
   }
   }
 
 
+  /**
+   * Create a new directory in the passed filesystem. This wrapper method exists
+   * so that it can be overridden/stubbed during testing.
+   */
+  @VisibleForTesting
+  boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
+      throws IOException {
+    return FileSystem.mkdirs(fs, dir, permission);
+  }
+
   // copies a file to the jobtracker filesystem and returns the path where it
   // copies a file to the jobtracker filesystem and returns the path where it
   // was copied to
   // was copied to
-  private Path copyRemoteFiles(Path parentDir, Path originalPath,
+  @VisibleForTesting
+  Path copyRemoteFiles(Path parentDir, Path originalPath,
       Configuration conf, short replication) throws IOException {
       Configuration conf, short replication) throws IOException {
     // check if we do not need to copy the files
     // check if we do not need to copy the files
     // is jt using the same file system.
     // is jt using the same file system.
@@ -400,10 +453,12 @@ class JobResourceUploader {
     Path newPath = new Path(parentDir, originalPath.getName());
     Path newPath = new Path(parentDir, originalPath.getName());
     FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
     FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
     jtFs.setReplication(newPath, replication);
     jtFs.setReplication(newPath, replication);
+    jtFs.makeQualified(newPath);
     return newPath;
     return newPath;
   }
   }
 
 
-  private void copyJar(Path originalJarPath, Path submitJarFile,
+  @VisibleForTesting
+  void copyJar(Path originalJarPath, Path submitJarFile,
       short replication) throws IOException {
       short replication) throws IOException {
     jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
     jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
     jtFs.setReplication(submitJarFile, replication);
     jtFs.setReplication(submitJarFile, replication);
@@ -427,7 +482,7 @@ class JobResourceUploader {
     URI pathURI = destPath.toUri();
     URI pathURI = destPath.toUri();
     if (pathURI.getFragment() == null) {
     if (pathURI.getFragment() == null) {
       if (fragment == null) {
       if (fragment == null) {
-        pathURI = new URI(pathURI.toString() + "#" + destPath.getName());
+        // no fragment, just return existing pathURI from destPath
       } else {
       } else {
         pathURI = new URI(pathURI.toString() + "#" + fragment);
         pathURI = new URI(pathURI.toString() + "#" + fragment);
       }
       }

+ 413 - 68
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java

@@ -23,13 +23,19 @@ import static org.mockito.Mockito.when;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
@@ -69,13 +75,13 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testAllDefaults() throws IOException {
   public void testAllDefaults() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     runLimitsTest(b.build(), true, null);
     runLimitsTest(b.build(), true, null);
   }
   }
 
 
   @Test
   @Test
   public void testNoLimitsWithResources() throws IOException {
   public void testNoLimitsWithResources() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(1);
     b.setNumOfDCFiles(1);
     b.setNumOfTmpArchives(10);
     b.setNumOfTmpArchives(10);
@@ -88,7 +94,7 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testAtResourceLimit() throws IOException {
   public void testAtResourceLimit() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(1);
     b.setNumOfDCFiles(1);
     b.setNumOfTmpArchives(1);
     b.setNumOfTmpArchives(1);
@@ -101,7 +107,7 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testOverResourceLimit() throws IOException {
   public void testOverResourceLimit() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(1);
     b.setNumOfDCFiles(1);
     b.setNumOfTmpArchives(1);
     b.setNumOfTmpArchives(1);
@@ -114,7 +120,7 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testAtResourcesMBLimit() throws IOException {
   public void testAtResourcesMBLimit() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(1);
     b.setNumOfDCFiles(1);
     b.setNumOfTmpArchives(1);
     b.setNumOfTmpArchives(1);
@@ -128,7 +134,7 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testOverResourcesMBLimit() throws IOException {
   public void testOverResourcesMBLimit() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(2);
     b.setNumOfDCFiles(2);
     b.setNumOfTmpArchives(1);
     b.setNumOfTmpArchives(1);
@@ -142,7 +148,7 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testAtSingleResourceMBLimit() throws IOException {
   public void testAtSingleResourceMBLimit() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(2);
     b.setNumOfDCFiles(2);
     b.setNumOfTmpArchives(1);
     b.setNumOfTmpArchives(1);
@@ -156,7 +162,7 @@ public class TestJobResourceUploader {
 
 
   @Test
   @Test
   public void testOverSingleResourceMBLimit() throws IOException {
   public void testOverSingleResourceMBLimit() throws IOException {
-    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    ResourceConf.Builder b = new ResourceConf.Builder();
     b.setNumOfDCArchives(1);
     b.setNumOfDCArchives(1);
     b.setNumOfDCFiles(2);
     b.setNumOfDCFiles(2);
     b.setNumOfTmpArchives(1);
     b.setNumOfTmpArchives(1);
@@ -168,20 +174,263 @@ public class TestJobResourceUploader {
     runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE);
     runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE);
   }
   }
 
 
+  private String destinationPathPrefix = "hdfs:///destinationPath/";
+  private String[] expectedFilesNoFrags =
+      { destinationPathPrefix + "tmpFiles0.txt",
+          destinationPathPrefix + "tmpFiles1.txt",
+          destinationPathPrefix + "tmpFiles2.txt",
+          destinationPathPrefix + "tmpFiles3.txt",
+          destinationPathPrefix + "tmpFiles4.txt",
+          destinationPathPrefix + "tmpjars0.jar",
+          destinationPathPrefix + "tmpjars1.jar" };
+
+  private String[] expectedFilesWithFrags =
+      { destinationPathPrefix + "tmpFiles0.txt#tmpFilesfragment0.txt",
+          destinationPathPrefix + "tmpFiles1.txt#tmpFilesfragment1.txt",
+          destinationPathPrefix + "tmpFiles2.txt#tmpFilesfragment2.txt",
+          destinationPathPrefix + "tmpFiles3.txt#tmpFilesfragment3.txt",
+          destinationPathPrefix + "tmpFiles4.txt#tmpFilesfragment4.txt",
+          destinationPathPrefix + "tmpjars0.jar#tmpjarsfragment0.jar",
+          destinationPathPrefix + "tmpjars1.jar#tmpjarsfragment1.jar" };
+
+  // We use the local fs for the submitFS in the StubedUploader, so libjars
+  // should be replaced with a single path.
+  private String[] expectedFilesWithWildcard =
+      { destinationPathPrefix + "tmpFiles0.txt",
+          destinationPathPrefix + "tmpFiles1.txt",
+          destinationPathPrefix + "tmpFiles2.txt",
+          destinationPathPrefix + "tmpFiles3.txt",
+          destinationPathPrefix + "tmpFiles4.txt",
+          "file:///libjars-submit-dir/libjars/*" };
+
+  private String[] expectedArchivesNoFrags =
+      { destinationPathPrefix + "tmpArchives0.tgz",
+          destinationPathPrefix + "tmpArchives1.tgz" };
+
+  private String[] expectedArchivesWithFrags =
+      { destinationPathPrefix + "tmpArchives0.tgz#tmpArchivesfragment0.tgz",
+          destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" };
+
+  private String jobjarSubmitDir = "/jobjar-submit-dir";
+  private String expectedJobJar = jobjarSubmitDir + "/job.jar";
+
+  @Test
+  public void testPathsWithNoFragNoSchemeRelative() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithScheme(false);
+    b.setPathsWithFrags(false);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
+        expectedArchivesNoFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithNoFragNoSchemeAbsolute() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(false);
+    b.setPathsWithScheme(false);
+    b.setAbsolutePaths(true);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
+        expectedArchivesNoFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithFragNoSchemeAbsolute() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(true);
+    b.setPathsWithScheme(false);
+    b.setAbsolutePaths(true);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
+        expectedArchivesWithFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithFragNoSchemeRelative() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(true);
+    b.setAbsolutePaths(false);
+    b.setPathsWithScheme(false);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
+        expectedArchivesWithFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithFragSchemeAbsolute() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(true);
+    b.setAbsolutePaths(true);
+    b.setPathsWithScheme(true);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
+        expectedArchivesWithFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithNoFragWithSchemeAbsolute() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(false);
+    b.setPathsWithScheme(true);
+    b.setAbsolutePaths(true);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
+        expectedArchivesNoFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithNoFragAndWildCard() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(4);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(false);
+    b.setPathsWithScheme(true);
+    b.setAbsolutePaths(true);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf, true);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard,
+        expectedArchivesNoFrags, expectedJobJar);
+  }
+
+  @Test
+  public void testPathsWithFragsAndWildCard() throws IOException {
+    ResourceConf.Builder b = new ResourceConf.Builder();
+    b.setNumOfTmpFiles(5);
+    b.setNumOfTmpLibJars(2);
+    b.setNumOfTmpArchives(2);
+    b.setJobJar(true);
+    b.setPathsWithFrags(true);
+    b.setPathsWithScheme(true);
+    b.setAbsolutePaths(true);
+    ResourceConf rConf = b.build();
+    JobConf jConf = new JobConf();
+    JobResourceUploader uploader = new StubedUploader(jConf, true);
+
+    runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
+        expectedArchivesWithFrags, expectedJobJar);
+  }
+
+  private void runTmpResourcePathTest(JobResourceUploader uploader,
+      ResourceConf rConf, JobConf jConf, String[] expectedFiles,
+      String[] expectedArchives, String expectedJobJar) throws IOException {
+    rConf.setupJobConf(jConf);
+    // We use a pre and post job object here because we need the post job object
+    // to get the new values set during uploadResources, but we need the pre job
+    // to set the job jar because JobResourceUploader#uploadJobJar uses the Job
+    // interface not the JobConf. The post job is automatically created in
+    // validateResourcePaths.
+    Job jobPre = Job.getInstance(jConf);
+    uploadResources(uploader, jConf, jobPre);
+
+    validateResourcePaths(jConf, expectedFiles, expectedArchives,
+        expectedJobJar, jobPre);
+  }
+
+  private void uploadResources(JobResourceUploader uploader, JobConf jConf,
+      Job job) throws IOException {
+    Collection<String> files = jConf.getStringCollection("tmpfiles");
+    Collection<String> libjars = jConf.getStringCollection("tmpjars");
+    Collection<String> archives = jConf.getStringCollection("tmparchives");
+    String jobJar = jConf.getJar();
+    uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null,
+        (short) 3);
+    uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"),
+        null, (short) 3);
+    uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"),
+        null, (short) 3);
+    uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3);
+  }
+
+  private void validateResourcePaths(JobConf jConf, String[] expectedFiles,
+      String[] expectedArchives, String expectedJobJar, Job preJob)
+      throws IOException {
+    Job j = Job.getInstance(jConf);
+    validateResourcePathsSub(j.getCacheFiles(), expectedFiles);
+    validateResourcePathsSub(j.getCacheArchives(), expectedArchives);
+    // We use a different job object here because the jobjar was set on a
+    // different job object
+    Assert.assertEquals("Job jar path is different than expected!",
+        expectedJobJar, preJob.getJar());
+  }
+
+  private void validateResourcePathsSub(URI[] actualURIs,
+      String[] expectedURIs) {
+    List<URI> actualList = Arrays.asList(actualURIs);
+    Set<String> expectedSet = new HashSet<>(Arrays.asList(expectedURIs));
+    if (actualList.size() != expectedSet.size()) {
+      Assert.fail("Expected list of resources (" + expectedSet.size()
+          + ") and actual list of resources (" + actualList.size()
+          + ") are different lengths!");
+    }
+
+    for (URI u : actualList) {
+      if (!expectedSet.contains(u.toString())) {
+        Assert.fail("Resource list contained unexpected path: " + u.toString());
+      }
+    }
+  }
+
   private enum ResourceViolation {
   private enum ResourceViolation {
     NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
     NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
   }
   }
 
 
-  private void runLimitsTest(ResourceLimitsConf rlConf,
-      boolean checkShouldSucceed, ResourceViolation violation)
-      throws IOException {
+  private void runLimitsTest(ResourceConf rlConf, boolean checkShouldSucceed,
+      ResourceViolation violation) throws IOException {
 
 
     if (!checkShouldSucceed && violation == null) {
     if (!checkShouldSucceed && violation == null) {
       Assert.fail("Test is misconfigured. checkShouldSucceed is set to false"
       Assert.fail("Test is misconfigured. checkShouldSucceed is set to false"
           + " and a ResourceViolation is not specified.");
           + " and a ResourceViolation is not specified.");
     }
     }
 
 
-    JobConf conf = setupJobConf(rlConf);
+    JobConf conf = new JobConf();
+    rlConf.setupJobConf(conf);
     JobResourceUploader uploader = new StubedUploader(conf);
     JobResourceUploader uploader = new StubedUploader(conf);
     long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024;
     long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024;
     when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes);
     when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes);
@@ -230,43 +479,7 @@ public class TestJobResourceUploader {
 
 
   private final FileStatus mockedStatus = mock(FileStatus.class);
   private final FileStatus mockedStatus = mock(FileStatus.class);
 
 
-  private JobConf setupJobConf(ResourceLimitsConf rlConf) {
-    JobConf conf = new JobConf();
-    conf.setInt(MRJobConfig.MAX_RESOURCES, rlConf.maxResources);
-    conf.setLong(MRJobConfig.MAX_RESOURCES_MB, rlConf.maxResourcesMB);
-    conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
-        rlConf.maxSingleResourceMB);
-
-    conf.set("tmpfiles",
-        buildPathString("file:///tmpFiles", rlConf.numOfTmpFiles));
-    conf.set("tmpjars",
-        buildPathString("file:///tmpjars", rlConf.numOfTmpLibJars));
-    conf.set("tmparchives",
-        buildPathString("file:///tmpArchives", rlConf.numOfTmpArchives));
-    conf.set(MRJobConfig.CACHE_ARCHIVES,
-        buildPathString("file:///cacheArchives", rlConf.numOfDCArchives));
-    conf.set(MRJobConfig.CACHE_FILES,
-        buildPathString("file:///cacheFiles", rlConf.numOfDCFiles));
-    if (rlConf.jobJar) {
-      conf.setJar("file:///jobjar.jar");
-    }
-    return conf;
-  }
-
-  private String buildPathString(String pathPrefix, int numOfPaths) {
-    if (numOfPaths < 1) {
-      return "";
-    } else {
-      StringBuilder b = new StringBuilder();
-      b.append(pathPrefix + 0);
-      for (int i = 1; i < numOfPaths; i++) {
-        b.append("," + pathPrefix + i);
-      }
-      return b.toString();
-    }
-  }
-
-  final static class ResourceLimitsConf {
+  private static class ResourceConf {
     private final int maxResources;
     private final int maxResources;
     private final long maxResourcesMB;
     private final long maxResourcesMB;
     private final long maxSingleResourceMB;
     private final long maxSingleResourceMB;
@@ -277,14 +490,15 @@ public class TestJobResourceUploader {
     private final int numOfDCFiles;
     private final int numOfDCFiles;
     private final int numOfDCArchives;
     private final int numOfDCArchives;
     private final long sizeOfResource;
     private final long sizeOfResource;
+    private final boolean pathsWithFrags;
+    private final boolean pathsWithScheme;
+    private final boolean absolutePaths;
 
 
-    static final ResourceLimitsConf DEFAULT = new ResourceLimitsConf();
-
-    private ResourceLimitsConf() {
+    private ResourceConf() {
       this(new Builder());
       this(new Builder());
     }
     }
 
 
-    private ResourceLimitsConf(Builder builder) {
+    private ResourceConf(Builder builder) {
       this.maxResources = builder.maxResources;
       this.maxResources = builder.maxResources;
       this.maxResourcesMB = builder.maxResourcesMB;
       this.maxResourcesMB = builder.maxResourcesMB;
       this.maxSingleResourceMB = builder.maxSingleResourceMB;
       this.maxSingleResourceMB = builder.maxSingleResourceMB;
@@ -295,6 +509,9 @@ public class TestJobResourceUploader {
       this.numOfDCFiles = builder.numOfDCFiles;
       this.numOfDCFiles = builder.numOfDCFiles;
       this.numOfDCArchives = builder.numOfDCArchives;
       this.numOfDCArchives = builder.numOfDCArchives;
       this.sizeOfResource = builder.sizeOfResource;
       this.sizeOfResource = builder.sizeOfResource;
+      this.pathsWithFrags = builder.pathsWithFrags;
+      this.pathsWithScheme = builder.pathsWithScheme;
+      this.absolutePaths = builder.absolutePaths;
     }
     }
 
 
     static class Builder {
     static class Builder {
@@ -309,69 +526,176 @@ public class TestJobResourceUploader {
       private int numOfDCFiles = 0;
       private int numOfDCFiles = 0;
       private int numOfDCArchives = 0;
       private int numOfDCArchives = 0;
       private long sizeOfResource = 0;
       private long sizeOfResource = 0;
+      private boolean pathsWithFrags = false;
+      private boolean pathsWithScheme = false;
+      private boolean absolutePaths = true;
 
 
-      Builder() {
+      private Builder() {
       }
       }
 
 
-      Builder setMaxResources(int max) {
+      private Builder setMaxResources(int max) {
         this.maxResources = max;
         this.maxResources = max;
         return this;
         return this;
       }
       }
 
 
-      Builder setMaxResourcesMB(long max) {
+      private Builder setMaxResourcesMB(long max) {
         this.maxResourcesMB = max;
         this.maxResourcesMB = max;
         return this;
         return this;
       }
       }
 
 
-      Builder setMaxSingleResourceMB(long max) {
+      private Builder setMaxSingleResourceMB(long max) {
         this.maxSingleResourceMB = max;
         this.maxSingleResourceMB = max;
         return this;
         return this;
       }
       }
 
 
-      Builder setNumOfTmpFiles(int num) {
+      private Builder setNumOfTmpFiles(int num) {
         this.numOfTmpFiles = num;
         this.numOfTmpFiles = num;
         return this;
         return this;
       }
       }
 
 
-      Builder setNumOfTmpArchives(int num) {
+      private Builder setNumOfTmpArchives(int num) {
         this.numOfTmpArchives = num;
         this.numOfTmpArchives = num;
         return this;
         return this;
       }
       }
 
 
-      Builder setNumOfTmpLibJars(int num) {
+      private Builder setNumOfTmpLibJars(int num) {
         this.numOfTmpLibJars = num;
         this.numOfTmpLibJars = num;
         return this;
         return this;
       }
       }
 
 
-      Builder setJobJar(boolean jar) {
+      private Builder setJobJar(boolean jar) {
         this.jobJar = jar;
         this.jobJar = jar;
         return this;
         return this;
       }
       }
 
 
-      Builder setNumOfDCFiles(int num) {
+      private Builder setNumOfDCFiles(int num) {
         this.numOfDCFiles = num;
         this.numOfDCFiles = num;
         return this;
         return this;
       }
       }
 
 
-      Builder setNumOfDCArchives(int num) {
+      private Builder setNumOfDCArchives(int num) {
         this.numOfDCArchives = num;
         this.numOfDCArchives = num;
         return this;
         return this;
       }
       }
 
 
-      Builder setSizeOfResource(long sizeMB) {
+      private Builder setSizeOfResource(long sizeMB) {
         this.sizeOfResource = sizeMB;
         this.sizeOfResource = sizeMB;
         return this;
         return this;
       }
       }
 
 
-      ResourceLimitsConf build() {
-        return new ResourceLimitsConf(this);
+      private Builder setPathsWithFrags(boolean fragments) {
+        this.pathsWithFrags = fragments;
+        return this;
+      }
+
+      private Builder setPathsWithScheme(boolean scheme) {
+        this.pathsWithScheme = scheme;
+        return this;
+      }
+
+      private Builder setAbsolutePaths(boolean absolute) {
+        this.absolutePaths = absolute;
+        return this;
+      }
+
+      ResourceConf build() {
+        return new ResourceConf(this);
+      }
+    }
+
+    private void setupJobConf(JobConf conf) {
+      conf.set("tmpfiles",
+          buildPathString("tmpFiles", this.numOfTmpFiles, ".txt"));
+      conf.set("tmpjars",
+          buildPathString("tmpjars", this.numOfTmpLibJars, ".jar"));
+      conf.set("tmparchives",
+          buildPathString("tmpArchives", this.numOfTmpArchives, ".tgz"));
+      conf.set(MRJobConfig.CACHE_ARCHIVES, buildDistributedCachePathString(
+          "cacheArchives", this.numOfDCArchives, ".tgz"));
+      conf.set(MRJobConfig.CACHE_FILES, buildDistributedCachePathString(
+          "cacheFiles", this.numOfDCFiles, ".txt"));
+      if (this.jobJar) {
+        String fragment = "";
+        if (pathsWithFrags) {
+          fragment = "#jobjarfrag.jar";
+        }
+        if (pathsWithScheme) {
+          conf.setJar("file:///jobjar.jar" + fragment);
+        } else {
+          if (absolutePaths) {
+            conf.setJar("/jobjar.jar" + fragment);
+          } else {
+            conf.setJar("jobjar.jar" + fragment);
+          }
+        }
+      }
+      conf.setInt(MRJobConfig.MAX_RESOURCES, this.maxResources);
+      conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB);
+      conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
+          this.maxSingleResourceMB);
+    }
+
+    // We always want absolute paths with a scheme in the DistributedCache, so
+    // we use a separate method to construct the path string.
+    private String buildDistributedCachePathString(String pathPrefix,
+        int numOfPaths, String extension) {
+      if (numOfPaths < 1) {
+        return "";
+      } else {
+        StringBuilder b = new StringBuilder();
+        b.append(buildPathStringSub(pathPrefix, "file:///" + pathPrefix,
+            extension, 0));
+        for (int i = 1; i < numOfPaths; i++) {
+          b.append("," + buildPathStringSub(pathPrefix, "file:///" + pathPrefix,
+              extension, i));
+        }
+        return b.toString();
+      }
+    }
+
+    private String buildPathString(String pathPrefix, int numOfPaths,
+        String extension) {
+      if (numOfPaths < 1) {
+        return "";
+      } else {
+        StringBuilder b = new StringBuilder();
+        String processedPath;
+        if (pathsWithScheme) {
+          processedPath = "file:///" + pathPrefix;
+        } else {
+          if (absolutePaths) {
+            processedPath = "/" + pathPrefix;
+          } else {
+            processedPath = pathPrefix;
+          }
+        }
+        b.append(buildPathStringSub(pathPrefix, processedPath, extension, 0));
+        for (int i = 1; i < numOfPaths; i++) {
+          b.append(","
+              + buildPathStringSub(pathPrefix, processedPath, extension, i));
+        }
+        return b.toString();
+      }
+    }
+
+    private String buildPathStringSub(String pathPrefix, String processedPath,
+        String extension, int num) {
+      if (pathsWithFrags) {
+        return processedPath + num + extension + "#" + pathPrefix + "fragment"
+            + num + extension;
+      } else {
+        return processedPath + num + extension;
       }
       }
     }
     }
   }
   }
 
 
-  class StubedUploader extends JobResourceUploader {
+  private class StubedUploader extends JobResourceUploader {
     StubedUploader(JobConf conf) throws IOException {
     StubedUploader(JobConf conf) throws IOException {
-      super(FileSystem.getLocal(conf), false);
+      this(conf, false);
+    }
+
+    StubedUploader(JobConf conf, boolean useWildcard) throws IOException {
+      super(FileSystem.getLocal(conf), useWildcard);
     }
     }
 
 
     @Override
     @Override
@@ -379,5 +703,26 @@ public class TestJobResourceUploader {
         Path p) throws IOException {
         Path p) throws IOException {
       return mockedStatus;
       return mockedStatus;
     }
     }
+
+    @Override
+    boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
+        throws IOException {
+      // Do nothing. Stubbed out to avoid side effects. We don't actually need
+      // to create submit dirs.
+      return true;
+    }
+
+    @Override
+    Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf,
+        short replication) throws IOException {
+      return new Path(destinationPathPrefix + originalPath.getName());
+    }
+
+    @Override
+    void copyJar(Path originalJarPath, Path submitJarFile, short replication)
+        throws IOException {
+      // Do nothing. Stubbed out to avoid side effects. We don't actually need
+      // to copy the jar to the remote fs.
+    }
   }
   }
 }
 }