Browse Source

HADOOP-3665. Modify WritableComparator so that it only creates instances of the
keytype if the type does not define a WritableComparator. Calling the
superclass compare will throw a NullPointerException. Also define a
RawComparator for NullWritable and permit it to be written as a key to
SequenceFiles.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@673869 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 years ago
parent
commit
160fce92d6

+ 6 - 0
CHANGES.txt

@@ -160,6 +160,12 @@ Release 0.18.0 - Unreleased
     if they are not necessary e.g. for Maps with no side-effect files.
     (acmurthy)
 
+    HADOOP-3665. Modify WritableComparator so that it only creates instances
+    of the keytype if the type does not define a WritableComparator. Calling
+    the superclass compare will throw a NullPointerException. Also define
+    a RawComparator for NullWritable and permit it to be written as a key
+    to SequenceFiles. (cdouglas)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

+ 21 - 0
src/core/org/apache/hadoop/io/NullWritable.java

@@ -45,5 +45,26 @@ public class NullWritable implements WritableComparable {
   public boolean equals(Object other) { return other instanceof NullWritable; }
   public void readFields(DataInput in) throws IOException {}
   public void write(DataOutput out) throws IOException {}
+
+  /** A Comparator "optimized" for NullWritable. */
+  public static class Comparator extends WritableComparator {
+    public Comparator() {
+      super(NullWritable.class);
+    }
+
+    /**
+     * Compare the buffers in serialized form.
+     */
+    public int compare(byte[] b1, int s1, int l1,
+                       byte[] b2, int s2, int l2) {
+      assert 0 == l1;
+      assert 0 == l2;
+      return 0;
+    }
+  }
+
+  static {                                        // register this comparator
+    WritableComparator.define(NullWritable.class, new Comparator());
+  }
 }
 

+ 13 - 13
src/core/org/apache/hadoop/io/SequenceFile.java

