|
@@ -32,21 +32,16 @@ import java.util.Map;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
-import org.apache.commons.cli2.Argument;
|
|
|
-import org.apache.commons.cli2.CommandLine;
|
|
|
-import org.apache.commons.cli2.Group;
|
|
|
-import org.apache.commons.cli2.Option;
|
|
|
-import org.apache.commons.cli2.OptionException;
|
|
|
-import org.apache.commons.cli2.WriteableCommandLine;
|
|
|
-import org.apache.commons.cli2.builder.ArgumentBuilder;
|
|
|
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
|
|
|
-import org.apache.commons.cli2.builder.GroupBuilder;
|
|
|
-import org.apache.commons.cli2.commandline.Parser;
|
|
|
-import org.apache.commons.cli2.option.PropertyOption;
|
|
|
-import org.apache.commons.cli2.resource.ResourceConstants;
|
|
|
-import org.apache.commons.cli2.util.HelpFormatter;
|
|
|
-import org.apache.commons.cli2.validation.InvalidArgumentException;
|
|
|
-import org.apache.commons.cli2.validation.Validator;
|
|
|
+import org.apache.commons.cli.BasicParser;
|
|
|
+import org.apache.commons.cli.CommandLine;
|
|
|
+import org.apache.commons.cli.CommandLineParser;
|
|
|
+import org.apache.commons.cli.GnuParser;
|
|
|
+import org.apache.commons.cli.HelpFormatter;
|
|
|
+import org.apache.commons.cli.Option;
|
|
|
+import org.apache.commons.cli.OptionBuilder;
|
|
|
+import org.apache.commons.cli.OptionGroup;
|
|
|
+import org.apache.commons.cli.Options;
|
|
|
+import org.apache.commons.cli.Parser;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -81,18 +76,8 @@ public class StreamJob implements Tool {
|
|
|
final static String REDUCE_NONE = "NONE";
|
|
|
|
|
|
/** -----------Streaming CLI Implementation **/
|
|
|
- private DefaultOptionBuilder builder =
|
|
|
- new DefaultOptionBuilder("-","-", false);
|
|
|
- private ArgumentBuilder argBuilder = new ArgumentBuilder();
|
|
|
- private Parser parser = new Parser();
|
|
|
- private Group allOptions;
|
|
|
- HelpFormatter helpFormatter = new HelpFormatter(" ", " ", " ", 900);
|
|
|
- // need these two at class level to extract values later from
|
|
|
- // commons-cli command line
|
|
|
- private MultiPropertyOption jobconf = new MultiPropertyOption(
|
|
|
- "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D');
|
|
|
- private MultiPropertyOption cmdenv = new MultiPropertyOption(
|
|
|
- "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E');
|
|
|
+ private CommandLineParser parser = new BasicParser();
|
|
|
+ private Options allOptions;
|
|
|
/**@deprecated use StreamJob() with ToolRunner or set the
|
|
|
* Configuration using {@link #setConf(Configuration)} and
|
|
|
* run with {@link #run(String[])}.
|
|
@@ -250,71 +235,82 @@ public class StreamJob implements Tool {
|
|
|
void parseArgv(){
|
|
|
CommandLine cmdLine = null;
|
|
|
try{
|
|
|
- cmdLine = parser.parse(argv_);
|
|
|
+ cmdLine = parser.parse(allOptions, argv_);
|
|
|
}catch(Exception oe){
|
|
|
LOG.error(oe.getMessage());
|
|
|
exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
|
|
|
}
|
|
|
|
|
|
if (cmdLine != null){
|
|
|
- verbose_ = cmdLine.hasOption("-verbose");
|
|
|
- detailedUsage_ = cmdLine.hasOption("-info");
|
|
|
- debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_;
|
|
|
+ verbose_ = cmdLine.hasOption("verbose");
|
|
|
+ detailedUsage_ = cmdLine.hasOption("info");
|
|
|
+ debug_ = cmdLine.hasOption("debug")? debug_ + 1 : debug_;
|
|
|
|
|
|
- inputSpecs_.addAll(cmdLine.getValues("-input"));
|
|
|
- output_ = (String) cmdLine.getValue("-output");
|
|
|
+ String[] values = cmdLine.getOptionValues("input");
|
|
|
+ if (values != null && values.length > 0) {
|
|
|
+ for (String input : values) {
|
|
|
+ inputSpecs_.add(input);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ output_ = (String) cmdLine.getOptionValue("output");
|
|
|
+
|
|
|
|
|
|
- mapCmd_ = (String)cmdLine.getValue("-mapper");
|
|
|
- comCmd_ = (String)cmdLine.getValue("-combiner");
|
|
|
- redCmd_ = (String)cmdLine.getValue("-reducer");
|
|
|
+ mapCmd_ = (String)cmdLine.getOptionValue("mapper");
|
|
|
+ comCmd_ = (String)cmdLine.getOptionValue("combiner");
|
|
|
+ redCmd_ = (String)cmdLine.getOptionValue("reducer");
|
|
|
|
|
|
- if(!cmdLine.getValues("-file").isEmpty()) {
|
|
|
- packageFiles_.addAll(cmdLine.getValues("-file"));
|
|
|
+ values = cmdLine.getOptionValues("file");
|
|
|
+ if (values != null && values.length > 0) {
|
|
|
+ for (String file : values) {
|
|
|
+ packageFiles_.add(file);
|
|
|
+ }
|
|
|
+ validate(packageFiles_);
|
|
|
}
|
|
|
+
|
|
|
|
|
|
- String fsName = (String)cmdLine.getValue("-dfs");
|
|
|
+ String fsName = (String)cmdLine.getOptionValue("dfs");
|
|
|
if (null != fsName){
|
|
|
LOG.warn("-dfs option is deprecated, please use -fs instead.");
|
|
|
config_.set("fs.default.name", fsName);
|
|
|
}
|
|
|
|
|
|
- additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec");
|
|
|
- inputFormatSpec_ = (String)cmdLine.getValue("-inputformat");
|
|
|
- outputFormatSpec_ = (String)cmdLine.getValue("-outputformat");
|
|
|
- numReduceTasksSpec_ = (String)cmdLine.getValue("-numReduceTasks");
|
|
|
- partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
|
|
|
- inReaderSpec_ = (String)cmdLine.getValue("-inputreader");
|
|
|
- mapDebugSpec_ = (String)cmdLine.getValue("-mapdebug");
|
|
|
- reduceDebugSpec_ = (String)cmdLine.getValue("-reducedebug");
|
|
|
+ additionalConfSpec_ = (String)cmdLine.getOptionValue("additionalconfspec");
|
|
|
+ inputFormatSpec_ = (String)cmdLine.getOptionValue("inputformat");
|
|
|
+ outputFormatSpec_ = (String)cmdLine.getOptionValue("outputformat");
|
|
|
+ numReduceTasksSpec_ = (String)cmdLine.getOptionValue("numReduceTasks");
|
|
|
+ partitionerSpec_ = (String)cmdLine.getOptionValue("partitioner");
|
|
|
+ inReaderSpec_ = (String)cmdLine.getOptionValue("inputreader");
|
|
|
+ mapDebugSpec_ = (String)cmdLine.getOptionValue("mapdebug");
|
|
|
+ reduceDebugSpec_ = (String)cmdLine.getOptionValue("reducedebug");
|
|
|
|
|
|
- List<String> car = cmdLine.getValues("-cacheArchive");
|
|
|
- if (null != car && !car.isEmpty()){
|
|
|
+ String[] car = cmdLine.getOptionValues("cacheArchive");
|
|
|
+ if (null != car && car.length > 0){
|
|
|
LOG.warn("-cacheArchive option is deprecated, please use -archives instead.");
|
|
|
for(String s : car){
|
|
|
cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- List<String> caf = cmdLine.getValues("-cacheFile");
|
|
|
- if (null != caf && !caf.isEmpty()){
|
|
|
+ String[] caf = cmdLine.getOptionValues("cacheFile");
|
|
|
+ if (null != caf && caf.length > 0){
|
|
|
LOG.warn("-cacheFile option is deprecated, please use -files instead.");
|
|
|
for(String s : caf){
|
|
|
cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- List<String> jobConfArgs = (List<String>)cmdLine.getValue(jobconf);
|
|
|
- List<String> envArgs = (List<String>)cmdLine.getValue(cmdenv);
|
|
|
-
|
|
|
- if (null != jobConfArgs && !jobConfArgs.isEmpty()){
|
|
|
+ String[] jobconf = cmdLine.getOptionValues("jobconf");
|
|
|
+ if (null != jobconf && jobconf.length > 0){
|
|
|
LOG.warn("-jobconf option is deprecated, please use -D instead.");
|
|
|
- for(String s : jobConfArgs){
|
|
|
+ for(String s : jobconf){
|
|
|
String []parts = s.split("=", 2);
|
|
|
config_.set(parts[0], parts[1]);
|
|
|
}
|
|
|
}
|
|
|
- if (null != envArgs){
|
|
|
- for(String s : envArgs){
|
|
|
+
|
|
|
+ String[] cmd = cmdLine.getOptionValues("cmdenv");
|
|
|
+ if (null != cmd && cmd.length > 0){
|
|
|
+ for(String s : cmd) {
|
|
|
if (addTaskEnvironment_.length() > 0) {
|
|
|
addTaskEnvironment_ += " ";
|
|
|
}
|
|
@@ -334,83 +330,31 @@ public class StreamJob implements Tool {
|
|
|
|
|
|
private Option createOption(String name, String desc,
|
|
|
String argName, int max, boolean required){
|
|
|
- Argument argument = argBuilder.
|
|
|
- withName(argName).
|
|
|
- withMinimum(1).
|
|
|
- withMaximum(max).
|
|
|
- create();
|
|
|
- return builder.
|
|
|
- withLongName(name).
|
|
|
- withArgument(argument).
|
|
|
- withDescription(desc).
|
|
|
- withRequired(required).
|
|
|
- create();
|
|
|
+ return OptionBuilder
|
|
|
+ .withArgName(argName)
|
|
|
+ .hasArgs(max)
|
|
|
+ .withDescription(desc)
|
|
|
+ .isRequired(required)
|
|
|
+ .create(name);
|
|
|
}
|
|
|
|
|
|
- private Option createOption(String name, String desc,
|
|
|
- String argName, int max, boolean required, Validator validator){
|
|
|
-
|
|
|
- Argument argument = argBuilder.
|
|
|
- withName(argName).
|
|
|
- withMinimum(1).
|
|
|
- withMaximum(max).
|
|
|
- withValidator(validator).
|
|
|
- create();
|
|
|
-
|
|
|
- return builder.
|
|
|
- withLongName(name).
|
|
|
- withArgument(argument).
|
|
|
- withDescription(desc).
|
|
|
- withRequired(required).
|
|
|
- create();
|
|
|
- }
|
|
|
-
|
|
|
private Option createBoolOption(String name, String desc){
|
|
|
- return builder.withLongName(name).withDescription(desc).create();
|
|
|
+ return OptionBuilder.withDescription(desc).create(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void validate(final List<String> values)
|
|
|
+ throws IllegalArgumentException {
|
|
|
+ for (String file : values) {
|
|
|
+ File f = new File(file);
|
|
|
+ if (!f.canRead()) {
|
|
|
+ throw new IllegalArgumentException("File : " + f.getAbsolutePath()
|
|
|
+ + " is not readable.");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void setupOptions(){
|
|
|
|
|
|
- final Validator fileValidator = new Validator(){
|
|
|
- public void validate(final List values) throws InvalidArgumentException {
|
|
|
- // Note : This code doesnt belong here, it should be changed to
|
|
|
- // an can exec check in java 6
|
|
|
- for (String file : (List<String>)values) {
|
|
|
- File f = new File(file);
|
|
|
- if (!f.exists()) {
|
|
|
- throw new InvalidArgumentException("Argument : " +
|
|
|
- f.getAbsolutePath() + " doesn't exist.");
|
|
|
- }
|
|
|
- if (!f.isFile()) {
|
|
|
- throw new InvalidArgumentException("Argument : " +
|
|
|
- f.getAbsolutePath() + " is not a file.");
|
|
|
- }
|
|
|
- if (!f.canRead()) {
|
|
|
- throw new InvalidArgumentException("Argument : " +
|
|
|
- f.getAbsolutePath() + " is not accessible");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- // Note: not extending CLI2's FileValidator, that overwrites
|
|
|
- // the String arg into File and causes ClassCastException
|
|
|
- // in inheritance tree.
|
|
|
- final Validator execValidator = new Validator(){
|
|
|
- public void validate(final List values) throws InvalidArgumentException {
|
|
|
- // Note : This code doesnt belong here, it should be changed to
|
|
|
- // an can exec check in java 6
|
|
|
- for (String file : (List<String>)values) {
|
|
|
- try{
|
|
|
- Runtime.getRuntime().exec("chmod 0777 " + (new File(file)).getAbsolutePath());
|
|
|
- }catch(IOException ioe){
|
|
|
- // ignore
|
|
|
- }
|
|
|
- }
|
|
|
- fileValidator.validate(values);
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
Option input = createOption("input",
|
|
|
"DFS input file(s) for the Map step",
|
|
|
"path",
|
|
@@ -428,8 +372,8 @@ public class StreamJob implements Tool {
|
|
|
Option reducer = createOption("reducer",
|
|
|
"The streaming command to run", "cmd", 1, false);
|
|
|
Option file = createOption("file",
|
|
|
- "File/dir to be shipped in the Job jar file",
|
|
|
- "file", Integer.MAX_VALUE, false, execValidator);
|
|
|
+ "File to be shipped in the Job jar file",
|
|
|
+ "file", Integer.MAX_VALUE, false);
|
|
|
Option dfs = createOption("dfs",
|
|
|
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
|
|
|
Option jt = createOption("jt",
|
|
@@ -450,6 +394,14 @@ public class StreamJob implements Tool {
|
|
|
"Optional.", "spec", 1, false);
|
|
|
Option reduceDebug = createOption("reducedebug",
|
|
|
"Optional", "spec",1, false);
|
|
|
+ Option jobconf =
|
|
|
+ createOption("jobconf",
|
|
|
+ "(n=v) Optional. Add or override a JobConf property.",
|
|
|
+ "spec", 1, false);
|
|
|
+
|
|
|
+ Option cmdenv =
|
|
|
+ createOption("cmdenv", "(n=v) Pass env.var to streaming commands.",
|
|
|
+ "spec", 1, false);
|
|
|
Option cacheFile = createOption("cacheFile",
|
|
|
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
|
|
|
Option cacheArchive = createOption("cacheArchive",
|
|
@@ -463,35 +415,32 @@ public class StreamJob implements Tool {
|
|
|
Option debug = createBoolOption("debug", "print debug output");
|
|
|
Option inputtagged = createBoolOption("inputtagged", "inputtagged");
|
|
|
|
|
|
- allOptions = new GroupBuilder().
|
|
|
- withOption(input).
|
|
|
- withOption(output).
|
|
|
- withOption(mapper).
|
|
|
- withOption(combiner).
|
|
|
- withOption(reducer).
|
|
|
- withOption(file).
|
|
|
- withOption(dfs).
|
|
|
- withOption(jt).
|
|
|
- withOption(additionalconfspec).
|
|
|
- withOption(inputformat).
|
|
|
- withOption(outputformat).
|
|
|
- withOption(partitioner).
|
|
|
- withOption(numReduceTasks).
|
|
|
- withOption(inputreader).
|
|
|
- withOption(mapDebug).
|
|
|
- withOption(reduceDebug).
|
|
|
- withOption(jobconf).
|
|
|
- withOption(cmdenv).
|
|
|
- withOption(cacheFile).
|
|
|
- withOption(cacheArchive).
|
|
|
- withOption(verbose).
|
|
|
- withOption(info).
|
|
|
- withOption(debug).
|
|
|
- withOption(inputtagged).
|
|
|
- withOption(help).
|
|
|
- create();
|
|
|
- parser.setGroup(allOptions);
|
|
|
-
|
|
|
+ allOptions = new Options().
|
|
|
+ addOption(input).
|
|
|
+ addOption(output).
|
|
|
+ addOption(mapper).
|
|
|
+ addOption(combiner).
|
|
|
+ addOption(reducer).
|
|
|
+ addOption(file).
|
|
|
+ addOption(dfs).
|
|
|
+ addOption(jt).
|
|
|
+ addOption(additionalconfspec).
|
|
|
+ addOption(inputformat).
|
|
|
+ addOption(outputformat).
|
|
|
+ addOption(partitioner).
|
|
|
+ addOption(numReduceTasks).
|
|
|
+ addOption(inputreader).
|
|
|
+ addOption(mapDebug).
|
|
|
+ addOption(reduceDebug).
|
|
|
+ addOption(jobconf).
|
|
|
+ addOption(cmdenv).
|
|
|
+ addOption(cacheFile).
|
|
|
+ addOption(cacheArchive).
|
|
|
+ addOption(verbose).
|
|
|
+ addOption(info).
|
|
|
+ addOption(debug).
|
|
|
+ addOption(inputtagged).
|
|
|
+ addOption(help);
|
|
|
}
|
|
|
|
|
|
public void exitUsage(boolean detailed) {
|
|
@@ -535,7 +484,7 @@ public class StreamJob implements Tool {
|
|
|
System.out.println("Map output format, reduce input/output format:");
|
|
|
System.out.println(" Format defined by what the mapper command outputs. Line-oriented");
|
|
|
System.out.println();
|
|
|
- System.out.println("The files or directories named in the -file argument[s] end up in the");
|
|
|
+ System.out.println("The files named in the -file argument[s] end up in the");
|
|
|
System.out.println(" working directory when the mapper and reducer are run.");
|
|
|
System.out.println(" The location of this working directory is unspecified.");
|
|
|
System.out.println();
|
|
@@ -973,58 +922,6 @@ public class StreamJob implements Tool {
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
- /** Support -jobconf x=y x1=y1 type options **/
|
|
|
- static class MultiPropertyOption extends PropertyOption{
|
|
|
- private String optionString;
|
|
|
- MultiPropertyOption(){
|
|
|
- super();
|
|
|
- }
|
|
|
-
|
|
|
- MultiPropertyOption(final String optionString,
|
|
|
- final String description,
|
|
|
- final int id){
|
|
|
- super(optionString, description, id);
|
|
|
- this.optionString = optionString;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean canProcess(final WriteableCommandLine commandLine,
|
|
|
- final String argument) {
|
|
|
- boolean ret = (argument != null) && argument.startsWith(optionString);
|
|
|
-
|
|
|
- return ret;
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void process(final WriteableCommandLine commandLine,
|
|
|
- final ListIterator arguments) throws OptionException {
|
|
|
- final String arg = (String) arguments.next();
|
|
|
-
|
|
|
- if (!canProcess(commandLine, arg)) {
|
|
|
- throw new OptionException(this,
|
|
|
- ResourceConstants.UNEXPECTED_TOKEN, arg);
|
|
|
- }
|
|
|
-
|
|
|
- ArrayList properties = new ArrayList();
|
|
|
- String next = "";
|
|
|
- while(arguments.hasNext()){
|
|
|
- next = (String) arguments.next();
|
|
|
- if (!next.startsWith("-")){
|
|
|
- properties.add(next);
|
|
|
- }else{
|
|
|
- arguments.previous();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // add to any existing values (support specifying args multiple times)
|
|
|
- List<String> oldVal = (List<String>)commandLine.getValue(this);
|
|
|
- if (oldVal == null){
|
|
|
- commandLine.addValue(this, properties);
|
|
|
- }else{
|
|
|
- oldVal.addAll(properties);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
protected String[] argv_;
|
|
|
protected boolean verbose_;
|