Browse Source

MAPREDUCE-4421. Run MapReduce framework via the distributed cache. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1528237 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 11 years ago
parent
commit
db06f1bcb9

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -162,6 +162,8 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
     Aaron Kimball via Sandy Ryza)
 
+    MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

+ 46 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.util;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
@@ -133,6 +134,30 @@ public class MRApps extends Apps {
     return TaskAttemptStateUI.valueOf(attemptStateStr);
   }
 
+  // gets the base name of the MapReduce framework or null if no
+  // framework was configured
+  private static String getMRFrameworkName(Configuration conf) {
+    String frameworkName = null;
+    String framework =
+        conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, "");
+    if (!framework.isEmpty()) {
+      URI uri;
+      try {
+        uri = new URI(framework);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("Unable to parse '" + framework
+            + "' as a URI, check the setting for "
+            + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e);
+      }
+
+      frameworkName = uri.getFragment();
+      if (frameworkName == null) {
+        frameworkName = new Path(uri).getName();
+      }
+    }
+    return frameworkName;
+  }
+
   private static void setMRFrameworkClasspath(
       Map<String, String> environment, Configuration conf) throws IOException {
     // Propagate the system classpath when using the mini cluster
@@ -141,18 +166,33 @@ public class MRApps extends Apps {
           System.getProperty("java.class.path"));
     }
 
-    // Add standard Hadoop classes
-    for (String c : conf.getStrings(
-        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
-          .trim());
+    // if the framework is specified then only use the MR classpath
+    String frameworkName = getMRFrameworkName(conf);
+    if (frameworkName == null) {
+      // Add standard Hadoop classes
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
+            .trim());
+      }
     }
+
+    boolean foundFrameworkInClasspath = (frameworkName == null);
     for (String c : conf.getStrings(
         MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
         MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
       Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
           .trim());
+      if (!foundFrameworkInClasspath) {
+        foundFrameworkInClasspath = c.contains(frameworkName);
+      }
+    }
+
+    if (!foundFrameworkInClasspath) {
+      throw new IllegalArgumentException(
+          "Could not locate MapReduce framework name '" + frameworkName
+          + "' in " + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
     }
     // TODO: Remove duplicates.
   }

+ 40 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java

@@ -282,7 +282,46 @@ public class TestMRApps {
     assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
       + " classpath!", expectedAppClasspath, appCp);
   }
