Bläddra i källkod

Add a new contrib package, Abacus, that simplifies counting and aggregation, built on MapReduce. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@497927 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 år sedan
förälder
incheckning
eb49d678a2
22 ändrade filer med 1876 tillägg och 0 borttagningar
  1. 3 0
      CHANGES.txt
  2. 2 0
      build.xml
  3. 23 0
      src/contrib/abacus/build.xml
  4. 58 0
      src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/WordCountAggregatorDescriptor.java
  5. 59 0
      src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/WordHistogramCountDescriptor.java
  6. 15 0
      src/contrib/abacus/src/examples/wordcountaggregator.spec
  7. 17 0
      src/contrib/abacus/src/examples/wordhistogramaggregator.spec
  8. 95 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/DoubleValueSum.java
  9. 160 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java
  10. 95 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/LongValueSum.java
  11. 91 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/UniqValueCount.java
  12. 113 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/UserDefinedValueAggregatorDescriptor.java
  13. 53 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregator.java
  14. 140 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorBaseDescriptor.java
  15. 90 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorCombiner.java
  16. 67 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorDescriptor.java
  17. 201 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java
  18. 100 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJobBase.java
  19. 67 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorMapper.java
  20. 79 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorReducer.java
  21. 177 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueHistogram.java
  22. 171 0
      src/contrib/abacus/src/java/org/apache/hadoop/abacus/package.html

+ 3 - 0
CHANGES.txt

@@ -51,6 +51,9 @@ Trunk (unreleased changes)
     merging multiple map outputs as they arrive at reduce nodes before
     they're written to disk.  (Devaraj Das via cutting)
 
+16. HADOOP-908.  Add a new contrib package, Abacus, that simplifies
+    counting and aggregation, built on MapReduce.  (Runping Qi via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

+ 2 - 0
build.xml

@@ -488,6 +488,7 @@
 
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/smallJobsBenchmark/src/java"/>
+    	<packageset dir="src/contrib/abacus/src/java"/>
 
         <link href="${javadoc.link.java}"/>
         <classpath refid="classpath"/>
@@ -497,6 +498,7 @@
 
        <group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
        <group title="contrib: Small Jobs Benchmark" packages="org.apache.hadoop.benchmarks.mapred*"/>
+       <group title="contrib: Abacus" packages="org.apache.hadoop.abacus*"/>
 
     </javadoc>
   </target>	

+ 23 - 0
src/contrib/abacus/build.xml

@@ -0,0 +1,23 @@
+<?xml version="1.0"?>
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="abacus" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+  <!-- Override jar target to specify main class -->
+  <target name="jar" depends="compile">
+    <jar
+      jarfile="${build.dir}/hadoop-${name}.jar"
+      basedir="${build.classes}"      
+    >
+  	<manifest>
+	    <attribute name="Main-Class" value="org.apache.hadoop.abacus.ValueAggregatorJob"/>
+	</manifest>
+    </jar>
+  </target>
+
+</project>

+ 58 - 0
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/WordCountAggregatorDescriptor.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus.examples;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.abacus.ValueAggregatorBaseDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a user defined aggregator descriptor that is used
+ * for counting the words in the input data
+ *
+ */
+public class WordCountAggregatorDescriptor extends
+    ValueAggregatorBaseDescriptor {
+
+  /**
+   * Parse the given value, generate an aggregation-id/value pair per word.
+   * The ID is of type LONG_VALUE_SUM, with WORD as the real id. The value is 1.
+   * 
+   * @return a list of the generated pairs.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    String words[] = val.toString().split(" |\t");
+    ArrayList<Entry> retv = new ArrayList<Entry>();
+    for (int i = 0; i < words.length; i++) {
+      Entry en = generateEntry(LONG_VALUE_SUM, words[i], ONE);
+      retv.add(en);
+    }
+    return retv;
+  }
+
+  /** 
+   * Do nothing.
+   */
+  public void configure(JobConf job) {
+
+  }
+
+}

+ 59 - 0
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/WordHistogramCountDescriptor.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus.examples;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.abacus.ValueAggregatorBaseDescriptor;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a user defined aggregator descriptor that is used for
+ * computing the histogram of the words in the input texts.
+ * 
+ */
+public class WordHistogramCountDescriptor extends ValueAggregatorBaseDescriptor {
+
+  /**
+   * Parse the given value, generate an aggregation-id/value pair per word.
+   * The ID is of type VALUE_HISTOGRAM, with WORD_HISTOGRAM as the real id. 
+   * The value is WORD\t1.
+   * 
+   * @return a list of the generated pairs.
+   */
+    public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+        String words[] = val.toString().split(" |\t");
+        ArrayList<Entry> retv = new ArrayList<Entry>();
+        for (int i = 0; i < words.length; i++) {
+            Text valCount = new Text(words[i] + "\t" + "1");
+            Entry en = generateEntry(VALUE_HISTOGRAM, "WORD_HISTOGRAM",
+                    valCount);
+            retv.add(en);
+        }
+        return retv;
+    }
+
+    public void configure(JobConf job) {
+
+    }
+
+
+}

+ 15 - 0
src/contrib/abacus/src/examples/wordcountaggregator.spec

@@ -0,0 +1,15 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+<property>
+  <name>aggregator.descriptor.num</name>
+  <value>1</value>
+</property>
+
+<property>
+   <name>aggregator.descriptor.0</name>
+   <value>UserDefined,org.apache.hadoop.abacus.examples.WordCountAggregatorDescriptor</value>
+o</property>
+</configuration>

+ 17 - 0
src/contrib/abacus/src/examples/wordhistogramaggregator.spec

