浏览代码

HADOOP-1290. Move contrib/abacus into mapred/lib/aggregate. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@533233 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
fe12deb92b
共有 36 个文件被更改,包括 2144 次插入4 次删除
  1. 3 0
      CHANGES.txt
  2. 0 2
      build.xml
  3. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/DoubleValueSum.java
  4. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java
  5. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/LongValueSum.java
  6. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/UniqValueCount.java
  7. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/UserDefinedValueAggregatorDescriptor.java
  8. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregator.java
  9. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorBaseDescriptor.java
  10. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorCombiner.java
  11. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorDescriptor.java
  12. 3 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java
  13. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJobBase.java
  14. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorMapper.java
  15. 2 2
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorReducer.java
  16. 2 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueHistogram.java
  17. 95 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/DoubleValueSum.java
  18. 98 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/LongValueMax.java
  19. 98 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/LongValueMin.java
  20. 95 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/LongValueSum.java
  21. 86 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/StringValueMax.java
  22. 86 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/StringValueMin.java
  23. 91 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java
  24. 113 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java
  25. 53 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregator.java
  26. 157 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java
  27. 87 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java
  28. 67 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java
  29. 202 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java
  30. 93 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java
  31. 61 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java
  32. 72 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java
  33. 180 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueHistogram.java
  34. 168 0
      src/java/org/apache/hadoop/mapred/lib/aggregate/package.html
  35. 122 0
      src/test/org/apache/hadoop/mapred/TestAggregates.java
  36. 88 0
      src/test/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java

+ 3 - 0
CHANGES.txt

@@ -291,6 +291,9 @@ Trunk (unreleased changes)
 86. HADOOP-1278.  Improve blacklisting of TaskTrackers by JobTracker,
     to reduce false positives.  (Arun C Murthy via cutting)
 
