|
@@ -59,7 +59,11 @@ import org.apache.hadoop.mapred.FileAlreadyExistsException;
|
|
import org.apache.hadoop.mapred.InvalidJobConfException;
|
|
import org.apache.hadoop.mapred.InvalidJobConfException;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
|
|
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
|
|
import org.apache.hadoop.mapred.RunningJob;
|
|
import org.apache.hadoop.mapred.RunningJob;
|
|
|
|
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
|
|
+import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
|
|
|
|
+import org.apache.hadoop.mapred.TextOutputFormat;
|
|
import org.apache.hadoop.filecache.*;
|
|
import org.apache.hadoop.filecache.*;
|
|
import org.apache.hadoop.util.*;
|
|
import org.apache.hadoop.util.*;
|
|
import org.apache.log4j.helpers.OptionConverter;
|
|
import org.apache.log4j.helpers.OptionConverter;
|
|
@@ -235,6 +239,10 @@ public class StreamJob {
|
|
userJobConfProps_.put("fs.default.name", jt);
|
|
userJobConfProps_.put("fs.default.name", jt);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec");
|
|
|
|
+ inputFormatSpec_ = (String)cmdLine.getValue("-inputformat");
|
|
|
|
+ outputFormatSpec_ = (String)cmdLine.getValue("-outputformat");
|
|
|
|
+ partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
|
|
inReaderSpec_ = (String)cmdLine.getValue("-inputreader");
|
|
inReaderSpec_ = (String)cmdLine.getValue("-inputreader");
|
|
|
|
|
|
List<String> car = cmdLine.getValues("-cacheArchive");
|
|
List<String> car = cmdLine.getValues("-cacheArchive");
|
|
@@ -381,6 +389,14 @@ public class StreamJob {
|
|
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
|
|
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
|
|
Option jt = createOption("jt",
|
|
Option jt = createOption("jt",
|
|
"Optional. Override JobTracker configuration", "<h:p>|local",1, false);
|
|
"Optional. Override JobTracker configuration", "<h:p>|local",1, false);
|
|
|
|
+ Option additionalconfspec = createOption("additionalconfspec",
|
|
|
|
+ "Optional.", "spec",1, false );
|
|
|
|
+ Option inputformat = createOption("inputformat",
|
|
|
|
+ "Optional.", "spec",1, false );
|
|
|
|
+ Option outputformat = createOption("outputformat",
|
|
|
|
+ "Optional.", "spec",1, false );
|
|
|
|
+ Option partitioner = createOption("partitioner",
|
|
|
|
+ "Optional.", "spec",1, false );
|
|
Option inputreader = createOption("inputreader",
|
|
Option inputreader = createOption("inputreader",
|
|
"Optional.", "spec",1, false );
|
|
"Optional.", "spec",1, false );
|
|
Option cacheFile = createOption("cacheFile",
|
|
Option cacheFile = createOption("cacheFile",
|
|
@@ -405,6 +421,10 @@ public class StreamJob {
|
|
withOption(file).
|
|
withOption(file).
|
|
withOption(dfs).
|
|
withOption(dfs).
|
|
withOption(jt).
|
|
withOption(jt).
|
|
|
|
+ withOption(additionalconfspec).
|
|
|
|
+ withOption(inputformat).
|
|
|
|
+ withOption(outputformat).
|
|
|
|
+ withOption(partitioner).
|
|
withOption(inputreader).
|
|
withOption(inputreader).
|
|
withOption(jobconf).
|
|
withOption(jobconf).
|
|
withOption(cmdenv).
|
|
withOption(cmdenv).
|
|
@@ -438,6 +458,10 @@ public class StreamJob {
|
|
//System.out.println(" -config <file> Optional. One or more paths to xml config files");
|
|
//System.out.println(" -config <file> Optional. One or more paths to xml config files");
|
|
System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration");
|
|
System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration");
|
|
System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration");
|
|
System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration");
|
|
|
|
+ System.out.println(" -additionalconfspec specfile Optional.");
|
|
|
|
+ System.out.println(" -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat Optional.");
|
|
|
|
+ System.out.println(" -outputformat specfile Optional.");
|
|
|
|
+ System.out.println(" -partitioner specfile Optional.");
|
|
System.out.println(" -inputreader <spec> Optional.");
|
|
System.out.println(" -inputreader <spec> Optional.");
|
|
System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
|
|
System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
|
|
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
|
|
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
|
|
@@ -645,6 +669,10 @@ public class StreamJob {
|
|
} else {
|
|
} else {
|
|
// use only defaults: hadoop-default.xml and hadoop-site.xml
|
|
// use only defaults: hadoop-default.xml and hadoop-site.xml
|
|
}
|
|
}
|
|
|
|
+ System.out.println("additionalConfSpec_:" + additionalConfSpec_);
|
|
|
|
+ if (additionalConfSpec_ != null) {
|
|
|
|
+ config_.addDefaultResource(new Path(additionalConfSpec_));
|
|
|
|
+ }
|
|
Iterator it = configPath_.iterator();
|
|
Iterator it = configPath_.iterator();
|
|
while (it.hasNext()) {
|
|
while (it.hasNext()) {
|
|
String pathName = (String) it.next();
|
|
String pathName = (String) it.next();
|
|
@@ -670,29 +698,53 @@ public class StreamJob {
|
|
jobConf_.setBoolean("stream.inputtagged", inputTagged_);
|
|
jobConf_.setBoolean("stream.inputtagged", inputTagged_);
|
|
jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
|
|
jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
|
|
|
|
|
|
- Class fmt;
|
|
|
|
- if (testMerge_ && false == hasSimpleInputSpecs_) {
|
|
|
|
- // this ignores -inputreader
|
|
|
|
- fmt = MergerInputFormat.class;
|
|
|
|
- } else {
|
|
|
|
- // need to keep this case to support custom -inputreader
|
|
|
|
- // and their parameters ,n=v,n=v
|
|
|
|
- fmt = StreamInputFormat.class;
|
|
|
|
|
|
+ String defaultPackage = this.getClass().getPackage().getName();
|
|
|
|
+ Class c;
|
|
|
|
+ Class fmt = null;
|
|
|
|
+ if (inReaderSpec_ == null && inputFormatSpec_ == null) {
|
|
|
|
+ fmt = KeyValueTextInputFormat.class;
|
|
|
|
+ } else if (inputFormatSpec_ != null) {
|
|
|
|
+ if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0)
|
|
|
|
+ || (inputFormatSpec_
|
|
|
|
+ .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
|
|
|
|
+ fmt = KeyValueTextInputFormat.class;
|
|
|
|
+ } else if ((inputFormatSpec_
|
|
|
|
+ .compareToIgnoreCase("SequenceFileInputFormat") == 0)
|
|
|
|
+ || (inputFormatSpec_
|
|
|
|
+ .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
|
|
|
|
+ fmt = SequenceFileInputFormat.class;
|
|
|
|
+ } else if ((inputFormatSpec_
|
|
|
|
+ .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
|
|
|
|
+ || (inputFormatSpec_
|
|
|
|
+ .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
|
|
|
|
+ fmt = SequenceFileAsTextInputFormat.class;
|
|
|
|
+ } else {
|
|
|
|
+ c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
|
|
|
|
+ if (c != null) {
|
|
|
|
+ fmt = c;
|
|
|
|
+ } else {
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (fmt == null) {
|
|
|
|
+ if (testMerge_ && false == hasSimpleInputSpecs_) {
|
|
|
|
+ // this ignores -inputreader
|
|
|
|
+ fmt = MergerInputFormat.class;
|
|
|
|
+ } else {
|
|
|
|
+ // need to keep this case to support custom -inputreader
|
|
|
|
+ // and their parameters ,n=v,n=v
|
|
|
|
+ fmt = StreamInputFormat.class;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- jobConf_.setInputFormat(fmt);
|
|
|
|
|
|
|
|
- // for SequenceFile, input classes may be overriden in getRecordReader
|
|
|
|
- jobConf_.setInputKeyClass(Text.class);
|
|
|
|
- jobConf_.setInputValueClass(Text.class);
|
|
|
|
|
|
+ jobConf_.setInputFormat(fmt);
|
|
|
|
|
|
jobConf_.setOutputKeyClass(Text.class);
|
|
jobConf_.setOutputKeyClass(Text.class);
|
|
jobConf_.setOutputValueClass(Text.class);
|
|
jobConf_.setOutputValueClass(Text.class);
|
|
|
|
|
|
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
|
|
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
|
|
|
|
|
|
- String defaultPackage = this.getClass().getPackage().getName();
|
|
|
|
-
|
|
|
|
- Class c;
|
|
|
|
if (mapCmd_ != null) {
|
|
if (mapCmd_ != null) {
|
|
c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
|
|
c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
|
|
if (c != null) {
|
|
if (c != null) {
|
|
@@ -748,13 +800,29 @@ public class StreamJob {
|
|
// output setup is done late so we can customize for reducerNone_
|
|
// output setup is done late so we can customize for reducerNone_
|
|
//jobConf_.setOutputDir(new File(output_));
|
|
//jobConf_.setOutputDir(new File(output_));
|
|
setOutputSpec();
|
|
setOutputSpec();
|
|
- if (testMerge_) {
|
|
|
|
- fmt = MuxOutputFormat.class;
|
|
|
|
- } else {
|
|
|
|
- fmt = StreamOutputFormat.class;
|
|
|
|
|
|
+ fmt = null;
|
|
|
|
+ if (outputFormatSpec_!= null) {
|
|
|
|
+ c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage);
|
|
|
|
+ if (c != null) {
|
|
|
|
+ fmt = c;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (fmt == null) {
|
|
|
|
+ if (testMerge_) {
|
|
|
|
+ fmt = MuxOutputFormat.class;
|
|
|
|
+ } else {
|
|
|
|
+ fmt = TextOutputFormat.class;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
jobConf_.setOutputFormat(fmt);
|
|
jobConf_.setOutputFormat(fmt);
|
|
|
|
|
|
|
|
+ if (partitionerSpec_!= null) {
|
|
|
|
+ c = StreamUtil.goodClassOrNull(partitionerSpec_, defaultPackage);
|
|
|
|
+ if (c != null) {
|
|
|
|
+ jobConf_.setPartitionerClass(c);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
// last, allow user to override anything
|
|
// last, allow user to override anything
|
|
// (although typically used with properties we didn't touch)
|
|
// (although typically used with properties we didn't touch)
|
|
|
|
|
|
@@ -1042,6 +1110,10 @@ public class StreamJob {
|
|
protected ArrayList configPath_ = new ArrayList(); // <String>
|
|
protected ArrayList configPath_ = new ArrayList(); // <String>
|
|
protected String hadoopAliasConf_;
|
|
protected String hadoopAliasConf_;
|
|
protected String inReaderSpec_;
|
|
protected String inReaderSpec_;
|
|
|
|
+ protected String inputFormatSpec_;
|
|
|
|
+ protected String outputFormatSpec_;
|
|
|
|
+ protected String partitionerSpec_;
|
|
|
|
+ protected String additionalConfSpec_;
|
|
|
|
|
|
protected boolean testMerge_;
|
|
protected boolean testMerge_;
|
|
|
|
|