Ver código fonte

HADOOP-1535. Fix the comparator used to merge in reduce phase.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@555690 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 18 anos atrás
pai
commit
0ec3db23d4

+ 3 - 0
CHANGES.txt

@@ -328,6 +328,9 @@ Trunk (unreleased changes)
 101. HADOOP-1596.  Fix the parsing of taskids by streaming and improve the
      error reporting. (omalley)
 
+102. HADOOP-1535.  Fix the user-controlled grouping to the reduce function.
+     (Vivek Ratan via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

+ 1 - 1
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -518,7 +518,7 @@ class MapTask extends Task {
       {
         //create a sorter object as we need access to the SegmentDescriptor
         //class and merge methods
-        Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(), valClass, job);
         sorter.setProgressable(reporter);
         
         for (int parts = 0; parts < partitions; parts++){

+ 6 - 7
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -237,7 +237,6 @@ class ReduceTask extends Task {
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
-    Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                                            job.getReducerClass(), job);
 
@@ -276,8 +275,6 @@ class ReduceTask extends Task {
     
     Path tempDir = new Path(getTaskId()); 
 
-    WritableComparator comparator = job.getOutputValueGroupingComparator();
-    
     SequenceFile.Sorter.RawKeyValueIterator rIter;
  
     setPhase(TaskStatus.Phase.SORT); 
@@ -285,8 +282,8 @@ class ReduceTask extends Task {
     final Reporter reporter = getReporter(umbilical);
     
     // sort the input file
-    SequenceFile.Sorter sorter =
-      new SequenceFile.Sorter(lfs, comparator, valueClass, job);
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, 
+        job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
     sorter.setProgressable(reporter);
     rIter = sorter.merge(mapFiles, tempDir, 
         !conf.getKeepFailedTaskFiles()); // sort
@@ -315,8 +312,10 @@ class ReduceTask extends Task {
     try {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
-      ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
-                                                             keyClass, valClass, job, reporter);
+      
+      ReduceValuesIterator values = new ReduceValuesIterator(rIter, 
+          job.getOutputValueGroupingComparator(), keyClass, valClass, 
+          job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);

+ 411 - 0
src/test/org/apache/hadoop/mapred/TestComparators.java

@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BooleanWritable.Comparator;
+import org.apache.hadoop.mapred.lib.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/**
+ * Two different types of comparators can be used in MapReduce. One is used
+ * during the Map and Reduce phases, to sort/merge key-value pairs. Another
+ * is used to group values for a particular key, when calling the user's 
+ * reducer. A user can override both of these two. 
+ * This class has tests for making sure we use the right comparators at the 
+ * right places. See Hadoop issues 485 and 1535. Our tests: 
+ * 1. Test that the same comparator is used for all sort/merge operations 
+ * during the Map and Reduce phases.  
+ * 2. Test the common use case where values are grouped by keys but values 
+ * within each key are grouped by a secondary key (a timestamp, for example). 
+ */
+public class TestComparators extends TestCase 
+{
+  JobConf conf = new JobConf(TestMapOutputType.class);
+  JobClient jc;
+  static Random rng = new Random();
+
+  /** 
+   * RandomGen is a mapper that generates 5 random values for each key
+   * in the input. The values are in the range [0-4]. The mapper also
+   * generates a composite key. If the input key is x and the generated
+   * value is y, the composite key is x0y (x-zero-y). Therefore, the inter-
+   * mediate key value pairs are ordered by {input key, value}.
+   * Think of the random value as a timestamp associated with the record. 
+   */
+  static class RandomGenMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector out, Reporter reporter) throws IOException {
+      int num_values = 5;
+      for(int i = 0; i < num_values; ++i) {
+        int val = rng.nextInt(num_values);
+        int compositeKey = ((IntWritable)(key)).get() * 100 + val;
+        out.collect(new IntWritable(compositeKey), new IntWritable(val));
+      }
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** 
+   * Your basic identity mapper. 
+   */
+  static class IdentityMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector out, Reporter reporter) throws IOException {
+      out.collect(key, value);
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** 
+   * Checks whether keys are in ascending order.  
+   */
+  static class AscendingKeysReducer implements Reducer {
+    public void configure(JobConf job) {}
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MIN_VALUE;
+    public void reduce(WritableComparable key, Iterator values, 
+        OutputCollector out, Reporter reporter) throws IOException {
+      int currentKey = ((IntWritable)(key)).get();
+      // keys should be in ascending order
+      if (currentKey < lastKey) {
+        fail("Keys not in sorted ascending order");
+      }
+      lastKey = currentKey;
+      out.collect(key, new Text("success"));
+    }
+    
+    public void close() {}
+  }
+  
+  /** 
+   * Checks whether keys are in ascending order.  
+   */
+  static class DescendingKeysReducer implements Reducer {
+    public void configure(JobConf job) {}
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MAX_VALUE;
+    public void reduce(WritableComparable key, Iterator values, 
+        OutputCollector out, Reporter reporter) throws IOException {
+      int currentKey = ((IntWritable)(key)).get();
+      // keys should be in descending order
+      if (currentKey > lastKey) {
+        fail("Keys not in sorted descending order");
+      }
+      lastKey = currentKey;
+      out.collect(key, new Text("success"));
+    }
+    
+    public void close() {}
+  }
+  
+  /** The reducer checks whether the input values are in ascending order and
+   * whether they are correctly grouped by key (i.e. each call to reduce
+   * should have 5 values if the grouping is correct). It also checks whether
+   * the keys themselves are in ascending order.
+   */
+  static class AscendingGroupReducer implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MIN_VALUE;
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      // check key order
+      int currentKey = ((IntWritable)(key)).get();
+      if (currentKey < lastKey) {
+        fail("Keys not in sorted ascending order");
+      }
+      lastKey = currentKey;
+      // check order of values
+      IntWritable previous = new IntWritable(Integer.MIN_VALUE);
+      int valueCount = 0;
+      while (values.hasNext()) {
+        IntWritable current = (IntWritable) values.next();
+        
+        // Check that the values are sorted
+        if (current.compareTo(previous) < 0)
+          fail("Values generated by Mapper not in order");
+        previous = current;
+        ++valueCount;
+      }
+      if (valueCount != 5) {
+        fail("Values not grouped by primary key");
+      }
+      out.collect(key, new Text("success"));
+    }
+
+    public void close() {
+    }
+  }
+  
+  /** The reducer checks whether the input values are in descending order and
+   * whether they are correctly grouped by key (i.e. each call to reduce
+   * should have 5 values if the grouping is correct). 
+   */
+  static class DescendingGroupReducer implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MAX_VALUE;
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      // check key order
+      int currentKey = ((IntWritable)(key)).get();
+      if (currentKey > lastKey) {
+        fail("Keys not in sorted descending order");
+      }
+      lastKey = currentKey;
+      // check order of values
+      IntWritable previous = new IntWritable(Integer.MAX_VALUE);
+      int valueCount = 0;
+      while (values.hasNext()) {
+        IntWritable current = (IntWritable) values.next();
+        
+        // Check that the values are sorted
+        if (current.compareTo(previous) > 0)
+          fail("Values generated by Mapper not in order");
+        previous = current;
+        ++valueCount;
+      }
+      if (valueCount != 5) {
+        fail("Values not grouped by primary key");
+      }
+      out.collect(key, new Text("success"));
+    }
+
+    public void close() {
+    }
+  }
+  
+  /** 
+   * A decreasing Comparator for IntWritable 
+   */ 
+  public static class DecreasingIntComparator extends IntWritable.Comparator {
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return -super.compare(b1, s1, l1, b2, s2, l2);
+    }
+    static {                    // register this comparator
+      WritableComparator.define(DecreasingIntComparator.class, new Comparator());
+    }
+  }
+
+  /** Grouping function for values based on the composite key. This
+   * comparator strips off the secondary key part from the x0y composite
+   * and only compares the primary key value (x).
+   */
+  public static class CompositeIntGroupFn extends WritableComparator {
+    public CompositeIntGroupFn() {
+      super(IntWritable.class);
+    }
+    public int compare (WritableComparable v1, WritableComparable v2) {
+      int val1 = ((IntWritable)(v1)).get() / 100;
+      int val2 = ((IntWritable)(v2)).get() / 100;
+      if (val1 < val2)
+        return 1;
+      else if (val1 > val2)
+        return -1;
+      else
+        return 0;
+    }
+    
+    public boolean equals (IntWritable v1, IntWritable v2) {
+      int val1 = v1.get();
+      int val2 = v2.get();
+      
+      return (val1/100) == (val2/100);
+    }
+    
+    static {
+      WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
+    }
+  }
+
+  /** Reverse grouping function for values based on the composite key. 
+   */
+  public static class CompositeIntReverseGroupFn extends CompositeIntGroupFn {
+    public int compare (WritableComparable v1, WritableComparable v2) {
+      return -super.compare(v1, v2);
+    }
+    
+    public boolean equals (IntWritable v1, IntWritable v2) {
+      return !(super.equals(v1, v2));
+    }
+    
+    static {
+      WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator());
+    }
+  }
+
+
+  public void configure() throws Exception {
+    Path testdir = new Path("build/test/test.mapred.spill");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testdir);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    // set up two map jobs, so we can test merge phase in Reduce also
+    conf.setNumMapTasks(2);
+    
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    // set up input data in 2 files 
+    Path inFile = new Path(inDir, "part0");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
+        IntWritable.class, IntWritable.class);
+    writer.append(new IntWritable(11), new IntWritable(999));
+    writer.append(new IntWritable(23), new IntWritable(456));
+    writer.append(new IntWritable(10), new IntWritable(780));
+    writer.close();
+    inFile = new Path(inDir, "part1");
+    writer = SequenceFile.createWriter(fs, conf, inFile, 
+        IntWritable.class, IntWritable.class);
+    writer.append(new IntWritable(45), new IntWritable(100));
+    writer.append(new IntWritable(18), new IntWritable(200));
+    writer.append(new IntWritable(27), new IntWritable(300));
+    writer.close();
+    
+    jc = new JobClient(conf);
+  }
+  
+  /**
+   * Test the default comparator for Map/Reduce. 
+   * Use the identity mapper and see if the keys are sorted at the end
+   * @throws Exception
+   */
+  public void testDefaultMRComparator() throws Exception { 
+    configure();
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(AscendingKeysReducer.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+  /**
+   * Test user-defined comparator for Map/Reduce.
+   * We provide our own comparator that is the reverse of the default int 
+   * comparator. Keys should be sorted in reverse order in the reducer. 
+   * @throws Exception
+   */
+  public void testUserMRComparator() throws Exception { 
+    configure();
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(DescendingKeysReducer.class);
+    conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+  /**
+   * Test user-defined grouping comparator for grouping values in Reduce.
+   * We generate composite keys that contain a random number, which acts
+   * as a timestamp associated with the record. In our Reduce function, 
+   * values for a key should be sorted by the 'timestamp'. 
+   * @throws Exception
+   */
+  public void testUserValueGroupingComparator() throws Exception { 
+    configure();
+    conf.setMapperClass(RandomGenMapper.class);
+    conf.setReducerClass(AscendingGroupReducer.class);
+    conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+  /**
+   * Test all user comparators. Super-test of all tests here. 
+   * We generate composite keys that contain a random number, which acts
+   * as a timestamp associated with the record. In our Reduce function, 
+   * values for a key should be sorted by the 'timestamp'.
+   * We also provide our own comparators that reverse the default sorting 
+   * order. This lets us make sure that the right comparators are used. 
+   * @throws Exception
+   */
+  public void testAllUserComparators() throws Exception { 
+    configure();
+    conf.setMapperClass(RandomGenMapper.class);
+    // use a decreasing comparator so keys are sorted in reverse order
+    conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
+    conf.setReducerClass(DescendingGroupReducer.class);
+    conf.setOutputValueGroupingComparator(CompositeIntReverseGroupFn.class);
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+}

+ 0 - 173
src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java

@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.BooleanWritable.Comparator;
-import org.apache.hadoop.mapred.lib.*;
-import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
-
-/** 
- */
-public class TestUserValueGrouping extends TestCase 
-{
-  JobConf conf = new JobConf(TestMapOutputType.class);
-  JobClient jc;
-  static Random rng = new Random();
-  /** 
-   * RandomGen is a mapper that generates 5 random values for each key
-   * in the input. The values are in the range [0-4]. The mapper also
-   * generates a composite key. If the input key is x and the generated
-   * value is y, the composite key is x0y (x-zero-y). Therefore, the inter-
-   * mediate key value pairs are ordered by {input key, value}. 
-   */
-   
-  static class RandomGen implements Mapper {
-    public void configure(JobConf job) {
-    }
-    
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector out, Reporter reporter) throws IOException {
-      int num_values = 5;
-      for(int i = 0; i < num_values; ++i) {
-        int val = rng.nextInt(num_values);
-        int compositeKey = ((IntWritable)(key)).get() * 100 + val;
-        out.collect(new IntWritable(compositeKey), new IntWritable(val));
-      }
-    }
-    
-    public void close() {
-    }
-  }
-  
-  /** The reducer checks whether the input values are in sorted order and
-   * whether they are correctly grouped by key (i.e. each call to reduce
-   * should have 5 values if the grouping is correct). 
-   */
-  static class AscendingReduce implements Reducer {
-    
-    public void configure(JobConf job) {
-    }
-
-    public void reduce(WritableComparable key,
-                       Iterator values,
-                       OutputCollector out,
-                       Reporter reporter) throws IOException {
-      IntWritable previous = new IntWritable(-1);
-      int valueCount = 0;
-      while (values.hasNext()) {
-        IntWritable current = (IntWritable) values.next();
-        
-        // Check that the values are sorted
-        if (current.compareTo(previous) < 0)
-          fail("Values generated by Mapper not in order");
-        previous = current;
-        ++valueCount;
-      }
-      if (valueCount != 5) {
-        fail("Values not grouped by primary key");
-      }
-      out.collect(key, new Text("success"));
-    }
-
-    public void close() {
-    }
-  }
-  
-  /** Grouping function for values based on the composite key. This
-   * comparator strips off the secondary key part from the x0y composite
-   * and only compares the primary key value (x).
-   */
-  public static class CompositeIntGroupFn extends WritableComparator {
-    public CompositeIntGroupFn() {
-      super(IntWritable.class);
-    }
-    public int compare (WritableComparable v1, WritableComparable v2) {
-      int val1 = ((IntWritable)(v1)).get() / 100;
-      int val2 = ((IntWritable)(v2)).get() / 100;
-      if (val1 > val2)
-        return 1;
-      else if (val2 > val1)
-        return -1;
-      else
-        return 0;
-    }
-    
-    public boolean equals (IntWritable v1, IntWritable v2) {
-      int val1 = v1.get();
-      int val2 = v2.get();
-      
-      return (val1/100) == (val2/100);
-    }
-    
-    static {
-      WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
-    }
-  }
-
-
-  public void configure() throws Exception {
-    Path testdir = new Path("build/test/test.mapred.spill");
-    Path inDir = new Path(testdir, "in");
-    Path outDir = new Path(testdir, "out");
-    FileSystem fs = FileSystem.get(conf);
-    fs.delete(testdir);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setInputPath(inDir);
-    conf.setOutputPath(outDir);
-    conf.setMapperClass(RandomGen.class);
-    conf.setReducerClass(AscendingReduce.class);
-    conf.setOutputKeyClass(IntWritable.class);
-    conf.setOutputValueClass(Text.class);
-    conf.setMapOutputValueClass(IntWritable.class);
-    conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class);
-    
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    if (!fs.mkdirs(testdir)) {
-      throw new IOException("Mkdirs failed to create " + testdir.toString());
-    }
-    if (!fs.mkdirs(inDir)) {
-      throw new IOException("Mkdirs failed to create " + inDir.toString());
-    }
-    Path inFile = new Path(inDir, "part0");
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
-        IntWritable.class, IntWritable.class);
-    writer.append(new IntWritable(11), new IntWritable(999));
-    writer.append(new IntWritable(23), new IntWritable(456));
-    writer.append(new IntWritable(10), new IntWritable(780));
-    writer.close();
-    
-    jc = new JobClient(conf);
-  }
-  
-  public void testUserValueGrouping() throws Exception { 
-    configure();
-    
-    RunningJob r_job = jc.submitJob(conf);
-    while (!r_job.isComplete()) {
-      Thread.sleep(1000);
-    }
-    
-    if (!r_job.isSuccessful()) {
-      fail("Oops! The job broke due to an unexpected error");
-    }
-  }
-}