+87. HADOOP-1290.  Move contrib/abacus into mapred/lib/aggregate.
+    (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 0 - 2
build.xml

@@ -553,7 +553,6 @@
     	<packageset dir="${examples.dir}"/>
 
     	<packageset dir="src/contrib/streaming/src/java"/>
-    	<packageset dir="src/contrib/abacus/src/java"/>
     	<packageset dir="src/contrib/data_join/src/java"/>
 
         <link href="${javadoc.link.java}"/>
@@ -563,7 +562,6 @@
     	<group title="Examples" packages="org.apache.hadoop.examples*"/>
 
        <group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
-       <group title="contrib: Abacus" packages="org.apache.hadoop.abacus*"/>
        <group title="contrib: DataJoin" packages="org.apache.hadoop.contrib/join*"/>
 
     </javadoc>

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/DoubleValueSum.java

@@ -22,6 +22,8 @@ import java.util.ArrayList;
 
 
 /**
+ * @deprecated
+ * 
  * This class implements a value aggregator that sums up a sequence of double
  * values.
  * 

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java

@@ -30,6 +30,8 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
 
 /**
+ * * @deprecated
+ * 
  * A common base implementing some statics collecting mechanisms that are
  * commonly used in a typical map/reduce job.
  * 

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/LongValueSum.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.abacus;
 import java.util.ArrayList;
 
 /**
+ * * @deprecated
+ * 
  * This class implements a value aggregator that sums up 
  * a sequence of long values.
  * 

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/UniqValueCount.java

@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.TreeMap;
 
 /**
+ * * @deprecated
+ * 
  * This class implements a value aggregator that dedupes a sequence of objects.
  * 
  */

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/UserDefinedValueAggregatorDescriptor.java

@@ -25,6 +25,8 @@ import java.util.Map.Entry;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
+ * @deprecated
+ * 
  * This class implements a wrapper for a user defined value aggregator descriptor.
  * It servs two functions: One is to create an object of ValueAggregatorDescriptor from the
  * name of a user defined class that may be dynamically loaded. The other is to

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregator.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.abacus;
 import java.util.ArrayList;
 
 /**
+ * @deprecated
+ * 
  * This interface defines the minimal protocol for value aggregators.
  * 
  */

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorBaseDescriptor.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 
 /** 
+ * @deprecated
+ * 
  * This class implements the common functionalities of 
  * the subclasses of ValueAggregatorDescriptor class.
  *

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorCombiner.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
+ * @deprecated
+ * 
  * This class implements the generic combiner of Abacus.
  */
 public class ValueAggregatorCombiner extends ValueAggregatorJobBase {

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorDescriptor.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
+ * @deprecated
+ * 
  * This interface defines the contract a value aggregator descriptor must
  * support. Such a descriptor can be configured with a JobConf object. Its main
  * function is to generate a list of aggregation-id/value pairs. An aggregation

+ 3 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java

@@ -34,6 +34,9 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
 
 /**
+ * 
+ * @deprecated
+ * 
  * This is the main class for creating a map/reduce job using Abacus framework.
  * The Abacus is a specialization of map/reduce framework, specilizing for
  * performing various simple aggregations.

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJobBase.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.mapred.JobConf;
 
 
 /**
+ * @deprecated
+ * 
  * This abstract class implements some common functionalities of the
  * the generic mapper, reducer and combiner classes of Abacus.
  *

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorMapper.java

@@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
+ * @deprecated
+ * 
  * This class implements the generic mapper of Abacus.
  */
 public class ValueAggregatorMapper extends ValueAggregatorJobBase {

+ 2 - 2
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorReducer.java

@@ -28,9 +28,9 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
- * This class implements the generic reducer of Abacus.
+ * @deprecated
  * 
- * @author runping
+ * This class implements the generic reducer of Abacus.
  * 
  */
 public class ValueAggregatorReducer extends ValueAggregatorJobBase {

+ 2 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueHistogram.java

@@ -27,6 +27,8 @@ import java.util.Arrays;
 import org.apache.hadoop.io.Text;
 
 /**
+ * @deprecated
+ * 
  * This class implements a value aggregator that computes the 
  * histogram of a sequence of strings.
  * 

+ 95 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/DoubleValueSum.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+
+/**
+ * This class implements a value aggregator that sums up a sequence of double
+ * values.
+ * 
+ */
+public class DoubleValueSum implements ValueAggregator {
+
+  double sum = 0;
+
+  /**
+   * The default constructor
+   * 
+   */
+  public DoubleValueSum() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object whose string representation represents a double value.
+   * 
+   */
+  public void addNextValue(Object val) {
+    this.sum += Double.parseDouble(val.toString());
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          a double value.
+   * 
+   */
+  public void addNextValue(double val) {
+    this.sum += val;
+  }
+
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return "" + sum;
+  }
+
+  /**
+   * @return the aggregated value
+   */
+  public double getSum() {
+    return this.sum;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    sum = 0;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>(1);
+    retv.add("" + sum);
+    return retv;
+  }
+
+}

+ 98 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/LongValueMax.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+/**
+ * This class implements a value aggregator that maintain the maximum of 
+ * a sequence of long values.
+ * 
+ */
+public class LongValueMax implements ValueAggregator {
+
+  long maxVal = Long.MIN_VALUE;
+    
+  /**
+   *  the default constructor
+   *
+   */
+  public LongValueMax() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object whose string representation represents a long value.
+   * 
+   */
+  public void addNextValue(Object val) {
+    long newVal = Long.parseLong(val.toString());
+    if (this.maxVal < newVal) {
+      this.maxVal = newVal;
+    }
+  }
+    
+  /**
+   * add a value to the aggregator
+   * 
+   * @param newVal
+   *          a long value.
+   * 
+   */
+  public void addNextValue(long newVal) {
+    if (this.maxVal < newVal) {
+      this.maxVal = newVal;
+    };
+  }
+    
+  /**
+   * @return the aggregated value
+   */
+  public long getVal() {
+    return this.maxVal;
+  }
+    
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return ""+maxVal;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    maxVal = Long.MIN_VALUE;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>(1);;
+    retv.add(""+maxVal);
+    return retv;
+  }
+}

+ 98 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/LongValueMin.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+/**
+ * This class implements a value aggregator that maintain the minimum of 
+ * a sequence of long values.
+ * 
+ */
+public class LongValueMin implements ValueAggregator {
+
+  long minVal = Long.MAX_VALUE;
+    
+  /**
+   *  the default constructor
+   *
+   */
+  public LongValueMin() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object whose string representation represents a long value.
+   * 
+   */
+  public void addNextValue(Object val) {
+    long newVal = Long.parseLong(val.toString());
+    if (this.minVal > newVal) {
+      this.minVal = newVal;
+    }
+  }
+    
+  /**
+   * add a value to the aggregator
+   * 
+   * @param newVal
+   *          a long value.
+   * 
+   */
+  public void addNextValue(long newVal) {
+    if (this.minVal > newVal) {
+      this.minVal = newVal;
+    };
+  }
+    
+  /**
+   * @return the aggregated value
+   */
+  public long getVal() {
+    return this.minVal;
+  }
+    
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return ""+minVal;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    minVal = Long.MAX_VALUE;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>(1);
+    retv.add(""+minVal);
+    return retv;
+  }
+}

+ 95 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/LongValueSum.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+/**
+ * This class implements a value aggregator that sums up 
+ * a sequence of long values.
+ * 
+ */
+public class LongValueSum implements ValueAggregator {
+
+  long sum = 0;
+    
+  /**
+   *  the default constructor
+   *
+   */
+  public LongValueSum() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object whose string representation represents a long value.
+   * 
+   */
+  public void addNextValue(Object val) {
+    this.sum += Long.parseLong(val.toString());
+  }
+    
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          a long value.
+   * 
+   */
+  public void addNextValue(long val) {
+    this.sum += val;
+  }
+    
+  /**
+   * @return the aggregated value
+   */
+  public long getSum() {
+    return this.sum;
+  }
+    
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return ""+sum;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    sum = 0;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>(1);
+    retv.add(""+sum);
+    return retv;
+  }
+}
+
+

+ 86 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/StringValueMax.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+/**
+ * This class implements a value aggregator that maintain the biggest of 
+ * a sequence of strings.
+ * 
+ */
+public class StringValueMax implements ValueAggregator {
+
+  String maxVal = null;
+    
+  /**
+   *  the default constructor
+   *
+   */
+  public StringValueMax() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          a string.
+   * 
+   */
+  public void addNextValue(Object val) {
+    String newVal = val.toString();
+    if (this.maxVal == null || this.maxVal.compareTo(newVal) < 0) {
+      this.maxVal = newVal;
+    }
+  }
+    
+    
+  /**
+   * @return the aggregated value
+   */
+  public String getVal() {
+    return this.maxVal;
+  }
+    
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return maxVal;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    maxVal = null;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>(1);
+    retv.add(maxVal);
+    return retv;
+  }
+}

+ 86 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/StringValueMin.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+/**
+ * This class implements a value aggregator that maintain the smallest of 
+ * a sequence of strings.
+ * 
+ */
+public class StringValueMin implements ValueAggregator {
+
+  String minVal = null;
+    
+  /**
+   *  the default constructor
+   *
+   */
+  public StringValueMin() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          a string.
+   * 
+   */
+  public void addNextValue(Object val) {
+    String newVal = val.toString();
+    if (this.minVal == null || this.minVal.compareTo(newVal) > 0) {
+      this.minVal = newVal;
+    }
+  }
+    
+    
+  /**
+   * @return the aggregated value
+   */
+  public String getVal() {
+    return this.minVal;
+  }
+    
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return minVal;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    minVal = null;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>(1);
+    retv.add(minVal);
+    return retv;
+  }
+}