-  
+
+  @Test (timeout = 3000000)
+  public void testSetClasspathWithFramework() throws IOException {
+    final String FRAMEWORK_NAME = "some-framework-name";
+    final String FRAMEWORK_PATH = "some-framework-path#" + FRAMEWORK_NAME;
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, FRAMEWORK_PATH);
+    Map<String, String> env = new HashMap<String, String>();
+    try {
+      MRApps.setClasspath(env, conf);
+      fail("Failed to catch framework path set without classpath change");
+    } catch (IllegalArgumentException e) {
+      assertTrue("Unexpected IllegalArgumentException",
+          e.getMessage().contains("Could not locate MapReduce framework name '"
+              + FRAMEWORK_NAME + "'"));
+    }
+
+    env.clear();
+    final String FRAMEWORK_CLASSPATH = FRAMEWORK_NAME + "/*.jar";
+    conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH);
+    MRApps.setClasspath(env, conf);
+    final String stdClasspath = StringUtils.join(File.pathSeparator,
+        Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+            ApplicationConstants.Environment.PWD.$() + "/*"));
+    String expectedClasspath = StringUtils.join(File.pathSeparator,
+        Arrays.asList(ApplicationConstants.Environment.PWD.$(),
+            FRAMEWORK_CLASSPATH, stdClasspath));
+    assertEquals("Incorrect classpath with framework and no user precedence",
+        expectedClasspath, env.get("CLASSPATH"));
+
+    env.clear();
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
+    MRApps.setClasspath(env, conf);
+    expectedClasspath = StringUtils.join(File.pathSeparator,
+        Arrays.asList(ApplicationConstants.Environment.PWD.$(),
+            stdClasspath, FRAMEWORK_CLASSPATH));
+    assertEquals("Incorrect classpath with framework and user precedence",
+        expectedClasspath, env.get("CLASSPATH"));
+  }
+
   @Test (timeout = 30000)
   public void testSetupDistributedCacheEmpty() throws IOException {
     Configuration conf = new Configuration();

+ 43 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -340,11 +341,12 @@ class JobSubmitter {
 
     //validate the jobs output specs 
     checkSpecs(job);
-    
-    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
-                                                     job.getConfiguration());
-    //configure the command line options correctly on the submitting dfs
+
     Configuration conf = job.getConfiguration();
+    addMRFrameworkToDistributedCache(conf);
+
+    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
+    //configure the command line options correctly on the submitting dfs
     InetAddress ip = InetAddress.getLocalHost();
     if (ip != null) {
       submitHostAddress = ip.getHostAddress();
@@ -602,7 +604,6 @@ class JobSubmitter {
   }
 
   //get secret keys and tokens and store them into TokenCache
-  @SuppressWarnings("unchecked")
   private void populateTokenCache(Configuration conf, Credentials credentials) 
   throws IOException{
     readTokensFromFiles(conf, credentials);
@@ -618,4 +619,41 @@ class JobSubmitter {
       TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
     }
   }
+
+  @SuppressWarnings("deprecation")
+  private static void addMRFrameworkToDistributedCache(Configuration conf)
+      throws IOException {
+    String framework =
+        conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, "");
+    if (!framework.isEmpty()) {
+      URI uri;
+      try {
+        uri = new URI(framework);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("Unable to parse '" + framework
+            + "' as a URI, check the setting for "
+            + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e);
+      }
+
+      String linkedName = uri.getFragment();
+
+      // resolve any symlinks in the URI path so using a "current" symlink
+      // to point to a specific version shows the specific version
+      // in the distributed cache configuration
+      FileSystem fs = FileSystem.get(conf);
+      Path frameworkPath = fs.makeQualified(
+          new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()));
+      FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf);
+      frameworkPath = fc.resolvePath(frameworkPath);
+      uri = frameworkPath.toUri();
+      try {
+        uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(),
+            null, linkedName);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e);
+      }
+
+      DistributedCache.addCacheArchive(uri, conf);
+    }
+  }
 }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -649,6 +649,12 @@ public interface MRJobConfig {
   public static final String MAPREDUCE_APPLICATION_CLASSPATH = 
       "mapreduce.application.classpath";
 
+  /**
+   * Path to MapReduce framework archive
+   */
+  public static final String MAPREDUCE_APPLICATION_FRAMEWORK_PATH =
+      "mapreduce.application.framework.path";
+
   /**
    * Default CLASSPATH for all YARN MapReduce applications.
    */

+ 21 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1024,11 +1024,31 @@
 
 <property>
   <description>CLASSPATH for MR applications. A comma-separated list
-  of CLASSPATH entries</description>
+  of CLASSPATH entries. If mapreduce.application.framework is set then this
+  must specify the appropriate classpath for that archive, and the name of
+  the archive must be present in the classpath.</description>
    <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
 </property>
 
+<property>
+  <description>Path to the MapReduce framework archive. If set, the framework
+    archive will automatically be distributed along with the job, and this
+    path would normally reside in a public location in an HDFS filesystem. As
+    with distributed cache files, this can be a URL with a fragment specifying
+    the alias to use for the archive name. For example,
+    hdfs:/mapred/framework/hadoop-mapreduce-2.1.1.tar.gz#mrframework would
+    alias the localized archive as "mrframework".
+
+    Note that mapreduce.application.classpath must include the appropriate
+    classpath for the specified framework. The base name of the archive, or
+    alias of the archive if an alias is used, must appear in the specified
+    classpath.
+  </description>
+   <name>mapreduce.application.framework.path</name>
+   <value></value>
+</property>
+
 <property>
    <name>mapreduce.job.classloader</name>
    <value>false</value>

+ 120 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm

@@ -0,0 +1,120 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~   http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+  ---
+  Hadoop Map Reduce Next Generation-${project.version} - Distributed Cache Deploy
+  ---
+  ---
+  ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - Distributed Cache Deploy
+
+  \[ {{{./index.html}Go Back}} \]
+
+* Introduction
+
+  The MapReduce application framework has rudimentary support for deploying a
+  new version of the MapReduce framework via the distributed cache. By setting
+  the appropriate configuration properties, users can run a different version
+  of MapReduce than the one initially deployed to the cluster. For example,
+  cluster administrators can place multiple versions of MapReduce in HDFS and
+  configure <<<mapred-site.xml>>> to specify which version jobs will use by
+  default. This allows the administrators to perform a rolling upgrade of the
+  MapReduce framework under certain conditions.
+
+* Preconditions and Limitations
+
+  The support for deploying the MapReduce framework via the distributed cache
+  currently does not address the job client code used to submit and query
+  jobs. It also does not address the <<<ShuffleHandler>>> code that runs as an
+  auxilliary service within each NodeManager. As a result the following
+  limitations apply to MapReduce versions that can be successfully deployed via
+  the distributed cache in a rolling upgrade fashion:
+
+  * The MapReduce version must be compatible with the job client code used to
+    submit and query jobs. If it is incompatible then the job client must be
+    upgraded separately on any node from which jobs using the new MapReduce
+    version will be submitted or queried.
+
+  * The MapReduce version must be compatible with the configuration files used
+    by the job client submitting the jobs. If it is incompatible with that
+    configuration (e.g.: a new property must be set or an existing property
+    value changed) then the configuration must be updated first.
+
+  * The MapReduce version must be compatible with the <<<ShuffleHandler>>>
+    version running on the nodes in the cluster. If it is incompatible then the
+    new <<<ShuffleHandler>>> code must be deployed to all the nodes in the
+    cluster, and the NodeManagers must be restarted to pick up the new
+    <<<ShuffleHandler>>> code.
+
+* Deploying a New MapReduce Version via the Distributed Cache
+
+  Deploying a new MapReduce version consists of three steps:
+
+  [[1]] Upload the MapReduce archive to a location that can be accessed by the
+  job submission client. Ideally the archive should be on the cluster's default
+  filesystem at a publicly-readable path. See the archive location discussion
+  below for more details.
+
+  [[2]] Configure <<<mapreduce.application.framework.path>>> to point to the
+  location where the archive is located. As when specifying distributed cache
+  files for a job, this is a URL that also supports creating an alias for the
+  archive if a URL fragment is specified. For example,
+  <<<hdfs:/mapred/framework/hadoop-mapreduce-2.1.1.tar.gz#mrframework>>> will
+  be localized as <<<mrframework>>> rather than
+  <<<hadoop-mapreduce-2.1.1.tar.gz>>>.
+
+  [[3]] Configure <<<mapreduce.application.classpath>>> to set the proper
+  classpath to use with the MapReduce archive configured above. NOTE: An error
+  occurs if <<<mapreduce.application.framework.path>>> is configured but
+  <<<mapreduce.application.classpath>>> does not reference the base name of the
+  archive path or the alias if an alias was specified.
+
+** Location of the MapReduce Archive and How It Affects Job Performance
+
+  Note that the location of the MapReduce archive can be critical to job
+  submission and job startup performance. If the archive is not located on the
+  cluster's default filesystem then it will be copied to the job staging
+  directory for each job and localized to each node where the job's tasks
+  run. This will slow down job submission and task startup performance.
+
+  If the archive is located on the default filesystem then the job client will
+  not upload the archive to the job staging directory for each job
+  submission. However if the archive path is not readable by all cluster users
+  then the archive will be localized separately for each user on each node
+  where tasks execute. This can cause unnecessary duplication in the
+  distributed cache.
+
+  When working with a large cluster it can be important to increase the
+  replication factor of the archive to increase its availability. This will
+  spread the load when the nodes in the cluster localize the archive for the
+  first time.
+
+* MapReduce Archives and Classpath Configuration
+
+  Setting a proper classpath for the MapReduce archive depends upon the
+  composition of the archive and whether it has any additional dependencies.
+  For example, the archive can contain not only the MapReduce jars but also the
+  necessary YARN, HDFS, and Hadoop Common jars and all other dependencies. In
+  that case, <<<mapreduce.application.classpath>>> would be configured to
+  something like the following example, where the archive basename is
+  hadoop-mapreduce-2.1.1.tar.gz and the archive is organized internally similar
+  to the standard Hadoop distribution archive:
+
+    <<<$HADOOP_CONF_DIR,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/lib/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/common/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/common/lib/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/yarn/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/yarn/lib/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/hdfs/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/hdfs/lib/*>>>
+
+  Another possible approach is to have the archive consist of just the
+  MapReduce jars and have the remaining dependencies picked up from the Hadoop
+  distribution installed on the nodes.  In that case, the above example would
+  change to something like the following:
+
+    <<<$HADOOP_CONF_DIR,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/lib/*,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*>>>

+ 1 - 0
hadoop-project/src/site/site.xml

@@ -86,6 +86,7 @@
       <item name="Compatibilty between Hadoop 1.x and Hadoop 2.x" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html"/>
       <item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
       <item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
+      <item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/>
     </menu>
 
     <menu name="YARN" inherit="top">