Browse Source

MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options which is a regression from MR1 (zxu via rkanter)

(cherry picked from commit d50e8f09287deeb51012d08e326a2ed71a6da869)
(cherry picked from commit 756c2542930756fef1cbff82056b418070f8d55f)
(cherry picked from commit 4f0fd74880e58958d89ca1cd4b7f665520595b2c)
Robert Kanter 10 năm trước cách đây
mục cha
commit
c253ac68eb

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -29,6 +29,9 @@ Release 2.6.1 - UNRELEASED
     MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own 
     class. (Chris Trezzo via kasha)
 
+    MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options
+    which is a regression from MR1 (zxu via rkanter)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 0 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java

@@ -100,18 +100,12 @@ class LocalDistributedCacheManager {
     Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
     if (archiveClassPaths != null) {
       for (Path p : archiveClassPaths) {
-        FileSystem remoteFS = p.getFileSystem(conf);
-        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-            remoteFS.getWorkingDirectory()));
         classpaths.put(p.toUri().getPath().toString(), p);
       }
     }
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
     if (fileClassPaths != null) {
       for (Path p : fileClassPaths) {
-        FileSystem remoteFS = p.getFileSystem(conf);
-        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-            remoteFS.getWorkingDirectory()));
         classpaths.put(p.toUri().getPath().toString(), p);
       }
     }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java

@@ -127,7 +127,7 @@ class JobResourceUploader {
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
         DistributedCache.addFileToClassPath(
-            new Path(newPath.toUri().getPath()), conf);
+            new Path(newPath.toUri().getPath()), conf, jtFs);
       }
     }
 

+ 92 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * check for the job submission options of
+ * -jt local -libjars
+ */
+public class TestLocalJobSubmission {
+  private static Path TEST_ROOT_DIR =
+      new Path(System.getProperty("test.build.data","/tmp"));
+
+  @Before
+  public void configure() throws Exception {
+  }
+
+  @After
+  public void cleanup() {
+  }
+
+  /**
+   * test the local job submission options of
+   * -jt local -libjars
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobLibjarsOption() throws IOException {
+    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://testcluster");
+    final String[] args = {
+        "-jt" , "local", "-libjars", jarPath.toString(),
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+    };
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
+
+  private Path makeJar(Path p) throws IOException {
+    FileOutputStream fos = new FileOutputStream(new File(p.toString()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    ZipEntry ze = new ZipEntry("test.jar.inside");
+    jos.putNextEntry(ze);
+    jos.write(("inside the jar!").getBytes());
+    jos.closeEntry();
+    jos.close();
+    return p;
+  }
+}