+ 91 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * This class implements a value aggregator that dedupes a sequence of objects.
+ * 
+ */
+public class UniqValueCount implements ValueAggregator {
+
+  TreeMap uniqItems = null;
+
+  /**
+   * the default constructor
+   * 
+   */
+  public UniqValueCount() {
+    uniqItems = new TreeMap();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object.
+   * 
+   */
+  public void addNextValue(Object val) {
+    uniqItems.put(val, "1");
+
+  }
+
+  /**
+   * @return return the number of unique objects aggregated
+   */
+  public String getReport() {
+    return "" + uniqItems.size();
+  }
+
+  /**
+   * 
+   * @return the set of the unique objects
+   */
+  public Set getUniqueItems() {
+    return uniqItems.keySet();
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    uniqItems = new TreeMap();
+  }
+
+  /**
+   * @return return an array of the unique objects. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList getCombinerOutput() {
+    Object key = null;
+    Iterator iter = uniqItems.keySet().iterator();
+    ArrayList retv = new ArrayList();
+
+    while (iter.hasNext()) {
+      key = iter.next();
+      retv.add(key);
+    }
+    return retv;
+  }
+}

+ 113 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a wrapper for a user defined value aggregator descriptor.
+ * It servs two functions: One is to create an object of ValueAggregatorDescriptor from the
+ * name of a user defined class that may be dynamically loaded. The other is to
+ * deligate inviokations of generateKeyValPairs function to the created object.
+ * 
+ */
+public class UserDefinedValueAggregatorDescriptor implements
+    ValueAggregatorDescriptor {
+  private String className;
+
+  private ValueAggregatorDescriptor theAggregatorDescriptor = null;
+
+  private static final Class[] argArray = new Class[] {};
+
+  /**
+   * Create an instance of the given class
+   * @param className the name of the class
+   * @return a dynamically created instance of the given class 
+   */
+  public static Object createInstance(String className) {
+    Object retv = null;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      Class theFilterClass = Class.forName(className, true, classLoader);
+      Constructor meth = theFilterClass.getDeclaredConstructor(argArray);
+      meth.setAccessible(true);
+      retv = meth.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return retv;
+  }
+
+  private void createAggregator(JobConf job) {
+    if (theAggregatorDescriptor == null) {
+      theAggregatorDescriptor = (ValueAggregatorDescriptor) createInstance(this.className);
+      theAggregatorDescriptor.configure(job);
+    }
+  }
+
+  /**
+   * 
+   * @param className the class name of the user defined descriptor class
+   * @param job a configure object used for decriptor configuration
+   */
+  public UserDefinedValueAggregatorDescriptor(String className, JobConf job) {
+    this.className = className;
+    this.createAggregator(job);
+  }
+
+  /**
+   *   Generate a list of aggregation-id/value pairs for the given key/value pairs
+   *   by delegating the invocation to the real object.
+   *   
+   * @param key
+   *          input key
+   * @param val
+   *          input value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Abacus based job.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    ArrayList<Entry> retv = new ArrayList<Entry>();
+    if (this.theAggregatorDescriptor != null) {
+      retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val);
+    }
+    return retv;
+  }
+
+  /**
+   * @return the string representation of this object.
+   */
+  public String toString() {
+    return "UserDefinedValueAggregatorDescriptor with class name:" + "\t"
+      + this.className;
+  }
+
+  /**
+   *  Do nothing.
+   */
+  public void configure(JobConf job) {
+
+  }
+
+}

+ 53 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregator.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+
+/**
+ * This interface defines the minimal protocol for value aggregators.
+ * 
+ */
+public interface ValueAggregator {
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val the value to be added
+   */
+  public void addNextValue(Object val);
+
+  /**
+   * reset the aggregator
+   *
+   */
+  public void reset();
+
+  /**
+   * @return the string representation of the agregator
+   */
+  public String getReport();
+
+  /**
+   * 
+   * @return an array of values as the outputs of the combiner.
+   */
+  public ArrayList getCombinerOutput();
+
+}

+ 157 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+/** 
+ * This class implements the common functionalities of 
+ * the subclasses of ValueAggregatorDescriptor class.
+ *
+ */
+public class ValueAggregatorBaseDescriptor implements ValueAggregatorDescriptor {
+
+  static public final String UNIQ_VALUE_COUNT = "UniqValueCount";
+
+  static public final String LONG_VALUE_SUM = "LongValueSum";
+
+  static public final String DOUBLE_VALUE_SUM = "DoubleValueSum";
+
+  static public final String VALUE_HISTOGRAM = "ValueHistogram";
+  
+  static public final String LONG_VALUE_MAX = "LongValueMax";
+  
+  static public final String LONG_VALUE_MIN = "LongValueMin";
+  
+  static public final String STRING_VALUE_MAX = "StringValueMax";
+  
+  static public final String STRING_VALUE_MIN = "StringValueMin";
+  
+
+  public String inputFile = null;
+
+  private static class MyEntry implements Entry {
+    Object key;
+
+    Object val;
+
+    public Object getKey() {
+      return key;
+    }
+
+    public Object getValue() {
+      return val;
+    }
+
+    public Object setValue(Object val) {
+      this.val = val;
+      return val;
+    }
+
+    public MyEntry(Object key, Object val) {
+      this.key = key;
+      this.val = val;
+    }
+  }
+
+  /**
+   * 
+   * @param type the aggregation type
+   * @param id the aggregation id
+   * @param val the val associated with the id to be aggregated
+   * @return an Entry whose key is the aggregation id prefixed with 
+   * the aggregation type.
+   */
+  public static Entry generateEntry(String type, String id, Object val) {
+    Text key = new Text(type + TYPE_SEPARATOR + id);
+    return new MyEntry(key, val);
+  }
+
+  /**
+   * 
+   * @param type the aggregation type
+   * @return a value aggregator of the given type.
+   */
+  static public ValueAggregator generateValueAggregator(String type) {
+    ValueAggregator retv = null;
+    if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) {
+      retv = new LongValueSum();
+    } if (type.compareToIgnoreCase(LONG_VALUE_MAX) == 0) {
+      retv = new LongValueMax();
+    } else if (type.compareToIgnoreCase(LONG_VALUE_MIN) == 0) {
+      retv = new LongValueMin();
+    } else if (type.compareToIgnoreCase(STRING_VALUE_MAX) == 0) {
+      retv = new StringValueMax();
+    } else if (type.compareToIgnoreCase(STRING_VALUE_MIN) == 0) {
+      retv = new StringValueMin();
+    } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) {
+      retv = new DoubleValueSum();
+    } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) {
+      retv = new UniqValueCount();
+    } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) {
+      retv = new ValueHistogram();
+    }
+    return retv;
+  }
+
+  /**
+   * Generate 1 or 2 aggregation-id/value pairs for the given key/value pair.
+   * The first id will be of type LONG_VALUE_SUM, with "record_count" as
+   * its aggregation id. If the input is a file split,
+   * the second id of the same type will be generated too, with the file name 
+   * as its aggregation id. This achieves the behavior of counting the total number
+   * of records in the input data, and the number of records in each input file.
+   * 
+   * @param key
+   *          input key
+   * @param val
+   *          input value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Abacus based job.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    ArrayList<Entry> retv = new ArrayList<Entry>();
+    String countType = LONG_VALUE_SUM;
+    String id = "record_count";
+    Entry e = generateEntry(countType, id, ONE);
+    if (e != null) {
+      retv.add(e);
+    }
+    if (this.inputFile != null) {
+      e = generateEntry(countType, this.inputFile, ONE);
+      if (e != null) {
+        retv.add(e);
+      }
+    }
+    return retv;
+  }
+
+  /**
+   * get the input file name.
+   * 
+   * @param job a job configuration object
+   */
+  public void configure(JobConf job) {
+    this.inputFile = job.get("map.input.file");
+  }
+}

