|
@@ -18,11 +18,14 @@ package org.apache.hadoop.streaming;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.net.URL;
|
|
|
+import java.net.URI;
|
|
|
+import java.net.URISyntaxException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.TreeSet;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
@@ -39,20 +42,19 @@ import org.apache.hadoop.mapred.RunningJob;
|
|
|
* (Jar packaging, MapRed job submission and monitoring)
|
|
|
* @author Michel Tourn
|
|
|
*/
|
|
|
-public class StreamJob
|
|
|
-{
|
|
|
- protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
|
|
|
+public class StreamJob {
|
|
|
|
|
|
+ protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
|
|
|
final static String REDUCE_NONE = "NONE";
|
|
|
|
|
|
- public StreamJob(String[] argv, boolean mayExit)
|
|
|
- {
|
|
|
+ private boolean reducerNone_;
|
|
|
+
|
|
|
+ public StreamJob(String[] argv, boolean mayExit) {
|
|
|
argv_ = argv;
|
|
|
mayExit_ = mayExit;
|
|
|
}
|
|
|
|
|
|
- public void go() throws IOException
|
|
|
- {
|
|
|
+ public void go() throws IOException {
|
|
|
init();
|
|
|
|
|
|
preProcessArgs();
|
|
@@ -63,40 +65,37 @@ public class StreamJob
|
|
|
submitAndMonitorJob();
|
|
|
}
|
|
|
|
|
|
- protected void init()
|
|
|
- {
|
|
|
- try {
|
|
|
- env_ = new Environment();
|
|
|
- } catch(IOException io) {
|
|
|
- throw new RuntimeException(io);
|
|
|
- }
|
|
|
+ protected void init() {
|
|
|
+ try {
|
|
|
+ env_ = new Environment();
|
|
|
+ } catch (IOException io) {
|
|
|
+ throw new RuntimeException(io);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- void preProcessArgs()
|
|
|
- {
|
|
|
+ void preProcessArgs() {
|
|
|
verbose_ = false;
|
|
|
addTaskEnvironment_ = "";
|
|
|
}
|
|
|
|
|
|
- void postProcessArgs() throws IOException
|
|
|
- {
|
|
|
- if(cluster_ == null) {
|
|
|
- // hadoop-default.xml is standard, hadoop-local.xml is not.
|
|
|
- cluster_ = "default";
|
|
|
+ void postProcessArgs() throws IOException {
|
|
|
+ if (cluster_ == null) {
|
|
|
+ // hadoop-default.xml is standard, hadoop-local.xml is not.
|
|
|
+ cluster_ = "default";
|
|
|
}
|
|
|
hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";
|
|
|
- if(inputGlobs_.size() == 0) {
|
|
|
- fail("Required argument: -input <name>");
|
|
|
+ if (inputSpecs_.size() == 0) {
|
|
|
+ fail("Required argument: -input <name>");
|
|
|
}
|
|
|
- if(output_ == null) {
|
|
|
- fail("Required argument: -output ");
|
|
|
+ if (output_ == null) {
|
|
|
+ fail("Required argument: -output ");
|
|
|
}
|
|
|
msg("addTaskEnvironment=" + addTaskEnvironment_);
|
|
|
|
|
|
Iterator it = packageFiles_.iterator();
|
|
|
- while(it.hasNext()) {
|
|
|
- File f = new File((String)it.next());
|
|
|
- if(f.isFile()) {
|
|
|
+ while (it.hasNext()) {
|
|
|
+ File f = new File((String) it.next());
|
|
|
+ if (f.isFile()) {
|
|
|
shippedCanonFiles_.add(f.getCanonicalPath());
|
|
|
}
|
|
|
}
|
|
@@ -108,37 +107,40 @@ public class StreamJob
|
|
|
redCmd_ = unqualifyIfLocalPath(redCmd_);
|
|
|
}
|
|
|
|
|
|
- void validateNameEqValue(String neqv)
|
|
|
- {
|
|
|
+ void validateNameEqValue(String neqv) {
|
|
|
String[] nv = neqv.split("=", 2);
|
|
|
- if(nv.length < 2) {
|
|
|
- fail("Invalid name=value spec: " + neqv);
|
|
|
+ if (nv.length < 2) {
|
|
|
+ fail("Invalid name=value spec: " + neqv);
|
|
|
}
|
|
|
msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
|
|
|
}
|
|
|
|
|
|
- String unqualifyIfLocalPath(String cmd) throws IOException
|
|
|
- {
|
|
|
- if(cmd == null) {
|
|
|
+ String unqualifyIfLocalPath(String cmd) throws IOException {
|
|
|
+ if (cmd == null) {
|
|
|
//
|
|
|
} else {
|
|
|
String prog = cmd;
|
|
|
String args = "";
|
|
|
int s = cmd.indexOf(" ");
|
|
|
- if(s != -1) {
|
|
|
+ if (s != -1) {
|
|
|
prog = cmd.substring(0, s);
|
|
|
- args = cmd.substring(s+1);
|
|
|
+ args = cmd.substring(s + 1);
|
|
|
+ }
|
|
|
+ String progCanon;
|
|
|
+ try {
|
|
|
+ progCanon = new File(prog).getCanonicalPath();
|
|
|
+ } catch (IOException io) {
|
|
|
+ progCanon = prog;
|
|
|
}
|
|
|
- String progCanon = new File(prog).getCanonicalPath();
|
|
|
boolean shipped = shippedCanonFiles_.contains(progCanon);
|
|
|
msg("shipped: " + shipped + " " + progCanon);
|
|
|
- if(shipped) {
|
|
|
+ if (shipped) {
|
|
|
// Change path to simple filename.
|
|
|
// That way when PipeMapRed calls Runtime.exec(),
|
|
|
// it will look for the excutable in Task's working dir.
|
|
|
// And this is where TaskRunner unjars our job jar.
|
|
|
prog = new File(prog).getName();
|
|
|
- if(args.length() > 0) {
|
|
|
+ if (args.length() > 0) {
|
|
|
cmd = prog + " " + args;
|
|
|
} else {
|
|
|
cmd = prog;
|
|
@@ -149,68 +151,70 @@ public class StreamJob
|
|
|
return cmd;
|
|
|
}
|
|
|
|
|
|
- String getHadoopAliasConfFile()
|
|
|
- {
|
|
|
+ String getHadoopAliasConfFile() {
|
|
|
return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- void parseArgv()
|
|
|
- {
|
|
|
- if(argv_.length==0) {
|
|
|
+ void parseArgv() {
|
|
|
+ if (argv_.length == 0) {
|
|
|
exitUsage(false);
|
|
|
}
|
|
|
- int i=0;
|
|
|
- while(i < argv_.length) {
|
|
|
+ int i = 0;
|
|
|
+ while (i < argv_.length) {
|
|
|
String s;
|
|
|
- if(argv_[i].equals("-verbose")) {
|
|
|
+ if (argv_[i].equals("-verbose")) {
|
|
|
verbose_ = true;
|
|
|
- } else if(argv_[i].equals("-info")) {
|
|
|
+ } else if (argv_[i].equals("-info")) {
|
|
|
detailedUsage_ = true;
|
|
|
- } else if(argv_[i].equals("-debug")) {
|
|
|
+ } else if (argv_[i].equals("-debug")) {
|
|
|
debug_++;
|
|
|
- } else if((s = optionArg(argv_, i, "-input", false)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-input", false)) != null) {
|
|
|
i++;
|
|
|
- inputGlobs_.add(s);
|
|
|
- } else if((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
|
|
|
+ inputSpecs_.add(s);
|
|
|
+ } else if (argv_[i].equals("-inputtagged")) {
|
|
|
+ inputTagged_ = true;
|
|
|
+ } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
|
|
|
i++;
|
|
|
output_ = s;
|
|
|
- } else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) {
|
|
|
+ i++;
|
|
|
+ mapsideoutURI_ = s;
|
|
|
+ } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
|
|
|
i++;
|
|
|
mapCmd_ = s;
|
|
|
- } else if((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
|
|
|
i++;
|
|
|
comCmd_ = s;
|
|
|
- } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
|
|
|
i++;
|
|
|
redCmd_ = s;
|
|
|
- } else if((s = optionArg(argv_, i, "-file", false)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-file", false)) != null) {
|
|
|
i++;
|
|
|
packageFiles_.add(s);
|
|
|
- } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
|
|
|
i++;
|
|
|
cluster_ = s;
|
|
|
- } else if((s = optionArg(argv_, i, "-config", false)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-config", false)) != null) {
|
|
|
i++;
|
|
|
configPath_.add(s);
|
|
|
- } else if((s = optionArg(argv_, i, "-dfs", false)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
|
|
|
i++;
|
|
|
- userJobConfProps_.add("fs.default.name="+s);
|
|
|
- } else if((s = optionArg(argv_, i, "-jt", false)) != null) {
|
|
|
+ userJobConfProps_.add("fs.default.name=" + s);
|
|
|
+ } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
|
|
|
i++;
|
|
|
- userJobConfProps_.add("mapred.job.tracker="+s);
|
|
|
- } else if((s = optionArg(argv_, i, "-jobconf", false)) != null) {
|
|
|
+ userJobConfProps_.add("mapred.job.tracker=" + s);
|
|
|
+ } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
|
|
|
i++;
|
|
|
validateNameEqValue(s);
|
|
|
userJobConfProps_.add(s);
|
|
|
- } else if((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
|
|
|
i++;
|
|
|
validateNameEqValue(s);
|
|
|
- if(addTaskEnvironment_.length() > 0) {
|
|
|
- addTaskEnvironment_ += " ";
|
|
|
+ if (addTaskEnvironment_.length() > 0) {
|
|
|
+ addTaskEnvironment_ += " ";
|
|
|
}
|
|
|
addTaskEnvironment_ += s;
|
|
|
- } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
|
|
|
+ } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
|
|
|
i++;
|
|
|
inReaderSpec_ = s;
|
|
|
} else {
|
|
@@ -219,37 +223,35 @@ public class StreamJob
|
|
|
}
|
|
|
i++;
|
|
|
}
|
|
|
- if(detailedUsage_) {
|
|
|
- exitUsage(true);
|
|
|
+ if (detailedUsage_) {
|
|
|
+ exitUsage(true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- String optionArg(String[] args, int index, String arg, boolean argSet)
|
|
|
- {
|
|
|
- if(index >= args.length || ! args[index].equals(arg)) {
|
|
|
+ String optionArg(String[] args, int index, String arg, boolean argSet) {
|
|
|
+ if (index >= args.length || !args[index].equals(arg)) {
|
|
|
return null;
|
|
|
}
|
|
|
- if(argSet) {
|
|
|
+ if (argSet) {
|
|
|
throw new IllegalArgumentException("Can only have one " + arg + " option");
|
|
|
}
|
|
|
- if(index >= args.length-1) {
|
|
|
+ if (index >= args.length - 1) {
|
|
|
throw new IllegalArgumentException("Expected argument after option " + args[index]);
|
|
|
}
|
|
|
- return args[index+1];
|
|
|
+ return args[index + 1];
|
|
|
}
|
|
|
|
|
|
- protected void msg(String msg)
|
|
|
- {
|
|
|
- if(verbose_) {
|
|
|
+ protected void msg(String msg) {
|
|
|
+ if (verbose_) {
|
|
|
System.out.println("STREAM: " + msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void exitUsage(boolean detailed)
|
|
|
- {
|
|
|
- // 1 2 3 4 5 6 7
|
|
|
- //1234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
|
- System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]");
|
|
|
+ public void exitUsage(boolean detailed) {
|
|
|
+ // 1 2 3 4 5 6 7
|
|
|
+ //1234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
|
+ System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");
|
|
|
+ System.out.println(" $HADOOP_HOME/hadoop-streaming.jar [options]");
|
|
|
System.out.println("Options:");
|
|
|
System.out.println(" -input <path> DFS input file(s) for the Map step");
|
|
|
System.out.println(" -output <path> DFS output directory for the Reduce step");
|
|
@@ -257,58 +259,82 @@ public class StreamJob
|
|
|
System.out.println(" -combiner <cmd> The streaming command to run");
|
|
|
System.out.println(" -reducer <cmd> The streaming command to run");
|
|
|
System.out.println(" -file <file> File/dir to be shipped in the Job jar file");
|
|
|
- System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
|
|
|
- System.out.println(" -config <file> Optional. One or more paths to xml config files");
|
|
|
- System.out.println(" -dfs <h:p> Optional. Override DFS configuration");
|
|
|
- System.out.println(" -jt <h:p> Optional. Override JobTracker configuration");
|
|
|
+ //Only advertise the standard way: [--config dir] in our launcher
|
|
|
+ //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
|
|
|
+ //System.out.println(" -config <file> Optional. One or more paths to xml config files");
|
|
|
+ System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration");
|
|
|
+ System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration");
|
|
|
System.out.println(" -inputreader <spec> Optional.");
|
|
|
- System.out.println(" -jobconf <n>=<v> Optional.");
|
|
|
+ System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
|
|
|
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
|
|
|
System.out.println(" -verbose");
|
|
|
System.out.println();
|
|
|
- if(!detailed) {
|
|
|
- System.out.println("For more details about these options:");
|
|
|
- System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
|
|
|
- fail("");
|
|
|
+ if (!detailed) {
|
|
|
+ System.out.println("For more details about these options:");
|
|
|
+ System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
|
|
|
+ fail("");
|
|
|
}
|
|
|
System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
|
|
|
System.out.println("Default Map input format: a line is a record in UTF-8");
|
|
|
System.out.println(" the key part ends at first TAB, the rest of the line is the value");
|
|
|
System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
|
|
|
- System.out.println(" comma-separated name-values can be specified to configure the InputFormat");
|
|
|
+ System.out
|
|
|
+ .println(" comma-separated name-values can be specified to configure the InputFormat");
|
|
|
System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
|
|
|
System.out.println("Map output format, reduce input/output format:");
|
|
|
- System.out.println(" Format defined by what mapper command outputs. Line-oriented");
|
|
|
+ System.out.println(" Format defined by what the mapper command outputs. Line-oriented");
|
|
|
System.out.println();
|
|
|
- System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
|
|
|
- System.out.println(" Hadoop clusters. ");
|
|
|
- System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
|
|
|
- System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
|
|
|
+ System.out.println("The files or directories named in the -file argument[s] end up in the");
|
|
|
+ System.out.println(" working directory when the mapper and reducer are run.");
|
|
|
+ System.out.println(" The location of this working directory is unspecified.");
|
|
|
System.out.println();
|
|
|
- System.out.println("To skip the shuffle/sort/reduce step:" );
|
|
|
+ //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
|
|
|
+ //System.out.println(" Hadoop clusters. ");
|
|
|
+ //System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
|
|
|
+ //System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
|
|
|
+ //System.out.println();
|
|
|
+ System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
|
|
|
System.out.println(" Use -reducer " + REDUCE_NONE);
|
|
|
- System.out.println(" This preserves the map input order and speeds up processing");
|
|
|
+ System.out
|
|
|
+ .println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
|
|
|
+ System.out
|
|
|
+ .println(" This speeds up processing, This also feels more like \"in-place\" processing");
|
|
|
+ System.out.println(" because the input filename and the map input order are preserved");
|
|
|
+ System.out.println("To specify a single side-effect output file");
|
|
|
+ System.out.println(" -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
|
|
|
+ System.out.println(" If the jobtracker is local this is a local file");
|
|
|
+ System.out.println(" This currently requires -reducer NONE");
|
|
|
System.out.println();
|
|
|
System.out.println("To set the number of reduce tasks (num. of output files):");
|
|
|
System.out.println(" -jobconf mapred.reduce.tasks=10");
|
|
|
- System.out.println("To name the job (appears in the JobTrack Web UI):");
|
|
|
+ System.out.println("To speed up the last reduces:");
|
|
|
+ System.out.println(" -jobconf mapred.speculative.execution=true");
|
|
|
+ System.out.println(" Do not use this along -reducer " + REDUCE_NONE);
|
|
|
+ System.out.println("To name the job (appears in the JobTracker Web UI):");
|
|
|
System.out.println(" -jobconf mapred.job.name='My Job' ");
|
|
|
System.out.println("To specify that line-oriented input is in gzip format:");
|
|
|
- System.out.println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
|
|
|
+ System.out
|
|
|
+ .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
|
|
|
System.out.println(" -jobconf stream.recordreader.compression=gzip ");
|
|
|
System.out.println("To change the local temp directory:");
|
|
|
- System.out.println(" -jobconf dfs.data.dir=/tmp");
|
|
|
+ System.out.println(" -jobconf dfs.data.dir=/tmp/dfs");
|
|
|
+ System.out.println(" -jobconf stream.tmpdir=/tmp/streaming");
|
|
|
System.out.println("Additional local temp directories with -cluster local:");
|
|
|
System.out.println(" -jobconf mapred.local.dir=/tmp/local");
|
|
|
System.out.println(" -jobconf mapred.system.dir=/tmp/system");
|
|
|
System.out.println(" -jobconf mapred.temp.dir=/tmp/temp");
|
|
|
+ System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");
|
|
|
+ System.out.println(" $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");
|
|
|
+ System.out
|
|
|
+ .println(" [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
|
|
|
System.out.println("For more details about jobconf parameters see:");
|
|
|
System.out.println(" http://wiki.apache.org/lucene-hadoop/JobConfFile");
|
|
|
System.out.println("To set an environement variable in a streaming command:");
|
|
|
System.out.println(" -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");
|
|
|
System.out.println();
|
|
|
- System.out.println("Shortcut to run from any directory:");
|
|
|
- System.out.println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/build/hadoop-streaming.jar\"");
|
|
|
+ System.out.println("Shortcut:");
|
|
|
+ System.out
|
|
|
+ .println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\"");
|
|
|
System.out.println();
|
|
|
System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");
|
|
|
System.out.println(" -file /local/filter.pl -input \"/logs/0604*/*\" [...]");
|
|
@@ -318,81 +344,87 @@ public class StreamJob
|
|
|
fail("");
|
|
|
}
|
|
|
|
|
|
- public void fail(String message)
|
|
|
- {
|
|
|
- if(mayExit_) {
|
|
|
- System.err.println(message);
|
|
|
- System.exit(1);
|
|
|
+ public void fail(String message) {
|
|
|
+ if (mayExit_) {
|
|
|
+ System.err.println(message);
|
|
|
+ System.exit(1);
|
|
|
} else {
|
|
|
- throw new IllegalArgumentException(message);
|
|
|
+ throw new IllegalArgumentException(message);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// --------------------------------------------
|
|
|
|
|
|
-
|
|
|
- protected String getHadoopClientHome()
|
|
|
- {
|
|
|
+ protected String getHadoopClientHome() {
|
|
|
String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
|
|
|
- if(h == null) {
|
|
|
+ if (h == null) {
|
|
|
//fail("Missing required environment variable: HADOOP_HOME");
|
|
|
h = "UNDEF";
|
|
|
}
|
|
|
return h;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- protected boolean isLocalHadoop()
|
|
|
- {
|
|
|
+ protected boolean isLocalHadoop() {
|
|
|
boolean local;
|
|
|
- if(jobConf_ == null) {
|
|
|
- local = getClusterNick().equals("local");
|
|
|
+ if (jobConf_ == null) {
|
|
|
+ local = getClusterNick().equals("local");
|
|
|
} else {
|
|
|
- local = jobConf_.get("mapred.job.tracker", "").equals("local");
|
|
|
+ local = StreamUtil.isLocalJobTracker(jobConf_);
|
|
|
}
|
|
|
return local;
|
|
|
}
|
|
|
- protected String getClusterNick()
|
|
|
- {
|
|
|
+
|
|
|
+ protected String getClusterNick() {
|
|
|
return cluster_;
|
|
|
}
|
|
|
|
|
|
/** @return path to the created Jar file or null if no files are necessary.
|
|
|
- */
|
|
|
- protected String packageJobJar() throws IOException
|
|
|
- {
|
|
|
+ */
|
|
|
+ protected String packageJobJar() throws IOException {
|
|
|
ArrayList unjarFiles = new ArrayList();
|
|
|
|
|
|
// Runtime code: ship same version of code as self (job submitter code)
|
|
|
// usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar
|
|
|
- String runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
|
|
|
- if(runtimeClasses == null) {
|
|
|
- throw new IOException("runtime classes not found: " + getClass().getPackage());
|
|
|
+
|
|
|
+ // First try an explicit spec: it's too hard to find our own location in this case:
|
|
|
+ // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
|
|
|
+ // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
|
|
|
+ String runtimeClasses = jobConf_.get("stream.shipped.hadoopstreaming"); // jar or class dir
|
|
|
+
|
|
|
+ if (runtimeClasses == null) {
|
|
|
+ runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
|
|
|
+ }
|
|
|
+ if (runtimeClasses == null) {
|
|
|
+ throw new IOException("runtime classes not found: " + getClass().getPackage());
|
|
|
} else {
|
|
|
- msg("Found runtime classes in: " + runtimeClasses);
|
|
|
+ msg("Found runtime classes in: " + runtimeClasses);
|
|
|
}
|
|
|
- if(isLocalHadoop()) {
|
|
|
+ if (isLocalHadoop()) {
|
|
|
// don't package class files (they might get unpackaged in "." and then
|
|
|
// hide the intended CLASSPATH entry)
|
|
|
// we still package everything else (so that scripts and executable are found in
|
|
|
// Task workdir like distributed Hadoop)
|
|
|
} else {
|
|
|
- if(new File(runtimeClasses).isDirectory()) {
|
|
|
- packageFiles_.add(runtimeClasses);
|
|
|
+ if (new File(runtimeClasses).isDirectory()) {
|
|
|
+ packageFiles_.add(runtimeClasses);
|
|
|
} else {
|
|
|
- unjarFiles.add(runtimeClasses);
|
|
|
+ unjarFiles.add(runtimeClasses);
|
|
|
}
|
|
|
}
|
|
|
- if(packageFiles_.size() + unjarFiles.size()==0) {
|
|
|
+ if (packageFiles_.size() + unjarFiles.size() == 0) {
|
|
|
return null;
|
|
|
}
|
|
|
- File jobJar = File.createTempFile("streamjob", ".jar");
|
|
|
- System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
|
|
|
- if(debug_ == 0) {
|
|
|
+ String tmp = jobConf_.get("stream.tmpdir"); //, "/tmp/${user.name}/"
|
|
|
+ File tmpDir = (tmp == null) ? null : new File(tmp);
|
|
|
+ // tmpDir=null means OS default tmp dir
|
|
|
+ File jobJar = File.createTempFile("streamjob", ".jar", tmpDir);
|
|
|
+ System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar
|
|
|
+ + " tmpDir=" + tmpDir);
|
|
|
+ if (debug_ == 0) {
|
|
|
jobJar.deleteOnExit();
|
|
|
}
|
|
|
JarBuilder builder = new JarBuilder();
|
|
|
- if(verbose_) {
|
|
|
+ if (verbose_) {
|
|
|
builder.setVerbose(true);
|
|
|
}
|
|
|
String jobJarName = jobJar.getAbsolutePath();
|
|
@@ -400,53 +432,81 @@ public class StreamJob
|
|
|
return jobJarName;
|
|
|
}
|
|
|
|
|
|
- protected void setJobConf() throws IOException
|
|
|
- {
|
|
|
+ protected void setUserJobConfProps(boolean doEarlyProps) {
|
|
|
+ Iterator it = userJobConfProps_.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ String prop = (String) it.next();
|
|
|
+ String[] nv = prop.split("=", 2);
|
|
|
+ if (doEarlyProps == nv[0].equals("fs.default.name")) {
|
|
|
+ msg("xxxJobConf: set(" + nv[0] + ", " + nv[1] + ") early=" + doEarlyProps);
|
|
|
+ jobConf_.set(nv[0], nv[1]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void setJobConf() throws IOException {
|
|
|
msg("hadoopAliasConf_ = " + hadoopAliasConf_);
|
|
|
config_ = new Configuration();
|
|
|
- if(!cluster_.equals("default")) {
|
|
|
- config_.addFinalResource(new Path(getHadoopAliasConfFile()));
|
|
|
+ if (!cluster_.equals("default")) {
|
|
|
+ config_.addFinalResource(new Path(getHadoopAliasConfFile()));
|
|
|
} else {
|
|
|
// use only defaults: hadoop-default.xml and hadoop-site.xml
|
|
|
}
|
|
|
Iterator it = configPath_.iterator();
|
|
|
- while(it.hasNext()) {
|
|
|
- String pathName = (String)it.next();
|
|
|
- config_.addFinalResource(new Path(pathName));
|
|
|
+ while (it.hasNext()) {
|
|
|
+ String pathName = (String) it.next();
|
|
|
+ config_.addFinalResource(new Path(pathName));
|
|
|
}
|
|
|
+
|
|
|
+ testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));
|
|
|
+
|
|
|
// general MapRed job properties
|
|
|
jobConf_ = new JobConf(config_);
|
|
|
- for(int i=0; i<inputGlobs_.size(); i++) {
|
|
|
- jobConf_.addInputPath(new Path((String)inputGlobs_.get(i)));
|
|
|
+
|
|
|
+ setUserJobConfProps(true);
|
|
|
+
|
|
|
+ // The correct FS must be set before this is called!
|
|
|
+ // (to resolve local vs. dfs drive letter differences)
|
|
|
+ // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
|
|
|
+ for (int i = 0; i < inputSpecs_.size(); i++) {
|
|
|
+ addInputSpec((String) inputSpecs_.get(i), i);
|
|
|
+ }
|
|
|
+ jobConf_.setBoolean("stream.inputtagged", inputTagged_);
|
|
|
+ jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
|
|
|
+
|
|
|
+ Class fmt;
|
|
|
+ if (testMerge_ && false == hasSimpleInputSpecs_) {
|
|
|
+ // this ignores -inputreader
|
|
|
+ fmt = MergerInputFormat.class;
|
|
|
+ } else {
|
|
|
+ // need to keep this case to support custom -inputreader
|
|
|
+ // and their parameters ,n=v,n=v
|
|
|
+ fmt = StreamInputFormat.class;
|
|
|
}
|
|
|
+ jobConf_.setInputFormat(fmt);
|
|
|
|
|
|
- jobConf_.setInputFormat(StreamInputFormat.class);
|
|
|
// for SequenceFile, input classes may be overriden in getRecordReader
|
|
|
jobConf_.setInputKeyClass(Text.class);
|
|
|
jobConf_.setInputValueClass(Text.class);
|
|
|
|
|
|
jobConf_.setOutputKeyClass(Text.class);
|
|
|
jobConf_.setOutputValueClass(Text.class);
|
|
|
- //jobConf_.setCombinerClass();
|
|
|
-
|
|
|
- jobConf_.setOutputPath(new Path(output_));
|
|
|
- jobConf_.setOutputFormat(StreamOutputFormat.class);
|
|
|
|
|
|
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
|
|
|
|
|
|
String defaultPackage = this.getClass().getPackage().getName();
|
|
|
|
|
|
Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
|
|
|
- if(c != null) {
|
|
|
+ if (c != null) {
|
|
|
jobConf_.setMapperClass(c);
|
|
|
} else {
|
|
|
jobConf_.setMapperClass(PipeMapper.class);
|
|
|
jobConf_.set("stream.map.streamprocessor", mapCmd_);
|
|
|
}
|
|
|
|
|
|
- if(comCmd_ != null) {
|
|
|
+ if (comCmd_ != null) {
|
|
|
c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
|
|
|
- if(c != null) {
|
|
|
+ if (c != null) {
|
|
|
jobConf_.setCombinerClass(c);
|
|
|
} else {
|
|
|
jobConf_.setCombinerClass(PipeCombiner.class);
|
|
@@ -454,9 +514,11 @@ public class StreamJob
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if(redCmd_ != null) {
|
|
|
+ reducerNone_ = false;
|
|
|
+ if (redCmd_ != null) {
|
|
|
+ reducerNone_ = redCmd_.equals(REDUCE_NONE);
|
|
|
c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
|
|
|
- if(c != null) {
|
|
|
+ if (c != null) {
|
|
|
jobConf_.setReducerClass(c);
|
|
|
} else {
|
|
|
jobConf_.setReducerClass(PipeReducer.class);
|
|
@@ -464,66 +526,165 @@ public class StreamJob
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if(inReaderSpec_ != null) {
|
|
|
- String[] args = inReaderSpec_.split(",");
|
|
|
- String readerClass = args[0];
|
|
|
- // this argument can only be a Java class
|
|
|
- c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
|
|
|
- if(c != null) {
|
|
|
- jobConf_.set("stream.recordreader.class", c.getName());
|
|
|
- } else {
|
|
|
- fail("-inputreader: class not found: " + readerClass);
|
|
|
- }
|
|
|
- for(int i=1; i<args.length; i++) {
|
|
|
- String[] nv = args[i].split("=", 2);
|
|
|
- String k = "stream.recordreader." + nv[0];
|
|
|
- String v = (nv.length>1) ? nv[1] : "";
|
|
|
- jobConf_.set(k, v);
|
|
|
- }
|
|
|
+ if (inReaderSpec_ != null) {
|
|
|
+ String[] args = inReaderSpec_.split(",");
|
|
|
+ String readerClass = args[0];
|
|
|
+ // this argument can only be a Java class
|
|
|
+ c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
|
|
|
+ if (c != null) {
|
|
|
+ jobConf_.set("stream.recordreader.class", c.getName());
|
|
|
+ } else {
|
|
|
+ fail("-inputreader: class not found: " + readerClass);
|
|
|
+ }
|
|
|
+ for (int i = 1; i < args.length; i++) {
|
|
|
+ String[] nv = args[i].split("=", 2);
|
|
|
+ String k = "stream.recordreader." + nv[0];
|
|
|
+ String v = (nv.length > 1) ? nv[1] : "";
|
|
|
+ jobConf_.set(k, v);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- jar_ = packageJobJar();
|
|
|
- if(jar_ != null) {
|
|
|
- jobConf_.setJar(jar_);
|
|
|
+ // output setup is done late so we can customize for reducerNone_
|
|
|
+ //jobConf_.setOutputDir(new File(output_));
|
|
|
+ setOutputSpec();
|
|
|
+ if (testMerge_) {
|
|
|
+ fmt = MuxOutputFormat.class;
|
|
|
+ } else {
|
|
|
+ fmt = StreamOutputFormat.class;
|
|
|
}
|
|
|
+ jobConf_.setOutputFormat(fmt);
|
|
|
|
|
|
// last, allow user to override anything
|
|
|
// (although typically used with properties we didn't touch)
|
|
|
- it = userJobConfProps_.iterator();
|
|
|
- while(it.hasNext()) {
|
|
|
- String prop = (String)it.next();
|
|
|
- String[] nv = prop.split("=", 2);
|
|
|
- msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
|
|
|
- jobConf_.set(nv[0], nv[1]);
|
|
|
+ setUserJobConfProps(false);
|
|
|
+
|
|
|
+ jar_ = packageJobJar();
|
|
|
+ if (jar_ != null) {
|
|
|
+ jobConf_.setJar(jar_);
|
|
|
}
|
|
|
+
|
|
|
+ if(verbose_) {
|
|
|
+ listJobConfProperties();
|
|
|
+ }
|
|
|
+
|
|
|
msg("submitting to jobconf: " + getJobTrackerHostPort());
|
|
|
}
|
|
|
|
|
|
- protected String getJobTrackerHostPort()
|
|
|
+ protected void listJobConfProperties()
|
|
|
{
|
|
|
+ msg("==== JobConf properties:");
|
|
|
+ Iterator it = jobConf_.entries();
|
|
|
+ TreeMap sorted = new TreeMap();
|
|
|
+ while(it.hasNext()) {
|
|
|
+ Map.Entry en = (Map.Entry)it.next();
|
|
|
+ sorted.put(en.getKey(), en.getValue());
|
|
|
+ }
|
|
|
+ it = sorted.entrySet().iterator();
|
|
|
+ while(it.hasNext()) {
|
|
|
+ Map.Entry en = (Map.Entry)it.next();
|
|
|
+ msg(en.getKey() + "=" + en.getValue());
|
|
|
+ }
|
|
|
+ msg("====");
|
|
|
+ }
|
|
|
+
|
|
|
+ /** InputSpec-s encode: a glob pattern x additional column files x additional joins */
|
|
|
+ protected void addInputSpec(String inSpec, int index) {
|
|
|
+ if (!testMerge_) {
|
|
|
+ jobConf_.addInputPath(new Path(inSpec));
|
|
|
+ } else {
|
|
|
+ CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);
|
|
|
+ msg("Parsed -input:\n" + spec.toTableString());
|
|
|
+ if (index == 0) {
|
|
|
+ hasSimpleInputSpecs_ = (spec.paths_.length == 0);
|
|
|
+ msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);
|
|
|
+ }
|
|
|
+ String primary = spec.primarySpec();
|
|
|
+ if (!seenPrimary_.add(primary)) {
|
|
|
+ // this won't detect glob overlaps and noncanonical path variations
|
|
|
+ fail("Primary used in multiple -input spec: " + primary);
|
|
|
+ }
|
|
|
+ jobConf_.addInputPath(new Path(primary));
|
|
|
+ // during Job execution, will reparse into a CompoundDirSpec
|
|
|
+ jobConf_.set("stream.inputspecs." + index, inSpec);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** uses output_ and mapsideoutURI_ */
|
|
|
+ protected void setOutputSpec() throws IOException {
|
|
|
+ CompoundDirSpec spec = new CompoundDirSpec(output_, false);
|
|
|
+ msg("Parsed -output:\n" + spec.toTableString());
|
|
|
+ String primary = spec.primarySpec();
|
|
|
+ String channel0;
|
|
|
+ // TODO simplify cases, encapsulate in a StreamJobConf
|
|
|
+ if (!reducerNone_) {
|
|
|
+ channel0 = primary;
|
|
|
+ } else {
|
|
|
+ if (mapsideoutURI_ != null) {
|
|
|
+ // user can override in case this is in a difft filesystem..
|
|
|
+ try {
|
|
|
+ URI uri = new URI(mapsideoutURI_);
|
|
|
+ if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")
|
|
|
+ if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {
|
|
|
+ fail("Must be absolute: " + mapsideoutURI_);
|
|
|
+ }
|
|
|
+ } else if (uri.getScheme().equals("socket")) {
|
|
|
+ // ok
|
|
|
+ } else {
|
|
|
+ fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);
|
|
|
+ }
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw (IOException) new IOException().initCause(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ mapsideoutURI_ = primary;
|
|
|
+ }
|
|
|
+ // an empty reduce output named "part-00002" will go here and not collide.
|
|
|
+ channel0 = primary + ".NONE";
|
|
|
+ // the side-effect of the first split of an input named "part-00002"
|
|
|
+ // will go in this directory
|
|
|
+ jobConf_.set("stream.sideoutput.dir", primary);
|
|
|
+ // oops if user overrides low-level this isn't set yet :-(
|
|
|
+ boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);
|
|
|
+ // just a guess user may prefer remote..
|
|
|
+ jobConf_.setBoolean("stream.sideoutput.localfs", localjt);
|
|
|
+ }
|
|
|
+ // a path in fs.name.default filesystem
|
|
|
+ System.out.println(channel0);
|
|
|
+ System.out.println(new Path(channel0));
|
|
|
+ jobConf_.setOutputPath(new Path(channel0));
|
|
|
+ // will reparse remotely
|
|
|
+ jobConf_.set("stream.outputspec", output_);
|
|
|
+ if (null != mapsideoutURI_) {
|
|
|
+ // a path in "jobtracker's filesystem"
|
|
|
+ // overrides sideoutput.dir
|
|
|
+ jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String getJobTrackerHostPort() {
|
|
|
return jobConf_.get("mapred.job.tracker");
|
|
|
}
|
|
|
|
|
|
- protected void jobInfo()
|
|
|
- {
|
|
|
- if(isLocalHadoop()) {
|
|
|
+ protected void jobInfo() {
|
|
|
+ if (isLocalHadoop()) {
|
|
|
LOG.info("Job running in-process (local Hadoop)");
|
|
|
} else {
|
|
|
String hp = getJobTrackerHostPort();
|
|
|
LOG.info("To kill this job, run:");
|
|
|
- LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
|
|
|
+ LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill "
|
|
|
+ + jobId_);
|
|
|
//LOG.info("Job file: " + running_.getJobFile() );
|
|
|
- LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
|
|
|
+ LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Based on JobClient
|
|
|
public void submitAndMonitorJob() throws IOException {
|
|
|
|
|
|
- if(jar_ != null && isLocalHadoop()) {
|
|
|
- // getAbs became required when shell and subvm have different working dirs...
|
|
|
- File wd = new File(".").getAbsoluteFile();
|
|
|
- StreamUtil.unJar(new File(jar_), wd);
|
|
|
+ if (jar_ != null && isLocalHadoop()) {
|
|
|
+ // getAbs became required when shell and subvm have different working dirs...
|
|
|
+ File wd = new File(".").getAbsoluteFile();
|
|
|
+ StreamUtil.unJar(new File(jar_), wd);
|
|
|
}
|
|
|
|
|
|
// if jobConf_ changes must recreate a JobClient
|
|
@@ -542,11 +703,12 @@ public class StreamJob
|
|
|
while (!running_.isComplete()) {
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
- } catch (InterruptedException e) {}
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
running_ = jc_.getJob(jobId_);
|
|
|
String report = null;
|
|
|
- report = " map "+Math.round(running_.mapProgress()*100)
|
|
|
- +"% reduce " + Math.round(running_.reduceProgress()*100)+"%";
|
|
|
+ report = " map " + Math.round(running_.mapProgress() * 100) + "% reduce "
|
|
|
+ + Math.round(running_.reduceProgress() * 100) + "%";
|
|
|
|
|
|
if (!report.equals(lastReport)) {
|
|
|
LOG.info(report);
|
|
@@ -569,7 +731,6 @@ public class StreamJob
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
protected boolean mayExit_;
|
|
|
protected String[] argv_;
|
|
|
protected boolean verbose_;
|
|
@@ -585,11 +746,15 @@ public class StreamJob
|
|
|
protected JobClient jc_;
|
|
|
|
|
|
// command-line arguments
|
|
|
- protected ArrayList inputGlobs_ = new ArrayList(); // <String>
|
|
|
- protected ArrayList packageFiles_ = new ArrayList(); // <String>
|
|
|
- protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>
|
|
|
- protected ArrayList userJobConfProps_ = new ArrayList(); // <String>
|
|
|
+ protected ArrayList inputSpecs_ = new ArrayList(); // <String>
|
|
|
+ protected boolean inputTagged_ = false;
|
|
|
+ protected TreeSet seenPrimary_ = new TreeSet(); // <String>
|
|
|
+ protected boolean hasSimpleInputSpecs_;
|
|
|
+ protected ArrayList packageFiles_ = new ArrayList(); // <String>
|
|
|
+ protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
|
|
|
+ protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
|
|
|
protected String output_;
|
|
|
+ protected String mapsideoutURI_;
|
|
|
protected String mapCmd_;
|
|
|
protected String comCmd_;
|
|
|
protected String redCmd_;
|
|
@@ -598,6 +763,7 @@ public class StreamJob
|
|
|
protected String hadoopAliasConf_;
|
|
|
protected String inReaderSpec_;
|
|
|
|
|
|
+ protected boolean testMerge_;
|
|
|
|
|
|
// Use to communicate config to the external processes (ex env.var.HADOOP_USER)
|
|
|
// encoding "a=b c=d"
|
|
@@ -609,6 +775,4 @@ public class StreamJob
|
|
|
protected RunningJob running_;
|
|
|
protected String jobId_;
|
|
|
|
|
|
-
|
|
|
}
|
|
|
-
|