Просмотр исходного кода

MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect with no authority in job jar path. Contributed by Gera Shegalov
(cherry picked from commit 10f9f5101c44be7c675a44ded4aad212627ecdee)

Jason Lowe 10 лет назад
Родитель
Сommit
7be74ea09a

+ 3 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

@@ -440,6 +440,9 @@ public class FileContext {
       final Configuration aConf) throws UnsupportedFileSystemException {
     UserGroupInformation currentUser = null;
     AbstractFileSystem defaultAfs = null;
+    if (defaultFsUri.getScheme() == null) {
+      return getFileContext(aConf);
+    }
     try {
       currentUser = UserGroupInformation.getCurrentUser();
       defaultAfs = getAbstractFileSystem(currentUser, defaultFsUri, aConf);

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -225,6 +225,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6048. Fixed TestJavaSerialization failure. (Varun Vasudev via
     jianhe)
 
+    MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect
+    with no authority in job jar path. (Gera Shegalov via jlowe)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 5 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -653,9 +653,11 @@ public abstract class TaskAttemptImpl implements
       // //////////// Set up JobJar to be localized properly on the remote NM.
       String jobJar = conf.get(MRJobConfig.JAR);
       if (jobJar != null) {
-        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
-            .getUri(), remoteFS.getWorkingDirectory());
-        LocalResource rc = createLocalResource(remoteFS, remoteJobJar,
+        final Path jobJarPath = new Path(jobJar);
+        final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
+        Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
+            jobJarFs.getWorkingDirectory());
+        LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
             LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
         String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
             JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();

+ 4 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -249,11 +249,10 @@ class JobSubmitter {
       }
       Path jobJarPath = new Path(jobJar);
       URI jobJarURI = jobJarPath.toUri();
-      // If the job jar is already in fs, we don't need to copy it from local fs
-      if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
-              || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) 
-                  && jobJarURI.getAuthority().equals(
-                                            jtFs.getUri().getAuthority()))) {
+      // If the job jar is already in a global fs,
+      // we don't need to copy it from local fs
+      if (     jobJarURI.getScheme() == null
+            || jobJarURI.getScheme().equals("file")) {
         copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), 
             replication);
         job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -357,8 +357,9 @@ public class YARNRunner implements ClientProtocol {
             jobConfPath, LocalResourceType.FILE));
     if (jobConf.get(MRJobConfig.JAR) != null) {
       Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
-      LocalResource rc = createApplicationResource(defaultFileContext,
-          jobJarPath, 
+      LocalResource rc = createApplicationResource(
+          FileContext.getFileContext(jobJarPath.toUri(), jobConf),
+          jobJarPath,
           LocalResourceType.PATTERN);
       String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();

+ 33 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.FailingMapper;
 import org.apache.hadoop.RandomTextWriterJob;
 import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.mapreduce.SleepJob.SleepMapper;
 import org.apache.hadoop.conf.Configuration;
@@ -85,6 +86,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ApplicationClassLoader;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -93,6 +95,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -106,6 +109,9 @@ public class TestMRJobs {
   private static final int NUM_NODE_MGRS = 3;
   private static final String TEST_IO_SORT_MB = "11";
 
+  private static final int DEFAULT_REDUCES = 2;
+  protected int numSleepReducers = DEFAULT_REDUCES;
+
   protected static MiniMRYarnCluster mrCluster;
   protected static MiniDFSCluster dfsCluster;
 
@@ -170,10 +176,23 @@ public class TestMRJobs {
     }
   }
 
+  @After
+  public void resetInit() {
+    numSleepReducers = DEFAULT_REDUCES;
+  }
+
+  @Test (timeout = 300000)
+  public void testSleepJob() throws Exception {
+    testSleepJobInternal(false);
+  }
+
   @Test (timeout = 300000)
-  public void testSleepJob() throws IOException, InterruptedException,
-      ClassNotFoundException { 
-    LOG.info("\n\n\nStarting testSleepJob().");
+  public void testSleepJobWithRemoteJar() throws Exception {
+    testSleepJobInternal(true);
+  }
+
+  private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
+    LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -187,14 +206,20 @@ public class TestMRJobs {
     
     SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(sleepConf);
-
-    int numReduces = sleepConf.getInt("TestMRJobs.testSleepJob.reduces", 2); // or sleepConf.getConfig().getInt(MRJobConfig.NUM_REDUCES, 2);
    
     // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
-    Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1);
+    Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1);
 
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
-    job.setJarByClass(SleepJob.class);
+    if (useRemoteJar) {
+      final Path localJar = new Path(
+          ClassUtil.findContainingJar(SleepJob.class));
+      ConfigUtil.addLink(job.getConfiguration(), "/jobjars",
+          localFs.makeQualified(localJar.getParent()).toUri());
+      job.setJar("viewfs:///jobjars/" + localJar.getName());
+    } else {
+      job.setJarByClass(SleepJob.class);
+    }
     job.setMaxMapAttempts(1); // speed up failures
     job.submit();
     String trackingUrl = job.getTrackingURL();
@@ -329,7 +354,7 @@ public class TestMRJobs {
         .getValue());
     Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
         .getValue());
-    Assert.assertEquals(2,
+    Assert.assertEquals(numSleepReducers,
         counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
     Assert
         .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null

+ 5 - 35
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,8 +39,7 @@ import org.junit.Test;
 public class TestUberAM extends TestMRJobs {
 
   private static final Log LOG = LogFactory.getLog(TestUberAM.class);
-  private int numSleepReducers;
-  
+
   @BeforeClass
   public static void setup() throws IOException {
     TestMRJobs.setup();
@@ -54,21 +52,15 @@ public class TestUberAM extends TestMRJobs {
   @Override
   @Test
   public void testSleepJob()
-  throws IOException, InterruptedException, ClassNotFoundException {
+  throws Exception {
     numSleepReducers = 1;
-    if (mrCluster != null) {
-    	mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
-    }
     super.testSleepJob();
   }
   
   @Test
   public void testSleepJobWithMultipleReducers()
-  throws IOException, InterruptedException, ClassNotFoundException {
+  throws Exception {
     numSleepReducers = 3;
-    if (mrCluster != null) {
-      mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
-    }
     super.testSleepJob();
   }
   
@@ -76,20 +68,7 @@ public class TestUberAM extends TestMRJobs {
   protected void verifySleepJobCounters(Job job) throws InterruptedException,
       IOException {
     Counters counters = job.getCounters();
-
-    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
-        .getValue());
-    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
-        .getValue());
-    Assert.assertEquals(numSleepReducers,
-        counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
-    Assert
-        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
-            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
-    Assert
-        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
-            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
-
+    super.verifySleepJobCounters(job);
     Assert.assertEquals(3,
         counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue());
     Assert.assertEquals(numSleepReducers,
@@ -168,16 +147,7 @@ public class TestUberAM extends TestMRJobs {
   protected void verifyFailingMapperCounters(Job job)
       throws InterruptedException, IOException {
     Counters counters = job.getCounters();
-    Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
-        .getValue());
-    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
-        .getValue());
-    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
-        .getValue());
-    Assert
-        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
-            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
-
+    super.verifyFailingMapperCounters(job);
     Assert.assertEquals(2,
         counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
     Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)