|
@@ -26,13 +26,10 @@ import java.net.URISyntaxException;
|
|
|
import java.net.URLEncoder;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.ListIterator;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
@@ -49,10 +46,11 @@ import org.apache.commons.cli2.validation.InvalidArgumentException;
|
|
|
import org.apache.commons.cli2.validation.Validator;
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
|
|
|
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
|
|
import org.apache.hadoop.mapred.FileAlreadyExistsException;
|
|
@@ -66,7 +64,7 @@ import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
|
|
|
import org.apache.hadoop.mapred.TextOutputFormat;
|
|
|
import org.apache.hadoop.filecache.*;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
-import org.apache.log4j.helpers.OptionConverter;
|
|
|
+
|
|
|
/** All the client-side work happens here.
|
|
|
* (Jar packaging, MapRed job submission and monitoring)
|
|
|
* @author Michel Tourn
|
|
@@ -213,7 +211,6 @@ public class StreamJob {
|
|
|
verbose_ = cmdLine.hasOption("-verbose");
|
|
|
detailedUsage_ = cmdLine.hasOption("-info");
|
|
|
debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_;
|
|
|
- inputTagged_ = cmdLine.hasOption("-inputtagged");
|
|
|
|
|
|
inputSpecs_.addAll(cmdLine.getValues("-input"));
|
|
|
output_ = (String) cmdLine.getValue("-output");
|
|
@@ -709,19 +706,14 @@ public class StreamJob {
|
|
|
if (inReaderSpec_ == null && inputFormatSpec_ == null) {
|
|
|
fmt = KeyValueTextInputFormat.class;
|
|
|
} else if (inputFormatSpec_ != null) {
|
|
|
- if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0)
|
|
|
- || (inputFormatSpec_
|
|
|
- .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
|
|
|
+ if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName())
|
|
|
+ || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) {
|
|
|
fmt = KeyValueTextInputFormat.class;
|
|
|
- } else if ((inputFormatSpec_
|
|
|
- .compareToIgnoreCase("SequenceFileInputFormat") == 0)
|
|
|
- || (inputFormatSpec_
|
|
|
- .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
|
|
|
+ } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName())
|
|
|
+ || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) {
|
|
|
fmt = SequenceFileInputFormat.class;
|
|
|
- } else if ((inputFormatSpec_
|
|
|
- .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
|
|
|
- || (inputFormatSpec_
|
|
|
- .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
|
|
|
+ } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName())
|
|
|
+ || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) {
|
|
|
fmt = SequenceFileAsTextInputFormat.class;
|
|
|
} else {
|
|
|
c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
|
|
@@ -774,12 +766,19 @@ public class StreamJob {
|
|
|
reducerNone_ = false;
|
|
|
if (redCmd_ != null) {
|
|
|
reducerNone_ = redCmd_.equals(REDUCE_NONE);
|
|
|
- c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
|
|
|
- if (c != null) {
|
|
|
- jobConf_.setReducerClass(c);
|
|
|
+ if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
|
|
|
+ jobConf_.setReducerClass(ValueAggregatorReducer.class);
|
|
|
+ jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
|
|
|
} else {
|
|
|
- jobConf_.setReducerClass(PipeReducer.class);
|
|
|
- jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(redCmd_, "UTF-8"));
|
|
|
+
|
|
|
+ c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
|
|
|
+ if (c != null) {
|
|
|
+ jobConf_.setReducerClass(c);
|
|
|
+ } else {
|
|
|
+ jobConf_.setReducerClass(PipeReducer.class);
|
|
|
+ jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
|
|
|
+ redCmd_, "UTF-8"));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|