Quellcode durchsuchen

MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1555969 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur vor 11 Jahren
Ursprung
Commit
47469c464c
14 geänderte Dateien mit 494 neuen und 10 gelöschten Zeilen
  1. 2 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 51 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
  3. 3 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
  4. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
  5. 17 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
  6. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  7. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
  8. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
  9. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
  10. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
  11. 11 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
  12. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
  13. 191 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
  14. 178 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -50,6 +50,8 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
     with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
 
+    MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

+ 51 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java

@@ -949,6 +949,23 @@ public class JobConf extends Configuration {
     return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
   }
 
+  /**
+   * Get the user defined {@link WritableComparable} comparator for
+   * grouping keys of inputs to the combiner.
+   *
+   * @return comparator set by the user for grouping values.
+   * @see #setCombinerKeyGroupingComparator(Class) for details.
+   */
+  public RawComparator getCombinerKeyGroupingComparator() {
+    Class<? extends RawComparator> theClass = getClass(
+        JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
+    if (theClass == null) {
+      return getOutputKeyComparator();
+    }
+
+    return ReflectionUtils.newInstance(theClass, this);
+  }
+
   /** 
    * Get the user defined {@link WritableComparable} comparator for 
    * grouping keys of inputs to the reduce.
@@ -966,6 +983,37 @@ public class JobConf extends Configuration {
     return ReflectionUtils.newInstance(theClass, this);
   }
 
+  /**
+   * Set the user defined {@link RawComparator} comparator for
+   * grouping keys in the input to the combiner.
+   * <p/>
+   * <p>This comparator should be provided if the equivalence rules for keys
+   * for sorting the intermediates are different from those for grouping keys
+   * before each call to
+   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
+   * <p/>
+   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
+   * <p/>
+   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+   * how keys are sorted, this can be used in conjunction to simulate
+   * <i>secondary sort on values</i>.</p>
+   * <p/>
+   * <p><i>Note</i>: This is not a guarantee of the combiner sort being
+   * <i>stable</i> in any sense. (In any case, with the order of available
+   * map-outputs to the combiner being non-deterministic, it wouldn't make
+   * that much sense.)</p>
+   *
+   * @param theClass the comparator class to be used for grouping keys for the
+   * combiner. It should implement <code>RawComparator</code>.
+   * @see #setOutputKeyComparatorClass(Class)
+   */
+  public void setCombinerKeyGroupingComparator(
+      Class<? extends RawComparator> theClass) {
+    setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
+        theClass, RawComparator.class);
+  }
+
   /** 
    * Set the user defined {@link RawComparator} comparator for 
    * grouping keys in the input to the reduce.
@@ -989,7 +1037,9 @@ public class JobConf extends Configuration {
    * 
    * @param theClass the comparator class to be used for grouping keys. 
    *                 It should implement <code>RawComparator</code>.
-   * @see #setOutputKeyComparatorClass(Class)                 
+   * @see #setOutputKeyComparatorClass(Class)
+   * @see {@link #setCombinerKeyGroupingComparator(Class)} for setting a 
+   * comparator for the combiner.
    */
   public void setOutputValueGroupingComparator(
       Class<? extends RawComparator> theClass) {

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -1553,7 +1553,8 @@ abstract public class Task implements Writable, Configurable {
       combinerClass = cls;
       keyClass = (Class<K>) job.getMapOutputKeyClass();
       valueClass = (Class<V>) job.getMapOutputValueClass();
-      comparator = (RawComparator<K>) job.getOutputKeyComparator();
+      comparator = (RawComparator<K>)
+          job.getCombinerKeyGroupingComparator();
     }
 
     @SuppressWarnings("unchecked")
@@ -1602,7 +1603,7 @@ abstract public class Task implements Writable, Configurable {
       this.taskId = taskId;
       keyClass = (Class<K>) context.getMapOutputKeyClass();
       valueClass = (Class<V>) context.getMapOutputValueClass();
-      comparator = (RawComparator<K>) context.getSortComparator();
+      comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
       this.committer = committer;
     }
 

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

@@ -948,11 +948,27 @@ public class Job extends JobContextImpl implements JobContext {
     conf.setOutputValueClass(theClass);
   }
 
+  /**
+   * Define the comparator that controls which keys are grouped together
+   * for a single call to combiner,
+   * {@link Reducer#reduce(Object, Iterable,
+   * org.apache.hadoop.mapreduce.Reducer.Context)}
+   *
+   * @param cls the raw comparator to use
+   * @throws IllegalStateException if the job is submitted
+   */
+  public void setCombinerKeyGroupingComparatorClass(
+      Class<? extends RawComparator> cls) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
+    conf.setCombinerKeyGroupingComparator(cls);
+  }
+
   /**
    * Define the comparator that controls how the keys are sorted before they
    * are passed to the {@link Reducer}.
    * @param cls the raw comparator
    * @throws IllegalStateException if the job is submitted
+   * @see {@link #setCombinerKeyGroupingComparatorClass(Class)}
    */
   public void setSortComparatorClass(Class<? extends RawComparator> cls
                                      ) throws IllegalStateException {
@@ -967,6 +983,8 @@ public class Job extends JobContextImpl implements JobContext {
    *                       org.apache.hadoop.mapreduce.Reducer.Context)}
    * @param cls the raw comparator to use
    * @throws IllegalStateException if the job is submitted