@@ -0,0 +1,17 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- viewclick job specific specification -->
+
+<configuration>
+
+<property>
+  <name>aggregator.descriptor.num</name>
+  <value>1</value>
+</property>
+
+<property>
+   <name>aggregator.descriptor.0</name>
+   <value>UserDefined,org.apache.hadoop.abacus.examples.WordHistogramCountDescriptor</value>
+</property>
+</configuration>

+ 95 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/DoubleValueSum.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+
+
+/**
+ * This class implements a value aggregator that sums up a sequence of double
+ * values.
+ * 
+ */
+public class DoubleValueSum implements ValueAggregator {
+
+  double sum = 0;
+
+  /**
+   * The default constructor
+   * 
+   */
+  public DoubleValueSum() {
+    reset();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object whose string representation represents a double value.
+   * 
+   */
+  public void addNextValue(Object val) {
+    this.sum += Double.parseDouble(val.toString());
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          a double value.
+   * 
+   */
+  public void addNextValue(double val) {
+    this.sum += val;
+  }
+
+  /**
+   * @return the string representation of the aggregated value
+   */
+  public String getReport() {
+    return "" + sum;
+  }
+
+  /**
+   * @return the aggregated value
+   */
+  public double getSum() {
+    return this.sum;
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    sum = 0;
+  }
+
+  /**
+   * @return return an array of one element. The element is a string
+   *         representation of the aggregated value. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList getCombinerOutput() {
+    ArrayList retv = new ArrayList(1);
+    retv.add(getReport());
+    return retv;
+  }
+
+}

+ 160 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * A common base implementing some statics collecting mechanisms that are
+ * commonly used in a typical map/reduce job.
+ * 
+ */
+public abstract class JobBase implements Mapper, Reducer {
+
+  public static final Log LOG = LogFactory.getLog("abacus.job");
+
+  private TreeMap<Object, Long> longCounters = null;
+
+  private TreeMap<Object, Double> doubleCounters = null;
+
+  /**
+   * Set the given counter to the given value
+   * @param name the counter name
+   * @param value the value for the counter
+   */
+  protected void setLongValue(Object name, long value) {
+    this.longCounters.put(name, new Long(value));
+  }
+
+  /**
+   * Set the given counter to the given value
+   * @param name the counter name
+   * @param value the value for the counter
+   */
+  protected void setDoubleValue(Object name, double value) {
+    this.doubleCounters.put(name, new Double(value));
+  }
+
+  /**
+   * 
+   * @param name the counter name
+   * @return return the value of the given counter.
+   */
+  protected Long getLongValue(Object name) {
+    return this.longCounters.get(name);
+  }
+
+  /**
+   * 
+   * @param name the counter name
+   * @return return the value of the given counter.
+   */
+  protected Double getDoubleValue(Object name) {
+    return this.doubleCounters.get(name);
+  }
+
+  /**
+   * Increment the given counter by the given incremental value
+   * If the counter does not exist, one is created with value 0.
+   * 
+   * @param name the counter name
+   * @param inc the incremental value
+   * @return the updated value.
+   */
+  protected Long addLongValue(Object name, long inc) {
+    Long val = this.longCounters.get(name);
+    Long retv = null;
+    if (val == null) {
+      retv = new Long(inc);
+    } else {
+      retv = new Long(val.longValue() + inc);
+    }
+    this.longCounters.put(name, retv);
+    return retv;
+  }
+
+  /**
+   * Increment the given counter by the given incremental value
+   * If the counter does not exist, one is created with value 0.
+   * 
+   * @param name the counter name
+   * @param inc the incremental value
+   * @return
+   */
+  protected Double addDoubleValue(Object name, double inc) {
+    Double val = this.doubleCounters.get(name);
+    Double retv = null;
+    if (val == null) {
+      retv = new Double(inc);
+    } else {
+      retv = new Double(val.doubleValue() + inc);
+    }
+    this.doubleCounters.put(name, retv);
+    return retv;
+  }
+
+  /**
+   * log the counters
+   * 
+   */
+  protected void report() {
+    LOG.info(getReport());
+  }
+
+  /**
+   * log the counters
+   * 
+   */
+  protected String getReport() {
+    StringBuffer sb = new StringBuffer();
+
+    Iterator iter = this.longCounters.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry e = (Entry) iter.next();
+      sb.append(e.getKey().toString()).append("\t").append(e.getValue())
+          .append("\n");
+    }
+    iter = this.doubleCounters.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry e = (Entry) iter.next();
+      sb.append(e.getKey().toString()).append("\t").append(e.getValue())
+          .append("\n");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Initializes a new instance from a {@link JobConf}.
+   * 
+   * @param job
+   *          the configuration
+   */
+  public void configure(JobConf job) {
+    this.longCounters = new TreeMap();
+    this.doubleCounters = new TreeMap();
+  }
+}

+ 95 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/LongValueSum.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+
+/**
+ * This class implements a value aggregator that sums up 
+ * a sequence of long values.
+ * 
+ */
+public class LongValueSum implements ValueAggregator {
+
+    long sum = 0;
+    
+    /**
+     *  the default constructor
+     *
+     */
+    public LongValueSum() {
+        reset();
+    }
+
+    /**
+     * add a value to the aggregator
+     * 
+     * @param val
+     *          an object whose string representation represents a long value.
+     * 
+     */
+    public void addNextValue(Object val) {
+        this.sum += Long.parseLong(val.toString());
+    }
+    
+    /**
+     * add a value to the aggregator
+     * 
+     * @param val
+     *          a long value.
+     * 
+     */
+    public void addNextValue(long val) {
+        this.sum += val;
+    }
+    
+    /**
+     * @return the aggregated value
+     */
+    public long getSum() {
+        return this.sum;
+    }
+    
+    /**
+     * @return the string representation of the aggregated value
+     */
+    public String getReport() {
+        return ""+sum;
+    }
+
+    /**
+     * reset the aggregator
+     */
+    public void reset() {
+        sum = 0;
+    }
+
+    /**
+     * @return return an array of one element. The element is a string
+     *         representation of the aggregated value. The return value is
+     *         expected to be used by the a combiner.
+     */
+    public ArrayList getCombinerOutput() {
+        ArrayList retv = new ArrayList(1);
+        retv.add(getReport());
+        return retv;
+    }
+}
+
+

+ 91 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/UniqValueCount.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * This class implements a value aggregator that dedupes a sequence of objects.
+ * 
+ */
+public class UniqValueCount implements ValueAggregator {
+
+  TreeMap uniqItems = null;
+
+  /**
+   * the default constructor
+   * 
+   */
+  public UniqValueCount() {
+    uniqItems = new TreeMap();
+  }
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val
+   *          an object.
+   * 
+   */
+  public void addNextValue(Object val) {
+    uniqItems.put(val, "1");
+
+  }
+
+  /**
+   * @return return the number of unique objects aggregated
+   */
+  public String getReport() {
+    return "" + uniqItems.size();
+  }
+
+  /**
+   * 
+   * @return the set of the unique objects
+   */
+  public Set getUniqueItems() {
+    return uniqItems.keySet();
+  }
+
+  /**
+   * reset the aggregator
+   */
+  public void reset() {
+    uniqItems = new TreeMap();
+  }
+
+  /**
+   * @return return an array of the unique objects. The return value is
+   *         expected to be used by the a combiner.
+   */
+  public ArrayList getCombinerOutput() {
+    Object key = null;
+    Iterator iter = uniqItems.keySet().iterator();
+    ArrayList retv = new ArrayList();
+
+    while (iter.hasNext()) {
+      key = iter.next();
+      retv.add(key);
+    }
+    return retv;
+  }
+}

+ 113 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/UserDefinedValueAggregatorDescriptor.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a wrapper for a user defined value aggregator descriptor.
+ * It servs two functions: One is to create an object of ValueAggregatorDescriptor from the
+ * name of a user defined class that may be dynamically loaded. The other is to
+ * deligate inviokations of generateKeyValPairs function to the created object.
+ * 
+ */
+public class UserDefinedValueAggregatorDescriptor implements
+    ValueAggregatorDescriptor {
+  private String className;
+
+  private ValueAggregatorDescriptor theAggregatorDescriptor = null;
+
+  private static final Class[] argArray = new Class[] {};
+
+  /**
+   * Create an instance of the given class
+   * @param className the name of the class
+   * @return a dynamically created instance of the given class 
+   */
+  public static Object createInstance(String className) {
+    Object retv = null;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      Class theFilterClass = Class.forName(className, true, classLoader);
+      Constructor meth = theFilterClass.getDeclaredConstructor(argArray);
+      meth.setAccessible(true);
+      retv = meth.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return retv;
+  }
+
+  private void createAggregator(JobConf job) {
+    if (theAggregatorDescriptor == null) {
+      theAggregatorDescriptor = (ValueAggregatorDescriptor) createInstance(this.className);
+      theAggregatorDescriptor.configure(job);
+    }
+  }
+
+  /**
+   * 
+   * @param className the class name of the user defined descriptor class
+   * @param job a configure object used for decriptor configuration
+   */
+  public UserDefinedValueAggregatorDescriptor(String className, JobConf job) {
+    this.className = className;
+    this.createAggregator(job);
+  }
+
+  /**
+   *   Generate a list of aggregation-id/value pairs for the given key/value pairs
+   *   by delegating the invocation to the real object.
+   *   
+   * @param key
+   *          input key
+   * @param valiput
+   *          value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Abacus based job.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    ArrayList<Entry> retv = new ArrayList<Entry>();
+    if (this.theAggregatorDescriptor != null) {
+      retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val);
+    }
+    return retv;
+  }
+
+  /**
+   * @return the string representation of this object.
+   */
+  public String toString() {
+    return "UserDefinedValueAggregatorDescriptor with class name:" + "\t"
+        + this.className;
+  }
+
+  /**
+   *  Do nothing.
+   */
+  public void configure(JobConf job) {
+
+  }
+
+}

+ 53 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregator.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+
+/**
+ * This interface defines the minimal protocol for value aggregators.
+ * 
+ */
+public interface ValueAggregator {
+
+  /**
+   * add a value to the aggregator
+   * 
+   * @param val the value to be added
+   */
+  public void addNextValue(Object val);
+
+  /**
+   * reset the aggregator
+   *
+   */
+  public void reset();
+
+  /**
+   * @return the string representation of the agregator
+   */
+  public String getReport();
+
+  /**
+   * 
+   * @return an array of values as the outputs of the combiner.
+   */
+  public ArrayList getCombinerOutput();
+
+}

+ 140 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorBaseDescriptor.java

@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+/** 
+ * This class implements the common functionalities of 
+ * the subclasses of ValueAggregatorDescriptor class.
+ *
+ */
+public class ValueAggregatorBaseDescriptor implements ValueAggregatorDescriptor {
+
+  static public final String UNIQ_VALUE_COUNT = "UniqValueCount";
+
+  static public final String LONG_VALUE_SUM = "LongValueSum";
+
+  static public final String DOUBLE_VALUE_SUM = "DoubleValueSum";
+
+  static public final String VALUE_HISTOGRAM = "ValueHistogram";
+
+  public String inputFile = null;
+
+  private static class MyEntry implements Entry {
+    Object key;
+
+    Object val;
+
+    public Object getKey() {
+      return key;
+    }
+
+    public Object getValue() {
+      return val;
+    }
+
+    public Object setValue(Object val) {
+      this.val = val;
+      return val;
+    }
+
+    public MyEntry(Object key, Object val) {
+      this.key = key;
+      this.val = val;
+    }
+  }
+
+  /**
+   * 
+   * @param type the aggregation type
+   * @param id the aggregation id
+   * @param val the val associated with the id to be aggregated
+   * @return an Entry whose key is the aggregation id prefixed with 
+   * the aggregation type.
+   */
+  public static Entry generateEntry(String type, String id, Object val) {
+    Text key = new Text(type + TYPE_SEPARATOR + id);
+    return new MyEntry(key, val);
+  }
+
+  /**
+   * 
+   * @param type the aggregation type
+   * @return a value aggregator of the given type.
+   */
+  static public ValueAggregator generateValueAggregator(String type) {
+    ValueAggregator retv = null;
+    if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) {
+      retv = new UniqValueCount();
+    } else if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) {
+      retv = new LongValueSum();
+    } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) {
+      retv = new DoubleValueSum();
+    } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) {
+      retv = new ValueHistogram();
+    }
+    return retv;
+  }
+
+  /**
+   * Generate 1 or 2 aggregation-id/value pairs for the given key/value pair.
+   * The first id will be of type LONG_VALUE_SUM, with "record_count" as
+   * its aggregation id. If the input is a file split,
+   * the second id of the same type will be generated too, with the file name 
+   * as its aggregation id. This achieves the behavior of counting the total number
+   * of records in the input data, and the number of records in each input file.
+   * 
+   * @param key
+   *          input key
+   * @param valiput
+   *          value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Abacus based job.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    ArrayList<Entry> retv = new ArrayList<Entry>();
+    String countType = LONG_VALUE_SUM;
+    String id = "record_count";
+    Entry e = generateEntry(countType, id, ONE);
+    if (e != null) {
+      retv.add(e);
+    }
+    if (this.inputFile != null) {
+      e = generateEntry(countType, this.inputFile, ONE);
+      if (e != null) {
+        retv.add(e);
+      }
+    }
+    return retv;
+  }
+
+  /**
+   * get the input file name.
+   * 
+   * @param job a job configuration object
+   */
+  public void configure(JobConf job) {
+    this.inputFile = job.get("map.input.file");
+  }
+}