+ 87 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class implements the generic combiner of Abacus.
+ */
+public class ValueAggregatorCombiner extends ValueAggregatorJobBase {
+
+  /**
+   * Combiner does not need to configure.
+   */
+  public void configure(JobConf job) {
+
+  }
+
+  /** Combines values for a given key.  
+   * @param key the key is expected to be a Text object, whose prefix indicates
+   * the type of aggregation to aggregate the values. 
+   * @param values the values to combine
+   * @param output to collect combined values
+   */
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter) throws IOException {
+    String keyStr = key.toString();
+    int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
+    String type = keyStr.substring(0, pos);
+    ValueAggregator aggregator = ValueAggregatorBaseDescriptor
+      .generateValueAggregator(type);
+    while (values.hasNext()) {
+      aggregator.addNextValue(values.next());
+    }
+    Iterator outputs = aggregator.getCombinerOutput().iterator();
+
+    while (outputs.hasNext()) {
+      Object v = outputs.next();
+      if (v instanceof Text) {
+        output.collect(key, (Text)v);
+      } else {
+        output.collect(key, new Text(v.toString()));
+      }
+    }
+  }
+
+  /** 
+   * Do nothing. 
+   *
+   */
+  public void close() throws IOException {
+
+  }
+
+  /** 
+   * Do nothing. Should not be called. 
+   *
+   */
+  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+                  Reporter arg3) throws IOException {
+    throw new IOException ("should not be called\n");
+  }
+}

+ 67 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This interface defines the contract a value aggregator descriptor must
+ * support. Such a descriptor can be configured with a JobConf object. Its main
+ * function is to generate a list of aggregation-id/value pairs. An aggregation
+ * id encodes an aggregation type which is used to guide the way to aggregate
+ * the value in the reduce/combiner phrase of an Abacus based job.The mapper in
+ * an Abacus based map/reduce job may create one or more of
+ * ValueAggregatorDescriptor objects at configuration time. For each input
+ * key/value pair, the mapper will use those objects to create aggregation
+ * id/value pairs.
+ * 
+ */
+public interface ValueAggregatorDescriptor {
+
+  public static final String TYPE_SEPARATOR = ":";
+
+  public static final Text ONE = new Text("1");
+
+  /**
+   * Generate a list of aggregation-id/value pairs for the given key/value pair.
+   * This function is usually called by the mapper of an Abacus based job.
+   * 
+   * @param key
+   *          input key
+   * @param val
+   *          input value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Abacus based job.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val);
+
+  /**
+   * Configure the object
+   * 
+   * @param job
+   *          a JobConf object that may contain the information that can be used
+   *          to configure the object.
+   */
+  public void configure(JobConf job);
+}

+ 202 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java

