Ver Fonte

MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process command-line. (ahmed via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1364020 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur há 13 anos atrás
pai
commit
9c9f29ac7e

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -132,6 +132,9 @@ Branch-2 ( Unreleased changes )
 
   NEW FEATURES
 
+    MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process 
+    command-line. (ahmed via tucu)
+
   IMPROVEMENTS
 
     MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved

+ 316 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/MiniHadoopClusterManager.java

@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.mortbay.util.ajax.JSON;
+
+/**
+ * This class drives the creation of a mini-cluster on the local machine. By
+ * default, a MiniDFSCluster and MiniMRCluster are spawned on the first
+ * available ports that are found.
+ *
+ * A series of command line flags controls the startup cluster options.
+ *
+ * This class can dump a Hadoop configuration and some basic metadata (in JSON)
+ * into a text file.
+ *
+ * To shutdown the cluster, kill the process.
+ */
+public class MiniHadoopClusterManager {
+  private static final Log LOG = LogFactory
+      .getLog(MiniHadoopClusterManager.class);
+
+  private MiniMRClientCluster mr;
+  private MiniDFSCluster dfs;
+  private String writeDetails;
+  private int numNodeManagers;
+  private int numDataNodes;
+  private int nnPort;
+  private int rmPort;
+  private int jhsPort;
+  private StartupOption dfsOpts;
+  private boolean noDFS;
+  private boolean noMR;
+  private String fs;
+  private String writeConfig;
+  private JobConf conf;
+
+  /**
+   * Creates configuration options object.
+   */
+  @SuppressWarnings("static-access")
+  private Options makeOptions() {
+    Options options = new Options();
+    options
+        .addOption("nodfs", false, "Don't start a mini DFS cluster")
+        .addOption("nomr", false, "Don't start a mini MR cluster")
+        .addOption("nodemanagers", true,
+            "How many nodemanagers to start (default 1)")
+        .addOption("datanodes", true, "How many datanodes to start (default 1)")
+        .addOption("format", false, "Format the DFS (default false)")
+        .addOption("nnport", true, "NameNode port (default 0--we choose)")
+        .addOption(
+            "namenode",
+            true,
+            "URL of the namenode (default "
+                + "is either the DFS cluster or a temporary dir)")
+        .addOption("rmport", true,
+            "ResourceManager port (default 0--we choose)")
+        .addOption("jhsport", true,
+            "JobHistoryServer port (default 0--we choose)")
+        .addOption(
+            OptionBuilder.hasArgs().withArgName("property=value")
+                .withDescription("Options to pass into configuration object")
+                .create("D"))
+        .addOption(
+            OptionBuilder.hasArg().withArgName("path").withDescription(
+                "Save configuration to this XML file.").create("writeConfig"))
+        .addOption(
+            OptionBuilder.hasArg().withArgName("path").withDescription(
+                "Write basic information to this JSON file.").create(
+                "writeDetails"))
+        .addOption(
+            OptionBuilder.withDescription("Prints option help.").create("help"));
+    return options;
+  }
+
+  /**
+   * Main entry-point.
+   *
+   * @throws URISyntaxException
+   */
+  public void run(String[] args) throws IOException, URISyntaxException {
+    if (!parseArguments(args)) {
+      return;
+    }
+    start();
+    sleepForever();
+  }
+
+  private void sleepForever() {
+    while (true) {
+      try {
+        Thread.sleep(1000 * 60);
+      } catch (InterruptedException _) {
+        // nothing
+      }
+    }
+  }
+
+  /**
+   * Starts DFS and MR clusters, as specified in member-variable options. Also
+   * writes out configuration and details, if requested.
+   *
+   * @throws IOException
+   * @throws FileNotFoundException
+   * @throws URISyntaxException
+   */
+  public void start() throws IOException, FileNotFoundException,
+      URISyntaxException {
+    if (!noDFS) {
+      dfs = new MiniDFSCluster(nnPort, conf, numDataNodes, true, true,
+          dfsOpts, null, null);
+      LOG.info("Started MiniDFSCluster -- namenode on port "
+          + dfs.getNameNodePort());
+    }
+    if (!noMR) {
+      if (fs == null && dfs != null) {
+        fs = dfs.getFileSystem().getUri().toString();
+      } else if (fs == null) {
+        fs = "file:///tmp/minimr-" + System.nanoTime();
+      }
+      FileSystem.setDefaultUri(conf, new URI(fs));
+      // Instruct the minicluster to use fixed ports, so user will know which
+      // ports to use when communicating with the cluster.
+      conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+      conf.setBoolean(JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS, true);
+      conf.set(YarnConfiguration.RM_ADDRESS, MiniYARNCluster.getHostname()
+          + ":" + this.rmPort);
+      conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, MiniYARNCluster.getHostname()
+          + ":" + this.jhsPort);
+      mr = MiniMRClientClusterFactory.create(this.getClass(), numNodeManagers,
+          conf);
+      LOG.info("Started MiniMRCluster");
+    }
+
+    if (writeConfig != null) {
+      FileOutputStream fos = new FileOutputStream(new File(writeConfig));
+      conf.writeXml(fos);
+      fos.close();
+    }
+
+    if (writeDetails != null) {
+      Map<String, Object> map = new TreeMap<String, Object>();
+      if (dfs != null) {
+        map.put("namenode_port", dfs.getNameNodePort());
+      }
+      if (mr != null) {
+        map.put("resourcemanager_port", mr.getConfig().get(
+            YarnConfiguration.RM_ADDRESS).split(":")[1]);
+      }
+      FileWriter fw = new FileWriter(new File(writeDetails));
+      fw.write(new JSON().toJSON(map));
+      fw.close();
+    }
+  }
+
+  /**
+   * Shuts down in-process clusters.
+   *
+   * @throws IOException
+   */
+  public void stop() throws IOException {
+    if (mr != null) {
+      mr.stop();
+    }
+    if (dfs != null) {
+      dfs.shutdown();
+    }
+  }
+
+  /**
+   * Parses arguments and fills out the member variables.
+   *
+   * @param args
+   *          Command-line arguments.
+   * @return true on successful parse; false to indicate that the program should
+   *         exit.
+   */
+  private boolean parseArguments(String[] args) {
+    Options options = makeOptions();
+    CommandLine cli;
+    try {
+      CommandLineParser parser = new GnuParser();
+      cli = parser.parse(options, args);
+    } catch (ParseException e) {
+      LOG.warn("options parsing failed:  " + e.getMessage());
+      new HelpFormatter().printHelp("...", options);
+      return false;
+    }
+
+    if (cli.hasOption("help")) {
+      new HelpFormatter().printHelp("...", options);
+      return false;
+    }
+    if (cli.getArgs().length > 0) {
+      for (String arg : cli.getArgs()) {
+        System.err.println("Unrecognized option: " + arg);
+        new HelpFormatter().printHelp("...", options);
+        return false;
+      }
+    }
+
+    // MR
+    noMR = cli.hasOption("nomr");
+    numNodeManagers = intArgument(cli, "nodemanagers", 1);
+    rmPort = intArgument(cli, "rmport", 0);
+    jhsPort = intArgument(cli, "jhsport", 0);
+    fs = cli.getOptionValue("namenode");
+
+    // HDFS
+    noDFS = cli.hasOption("nodfs");
+    numDataNodes = intArgument(cli, "datanodes", 1);
+    nnPort = intArgument(cli, "nnport", 0);
+    dfsOpts = cli.hasOption("format") ? StartupOption.FORMAT
+        : StartupOption.REGULAR;
+
+    // Runner
+    writeDetails = cli.getOptionValue("writeDetails");
+    writeConfig = cli.getOptionValue("writeConfig");
+
+    // General
+    conf = new JobConf();
+    updateConfiguration(conf, cli.getOptionValues("D"));
+
+    return true;
+  }
+
+  /**
+   * Updates configuration based on what's given on the command line.
+   *
+   * @param conf
+   *          The configuration object
+   * @param keyvalues
+   *          An array of interleaved key value pairs.
+   */
+  private void updateConfiguration(JobConf conf, String[] keyvalues) {
+    int num_confs_updated = 0;
+    if (keyvalues != null) {
+      for (String prop : keyvalues) {
+        String[] keyval = prop.split("=", 2);
+        if (keyval.length == 2) {
+          conf.set(keyval[0], keyval[1]);
+          num_confs_updated++;
+        } else {
+          LOG.warn("Ignoring -D option " + prop);
+        }
+      }
+    }
+    LOG.info("Updated " + num_confs_updated
+        + " configuration settings from command line.");
+  }
+
+  /**
+   * Extracts an integer argument with specified default value.
+   */
+  private int intArgument(CommandLine cli, String argName, int default_) {
+    String o = cli.getOptionValue(argName);
+    if (o == null) {
+      return default_;
+    } else {
+      return Integer.parseInt(o);
+    }
+  }
+
+  /**
+   * Starts a MiniHadoopCluster.
+   *
+   * @throws URISyntaxException
+   */
+  public static void main(String[] args) throws IOException, URISyntaxException {
+    new MiniHadoopClusterManager().run(args);
+  }
+}

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
 import org.apache.hadoop.mapred.TestTextInputFormat;
 import org.apache.hadoop.mapred.ThreadedMapBenchmark;
 import org.apache.hadoop.mapreduce.FailJob;