+ 90 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorCombiner.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class implements the generic combiner of Abacus.
+ */
+public class ValueAggregatorCombiner extends ValueAggregatorJobBase {
+
+  /**
+   * Combiner does not need to configure.
+   */
+  public void configure(JobConf job) {
+
+  }
+
+  /** Combines values for a given key.  
+   * @param key: the key is expected to be a Text object, whose prefix indicates
+   * the type of aggregation to aggregate the values. 
+   * @param values the values to combine
+   * @param output to collect combined values
+   */
+  public void reduce(WritableComparable key, Iterator values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    String keyStr = key.toString();
+    int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
+    String type = keyStr.substring(0, pos);
+    ValueAggregator aggregator = ValueAggregatorBaseDescriptor
+        .generateValueAggregator(type);
+    if (aggregator == null) {
+      LOG.info(key.toString());
+    }
+    while (values.hasNext()) {
+      aggregator.addNextValue(values.next());
+    }
+    Iterator outputs = aggregator.getCombinerOutput().iterator();
+
+    while (outputs.hasNext()) {
+      Object v = outputs.next();
+      if (v instanceof Text) {
+        output.collect(key, (Text)v);
+      } else {
+        output.collect(key, new Text(v.toString()));
+      }
+    }
+  }
+
+  /** 
+   * Do nothing. 
+   *
+   */
+  public void close() throws IOException {
+
+  }
+
+  /** 
+   * Do nothing. Should not be called. 
+   *
+   */
+  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+      Reporter arg3) throws IOException {
+    throw new IOException ("should not be called\n");
+  }
+}