@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+
+/**
+ * This is the main class for creating a map/reduce job using Abacus framework.
+ * The Abacus is a specialization of map/reduce framework, specilizing for
+ * performing various simple aggregations.
+ * 
+ * Generally speaking, in order to implement an application using Map/Reduce
+ * model, the developer is to implement Map and Reduce functions (and possibly
+ * combine function). However, a lot of applications related to counting and
+ * statistics computing have very similar characteristics. Abacus abstracts out
+ * the general patterns of these functions and implementing those patterns. In
+ * particular, the package provides generic mapper/redducer/combiner classes,
+ * and a set of built-in value aggregators, and a generic utility class that
+ * helps user create map/reduce jobs using the generic class. The built-in
+ * aggregators include:
+ * 
+ *      sum over numeric values 
+ *      count the number of distinct values 
+ *      compute the histogram of values 
+ *      compute the minimum, maximum, media,average, standard deviation of numeric values
+ * 
+ * The developer using Abacus will need only to provide a plugin class
+ * conforming to the following interface:
+ * 
+ *      public interface ValueAggregatorDescriptor { 
+ *          public ArrayList<Entry> generateKeyValPairs(Object key, Object value); 
+ *          public void configure(JobConfjob); 
+ *     } 
+ * 
+ * The package also provides a base class,
+ * ValueAggregatorBaseDescriptor, implementing the above interface. The user can
+ * extend the base class and implement generateKeyValPairs accordingly.
+ * 
+ * The primary work of generateKeyValPairs is to emit one or more key/value
+ * pairs based on the input key/value pair. The key in an output key/value pair
+ * encode two pieces of information: aggregation type and aggregation id. The
+ * value will be aggregated onto the aggregation id according the aggregation
+ * type.
+ * 
+ * This class offers a function to generate a map/reduce job using Abacus
+ * framework. The function takes the following parameters: input directory spec
+ * input format (text or sequence file) output directory a file specifying the
+ * user plugin class
+ * 
+ */
+public class ValueAggregatorJob {
+
+  public static JobControl createValueAggregatorJobs(String args[])
+    throws IOException {
+    JobControl theControl = new JobControl("ValueAggregatorJobs");
+    ArrayList dependingJobs = new ArrayList();
+    JobConf aJobConf = createValueAggregatorJob(args);
+    Job aJob = new Job(aJobConf, dependingJobs);
+    theControl.addJob(aJob);
+    return theControl;
+  }
+
+  /**
+   * Create an Abacus based map/reduce job.
+   * 
+   * @param args the arguments used for job creation
+   * @return a JobConf object ready for submission.
+   * 
+   * @throws IOException
+   */
+  public static JobConf createValueAggregatorJob(String args[])
+    throws IOException {
+
+    if (args.length < 2) {
+      System.out.println("usage: inputDirs outDir [numOfReducer [textinputformat|seq [specfile [jobName]]]]");
+      System.exit(1);
+    }
+    String inputDir = args[0];
+    String outputDir = args[1];
+    int numOfReducers = 1;
+    if (args.length > 2) {
+      numOfReducers = Integer.parseInt(args[2]);
+    }
+
+    Class theInputFormat = SequenceFileInputFormat.class;
+    if (args.length > 3 && args[3].compareToIgnoreCase("textinputformat") == 0) {
+      theInputFormat = TextInputFormat.class;
+    }
+
+    Path specFile = null;
+
+    if (args.length > 4) {
+      specFile = new Path(args[4]);
+    }
+
+    String jobName = "";
+    
+    if (args.length > 5) {
+      jobName = args[5];
+    }
+    
+    JobConf theJob = new JobConf(ValueAggregatorJob.class);
+    if (specFile != null) {
+      theJob.addDefaultResource(specFile);
+    }
+    FileSystem fs = FileSystem.get(theJob);
+    theJob.setJobName("ValueAggregatorJob: " + jobName);
+
+    String[] inputDirsSpecs = inputDir.split(",");
+    for (int i = 0; i < inputDirsSpecs.length; i++) {
+      theJob.addInputPath(new Path(inputDirsSpecs[i]));
+    }
+
+    theJob.setInputFormat(theInputFormat);
+    
+    theJob.setMapperClass(ValueAggregatorMapper.class);
+    theJob.setOutputPath(new Path(outputDir));
+    theJob.setOutputFormat(TextOutputFormat.class);
+    theJob.setMapOutputKeyClass(Text.class);
+    theJob.setMapOutputValueClass(Text.class);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    theJob.setReducerClass(ValueAggregatorReducer.class);
+    theJob.setCombinerClass(ValueAggregatorCombiner.class);
+    theJob.setNumMapTasks(1);
+    theJob.setNumReduceTasks(numOfReducers);
+    return theJob;
+  }
+
+  /**
+   * Submit/run a map/reduce job.
+   * 
+   * @param job
+   * @return true for success
+   * @throws IOException
+   */
+  public static boolean runJob(JobConf job) throws IOException {
+    JobClient jc = new JobClient(job);
+    boolean sucess = true;
+    RunningJob running = null;
+    try {
+      running = jc.submitJob(job);
+      String jobId = running.getJobID();
+      System.out.println("Job " + jobId + " is submitted");
+      while (!running.isComplete()) {
+        System.out.println("Job " + jobId + " is still running.");
+        try {
+          Thread.sleep(60000);
+        } catch (InterruptedException e) {
+        }
+        running = jc.getJob(jobId);
+      }
+      sucess = running.isSuccessful();
+    } finally {
+      if (!sucess && (running != null)) {
+        running.killJob();
+      }
+      jc.close();
+    }
+    return sucess;
+  }
+
+  /**
+   * create and run an Abacus based map/reduce job.
+   * 
+   * @param args the arguments used for job creation
+   * @throws IOException
+   */
+  public static void main(String args[]) throws IOException {
+    JobConf job = ValueAggregatorJob.createValueAggregatorJob(args);
+    runJob(job);
+  }
+}

+ 93 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * This abstract class implements some common functionalities of the
+ * the generic mapper, reducer and combiner classes of Abacus.
+ *
+ */
+public abstract class ValueAggregatorJobBase implements Mapper, Reducer {
+
+  protected ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
+
+  public void configure(JobConf job) {
+    this.initializeMySpec(job);
+    this.logSpec();
+  }
+
+  private static ValueAggregatorDescriptor getValueAggregatorDescriptor(
+      String spec, JobConf job) {
+    if (spec == null)
+      return null;
+    String[] segments = spec.split(",", -1);
+    String type = segments[0];
+    if (type.compareToIgnoreCase("UserDefined") == 0) {
+      String className = segments[1];
+      return new UserDefinedValueAggregatorDescriptor(className, job);
+    }
+    return null;
+  }
+
+  private static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(JobConf job) {
+    String advn = "aggregator.descriptor";
+    int num = job.getInt(advn + ".num", 0);
+    ArrayList<ValueAggregatorDescriptor> retv = new ArrayList<ValueAggregatorDescriptor>(num);
+    for (int i = 0; i < num; i++) {
+      String spec = job.get(advn + "." + i);
+      ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, job);
+      if (ad != null) {
+        retv.add(ad);
+      }
+    }
+    return retv;
+  }
+
+  private void initializeMySpec(JobConf job) {
+    this.aggregatorDescriptorList = getAggregatorDescriptors(job);
+    if (this.aggregatorDescriptorList.size() == 0) {
+      this.aggregatorDescriptorList
+          .add(new UserDefinedValueAggregatorDescriptor(
+              ValueAggregatorBaseDescriptor.class.getCanonicalName(), job));
+    }
+  }
+
+  protected void logSpec() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("\n");
+    if (aggregatorDescriptorList == null) {
+      sb.append(" aggregatorDescriptorList: null");
+    } else {
+      sb.append(" aggregatorDescriptorList: ");
+      for (int i = 0; i < aggregatorDescriptorList.size(); i++) {
+        sb.append(" ").append(aggregatorDescriptorList.get(i).toString());
+      }
+    }
+  }
+
+  public void close() throws IOException {
+  }
+}

