Explorar el Código

HADOOP-485. Allow a different comparator for grouping keys in calls to reduce. Contributed by Tahir.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@534971 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting hace 18 años
padre
commit
3c757ccded

+ 3 - 0
CHANGES.txt

@@ -319,6 +319,9 @@ Trunk (unreleased changes)
 94. HADOOP-1315.  Clean up contrib/streaming, switching it to use core
     classes more and removing unused code.  (Runping Qi via cutting)
 
+95. HADOOP-485.  Allow a different comparator for grouping keys in
+    calls to reduce.  (Tahir Hashmi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 34 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -443,6 +443,40 @@ public class JobConf extends Configuration {
              theClass, WritableComparator.class);
   }
 
+  /** Get the user defined comparator for grouping values.
+   * 
+   * This call is used to get the comparator for grouping values by key.
+   * @see #setOutputValueGroupingComparator(Class) for details.
+   *  
+   * @return Comparator set by the user for grouping values.
+   */
+  public WritableComparator getOutputValueGroupingComparator() {
+    Class theClass = getClass("mapred.output.value.groupfn.class", null,
+                              WritableComparator.class);
+    if (theClass == null) {
+      return getOutputKeyComparator();
+    }
+    
+    return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
+  }
+
+  /** Set the user defined comparator for grouping values.
+   * 
+   * For key-value pairs (K1,V1) and (K2,V2), the values are passed
+   * in a single call to the map function if K1 and K2 compare as equal.
+   * 
+   * This comparator should be provided if the equivalence rules for keys
+   * for sorting the intermediates are different from those for grouping 
+   * values.
+   * 
+   * @param theClass The Comparator class to be used for grouping. It should
+   * extend WritableComparator.
+   */
+  public void setOutputValueGroupingComparator(Class theClass) {
+    setClass("mapred.output.value.groupfn.class",
+             theClass, WritableComparator.class);
+  }
+
   public Class<? extends Writable> getOutputValueClass() {
     return getClass("mapred.output.value.class", Text.class, Writable.class);
   }

+ 2 - 2
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -295,8 +295,8 @@ class ReduceTask extends Task {
 
     Path tempDir = job.getLocalPath(getTaskId()); 
 
-    WritableComparator comparator = job.getOutputKeyComparator();
-   
+    WritableComparator comparator = job.getOutputValueGroupingComparator();
+    
     SequenceFile.Sorter.RawKeyValueIterator rIter;
  
     try {

+ 174 - 0
src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BooleanWritable.Comparator;
+import org.apache.hadoop.mapred.lib.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/** 
+ *
+ */
+public class TestUserValueGrouping extends TestCase 
+{
+  JobConf conf = new JobConf(TestMapOutputType.class);
+  JobClient jc;
+  static Random rng = new Random();
+  /** 
+   * RandomGen is a mapper that generates 5 random values for each key
+   * in the input. The values are in the range [0-4]. The mapper also
+   * generates a composite key. If the input key is x and the generated
+   * value is y, the composite key is x0y (x-zero-y). Therefore, the inter-
+   * mediate key value pairs are ordered by {input key, value}. 
+   */
+   
+  static class RandomGen implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector out, Reporter reporter) throws IOException {
+      int num_values = 5;
+      for(int i = 0; i < num_values; ++i) {
+        int val = rng.nextInt(num_values);
+        int compositeKey = ((IntWritable)(key)).get() * 100 + val;
+        out.collect(new IntWritable(compositeKey), new IntWritable(val));
+      }
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** The reducer checks whether the input values are in sorted order and
+   * whether they are correctly grouped by key (i.e. each call to reduce
+   * should have 5 values if the grouping is correct). 
+   */
+  static class AscendingReduce implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      IntWritable previous = new IntWritable(-1);
+      int valueCount = 0;
+      while (values.hasNext()) {
+        IntWritable current = (IntWritable) values.next();
+        
+        // Check that the values are sorted
+        if (current.compareTo(previous) < 0)
+          fail("Values generated by Mapper not in order");
+        previous = current;
+        ++valueCount;
+      }
+      if (valueCount != 5) {
+        fail("Values not grouped by primary key");
+      }
+      out.collect(key, new Text("success"));
+    }
+
+    public void close() {
+    }
+  }
+  
+  /** Grouping function for values based on the composite key. This
+   * comparator strips off the secondary key part from the x0y composite
+   * and only compares the primary key value (x).
+   */
+  public static class CompositeIntGroupFn extends WritableComparator {
+    public CompositeIntGroupFn() {
+      super(IntWritable.class);
+    }
+    public int compare (WritableComparable v1, WritableComparable v2) {
+      int val1 = ((IntWritable)(v1)).get() / 100;
+      int val2 = ((IntWritable)(v2)).get() / 100;
+      if (val1 > val2)
+        return 1;
+      else if (val2 > val1)
+        return -1;
+      else
+        return 0;
+    }
+    
+    public boolean equals (IntWritable v1, IntWritable v2) {
+      int val1 = v1.get();
+      int val2 = v2.get();
+      
+      return (val1/100) == (val2/100);
+    }
+    
+    static {
+      WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
+    }
+  }
+
+
+  public void configure() throws Exception {
+    Path testdir = new Path("build/test/test.mapred.spill");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testdir);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setMapperClass(RandomGen.class);
+    conf.setReducerClass(AscendingReduce.class);
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class);
+    
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    Path inFile = new Path(inDir, "part0");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
+        IntWritable.class, IntWritable.class);
+    writer.append(new IntWritable(11), new IntWritable(999));
+    writer.append(new IntWritable(23), new IntWritable(456));
+    writer.append(new IntWritable(10), new IntWritable(780));
+    writer.close();
+    
+    jc = new JobClient(conf);
+  }
+  
+  public void testUserValueGrouping() throws Exception { 
+    configure();
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+}