+ 67 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorDescriptor.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This interface defines the contract a value aggregator descriptor must
+ * support. Such a descriptor can be configured with a JobConf object. Its main
+ * function is to generate a list of aggregation-id/value pairs. An aggregation
+ * id encodes an aggregation type which is used to guide the way to aggregate
+ * the value in the reduce/combiner phrase of an Abacus based job.The mapper in
+ * an Abacus based map/reduce job may create one or more of
+ * ValueAggregatorDescriptor objects at configuration time. For each input
+ * key/value pair, the mapper will use those objects to create aggregation
+ * id/value pairs.
+ * 
+ */
+public interface ValueAggregatorDescriptor {
+
+  public static final String TYPE_SEPARATOR = ":";
+
+  public static final Text ONE = new Text("1");
+
+  /**
+   * Generate a list of aggregation-id/value pairs for the given key/value pair.
+   * This function is usually called by the mapper of an Abacus based job.
+   * 
+   * @param key
+   *          input key
+   * @param valiput
+   *          value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Abacus based job.
+   */
+  public ArrayList<Entry> generateKeyValPairs(Object key, Object val);
+
+  /**
+   * Configure the object
+   * 
+   * @param job
+   *          a JobConf object that may contain the information that can be used
+   *          to configure the object.
+   */
+  public void configure(JobConf job);
+}

+ 201 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java