+ 61 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class implements the generic mapper of Abacus.
+ */
+public class ValueAggregatorMapper extends ValueAggregatorJobBase {
+
+  /**
+   *  the map function. It iterates through the value aggregator descriptor 
+   *  list to generate aggregation id/value pairs and emit them.
+   */
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector output, Reporter reporter) throws IOException {
+
+    Iterator iter = this.aggregatorDescriptorList.iterator();
+    while (iter.hasNext()) {
+      ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next();
+      Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator();
+      while (ens.hasNext()) {
+        Entry en = ens.next();
+        output.collect((WritableComparable) en.getKey(), (Writable) en
+                       .getValue());
+      }
+    }
+  }
+
+  /**
+   * Do nothing. Should not be called.
+   */
+  public void reduce(WritableComparable arg0, Iterator arg1,
+                     OutputCollector arg2, Reporter arg3) throws IOException {
+    throw new IOException ("should not be called\n");
+  }
+}

+ 72 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class implements the generic reducer of Abacus.
+ * 
+ * 
+ */
+public class ValueAggregatorReducer extends ValueAggregatorJobBase {
+
+  /**
+   * @param key
+   *          the key is expected to be a Text object, whose prefix indicates
+   *          the type of aggregation to aggregate the values. In effect, data
+   *          driven computing is achieved. It is assumed that each aggregator's
+   *          getReport method emits appropriate output for the aggregator. This
+   *          may be further customiized.
+   * @value the values to be aggregated
+   */
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter) throws IOException {
+    String keyStr = key.toString();
+    int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
+    String type = keyStr.substring(0, pos);
+    keyStr = keyStr.substring(pos
+                              + ValueAggregatorDescriptor.TYPE_SEPARATOR.length());
+
+    ValueAggregator aggregator = ValueAggregatorBaseDescriptor
+      .generateValueAggregator(type);
+    while (values.hasNext()) {
+      aggregator.addNextValue(values.next());
+    }
+
+    String val = aggregator.getReport();
+    key = new Text(keyStr);
+    output.collect(key, new Text(val));
+  }
+
+  /**
+   * Do nothing. Should not be called
+   */
+  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+                  Reporter arg3) throws IOException {
+    throw new IOException ("should not be called\n");
+  }
+}

+ 180 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueHistogram.java

@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements a value aggregator that computes the 
+ * histogram of a sequence of strings.
+ * 
+ */
+public class ValueHistogram implements ValueAggregator {
+
+  TreeMap items = null;
+
+  public ValueHistogram() {
+    items = new TreeMap();
+  }
+
+  /**
+   * add the given val to the aggregator.
+   * 
+   * @param val the value to be added. It is expected to be a string
+   * in the form of xxxx\tnum, meaning xxxx has num occurrences.
+   */
+  public void addNextValue(Object val) {
+    String valCountStr = val.toString();
+    int pos = valCountStr.lastIndexOf("\t");
+    String valStr = valCountStr;
+    String countStr = "1";
+    if (pos >= 0) {
+      valCountStr.substring(0, pos);
+      countStr = valCountStr.substring(pos + 1);
+    }
+    
+    Long count = (Long) this.items.get(valStr);
+    long inc = Long.parseLong(countStr);
+
+    if (count == null) {
+      count = new Long(inc);
+    } else {
+      count = new Long(count.longValue() + inc);
+    }
+    items.put(valStr, count);
+  }
+
+  /**
+   * @return the string representation of this aggregator.
+   * It includes the following basic statistics of the histogram:
+   *    the number of unique values
+   *    the minimum value
+   *    the media value
+   *    the maximum value
+   *    the average value
+   *    the standard deviation
+   */
+  public String getReport() {
+    long[] counts = new long[items.size()];
+
+    StringBuffer sb = new StringBuffer();
+    Iterator iter = items.values().iterator();
+    int i = 0;
+    while (iter.hasNext()) {
+      Long count = (Long) iter.next();
+      counts[i] = count.longValue();
+      i += 1;
+    }
+    Arrays.sort(counts);
+    sb.append(counts.length);
+    i = 0;
+    long acc = 0;
+    while (i < counts.length) {
+      long nextVal = counts[i];
+      int j = i + 1;
+      while (j < counts.length && counts[j] == nextVal) {
+        j++;
+      }
+      acc += nextVal * (j - i);
+      //sbVal.append("\t").append(nextVal).append("\t").append(j - i)
+      //.append("\n");
+      i = j;
+    }
+    double average = 0.0;
+    double sd = 0.0;
+    if (counts.length > 0) {
+      sb.append("\t").append(counts[0]);
+      sb.append("\t").append(counts[counts.length / 2]);
+      sb.append("\t").append(counts[counts.length - 1]);
+
+      average = acc * 1.0 / counts.length;
+      sb.append("\t").append(average);
+
+      i = 0;
+      while (i < counts.length) {
+        double nextDiff = counts[i] - average;
+        sd += nextDiff * nextDiff;
+        i += 1;
+      }
+      sd = Math.sqrt(sd / counts.length);
+
+      sb.append("\t").append(sd);
+
+    }
+    //sb.append("\n").append(sbVal.toString());
+    return sb.toString();
+  }
+
+  /** 
+   * 
+   * @return a string representation of the list of value/frequence pairs of 
+   * the histogram
+   */
+  public String getReportDetails() {
+    StringBuffer sb = new StringBuffer();
+    Iterator iter = items.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry en = (Entry) iter.next();
+      Object val = en.getKey();
+      Long count = (Long) en.getValue();
+      sb.append("\t").append(val.toString()).append("\t").append(
+                                                                 count.longValue()).append("\n");
+    }
+    return sb.toString();
+  }
+
+  /**
+   *  @return a list value/frequence pairs.
+   *  The return value is expected to be used by the reducer.
+   */
+  public ArrayList getCombinerOutput() {
+    ArrayList retv = new ArrayList();
+    Iterator iter = items.entrySet().iterator();
+
+    while (iter.hasNext()) {
+      Entry en = (Entry) iter.next();
+      Object val = en.getKey();
+      Long count = (Long) en.getValue();
+      retv.add(val.toString() + "\t" + count.longValue());
+    }
+    return retv;
+  }
+
+  /** 
+   * 
+   * @return a TreeMap representation of the histogram
+   */
+  public TreeMap getReportItems() {
+    return items;
+  }
+
+  /** 
+   * reset the aggregator
+   */
+  public void reset() {
+    items = new TreeMap();
+  }
+
+}