@@ -992,8 +992,8 @@ public class SequenceFile {
       // Append the 'key'
       keySerializer.serialize(key);
       int keyLength = buffer.getLength();
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed: " + key);
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed: " + key);
 
       // Append the 'value'
       if (compress) {
@@ -1014,8 +1014,8 @@ public class SequenceFile {
 
     public synchronized void appendRaw(byte[] keyData, int keyOffset,
         int keyLength, ValueBytes val) throws IOException {
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed: " + keyLength);
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed: " + keyLength);
 
       int valLength = val.getSize();
 
@@ -1119,8 +1119,8 @@ public class SequenceFile {
       // Append the 'key'
       keySerializer.serialize(key);
       int keyLength = buffer.getLength();
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed: " + key);
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed: " + key);
 
       // Compress 'value' and append it
       deflateFilter.resetState();
@@ -1139,8 +1139,8 @@ public class SequenceFile {
     public synchronized void appendRaw(byte[] keyData, int keyOffset,
         int keyLength, ValueBytes val) throws IOException {
 
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed");
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed: " + keyLength);
 
       int valLength = val.getSize();
       
@@ -1302,8 +1302,8 @@ public class SequenceFile {
       int oldKeyLength = keyBuffer.getLength();
       keySerializer.serialize(key);
       int keyLength = keyBuffer.getLength() - oldKeyLength;
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed: " + key);
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed: " + key);
       WritableUtils.writeVInt(keyLenBuffer, keyLength);
 
       int oldValLength = valBuffer.getLength();
@@ -1325,8 +1325,8 @@ public class SequenceFile {
     public synchronized void appendRaw(byte[] keyData, int keyOffset,
         int keyLength, ValueBytes val) throws IOException {
       
-      if (keyLength == 0)
-        throw new IOException("zero length keys not allowed");
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed");
 
       int valLength = val.getSize();
       
@@ -2229,7 +2229,7 @@ public class SequenceFile {
 
     /** Sort and merge files containing the named classes. */
     public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {
-      this(fs, new WritableComparator(keyClass), keyClass, valClass, conf);
+      this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
     }
 
     /** Sort and merge using an arbitrary {@link RawComparator}. */

+ 21 - 15
src/core/org/apache/hadoop/io/WritableComparator.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.io;
 import java.io.*;
 import java.util.*;
 
+import org.apache.hadoop.util.ReflectionUtils;
+
 /** A Comparator for {@link WritableComparable}s.
  *
  * <p>This base implemenation uses the natural ordering.  To define alternate
@@ -39,7 +41,7 @@ public class WritableComparator implements RawComparator {
   public static synchronized WritableComparator get(Class c) {
     WritableComparator comparator = comparators.get(c);
     if (comparator == null)
-      comparator = new WritableComparator(c);
+      comparator = new WritableComparator(c, true);
     return comparator;
   }
 
@@ -51,17 +53,26 @@ public class WritableComparator implements RawComparator {
   }
 
 
-  private DataInputBuffer buffer = new DataInputBuffer();
-
-  private Class keyClass;
-  private WritableComparable key1;
-  private WritableComparable key2;
+  private final Class keyClass;
+  private final WritableComparable key1;
+  private final WritableComparable key2;
+  private final DataInputBuffer buffer;
 
   /** Construct for a {@link WritableComparable} implementation. */
   protected WritableComparator(Class keyClass) {
+    this(keyClass, false);
+  }
+
+  private WritableComparator(Class keyClass, boolean createInstances) {
     this.keyClass = keyClass;
-    this.key1 = newKey();
-    this.key2 = newKey();
+    if (createInstances) {
+      key1 = newKey();
+      key2 = newKey();
+      buffer = new DataInputBuffer();
+    } else {
+      key1 = key2 = null;
+      buffer = null;
+    }
   }
 
   /** Returns the WritableComparable implementation class. */
@@ -69,13 +80,8 @@ public class WritableComparator implements RawComparator {
 
   /** Construct a new {@link WritableComparable} instance. */
   public WritableComparable newKey() {
-    try {
-      return (WritableComparable)keyClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new RuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException(e);
-    }
+    return (WritableComparable)
+      ReflectionUtils.newInstance(keyClass, null);
   }
 
   /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.

+ 53 - 1
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
@@ -346,7 +347,58 @@ public class TestMapRed extends TestCase {
     }
     assertFalse("should fail for partition >= numPartitions", pass);
   }
-    
+
+  public static class NullMapper
+      implements Mapper<NullWritable,Text,NullWritable,Text> {
+    public void map(NullWritable key, Text val,
+        OutputCollector<NullWritable,Text> output, Reporter reporter)
+        throws IOException {
+      output.collect(NullWritable.get(), val);
+    }
+    public void configure(JobConf conf) { }
+    public void close() { }
+  }
+
+  public void testNullKeys() throws Exception {
+    JobConf conf = new JobConf(TestMapRed.class);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path testdir = new Path(
+        System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+    fs.delete(testdir, true);
+    Path inFile = new Path(testdir, "nullin/blah");
+    SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
+        NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
+    Text t = new Text();
+    t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
+    t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
+    t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
+    t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
+    t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
+    t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
+    t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
+    t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
+    w.close();
+    FileInputFormat.setInputPaths(conf, inFile);
+    FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
+    conf.setMapperClass(NullMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.setNumReduceTasks(1);
+
+    JobClient.runJob(conf);
+
+    SequenceFile.Reader r = new SequenceFile.Reader(fs,
+        new Path(testdir, "nullout/part-00000"), conf);
+    String m = "AAAAAAAAAAAAAA";
+    for (int i = 1; r.next(NullWritable.get(), t); ++i) {
+      assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
+      m = m.replace((char)('A' + i - 1), (char)('A' + i));
+    }
+  }
+
   private void checkCompression(boolean compressMapOutputs,
                                 CompressionType redCompression,
                                 boolean includeCombine