@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+
+/**
+ * This is the main class for creating a map/reduce job using Abacus framework.
+ * The Abacus is a specialization of map/reduce framework, specilizing for
+ * performing various simple aggregations.
+ * 
+ * Generally speaking, in order to implement an application using Map/Reduce
+ * model, the developer is to implement Map and Reduce functions (and possibly
+ * combine function). However, a lot of applications related to counting and
+ * statistics computing have very similar characteristics. Abacus abstracts out
+ * the general patterns of these functions and implementing those patterns. In
+ * particular, the package provides generic mapper/redducer/combiner classes,
+ * and a set of built-in value aggregators, and a generic utility class that
+ * helps user create map/reduce jobs using the generic class. The built-in
+ * aggregators include:
+ * 
+ *      sum over numeric values 
+ *      count the number of distinct values 
+ *      compute the histogram of values 
+ *      compute the minimum, maximum, media,average, standard deviation of numeric values
+ * 
+ * The developer using Abacus will need only to provide a plugin class
+ * conforming to the following interface:
+ * 
+ *      public interface ValueAggregatorDescriptor { 
+ *          public ArrayList<Entry> generateKeyValPairs(Object key, Object value); 
+ *          public void configure(JobConfjob); 
+ *     } 
+ * 
+ * The package also provides a base class,
+ * ValueAggregatorBaseDescriptor, implementing the above interface. The user can
+ * extend the base class and implement generateKeyValPairs accordingly.
+ * 
+ * The primary work of generateKeyValPairs is to emit one or more key/value
+ * pairs based on the input key/value pair. The key in an output key/value pair
+ * encode two pieces of information: aggregation type and aggregation id. The
+ * value will be aggregated onto the aggregation id according the aggregation
+ * type.
+ * 
+ * This class offers a function to generate a map/reduce job using Abacus
+ * framework. The function takes the following parameters: input directory spec
+ * input format (text or sequence file) output directory a file specifying the
+ * user plugin class
+ * 
+ */
+public class ValueAggregatorJob {
+
+  public static JobControl createValueAggregatorJobs(String args[])
+      throws IOException {
+    JobControl theControl = new JobControl("ValueAggregatorJobs");
+    ArrayList dependingJobs = new ArrayList();
+    JobConf aJobConf = createValueAggregatorJob(args);
+    Job aJob = new Job(aJobConf, dependingJobs);
+    theControl.addJob(aJob);
+    return theControl;
+  }
+
+  /**
+   * Create an Abacus based map/reduce job.
+   * 
+   * @param args the arguments used for job creation
+   * @return a JobConf object ready for submission.
+   * 
+   * @throws IOException
+   */
+  public static JobConf createValueAggregatorJob(String args[])
+      throws IOException {
+
+    String inputDir = args[0];
+    String outputDir = args[1];
+    int numOfReducers = 1;
+    if (args.length > 2) {
+      numOfReducers = Integer.parseInt(args[2]);
+    }
+
+    Class theInputFormat = SequenceFileInputFormat.class;
+    if (args.length > 3 && args[3].compareToIgnoreCase("textinputformat") == 0) {
+      theInputFormat = TextInputFormat.class;
+    }
+
+    Path specFile = null;
+
+    if (args.length > 4) {
+      specFile = new Path(args[4]);
+    }
+
+    JobConf theJob = new JobConf(ValueAggregatorJob.class);
+    if (specFile != null) {
+      theJob.addDefaultResource(specFile);
+    }
+    FileSystem fs = FileSystem.get(theJob);
+    theJob.setJobName("ValueAggregatorJob");
+
+    String[] inputDirsSpecs = inputDir.split(",");
+    for (int i = 0; i < inputDirsSpecs.length; i++) {
+      String spec = inputDirsSpecs[i];
+      Path[] dirs = fs.globPaths(new Path(spec));
+      for (int j = 0; j < dirs.length; j++) {
+        System.out.println("Adding dir: " + dirs[j].toString());
+        theJob.addInputPath(dirs[j]);
+      }
+    }
+
+    theJob.setInputFormat(theInputFormat);
+    fs.delete(new Path(outputDir));
+
+    theJob.setMapperClass(ValueAggregatorMapper.class);
+    theJob.setOutputPath(new Path(outputDir));
+    theJob.setOutputFormat(TextOutputFormat.class);
+    theJob.setMapOutputKeyClass(Text.class);
+    theJob.setMapOutputValueClass(Text.class);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    theJob.setReducerClass(ValueAggregatorReducer.class);
+    theJob.setCombinerClass(ValueAggregatorCombiner.class);
+    theJob.setNumMapTasks(1);
+    theJob.setNumReduceTasks(numOfReducers);
+    theJob.set("mapred.sds.data.serialization.format", "csv");
+    theJob.set("mapred.child.java.opts", "-Xmx1024m");
+    // aggregator.setKeepFailedTaskFiles(true);
+    return theJob;
+  }
+
+  /**
+   * Submit/run a map/reduce job.
+   * 
+   * @param job
+   * @return true for success
+   * @throws IOException
+   */
+  public static boolean runJob(JobConf job) throws IOException {
+    JobClient jc = new JobClient(job);
+    boolean sucess = true;
+    RunningJob running = null;
+    try {
+      running = jc.submitJob(job);
+      String jobId = running.getJobID();
+      System.out.println("Job " + jobId + " is submitted");
+      while (!running.isComplete()) {
+        System.out.println("Job " + jobId + " is still running.");
+        try {
+          Thread.sleep(60000);
+        } catch (InterruptedException e) {
+        }
+        running = jc.getJob(jobId);
+      }
+      sucess = running.isSuccessful();
+    } finally {
+      if (!sucess && (running != null)) {
+        running.killJob();
+      }
+      jc.close();
+    }
+    return sucess;
+  }
+
+  /**
+   * create and run an Abacus based map/reduce job.
+   * 
+   * @param args the arguments used for job creation
+   * @throws IOException
+   */
+  public static void main(String args[]) throws IOException {
+    JobConf job = ValueAggregatorJob.createValueAggregatorJob(args);
+    runJob(job);
+  }
+}