+ 168 - 0
src/java/org/apache/hadoop/mapred/lib/aggregate/package.html

@@ -0,0 +1,168 @@
+<html>
+<body>
+
+Classes for performing various counting and aggregations.
+<p />
+<h2><a name="Aggregate"></a>Aggregate framework </h2>
+<p />
+Generally speaking, in order to implement an application using Map/Reduce
+model, the developer needs to implement Map and Reduce functions (and possibly
+Combine function). However, for a lot of applications related to counting and
+statistics computing, these functions have very similar
+characteristics. This provides a package implementing 
+those patterns. In particular, the package provides a generic mapper class,
+a reducer class and a combiner class, and a set of built-in value aggregators.
+It also provides a generic utility class, ValueAggregatorJob, that offers a static function that 
+creates map/reduce jobs:
+<blockquote>
+<pre>
+public static JobConf createValueAggregatorJob(String args&#91;]) throws IOException;
+</pre>
+</blockquote>
+To call this function, the user needs to pass in arguments specifying the input directories, the output directory, 
+the number of reducers, the input data format (textinputformat or sequencefileinputformat), and a file specifying user plugin class(es) to load by the mapper. 
+A user plugin class is responsible for specifying what 
+aggregators to use and what values are for which aggregators. 
+A plugin class must implement the following interface:
+<blockquote>
+<pre>
+ public interface ValueAggregatorDescriptor { 
+     public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object value); 
+     public void configure(JobConfjob); 
+} 
+</pre>
+</blockquote>
+Function generateKeyValPairs will generate aggregation key/value pairs for the
+input key/value pair. Each aggregation key encodes two pieces of information: the aggregation type and aggregation ID.
+The value is the value to be aggregated onto the aggregation ID according to the aggregation type. Here 
+is a simple example user plugin class for counting the words in the input texts:
+<blockquote>
+<pre>
+public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { 
+    public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
+        String words &#91;] &#61; val.toString().split(&#34; &#124;\t&#34;);
+        ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();
+        for (int i &#61; 0; i &#60; words.length; i++) {
+            retv.add(generateEntry(LONG&#95;VALUE&#95;SUM, words&#91;i], ONE))
+        }
+        return retv;
+    }
+    public void configure(JobConf job) {}
+} 
+</pre>
+</blockquote>
+In the above code, LONG_VALUE_SUM is a string denoting the aggregation type LongValueSum, which sums over long values.
+ONE denotes a string "1". Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first argument as an aggregation type, the second as an aggregation ID, and the the third argumnent as the value to be aggregated. The output will look like: "LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will be "1". The mapper will call generateKeyValPairs(Object key, Object val)  for each input key/value pair to generate the desired aggregation id/value pairs. 
+The down stream combiner/reducer will interpret these pairs as adding one to the aggregator XXXX.
+<p />
+Class ValueAggregatorBaseDescriptor is a base class that user plugin classes can extend. Here is the XML fragment specifying the user plugin class:
+<blockquote>
+<pre>
+&#60;property&#62;
+    &#60;name&#62;aggregator.descriptor.num&#60;/name&#62;
+    &#60;value&#62;1&#60;/value&#62;
+&#60;/property&#62;
+&#60;property&#62;
+   &#60;name&#62;aggregator.descriptor.0&#60;/name&#62;
+   &#60;value&#62;UserDefined,org.apache.hadoop.mapred.lib.aggregate.examples.WordCountAggregatorDescriptor&#60;/value&#62;
+&#60;/property&#62; 
+</pre>
+</blockquote>
+Class ValueAggregatorBaseDescriptor itself provides a default implementation for  generateKeyValPairs:
+<blockquote>
+<pre>
+public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
+   ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();     
+   String countType &#61; LONG&#95;VALUE&#95;SUM;
+   String id &#61; &#34;record&#95;count&#34;;
+   retv.add(generateEntry(countType, id, ONE));
+   return retv;
+}
+</pre>
+</blockquote>
+Thus, if no user plugin class is specified, the default behavior of the map/reduce job is to count the number of records (lines) in the imput files.
+<p />
+During runtime, the mapper will invoke the generateKeyValPairs function for each input key/value pair, and emit the generated 
+key/value pairs:
+<blockquote>
+<pre>
+public void map(WritableComparable key, Writable value,
+            OutputCollector output, Reporter reporter) throws IOException {
+   Iterator iter &#61; this.aggregatorDescriptorList.iterator();
+   while (iter.hasNext()) {
+       ValueAggregatorDescriptor ad &#61; (ValueAggregatorDescriptor) iter.next();
+       Iterator&#60;Entry&#62; ens &#61; ad.generateKeyValPairs(key, value).iterator();
+       while (ens.hasNext()) {
+           Entry en &#61; ens.next();
+           output.collect((WritableComparable)en.getKey(), (Writable)en.getValue());
+       }
+   }
+}
+</pre>
+</blockquote>
+The reducer will create an aggregator object for each key/value list pair, and perform the appropriate aggregation.
+At the end, it will emit the aggregator's results:
+<blockquote>
+<pre>
+public void reduce(WritableComparable key, Iterator values,
+            OutputCollector output, Reporter reporter) throws IOException {
+   String keyStr &#61; key.toString();
+   int pos &#61; keyStr.indexOf(ValueAggregatorDescriptor.TYPE&#95;SEPARATOR);
+   String type &#61; keyStr.substring(0,pos);
+   keyStr &#61; keyStr.substring(pos+ValueAggregatorDescriptor.TYPE&#95;SEPARATOR.length());       
+   ValueAggregator aggregator &#61; 
+       ValueAggregatorBaseDescriptor.generateValueAggregator(type);
+   while (values.hasNext()) {
+       aggregator.addNextValue(values.next());
+   }         
+   String val &#61; aggregator.getReport();
+   key &#61; new Text(keyStr);
+   output.collect(key, new Text(val)); 
+}
+</pre>
+</blockquote>
+In order to be able to use combiner, all the aggregation type be aggregators must be associative and communitive.
+The following are the types supported: <ul>
+<li> LongValueSum: sum over long values 
+</li> <li> DoubleValueSum: sum over float/double values 
+</li> <li> uniqValueCount: count the number of distinct values 
+</li> <li> ValueHistogram: compute the histogram of values compute the minimum, maximum, media,average, standard deviation of numeric values
+</li></ul> 
+<p />
+<h2><a name="Create_and_run"></a> Create and run an application </h2>
+<p />
+To create an application, the user needs to do the following things:
+<p />
+1. Implement a user plugin:
+<blockquote>
+<pre>
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+
+public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor {
+   public void map(WritableComparable key, Writable value,
+            OutputCollector output, Reporter reporter) throws IOException {
+   }
+   public void configure(JobConf job) {
+    
+   } 
+}
+</pre>
+</blockquote>
+
+2. Create an xml file specifying the user plugin.
+<p />
+3. Compile your java class and create a jar file, say wc.jar.
+
+<p />
+Finally, run the job:
+<blockquote>
+<pre>
+        hadoop jar wc.jar org.apache.hadoop.mapred.lib.aggregate..ValueAggregatorJob indirs outdir numofreducers textinputformat|sequencefileinputformat spec_file
+</pre>
+</blockquote>
+<p />
+
+
+</body>
+</html>