+import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ProgramDriver;
 
@@ -101,6 +102,8 @@ public class MapredTestDriver {
           "Job History Log analyzer.");
       pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, 
           "HDFS Stress Test and Live Data Verification.");
+      pgd.addClass("minicluster", MiniHadoopClusterManager.class,
+      "Single process HDFS and MR cluster.");
     } catch(Throwable e) {
       e.printStackTrace();
     }

+ 84 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CLIMiniCluster.apt.vm

@@ -0,0 +1,84 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~   http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+  ---
+  Hadoop MapReduce Next Generation ${project.version} - CLI MiniCluster.
+  ---
+  ---
+  ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - CLI MiniCluster.
+
+  \[ {{{./index.html}Go Back}} \]
+
+%{toc|section=1|fromDepth=0}
+
+* {Purpose}
+
+  Using the CLI MiniCluster, users can simply start and stop a single-node
+  Hadoop cluster with a single command, and without the need to set any
+  environment variables or manage configuration files. The CLI MiniCluster
+  starts both a <<<YARN>>>/<<<MapReduce>>> & <<<HDFS>>> clusters.
+
+  This is useful for cases where users want to quickly experiment with a real
+  Hadoop cluster or test non-Java programs that rely on significant Hadoop
+  functionality.
+
+* {Hadoop Tarball}
+
+  You should be able to obtain the Hadoop tarball from the release. Also, you
+  can directly create a tarball from the source:
+
++---+
+$ mvn clean install -DskipTests
+$ mvn package -Pdist -Dtar -DskipTests -Dmaven.javadoc.skip
++---+
+  <<NOTE:>> You will need protoc installed of version 2.4.1 or greater.
+
+  The tarball should be available in <<<hadoop-dist/target/>>> directory. 
+
+* {Running the MiniCluster}
+
+  From inside the root directory of the extracted tarball, you can start the CLI
+  MiniCluster using the following command:
+
++---+
+$ bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-${project.version}-tests.jar minicluster -rmport RM_PORT -jhsport JHS_PORT
++---+
+
+  In the example command above, <<<RM_PORT>>> and <<<JHS_PORT>>> should be
+  replaced by the user's choice of these port numbers. If not specified, random
+  free ports will be used.
+
+  There are a number of command line arguments that the users can use to control
+  which services to start, and to pass other configuration properties.
+  The available command line arguments:
+
++---+
+$ -D <property=value>    Options to pass into configuration object
+$ -datanodes <arg>       How many datanodes to start (default 1)
+$ -format                Format the DFS (default false)
+$ -help                  Prints option help.
+$ -jhsport <arg>         JobHistoryServer port (default 0--we choose)
+$ -namenode <arg>        URL of the namenode (default is either the DFS
+$                        cluster or a temporary dir)
+$ -nnport <arg>          NameNode port (default 0--we choose)
+$ -nodemanagers <arg>    How many nodemanagers to start (default 1)
+$ -nodfs                 Don't start a mini DFS cluster
+$ -nomr                  Don't start a mini MR cluster
+$ -rmport <arg>          ResourceManager port (default 0--we choose)
+$ -writeConfig <path>    Save configuration to this XML file.
+$ -writeDetails <path>   Write basic information to this JSON file.
++---+
+
+  To display this full list of available arguments, the user can pass the
+  <<<-help>>> argument to the above command.

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm

@@ -49,4 +49,5 @@ MapReduce NextGen aka YARN aka MRv2
 
   * {{{./WebApplicationProxy.html}Web Application Proxy}}
 
+  * {{{./CLIMiniCluster.html}CLI MiniCluster}}