+ 100 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJobBase.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.mapred.JobConf;
+
+
+/**
+ * This abstract class implements some common functionalities of the
+ * the generic mapper, reducer and combiner classes of Abacus.
+ *
+ */
+public abstract class ValueAggregatorJobBase extends JobBase {
+ 
+    protected ArrayList aggregatorDescriptorList = null;
+        
+    public void configure(JobConf job) {
+        super.configure(job);
+        
+        setLongValue("totalCount", 0);
+        setLongValue("errorCount", 0);
+        setLongValue("collectedCount", 0);
+        setLongValue("groupCount", 0);
+        
+        this.initializeMySpec(job);
+        this.logSpec();
+    }
+
+    private static ValueAggregatorDescriptor getValueAggregatorDescriptor(
+            String spec, JobConf job) {
+        if (spec == null)
+            return null;
+        String[] segments = spec.split(",", -1);
+        String type = segments[0];
+        if (type.compareToIgnoreCase("UserDefined") == 0) {
+            String className = segments[1];
+            return new UserDefinedValueAggregatorDescriptor(className, job);
+        } 
+        return null;
+    }
+
+    private static ArrayList getAggregatorDescriptors(JobConf job) {
+        String advn = "aggregator.descriptor";
+        int num = job.getInt(advn + ".num", 0);
+        ArrayList retv = new ArrayList(num);
+        for (int i = 0; i < num; i++) {
+            String spec = job.get(advn + "." + i);
+            ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, job);
+            if (ad != null) {
+                retv.add(ad);
+            }
+        }
+        return retv;
+    }
+    
+    private void initializeMySpec(JobConf job) {
+        this.aggregatorDescriptorList = getAggregatorDescriptors(job);
+        if (this.aggregatorDescriptorList.size() == 0) {
+            this.aggregatorDescriptorList.add(new UserDefinedValueAggregatorDescriptor(
+                    ValueAggregatorBaseDescriptor.class.getCanonicalName(), job));
+        }
+    }
+    
+    protected void logSpec() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("\n");
+        if (aggregatorDescriptorList == null) {
+            sb.append(" aggregatorDescriptorList: null");
+        } else {
+            sb.append(" aggregatorDescriptorList: ");
+            for (int i = 0; i < aggregatorDescriptorList.size(); i++) {
+                sb.append(" ").append(aggregatorDescriptorList.get(i).toString());
+            }
+        }      
+        LOG.info(sb.toString());
+    }
+
+    public void close() throws IOException {
+        report();
+    }
+}

+ 67 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorMapper.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class implements the generic mapper of Abacus.
+ */
+public class ValueAggregatorMapper extends ValueAggregatorJobBase {
+
+  /**
+   *  the map function. It iterates through the value aggregator descriptor 
+   *  list to generate aggregation id/value pairs and emit them.
+   */
+  public void map(WritableComparable key, Writable value,
+      OutputCollector output, Reporter reporter) throws IOException {
+
+    addLongValue("groupCount", 1);
+    Iterator iter = this.aggregatorDescriptorList.iterator();
+    while (iter.hasNext()) {
+      ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next();
+      Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator();
+      while (ens.hasNext()) {
+        Entry en = ens.next();
+        output.collect((WritableComparable) en.getKey(), (Writable) en
+            .getValue());
+        addLongValue("collectedCount", 1);
+      }
+    }
+
+    if (getLongValue("groupCount").longValue() % 10000 == 0) {
+      report();
+    }
+  }
+
+  /**
+   * Do nothing. Should not be called.
+   */
+  public void reduce(WritableComparable arg0, Iterator arg1,
+      OutputCollector arg2, Reporter arg3) throws IOException {
+    throw new IOException ("should not be called\n");
+  }
+}

+ 79 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorReducer.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class implements the generic reducer of Abacus.
+ * 
+ * @author runping
+ * 
+ */
+public class ValueAggregatorReducer extends ValueAggregatorJobBase {
+
+  /**
+   * @param key
+   *          the key is expected to be a Text object, whose prefix indicates
+   *          the type of aggregation to aggregate the values. In effect, data
+   *          driven computing is achieved. It is assumed that each aggregator's
+   *          getReport method emits appropriate output for the aggregator. This
+   *          may be further customiized.
+   * @value the values to be aggregated
+   */
+  public void reduce(WritableComparable key, Iterator values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    addLongValue("groupCount", 1);
+    String keyStr = key.toString();
+    int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
+    String type = keyStr.substring(0, pos);
+    keyStr = keyStr.substring(pos
+        + ValueAggregatorDescriptor.TYPE_SEPARATOR.length());
+
+    ValueAggregator aggregator = ValueAggregatorBaseDescriptor
+        .generateValueAggregator(type);
+    while (values.hasNext()) {
+      addLongValue("totalCount", 1);
+      aggregator.addNextValue(values.next());
+    }
+
+    String val = aggregator.getReport();
+    key = new Text(keyStr);
+    output.collect(key, new Text(val));
+    addLongValue("collectedCount", 1);
+    if (getLongValue("collectedCount").longValue() % 10000 == 0) {
+      report();
+    }
+  }
+
+  /**
+   * Do nothing. Should not be called
+   */
+  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+      Reporter arg3) throws IOException {
+    throw new IOException ("should not be called\n");
+  }
+}

+ 177 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueHistogram.java