+ 122 - 0
src/test/org/apache/hadoop/mapred/TestAggregates.java

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapred.lib.aggregate.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import java.text.NumberFormat;
+
+public class TestAggregates extends TestCase {
+
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+    static {
+      idFormat.setMinimumIntegerDigits(4);
+      idFormat.setGroupingUsed(false);
+  }
+
+
+  public void testAggregates() throws Exception {
+    launch();
+  }
+
+  public static void launch() throws Exception {
+    JobConf conf = new JobConf(TestAggregates.class);
+    FileSystem fs = FileSystem.get(conf);
+    int numOfInputLines = 20;
+
+    Path OUTPUT_DIR = new Path("build/test/output_for_aggregates_test");
+    Path INPUT_DIR = new Path("build/test/input_for_aggregates_test");
+    String inputFile = "input.txt";
+    fs.delete(INPUT_DIR);
+    fs.mkdirs(INPUT_DIR);
+    fs.delete(OUTPUT_DIR);
+
+    StringBuffer inputData = new StringBuffer();
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append("max\t19\n");
+    expectedOutput.append("min\t1\n"); 
+
+    FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
+    for (int i = 1; i < numOfInputLines; i++) {
+      expectedOutput.append("count_").append(idFormat.format(i));
+      expectedOutput.append("\t").append(i).append("\n");
+
+      inputData.append(idFormat.format(i));
+      for (int j = 1; j < i; j++) {
+        inputData.append(" ").append(idFormat.format(i));
+      }
+      inputData.append("\n");
+    }
+    expectedOutput.append("value_as_string_max\t9\n");
+    expectedOutput.append("value_as_string_min\t1\n");
+    expectedOutput.append("uniq_count\t19\n");
+
+
+    fileOut.write(inputData.toString().getBytes("utf-8"));
+    fileOut.close();
+
+    System.out.println("inputData:");
+    System.out.println(inputData.toString());
+    JobConf job = new JobConf(conf, TestAggregates.class);
+    job.setInputPath(INPUT_DIR);
+    job.setInputFormat(TextInputFormat.class);
+
+    job.setOutputPath(OUTPUT_DIR);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(1);
+
+    job.setMapperClass(ValueAggregatorMapper.class);
+    job.setReducerClass(ValueAggregatorReducer.class);
+    job.setCombinerClass(ValueAggregatorCombiner.class);
+
+    job.set("aggregator.descriptor.num", 1);
+    job.set("aggregator.descriptor.0", "UserDefined,org.apache.hadoop.mapred.lib.aggregate.AggregatorTests");
+
+    JobClient.runJob(job);
+
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    boolean success = true;
+    Path outPath = new Path(OUTPUT_DIR, "part-00000");
+    String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+    outdata = outdata.substring(0, expectedOutput.toString().length());
+
+    assertEquals(expectedOutput.toString(),outdata);
+    //fs.delete(OUTPUT_DIR);
+    fs.delete(INPUT_DIR);
+  }
+
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    launch();
+  }
+}

+ 88 - 0
src/test/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.aggregate;
+
+import org.apache.hadoop.io.Text;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+public class AggregatorTests extends ValueAggregatorBaseDescriptor {
+  
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    ArrayList<Entry> retv = new ArrayList<Entry>();
+    String [] words = val.toString().split(" ");
+    
+    String countType;
+    String id;
+    Entry e;
+    
+    for (String word: words) {
+      long numVal = Long.parseLong(word);
+      countType = LONG_VALUE_SUM;
+      id = "count_" + word;
+      e = generateEntry(countType, id, ONE);
+      if (e != null) {
+        retv.add(e);
+      }
+      countType = LONG_VALUE_MAX;
+      id = "max";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = LONG_VALUE_MIN;
+      id = "min";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = STRING_VALUE_MAX;
+      id = "value_as_string_max";
+      e = generateEntry(countType, id, new Text(""+numVal));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = STRING_VALUE_MIN;
+      id = "value_as_string_min";
+      e = generateEntry(countType, id, new Text(""+numVal));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = UNIQ_VALUE_COUNT;
+      id = "uniq_count";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = VALUE_HISTOGRAM;
+      id = "histogram";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+    }
+    return retv;
+  }
+
+}