+   * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} for setting a 
+   * comparator for the combiner.
    */
   public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                          ) throws IllegalStateException {

+ 17 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java

@@ -167,13 +167,24 @@ public interface JobContext extends MRJobConfig {
    */
   public String getJar();
 
-  /** 
-   * Get the user defined {@link RawComparator} comparator for 
-   * grouping keys of inputs to the reduce.
-   * 
+  /**
+   * Get the user defined {@link RawComparator} comparator for
+   * grouping keys of inputs to the combiner.
+   *
    * @return comparator set by the user for grouping values.
-   * @see Job#setGroupingComparatorClass(Class) for details.  
-   */
+   * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+   */
+  public RawComparator<?> getCombinerKeyGroupingComparator();
+
+    /**
+     * Get the user defined {@link RawComparator} comparator for
+     * grouping keys of inputs to the reduce.
+     *
+     * @return comparator set by the user for grouping values.
+     * @see Job#setGroupingComparatorClass(Class) for details.
+     * @see {@link #getCombinerKeyGroupingComparator()} for setting a 
+     * comparator for the combiner.
+     */
   public RawComparator<?> getGroupingComparator();
   
   /**

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -93,6 +93,8 @@ public interface MRJobConfig {
 
   public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
 
+  public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
+
   public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
 
   public static final String WORKING_DIR = "mapreduce.job.working.dir";

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java

@@ -166,6 +166,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
     return base.getFileTimestamps();
   }
 
+  @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return base.getCombinerKeyGroupingComparator();
+  }
+
   @Override
   public RawComparator<?> getGroupingComparator() {
     return base.getGroupingComparator();

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java

@@ -159,6 +159,11 @@ class ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
     return base.getFileTimestamps();
   }
 
+  @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return base.getCombinerKeyGroupingComparator();
+  }
+
   @Override
   public RawComparator<?> getGroupingComparator() {
     return base.getGroupingComparator();

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java

@@ -168,6 +168,11 @@ public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
       return mapContext.getFileTimestamps();
     }
 
+    @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return mapContext.getCombinerKeyGroupingComparator();
+    }
+
     @Override
     public RawComparator<?> getGroupingComparator() {
       return mapContext.getGroupingComparator();

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java

@@ -161,6 +161,11 @@ public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
       return reduceContext.getFileTimestamps();
     }
 
+    @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return reduceContext.getCombinerKeyGroupingComparator();
+    }
+
     @Override
     public RawComparator<?> getGroupingComparator() {
       return reduceContext.getGroupingComparator();

+ 11 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java

@@ -252,6 +252,17 @@ public class JobContextImpl implements JobContext {
     return conf.getJar();
   }
 
+  /**
+   * Get the user defined {@link RawComparator} comparator for
+   * grouping keys of inputs to the combiner.
+   *
+   * @return comparator set by the user for grouping values.
+   * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+   */
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return conf.getCombinerKeyGroupingComparator();
+  }
+
   /** 
    * Get the user defined {@link RawComparator} comparator for 
    * grouping keys of inputs to the reduce.

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

@@ -582,7 +582,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
     Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
     RawComparator<K> comparator = 
-      (RawComparator<K>)job.getOutputKeyComparator();
+      (RawComparator<K>)job.getCombinerKeyGroupingComparator();
     try {
       CombineValuesIterator values = new CombineValuesIterator(
           kvIter, comparator, keyClass, valClass, job, Reporter.NULL,

+ 191 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java

@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestOldCombinerGrouping {
+  private static String TEST_ROOT_DIR =
+      new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+  public static class Map implements
+      Mapper<LongWritable, Text, Text, LongWritable> {
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      String v = value.toString();
+      String k = v.substring(0, v.indexOf(","));
+      v = v.substring(v.indexOf(",") + 1);
+      output.collect(new Text(k), new LongWritable(Long.parseLong(v)));
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf job) {
+    }
+  }
+
+  public static class Reduce implements
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    @Override
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      LongWritable maxValue = null;
+      while (values.hasNext()) {
+        LongWritable value = values.next();
+        if (maxValue == null) {
+          maxValue = value;
+        } else if (value.compareTo(maxValue) > 0) {
+          maxValue = value;
+        }
+      }
+      output.collect(key, maxValue);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf job) {
+    }
+  }
+
+  public static class Combiner extends Reduce {
+  }
+
+  public static class GroupComparator implements RawComparator<Text> {
+    @Override
+    public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+        int i4) {
+      byte[] b1 = new byte[i2];
+      System.arraycopy(bytes, i, b1, 0, i2);
+
+      byte[] b2 = new byte[i4];
+      System.arraycopy(bytes2, i3, b2, 0, i4);
+
+      return compare(new Text(new String(b1)), new Text(new String(b2)));
+    }
+
+    @Override
+    public int compare(Text o1, Text o2) {
+      String s1 = o1.toString();
+      String s2 = o2.toString();
+      s1 = s1.substring(0, s1.indexOf("|"));
+      s2 = s2.substring(0, s2.indexOf("|"));
+      return s1.compareTo(s2);
+    }
+
+  }
+
+  @Test
+  public void testCombiner() throws Exception {
+    if (!new File(TEST_ROOT_DIR).mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+    }
+    File in = new File(TEST_ROOT_DIR, "input");
+    if (!in.mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + in);
+    }
+    File out = new File(TEST_ROOT_DIR, "output");
+    PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+    pw.println("A|a,1");
+    pw.println("A|b,2");
+    pw.println("B|a,3");
+    pw.println("B|b,4");
+    pw.println("B|c,5");
+    pw.close();
+    JobConf job = new JobConf();
+    job.set("mapreduce.framework.name", "local");
+    TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+    TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputValueGroupingComparator(GroupComparator.class);
+
+    job.setCombinerClass(Combiner.class);
+    job.setCombinerKeyGroupingComparator(GroupComparator.class);
+    job.setInt("min.num.spills.for.combine", 0);
+
+    JobClient client = new JobClient(job);
+    RunningJob runningJob = client.submitJob(job);
+    runningJob.waitForCompletion();
+    if (runningJob.isSuccessful()) {
+      Counters counters = runningJob.getCounters();
+
+      long combinerInputRecords = counters.getGroup(
+          "org.apache.hadoop.mapreduce.TaskCounter").
+          getCounter("COMBINE_INPUT_RECORDS");
+      long combinerOutputRecords = counters.getGroup(
+          "org.apache.hadoop.mapreduce.TaskCounter").
+          getCounter("COMBINE_OUTPUT_RECORDS");
+      Assert.assertTrue(combinerInputRecords > 0);
+      Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+      BufferedReader br = new BufferedReader(new FileReader(
+          new File(out, "part-00000")));
+      Set<String> output = new HashSet<String>();
+      String line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNull(line);
+      br.close();
+
+      Set<String> expected = new HashSet<String>();
+      expected.add("A2");
+      expected.add("B5");
+
+      Assert.assertEquals(expected, output);
+
+    } else {
+      Assert.fail("Job failed");
+    }
+  }
+
+}

+ 178 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestNewCombinerGrouping {
+  private static String TEST_ROOT_DIR =
+      new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+  public static class Map extends
+      Mapper<LongWritable, Text, Text, LongWritable> {
+
+    @Override
+    protected void map(LongWritable key, Text value,
+        Context context)
+        throws IOException, InterruptedException {
+      String v = value.toString();
+      String k = v.substring(0, v.indexOf(","));
+      v = v.substring(v.indexOf(",") + 1);
+      context.write(new Text(k), new LongWritable(Long.parseLong(v)));
+    }
+  }
+
+  public static class Reduce extends
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    @Override
+    protected void reduce(Text key, Iterable<LongWritable> values,
+        Context context)
+        throws IOException, InterruptedException {
+      LongWritable maxValue = null;
+      for (LongWritable value : values) {
+        if (maxValue == null) {
+          maxValue = value;
+        } else if (value.compareTo(maxValue) > 0) {
+          maxValue = value;
+        }
+      }
+      context.write(key, maxValue);
+    }
+  }
+
+  public static class Combiner extends Reduce {
+  }
+
+  public static class GroupComparator implements RawComparator<Text> {
+    @Override
+    public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+        int i4) {
+      byte[] b1 = new byte[i2];
+      System.arraycopy(bytes, i, b1, 0, i2);
+
+      byte[] b2 = new byte[i4];
+      System.arraycopy(bytes2, i3, b2, 0, i4);
+
+      return compare(new Text(new String(b1)), new Text(new String(b2)));
+    }
+
+    @Override
+    public int compare(Text o1, Text o2) {
+      String s1 = o1.toString();
+      String s2 = o2.toString();
+      s1 = s1.substring(0, s1.indexOf("|"));
+      s2 = s2.substring(0, s2.indexOf("|"));
+      return s1.compareTo(s2);
+    }
+
+  }
+
+  @Test
+  public void testCombiner() throws Exception {
+    if (!new File(TEST_ROOT_DIR).mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+    }
+    File in = new File(TEST_ROOT_DIR, "input");
+    if (!in.mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + in);
+    }
+    File out = new File(TEST_ROOT_DIR, "output");
+    PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+    pw.println("A|a,1");
+    pw.println("A|b,2");
+    pw.println("B|a,3");
+    pw.println("B|b,4");
+    pw.println("B|c,5");
+    pw.close();
+    JobConf conf = new JobConf();
+    conf.set("mapreduce.framework.name", "local");
+    Job job = new Job(conf);
+    TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+    TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setGroupingComparatorClass(GroupComparator.class);
+
+    job.setCombinerKeyGroupingComparatorClass(GroupComparator.class);
+    job.setCombinerClass(Combiner.class);
+    job.getConfiguration().setInt("min.num.spills.for.combine", 0);
+
+    job.submit();
+    job.waitForCompletion(false);
+    if (job.isSuccessful()) {
+      Counters counters = job.getCounters();
+
+      long combinerInputRecords = counters.findCounter(
+          "org.apache.hadoop.mapreduce.TaskCounter",
+          "COMBINE_INPUT_RECORDS").getValue();
+      long combinerOutputRecords = counters.findCounter(
+          "org.apache.hadoop.mapreduce.TaskCounter",
+          "COMBINE_OUTPUT_RECORDS").getValue();
+      Assert.assertTrue(combinerInputRecords > 0);
+      Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+      BufferedReader br = new BufferedReader(new FileReader(
+          new File(out, "part-r-00000")));
+      Set<String> output = new HashSet<String>();
+      String line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNull(line);
+      br.close();
+
+      Set<String> expected = new HashSet<String>();
+      expected.add("A2");
+      expected.add("B5");
+
+      Assert.assertEquals(expected, output);
+
+    } else {
+      Assert.fail("Job failed");
+    }
+  }
+
+}