@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.abacus;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements a value aggregator that computes the 
+ * histogram of a sequence of strings.
+ * 
+ */
+public class ValueHistogram implements ValueAggregator {
+
+  TreeMap items = null;
+
+  public ValueHistogram() {
+    items = new TreeMap();
+  }
+
+  /**
+   * add the given val to the aggregator.
+   * 
+   * @param the value to be added. It is expected to be a string
+   * in the form of xxxx\tnum, meaning xxxx has num occurrences.
+   */
+  public void addNextValue(Object val) {
+    String valCountStr = val.toString();
+    int pos = valCountStr.lastIndexOf("\t");
+    String valStr = valCountStr.substring(0, pos);
+    String countStr = valCountStr.substring(pos + 1);
+
+    Long count = (Long) this.items.get(valStr);
+    long inc = Long.parseLong(countStr);
+
+    if (count == null) {
+      count = new Long(inc);
+    } else {
+      count = new Long(count.longValue() + inc);
+    }
+    items.put(valStr, count);
+  }
+
+  /**
+   * @return the string representation of this aggregator.
+   * It includes the following basic statistics of the histogram:
+   *    the number of unique values
+   *    the minimum value
+   *    the media value
+   *    the maximum value
+   *    the average value
+   *    the standard deviation
+   */
+  public String getReport() {
+    long[] counts = new long[items.size()];
+
+    StringBuffer sb = new StringBuffer();
+    Iterator iter = items.values().iterator();
+    int i = 0;
+    while (iter.hasNext()) {
+      Long count = (Long) iter.next();
+      counts[i] = count.longValue();
+      i += 1;
+    }
+    Arrays.sort(counts);
+    sb.append(counts.length);
+    StringBuffer sbVal = new StringBuffer();
+    i = 0;
+    long acc = 0;
+    while (i < counts.length) {
+      long nextVal = counts[i];
+      int j = i + 1;
+      while (j < counts.length && counts[j] == nextVal) {
+        j++;
+      }
+      acc += nextVal * (j - i);
+      //sbVal.append("\t").append(nextVal).append("\t").append(j - i)
+          //.append("\n");
+      i = j;
+    }
+    double average = 0.0;
+    double sd = 0.0;
+    if (counts.length > 0) {
+      sb.append("\t").append(counts[0]);
+      sb.append("\t").append(counts[counts.length / 2]);
+      sb.append("\t").append(counts[counts.length - 1]);
+
+      average = acc * 1.0 / counts.length;
+      sb.append("\t").append(average);
+
+      i = 0;
+      while (i < counts.length) {
+        double nextDiff = counts[i] - average;
+        sd += nextDiff * nextDiff;
+        i += 1;
+      }
+      sd = Math.sqrt(sd / counts.length);
+
+      sb.append("\t").append(sd);
+
+    }
+    //sb.append("\n").append(sbVal.toString());
+    return sb.toString();
+  }
+
+  /** 
+   * 
+   * @return a string representation of the list of value/frequence pairs of 
+   * the histogram
+   */
+  public String getReportDetails() {
+    StringBuffer sb = new StringBuffer();
+    Iterator iter = items.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry en = (Entry) iter.next();
+      Object val = en.getKey();
+      Long count = (Long) en.getValue();
+      sb.append("\t").append(val.toString()).append("\t").append(
+          count.longValue()).append("\n");
+    }
+    return sb.toString();
+  }
+
+  /**
+   *  @return a list value/frequence pairs.
+   *  The return value is expected to be used by the reducer.
+   */
+  public ArrayList getCombinerOutput() {
+    ArrayList retv = new ArrayList();
+    Iterator iter = items.entrySet().iterator();
+
+    while (iter.hasNext()) {
+      Entry en = (Entry) iter.next();
+      Object val = en.getKey();
+      Long count = (Long) en.getValue();
+      retv.add(new Text(val.toString() + "\t" + count.longValue()));
+    }
+    return retv;
+  }
+
+  /** 
+   * 
+   * @return a TreeMap representation of the histogram
+   */
+  public TreeMap getReportItems() {
+    return items;
+  }
+
+  /** 
+   * reset the aggregator
+   */
+  public void reset() {
+    items = new TreeMap();
+  }
+
+}

+ 171 - 0
src/contrib/abacus/src/java/org/apache/hadoop/abacus/package.html

@@ -0,0 +1,171 @@
+<html>
+<body>
+
+Hadoop based Abacus is a specialization of map/reduce framework, specilizing for
+performing various counting and aggregations. It offers similar functionalities to Google's SawZall.
+<p />
+<h2><a name="Abacus_framework"></a> Abacus framework </h2>
+<p />
+Generally speaking, in order to implement an application using Map/Reduce
+model, the developer needs to implement Map and Reduce functions (and possibly
+Combine function). However, for a lot of applications related to counting and
+statistics computing, these functions have very similar characteristics. Abacus abstracts out
+the general patterns and provides a package implementing 
+those patterns. In particular, the package provides a generic mapper class,
+a reducer class and a combiner class, and a set of built-in value aggregators.
+It also provides a generic utility class, ValueAggregatorJob, that offers a static function that 
+creates Abacus map/reduce jobs:
+<blockquote>
+<pre>
+public static JobConf createValueAggregatorJob(String args&#91;]) throws IOException;
+</pre>
+</blockquote>
+To call this function, the user needs to pass in arguments specifying the input directories, the output directory, 
+the number of reducers, the input data format (textinputformat or sequencefileinputformat), and a file specifying user plugin class(es) to load by the mapper. 
+A user plugin class is responsible for specifying what 
+aggregators to use and what values are for which aggregators. 
+A plugin class must implement the following interface:
+<blockquote>
+<pre>
+ public interface ValueAggregatorDescriptor { 
+     public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object value); 
+     public void configure(JobConfjob); 
+} 
+</pre>
+</blockquote>
+Function generateKeyValPairs will generate aggregation key/value pairs for the
+input key/value pair. Each aggregation key encodes two pieces of information: the aggregation type and aggregation ID.
+The value is the value to be aggregated onto the aggregation ID according to the aggregation type. Here 
+is a simple example user plugin class for counting the words in the input texts:
+<blockquote>
+<pre>
+public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { 
+    public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
+        String words &#91;] &#61; val.toString().split(&#34; &#124;\t&#34;);
+        ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();
+        for (int i &#61; 0; i &#60; words.length; i++) {
+            retv.add(generateEntry(LONG&#95;VALUE&#95;SUM, words&#91;i], ONE))
+        }
+        return retv;
+    }
+    public void configure(JobConf job) {}
+} 
+</pre>
+</blockquote>
+In the above code, LONG_VALUE_SUM is a string denoting the aggregation type LongValueSum, which sums over long values.
+ONE denotes a string "1". Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first argument as an aggregation type, the second as an aggregation ID, and the the third argumnent as the value to be aggregated. The output will look like: "LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will be "1". The mapper will call generateKeyValPairs(Object key, Object val)  for each input key/value pair to generate the desired aggregation id/value pairs. 
+The down stream combiner/reducer will interpret these pairs as adding one to the aggregator XXXX.
+<p />
+Class ValueAggregatorBaseDescriptor is a base class that user plugin classes can extend. Here is the XML fragment specifying the user plugin class:
+<blockquote>
+<pre>
+&#60;property&#62;
+    &#60;name&#62;aggregator.descriptor.num&#60;/name&#62;
+    &#60;value&#62;1&#60;/value&#62;
+&#60;/property&#62;
+&#60;property&#62;
+   &#60;name&#62;aggregator.descriptor.0&#60;/name&#62;
+   &#60;value&#62;UserDefined,org.apache.hadoop.abacus.examples.WordCountAggregatorDescriptor&#60;/value&#62;
+&#60;/property&#62; 
+</pre>
+</blockquote>
+Class ValueAggregatorBaseDescriptor itself provides a default implementation for  generateKeyValPairs:
+<blockquote>
+<pre>
+public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
+   ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();     
+   String countType &#61; LONG&#95;VALUE&#95;SUM;
+   String id &#61; &#34;record&#95;count&#34;;
+   retv.add(generateEntry(countType, id, ONE));
+   return retv;
+}
+</pre>
+</blockquote>
+Thus, if no user plugin class is specified, the default behavior of the map/reduce job is to count the number of records (lines) in the imput files.
+<p />
+During runtime, the mapper will invoke the generateKeyValPairs function for each input key/value pair, and emit the generated 
+key/value pairs:
+<blockquote>
+<pre>
+public void map(WritableComparable key, Writable value,
+            OutputCollector output, Reporter reporter) throws IOException {
+   Iterator iter &#61; this.aggregatorDescriptorList.iterator();
+   while (iter.hasNext()) {
+       ValueAggregatorDescriptor ad &#61; (ValueAggregatorDescriptor) iter.next();
+       Iterator&#60;Entry&#62; ens &#61; ad.generateKeyValPairs(key, value).iterator();
+       while (ens.hasNext()) {
+           Entry en &#61; ens.next();
+           output.collect((WritableComparable)en.getKey(), (Writable)en.getValue());
+       }
+   }
+}
+</pre>
+</blockquote>
+The reducer will create an aggregator object for each key/value list pair, and perform the appropriate aggregation.
+At the end, it will emit the aggregator's results:
+<blockquote>
+<pre>
+public void reduce(WritableComparable key, Iterator values,
+            OutputCollector output, Reporter reporter) throws IOException {
+   String keyStr &#61; key.toString();
+   int pos &#61; keyStr.indexOf(ValueAggregatorDescriptor.TYPE&#95;SEPARATOR);
+   String type &#61; keyStr.substring(0,pos);
+   keyStr &#61; keyStr.substring(pos+ValueAggregatorDescriptor.TYPE&#95;SEPARATOR.length());       
+   ValueAggregator aggregator &#61; 
+       ValueAggregatorBaseDescriptor.generateValueAggregator(type);
+   while (values.hasNext()) {
+       aggregator.addNextValue(values.next());
+   }         
+   String val &#61; aggregator.getReport();
+   key &#61; new Text(keyStr);
+   output.collect(key, new Text(val)); 
+}
+</pre>
+</blockquote>
+In order to be able to use combiner, all the aggregation type be aggregators must be associative and communitive.
+The following are the types Abacus supports now: <ul>
+<li> LongValueSum: sum over long values 
+</li> <li> DoubleValueSum: sum over float/double values 
+</li> <li> uniqValueCount: count the number of distinct values 
+</li> <li> ValueHistogram: compute the histogram of values compute the minimum, maximum, media,average, standard deviation of numeric values
+</li></ul> 
+<p />
+<h2><a name="Create_and_run_an_Abacus_based_a"></a> Create and run an Abacus based application </h2>
+<p />
+To create an Abacus based application, the user needs to do the following things:
+<p />
+1. Implement a user plugin:
+<blockquote>
+<pre>
+import org.apache.hadoop.abacus.ValueAggregatorBaseDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+
+public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor {
+   public void map(WritableComparable key, Writable value,
+            OutputCollector output, Reporter reporter) throws IOException {
+   }
+   public void configure(JobConf job) {
+    
+   } 
+}
+</pre>
+</blockquote>
+Examples can be found in src/contrib/abacus/examples directory.
+<p />
+
+2. Create an xml file specifying the user plugin.
+<p />
+3. Compile your java class and create a jar file, say wc.jar.
+
+<p />
+Finally, run the job:
+<blockquote>
+<pre>
+        hadoop jar wc.jar org.apache.hadoop.abacus.examples.ValueAggregatorJob indirs outdir numofreducers textinputformat|sequencefileinputformat spec_file
+</pre>
+</blockquote>
+<p />
+
+
+</body>
+</html>