Browse Source

HADOOP-1986. Add support for a general serialization mechanism for Map Reduce.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@632073 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 17 years ago
parent
commit
341d588503
61 changed files with 1059 additions and 283 deletions
  1. 3 0
      CHANGES.txt
  2. 7 0
      conf/hadoop-default.xml
  3. 1 2
      src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java
  4. 3 5
      src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
  5. 6 8
      src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
  6. 2 2
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
  7. 1 5
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
  8. 1 2
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
  9. 1 1
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java
  10. 89 0
      src/java/org/apache/hadoop/io/InputBuffer.java
  11. 92 0
      src/java/org/apache/hadoop/io/OutputBuffer.java
  12. 37 0
      src/java/org/apache/hadoop/io/RawComparator.java
  13. 36 9
      src/java/org/apache/hadoop/io/SequenceFile.java
  14. 1 1
      src/java/org/apache/hadoop/io/WritableComparator.java
  15. 59 0
      src/java/org/apache/hadoop/io/serializer/Deserializer.java
  16. 70 0
      src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
  17. 100 0
      src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
  18. 46 0
      src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
  19. 44 0
      src/java/org/apache/hadoop/io/serializer/Serialization.java
  20. 88 0
      src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
  21. 52 0
      src/java/org/apache/hadoop/io/serializer/Serializer.java
  22. 105 0
      src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
  23. 37 0
      src/java/org/apache/hadoop/io/serializer/package.html
  24. 8 7
      src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
  25. 2 2
      src/java/org/apache/hadoop/mapred/BufferSorter.java
  26. 1 5
      src/java/org/apache/hadoop/mapred/FileInputFormat.java
  27. 1 4
      src/java/org/apache/hadoop/mapred/InputFormat.java
  28. 35 36
      src/java/org/apache/hadoop/mapred/JobConf.java
  29. 1 1
      src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
  30. 1 5
      src/java/org/apache/hadoop/mapred/MapRunnable.java
  31. 1 4
      src/java/org/apache/hadoop/mapred/MapRunner.java
  32. 51 32
      src/java/org/apache/hadoop/mapred/MapTask.java
  33. 3 7
      src/java/org/apache/hadoop/mapred/Mapper.java
  34. 1 4
      src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
  35. 1 6
      src/java/org/apache/hadoop/mapred/OutputCollector.java
  36. 1 4
      src/java/org/apache/hadoop/mapred/OutputFormat.java
  37. 1 5
      src/java/org/apache/hadoop/mapred/OutputFormatBase.java
  38. 1 6
      src/java/org/apache/hadoop/mapred/Partitioner.java
  39. 1 7
      src/java/org/apache/hadoop/mapred/RecordReader.java
  40. 1 6
      src/java/org/apache/hadoop/mapred/RecordWriter.java
  41. 28 28
      src/java/org/apache/hadoop/mapred/ReduceTask.java
  42. 4 8
      src/java/org/apache/hadoop/mapred/Reducer.java
  43. 1 1
      src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
  44. 2 7
      src/java/org/apache/hadoop/mapred/TextOutputFormat.java
  45. 1 4
      src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
  46. 1 6
      src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java
  47. 1 4
      src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
  48. 1 4
      src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
  49. 1 4
      src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
  50. 1 5
      src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
  51. 1 3
      src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
  52. 1 4
      src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
  53. 1 5
      src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
  54. 1 3
      src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
  55. 1 3
      src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
  56. 6 6
      src/java/org/apache/hadoop/mapred/package.html
  57. 1 1
      src/java/org/apache/hadoop/util/CopyFiles.java
  58. 8 5
      src/test/org/apache/hadoop/fs/TestFileSystem.java
  59. 2 2
      src/test/org/apache/hadoop/io/FileBench.java
  60. 2 1
      src/test/org/apache/hadoop/io/TestSequenceFile.java
  61. 2 3
      src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

+ 3 - 0
CHANGES.txt

@@ -23,6 +23,9 @@ Trunk (unreleased changes)
     HADOOP-1985.  This addresses rack-awareness for Map tasks and for 
     HADOOP-1985.  This addresses rack-awareness for Map tasks and for 
     HDFS in a uniform way. (ddas)
     HDFS in a uniform way. (ddas)
 
 
+    HADOOP-1986.  Add support for a general serialization mechanism for
+    Map Reduce. (tomwhite)
+
   NEW FEATURES
   NEW FEATURES
 
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

+ 7 - 0
conf/hadoop-default.xml

@@ -118,6 +118,13 @@ creations/deletions), or "all".</description>
                for compression/decompression.</description>
                for compression/decompression.</description>
 </property>
 </property>
 
 
+<property>
+  <name>io.serializations</name>
+  <value>org.apache.hadoop.io.serializer.WritableSerialization</value>
+  <description>A list of serialization classes that can be used for
+  obtaining serializers and deserializers.</description>
+</property>
+
 <!-- file system properties -->
 <!-- file system properties -->
 
 
 <property>
 <property>

+ 1 - 2
src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.contrib.utils.join;
 package org.apache.hadoop.contrib.utils.join;
 
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 
 import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
 import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
 import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
 import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
@@ -47,7 +46,7 @@ public class SampleDataJoinMapper extends DataJoinMapperBase {
     return new Text(groupKey);
     return new Text(groupKey);
   }
   }
 
 
-  protected TaggedMapOutput generateTaggedMapOutput(Writable value) {
+  protected TaggedMapOutput generateTaggedMapOutput(Object value) {
     TaggedMapOutput retv = new SampleTaggedMapOutput((Text) value);
     TaggedMapOutput retv = new SampleTaggedMapOutput((Text) value);
     retv.setTag(new Text(this.inputTag));
     retv.setTag(new Text(this.inputTag));
     return retv;
     return retv;

+ 3 - 5
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java

@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Iterator;
 
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
@@ -77,7 +75,7 @@ public abstract class DataJoinMapperBase extends JobBase {
    * @param value
    * @param value
    * @return an object of TaggedMapOutput computed from the given value.
    * @return an object of TaggedMapOutput computed from the given value.
    */
    */
-  protected abstract TaggedMapOutput generateTaggedMapOutput(Writable value);
+  protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);
 
 
   /**
   /**
    * Generate a map output key. The user code can compute the key
    * Generate a map output key. The user code can compute the key
@@ -89,7 +87,7 @@ public abstract class DataJoinMapperBase extends JobBase {
    */
    */
   protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
   protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
 
 
-  public void map(WritableComparable key, Writable value,
+  public void map(Object key, Object value,
                   OutputCollector output, Reporter reporter) throws IOException {
                   OutputCollector output, Reporter reporter) throws IOException {
     if (this.reporter == null) {
     if (this.reporter == null) {
       this.reporter = reporter;
       this.reporter = reporter;
@@ -115,7 +113,7 @@ public abstract class DataJoinMapperBase extends JobBase {
     }
     }
   }
   }
 
 
-  public void reduce(WritableComparable arg0, Iterator arg1,
+  public void reduce(Object arg0, Iterator arg1,
                      OutputCollector arg2, Reporter arg3) throws IOException {
                      OutputCollector arg2, Reporter arg3) throws IOException {
     // TODO Auto-generated method stub
     // TODO Auto-generated method stub
 
 

+ 6 - 8
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java

@@ -24,8 +24,6 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeMap;
 
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
@@ -90,7 +88,7 @@ public abstract class DataJoinReducerBase extends JobBase {
    * @param arg1
    * @param arg1
    * @return
    * @return
    */
    */
-  private SortedMap<Object, ResetableIterator> regroup(Writable key,
+  private SortedMap<Object, ResetableIterator> regroup(Object key,
                                                        Iterator arg1, Reporter reporter) throws IOException {
                                                        Iterator arg1, Reporter reporter) throws IOException {
     this.numOfValues = 0;
     this.numOfValues = 0;
     SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>();
     SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>();
@@ -121,7 +119,7 @@ public abstract class DataJoinReducerBase extends JobBase {
     return retv;
     return retv;
   }
   }
 
 
-  public void reduce(WritableComparable key, Iterator values,
+  public void reduce(Object key, Iterator values,
                      OutputCollector output, Reporter reporter) throws IOException {
                      OutputCollector output, Reporter reporter) throws IOException {
     if (this.reporter == null) {
     if (this.reporter == null) {
       this.reporter = reporter;
       this.reporter = reporter;
@@ -150,7 +148,7 @@ public abstract class DataJoinReducerBase extends JobBase {
    * @param reporter
    * @param reporter
    * @throws IOException
    * @throws IOException
    */
    */
-  protected void collect(WritableComparable key, TaggedMapOutput aRecord,
+  protected void collect(Object key, TaggedMapOutput aRecord,
                          OutputCollector output, Reporter reporter) throws IOException {
                          OutputCollector output, Reporter reporter) throws IOException {
     this.collected += 1;
     this.collected += 1;
     addLongValue("collectedCount", 1);
     addLongValue("collectedCount", 1);
@@ -173,7 +171,7 @@ public abstract class DataJoinReducerBase extends JobBase {
    * @throws IOException
    * @throws IOException
    */
    */
   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
-                              WritableComparable key, OutputCollector output, Reporter reporter)
+                              Object key, OutputCollector output, Reporter reporter)
     throws IOException {
     throws IOException {
     if (values.length < 1) {
     if (values.length < 1) {
       return;
       return;
@@ -198,7 +196,7 @@ public abstract class DataJoinReducerBase extends JobBase {
    * @throws IOException
    * @throws IOException
    */
    */
   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
-                              int pos, Object[] partialList, WritableComparable key,
+                              int pos, Object[] partialList, Object key,
                               OutputCollector output, Reporter reporter) throws IOException {
                               OutputCollector output, Reporter reporter) throws IOException {
 
 
     if (values.length == pos) {
     if (values.length == pos) {
@@ -230,7 +228,7 @@ public abstract class DataJoinReducerBase extends JobBase {
    */
    */
   protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
   protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
 
 
-  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+  public void map(Object arg0, Object arg1, OutputCollector arg2,
                   Reporter arg3) throws IOException {
                   Reporter arg3) throws IOException {
     // TODO Auto-generated method stub
     // TODO Auto-generated method stub
 
 

+ 2 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -497,11 +497,11 @@ public abstract class PipeMapRed {
   }
   }
 
 
   /**
   /**
-   * Write a writable value to the output stream using UTF-8 encoding
+   * Write a value to the output stream using UTF-8 encoding
    * @param value output value
    * @param value output value
    * @throws IOException
    * @throws IOException
    */
    */
-  void write(Writable value) throws IOException {
+  void write(Object value) throws IOException {
     byte[] bval;
     byte[] bval;
     int valSize;
     int valSize;
     if (value instanceof BytesWritable) {
     if (value instanceof BytesWritable) {

+ 1 - 5
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -28,10 +28,6 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
 /** A generic Mapper bridge.
 /** A generic Mapper bridge.
  *  It delegates operations to an external program via stdin and stdout.
  *  It delegates operations to an external program via stdin and stdout.
  */
  */
@@ -66,7 +62,7 @@ public class PipeMapper extends PipeMapRed implements Mapper {
   // Do NOT declare default constructor
   // Do NOT declare default constructor
   // (MapRed creates it reflectively)
   // (MapRed creates it reflectively)
 
 
-  public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+  public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
     // init
     // init
     if (outThread_ == null) {
     if (outThread_ == null) {
       startOutputThreads(output, reporter);
       startOutputThreads(output, reporter);

+ 1 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 
 
 /** A generic Reducer bridge.
 /** A generic Reducer bridge.
@@ -56,7 +55,7 @@ public class PipeReducer extends PipeMapRed implements Reducer {
     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
   }
   }
 
 
-  public void reduce(WritableComparable key, Iterator values, OutputCollector output,
+  public void reduce(Object key, Iterator values, OutputCollector output,
                      Reporter reporter) throws IOException {
                      Reporter reporter) throws IOException {
 
 
     // init
     // init

+ 1 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java

@@ -41,7 +41,7 @@ public class ValueCountReduce implements Reducer {
 
 
   }
   }
 
 
-  public void reduce(WritableComparable arg0, Iterator arg1, OutputCollector arg2, Reporter arg3) throws IOException {
+  public void reduce(Object arg0, Iterator arg1, OutputCollector arg2, Reporter arg3) throws IOException {
     int count = 0;
     int count = 0;
     while (arg1.hasNext()) {
     while (arg1.hasNext()) {
       count += 1;
       count += 1;

+ 89 - 0
src/java/org/apache/hadoop/io/InputBuffer.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.*;
+
+
+/** A reusable {@link InputStream} implementation that reads from an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new InputStream and
+ * ByteArrayInputStream each time data is read.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * InputBuffer buffer = new InputBuffer();
+ * while (... loop condition ...) {
+ *   byte[] data = ... get data ...;
+ *   int dataLength = ... get data length ...;
+ *   buffer.reset(data, dataLength);
+ *   ... read buffer using InputStream methods ...
+ * }
+ * </pre>
+ * @see DataInputBuffer
+ * @see DataOutput
+ */
+public class InputBuffer extends FilterInputStream {
+
+  private static class Buffer extends ByteArrayInputStream {
+    public Buffer() {
+      super(new byte[] {});
+    }
+
+    public void reset(byte[] input, int start, int length) {
+      this.buf = input;
+      this.count = start+length;
+      this.mark = start;
+      this.pos = start;
+    }
+
+    public int getPosition() { return pos; }
+    public int getLength() { return count; }
+  }
+
+  private Buffer buffer;
+  
+  /** Constructs a new empty buffer. */
+  public InputBuffer() {
+    this(new Buffer());
+  }
+
+  private InputBuffer(Buffer buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int length) {
+    buffer.reset(input, 0, length);
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int start, int length) {
+    buffer.reset(input, start, length);
+  }
+
+  /** Returns the current position in the input. */
+  public int getPosition() { return buffer.getPosition(); }
+
+  /** Returns the length of the input. */
+  public int getLength() { return buffer.getLength(); }
+
+}

+ 92 - 0
src/java/org/apache/hadoop/io/OutputBuffer.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.*;
+
+/** A reusable {@link OutputStream} implementation that writes to an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new OutputStream and
+ * ByteArrayOutputStream each time data is written.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * OutputBuffer buffer = new OutputBuffer();
+ * while (... loop condition ...) {
+ *   buffer.reset();
+ *   ... write buffer using OutputStream methods ...
+ *   byte[] data = buffer.getData();
+ *   int dataLength = buffer.getLength();
+ *   ... write data to its ultimate destination ...
+ * }
+ * </pre>
+ * @see DataOutputBuffer
+ * @see InputBuffer
+ */
+public class OutputBuffer extends FilterOutputStream {
+
+  private static class Buffer extends ByteArrayOutputStream {
+    public byte[] getData() { return buf; }
+    public int getLength() { return count; }
+    public void reset() { count = 0; }
+
+    public void write(InputStream in, int len) throws IOException {
+      int newcount = count + len;
+      if (newcount > buf.length) {
+        byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+        System.arraycopy(buf, 0, newbuf, 0, count);
+        buf = newbuf;
+      }
+      IOUtils.readFully(in, buf, count, len);
+      count = newcount;
+    }
+  }
+
+  private Buffer buffer;
+  
+  /** Constructs a new empty buffer. */
+  public OutputBuffer() {
+    this(new Buffer());
+  }
+  
+  private OutputBuffer(Buffer buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /** Returns the current contents of the buffer.
+   *  Data is only valid to {@link #getLength()}.
+   */
+  public byte[] getData() { return buffer.getData(); }
+
+  /** Returns the length of the valid data currently in the buffer. */
+  public int getLength() { return buffer.getLength(); }
+
+  /** Resets the buffer to empty. */
+  public OutputBuffer reset() {
+    buffer.reset();
+    return this;
+  }
+
+  /** Writes bytes from a InputStream directly into the buffer. */
+  public void write(InputStream in, int length) throws IOException {
+    buffer.write(in, length);
+  }
+}

+ 37 - 0
src/java/org/apache/hadoop/io/RawComparator.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.io.serializer.DeserializerComparator;
+
+/**
+ * <p>
+ * A {@link Comparator} that operates directly on byte representations of
+ * objects.
+ * </p>
+ * @param <T>
+ * @see DeserializerComparator
+ */
+public interface RawComparator<T> extends Comparator<T> {
+
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
+
+}

+ 36 - 9
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progress;
@@ -780,6 +782,10 @@ public class SequenceFile {
     Metadata metadata = null;
     Metadata metadata = null;
     Compressor compressor = null;
     Compressor compressor = null;
     
     
+    private Serializer keySerializer;
+    private Serializer uncompressedValSerializer;
+    private Serializer compressedValSerializer;
+    
     // Insert a globally unique 16-byte value every few entries, so that one
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
     // starts and ends by scanning for this value.
@@ -876,6 +882,7 @@ public class SequenceFile {
     }
     }
     
     
     /** Initialize. */
     /** Initialize. */
+    @SuppressWarnings("unchecked")
     void init(Path name, Configuration conf, FSDataOutputStream out,
     void init(Path name, Configuration conf, FSDataOutputStream out,
               Class keyClass, Class valClass,
               Class keyClass, Class valClass,
               boolean compress, CompressionCodec codec, Metadata metadata) 
               boolean compress, CompressionCodec codec, Metadata metadata) 
@@ -887,6 +894,11 @@ public class SequenceFile {
       this.compress = compress;
       this.compress = compress;
       this.codec = codec;
       this.codec = codec;
       this.metadata = metadata;
       this.metadata = metadata;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(buffer);
+      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
+      this.uncompressedValSerializer.open(buffer);
       if (this.codec != null) {
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
         ReflectionUtils.setConf(this.codec, this.conf);
         compressor = compressorPool.getCodec(this.codec.getCompressorType());
         compressor = compressorPool.getCodec(this.codec.getCompressorType());
@@ -896,6 +908,8 @@ public class SequenceFile {
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
+        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
+        this.compressedValSerializer.open(deflateOut);
       }
       }
     }
     }
     
     
@@ -923,6 +937,12 @@ public class SequenceFile {
     /** Close the file. */
     /** Close the file. */
     public synchronized void close() throws IOException {
     public synchronized void close() throws IOException {
       compressorPool.returnCodec(compressor);
       compressorPool.returnCodec(compressor);
+      
+      keySerializer.close();
+      uncompressedValSerializer.close();
+      if (compressedValSerializer != null) {
+        compressedValSerializer.close();
+      }
 
 
       if (out != null) {
       if (out != null) {
         out.flush();
         out.flush();
@@ -944,6 +964,13 @@ public class SequenceFile {
 
 
     /** Append a key/value pair. */
     /** Append a key/value pair. */
     public synchronized void append(Writable key, Writable val)
     public synchronized void append(Writable key, Writable val)
+      throws IOException {
+      append((Object) key, (Object) val);
+    }
+
+    /** Append a key/value pair. */
+    @SuppressWarnings("unchecked")
+    public synchronized void append(Object key, Object val)
       throws IOException {
       throws IOException {
       if (key.getClass() != keyClass)
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
         throw new IOException("wrong key class: "+key.getClass().getName()
@@ -955,7 +982,7 @@ public class SequenceFile {
       buffer.reset();
       buffer.reset();
 
 
       // Append the 'key'
       // Append the 'key'
-      key.write(buffer);
+      keySerializer.serialize(key);
       int keyLength = buffer.getLength();
       int keyLength = buffer.getLength();
       if (keyLength == 0)
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
         throw new IOException("zero length keys not allowed: " + key);
@@ -963,11 +990,11 @@ public class SequenceFile {
       // Append the 'value'
       // Append the 'value'
       if (compress) {
       if (compress) {
         deflateFilter.resetState();
         deflateFilter.resetState();
-        val.write(deflateOut);
+        compressedValSerializer.serialize(val);
         deflateOut.flush();
         deflateOut.flush();
         deflateFilter.finish();
         deflateFilter.finish();
       } else {
       } else {
-        val.write(buffer);
+        uncompressedValSerializer.serialize(val);
       }
       }
 
 
       // Write the record out
       // Write the record out
@@ -2107,7 +2134,7 @@ public class SequenceFile {
    */
    */
   public static class Sorter {
   public static class Sorter {
 
 
-    private WritableComparator comparator;
+    private RawComparator comparator;
 
 
     private MergeSort mergeSort; //the implementation of merge sort
     private MergeSort mergeSort; //the implementation of merge sort
     
     
@@ -2129,15 +2156,15 @@ public class SequenceFile {
 
 
     /** Sort and merge files containing the named classes. */
     /** Sort and merge files containing the named classes. */
     public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {
     public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {
-      this(fs, new WritableComparator(keyClass), valClass, conf);
+      this(fs, new WritableComparator(keyClass), keyClass, valClass, conf);
     }
     }
 
 
-    /** Sort and merge using an arbitrary {@link WritableComparator}. */
-    public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, 
-                  Configuration conf) {
+    /** Sort and merge using an arbitrary {@link RawComparator}. */
+    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
+                  Class valClass, Configuration conf) {
       this.fs = fs;
       this.fs = fs;
       this.comparator = comparator;
       this.comparator = comparator;
-      this.keyClass = comparator.getKeyClass();
+      this.keyClass = keyClass;
       this.valClass = valClass;
       this.valClass = valClass;
       this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
       this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
       this.factor = conf.getInt("io.sort.factor", 100);
       this.factor = conf.getInt("io.sort.factor", 100);

+ 1 - 1
src/java/org/apache/hadoop/io/WritableComparator.java

@@ -30,7 +30,7 @@ import java.util.*;
  * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
  * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
  * provided to assist in optimized implementations of this method.
  * provided to assist in optimized implementations of this method.
  */
  */
-public class WritableComparator implements Comparator {
+public class WritableComparator implements RawComparator {
 
 
   private static HashMap<Class, WritableComparator> comparators =
   private static HashMap<Class, WritableComparator> comparators =
     new HashMap<Class, WritableComparator>(); // registry
     new HashMap<Class, WritableComparator>(); // registry

+ 59 - 0
src/java/org/apache/hadoop/io/serializer/Deserializer.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * <p>
+ * Provides a facility for deserializing objects of type <T> from an
+ * {@link InputStream}.
+ * </p>
+ * 
+ * <p>
+ * Deserializers are stateful, but must not buffer the input since
+ * other producers may read from the input between calls to
+ * {@link #deserialize(Object)}.
+ * </p>
+ * @param <T>
+ */
+public interface Deserializer<T> {
+  /**
+   * <p>Prepare the deserializer for reading.</p>
+   */
+  void open(InputStream in) throws IOException;
+  
+  /**
+   * <p>
+   * Deserialize the next object from the underlying input stream.
+   * If the object <code>t</code> is non-null then this deserializer
+   * <i>may</i> set its internal state to the next object read from the input
+   * stream. Otherwise, if the object <code>t</code> is null a new
+   * deserialized object will be created.
+   * </p>
+   * @return the deserialized object
+   */
+  T deserialize(T t) throws IOException;
+  
+  /**
+   * <p>Close the underlying input stream and clear up any resources.</p>
+   */
+  void close() throws IOException;
+}

+ 70 - 0
src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.hadoop.io.InputBuffer;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * <p>
+ * A {@link RawComparator} that uses a {@link Deserializer} to deserialize
+ * the objects to be compared so that the standard {@link Comparator} can
+ * be used to compare them.
+ * </p>
+ * <p>
+ * One may optimize compare-intensive operations by using a custom
+ * implementation of {@link RawComparator} that operates directly
+ * on byte representations.
+ * </p>
+ * @param <T>
+ */
+public abstract class DeserializerComparator<T> implements RawComparator<T> {
+  
+  private InputBuffer buffer = new InputBuffer();
+  private Deserializer<T> deserializer;
+  
+  private T key1;
+  private T key2;
+
+  protected DeserializerComparator(Deserializer<T> deserializer)
+    throws IOException {
+    
+    this.deserializer = deserializer;
+    this.deserializer.open(buffer);
+  }
+
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    try {
+      
+      buffer.reset(b1, s1, l1);
+      key1 = deserializer.deserialize(key1);
+      
+      buffer.reset(b2, s2, l2);
+      key2 = deserializer.deserialize(key2);
+      
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return compare(key1, key2);
+  }
+
+}

+ 100 - 0
src/java/org/apache/hadoop/io/serializer/JavaSerialization.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * <p>
+ * An experimental {@link Serialization} for Java {@link Serializable} classes.
+ * </p>
+ * @see JavaSerializationComparator
+ */
+public class JavaSerialization implements Serialization<Serializable> {
+  
+  static class JavaSerializationDeserializer<T extends Serializable>
+    implements Deserializer<T> {
+
+    private ObjectInputStream ois;
+
+    public void open(InputStream in) throws IOException {
+      ois = new ObjectInputStream(in) {
+        @Override protected void readStreamHeader() {
+          // no header
+        }
+      };
+    }
+    
+    @SuppressWarnings("unchecked")
+    public T deserialize(T object) throws IOException {
+      try {
+        // ignore passed-in object
+        return (T) ois.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e.toString());
+      }
+    }
+
+    public void close() throws IOException {
+      ois.close();
+    }
+
+  }
+  
+  static class JavaSerializationSerializer
+    implements Serializer<Serializable> {
+
+    private ObjectOutputStream oos;
+
+    public void open(OutputStream out) throws IOException {
+      oos = new ObjectOutputStream(out) {
+        @Override protected void writeStreamHeader() {
+          // no header
+        }
+      };
+    }
+
+    public void serialize(Serializable object) throws IOException {
+      oos.writeObject(object);
+    }
+
+    public void close() throws IOException {
+      oos.close();
+    }
+
+  }
+
+  public boolean accept(Class<?> c) {
+    return Serializable.class.isAssignableFrom(c);
+  }
+
+  public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
+    return new JavaSerializationDeserializer<Serializable>();
+  }
+
+  public Serializer<Serializable> getSerializer(Class<Serializable> c) {
+    return new JavaSerializationSerializer();
+  }
+
+}

+ 46 - 0
src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * <p>
+ * A {@link RawComparator} that uses a {@link JavaSerialization}
+ * {@link Deserializer} to deserialize objects that are then compared via
+ * their {@link Comparable} interfaces.
+ * </p>
+ * @param <T>
+ * @see JavaSerialization
+ */
+public class JavaSerializationComparator<T extends Serializable&Comparable<T>>
+  extends DeserializerComparator<T> {
+
+  public JavaSerializationComparator() throws IOException {
+    super(new JavaSerialization.JavaSerializationDeserializer<T>());
+  }
+
+  public int compare(T o1, T o2) {
+    return o1.compareTo(o2);
+  }
+
+}

+ 44 - 0
src/java/org/apache/hadoop/io/serializer/Serialization.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+/**
+ * <p>
+ * Encapsulates a {@link Serializer}/{@link Deserializer} pair.
+ * </p>
+ * @param <T>
+ */
+public interface Serialization<T> {
+  
+  /**
+   * Allows clients to test whether this {@link Serialization}
+   * supports the given class.
+   */
+  boolean accept(Class<?> c);
+  
+  /**
+   * @return a {@link Serializer} for the given class.
+   */
+  Serializer<T> getSerializer(Class<T> c);
+
+  /**
+   * @return a {@link Deserializer} for the given class.
+   */
+  Deserializer<T> getDeserializer(Class<T> c);
+}

+ 88 - 0
src/java/org/apache/hadoop/io/serializer/SerializationFactory.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * <p>
+ * A factory for {@link Serialization}s.
+ * </p>
+ */
+public class SerializationFactory extends Configured {
+  
+  private static final Log LOG =
+    LogFactory.getLog(SerializationFactory.class.getName());
+
+  private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
+  
+  /**
+   * <p>
+   * Serializations are found by reading the <code>io.serializations</code>
+   * property from <code>conf</code>, which is a comma-delimited list of
+   * classnames. 
+   * </p>
+   */
+  public SerializationFactory(Configuration conf) {
+    super(conf);
+    for (String serializerName : conf.getStrings("io.serializations")) {
+      add(serializerName);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void add(String serializationName) {
+    try {
+      
+      Class<? extends Serialization> serializionClass =
+        (Class<? extends Serialization>) Class.forName(serializationName);
+      serializations.add((Serialization)
+          ReflectionUtils.newInstance(serializionClass, getConf()));
+    } catch (ClassNotFoundException e) {
+      LOG.warn("Serilization class not found: " +
+          StringUtils.stringifyException(e));
+    }
+  }
+
+  public <T> Serializer<T> getSerializer(Class<T> c) {
+    return getSerialization(c).getSerializer(c);
+  }
+
+  public <T> Deserializer<T> getDeserializer(Class<T> c) {
+    return getSerialization(c).getDeserializer(c);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Serialization<T> getSerialization(Class<T> c) {
+    for (Serialization serialization : serializations) {
+      if (serialization.accept(c)) {
+        return (Serialization<T>) serialization;
+      }
+    }
+    return null;
+  }
+}

+ 52 - 0
src/java/org/apache/hadoop/io/serializer/Serializer.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * <p>
+ * Provides a facility for serializing objects of type <T> to an
+ * {@link OutputStream}.
+ * </p>
+ * 
+ * <p>
+ * Serializers are stateful, but must not buffer the output since
+ * other producers may write to the output between calls to
+ * {@link #serialize(Object)}.
+ * </p>
+ * @param <T>
+ */
+public interface Serializer<T> {
+  /**
+   * <p>Prepare the serializer for writing.</p>
+   */
+  void open(OutputStream out) throws IOException;
+  
+  /**
+   * <p>Serialize <code>t</code> to the underlying output stream.</p>
+   */
+  void serialize(T t) throws IOException;
+  
+  /**
+   * <p>Close the underlying output stream and clear up any resources.</p>
+   */  
+  void close() throws IOException;
+}

+ 105 - 0
src/java/org/apache/hadoop/io/serializer/WritableSerialization.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A {@link Serialization} for {@link Writable}s that delegates to
+ * {@link Writable#write(java.io.DataOutput)} and
+ * {@link Writable#readFields(java.io.DataInput)}.
+ */
+public class WritableSerialization implements Serialization<Writable> {
+  
+  static class WritableDeserializer implements Deserializer<Writable> {
+
+    private Class<?> writableClass;
+    private DataInputStream dataIn;
+    
+    public WritableDeserializer(Class<?> c) {
+      this.writableClass = c;
+    }
+    
+    public void open(InputStream in) {
+      if (in instanceof DataInputStream) {
+        dataIn = (DataInputStream) in;
+      } else {
+        dataIn = new DataInputStream(in);
+      }
+    }
+    
+    public Writable deserialize(Writable w) throws IOException {
+      Writable writable;
+      if (w == null) {
+        writable = (Writable) ReflectionUtils.newInstance(writableClass, null);
+      } else {
+        writable = w;
+      }
+      writable.readFields(dataIn);
+      return writable;
+    }
+
+    public void close() throws IOException {
+      dataIn.close();
+    }
+    
+  }
+  
+  static class WritableSerializer implements Serializer<Writable> {
+
+    private DataOutputStream dataOut;
+    
+    public void open(OutputStream out) {
+      if (out instanceof DataOutputStream) {
+        dataOut = (DataOutputStream) out;
+      } else {
+        dataOut = new DataOutputStream(out);
+      }
+    }
+
+    public void serialize(Writable w) throws IOException {
+      w.write(dataOut);
+    }
+
+    public void close() throws IOException {
+      dataOut.close();
+    }
+
+  }
+
+  public boolean accept(Class<?> c) {
+    return Writable.class.isAssignableFrom(c);
+  }
+
+  public Deserializer<Writable> getDeserializer(Class<Writable> c) {
+    return new WritableDeserializer(c);
+  }
+
+  public Serializer<Writable> getSerializer(Class<Writable> c) {
+    return new WritableSerializer();
+  }
+
+}

+ 37 - 0
src/java/org/apache/hadoop/io/serializer/package.html

@@ -0,0 +1,37 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>
+This package provides a mechanism for using different serialization frameworks
+in Hadoop. The property "io.serializations" defines a list of
+{@link org.apache.hadoop.io.serializer.Serialization}s that know how to create
+{@link org.apache.hadoop.io.serializer.Serializer}s and
+{@link org.apache.hadoop.io.serializer.Deserializer}s.
+</p>
+
+<p>
+To add a new serialization framework write an implementation of
+{@link org.apache.hadoop.io.serializer.Serialization} and add its name to the
+"io.serializations" property.
+</p>
+
+</body>
+</html>

+ 8 - 7
src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java

@@ -21,8 +21,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.OutputBuffer;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile.ValueBytes;
 import org.apache.hadoop.io.SequenceFile.ValueBytes;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
@@ -33,7 +34,7 @@ import org.apache.hadoop.util.Progressable;
  */
  */
 abstract class BasicTypeSorterBase implements BufferSorter {
 abstract class BasicTypeSorterBase implements BufferSorter {
   
   
-  protected DataOutputBuffer keyValBuffer; //the buffer used for storing
+  protected OutputBuffer keyValBuffer; //the buffer used for storing
                                            //key/values
                                            //key/values
   protected int[] startOffsets; //the array used to store the start offsets of
   protected int[] startOffsets; //the array used to store the start offsets of
                                 //keys in keyValBuffer
                                 //keys in keyValBuffer
@@ -43,7 +44,7 @@ abstract class BasicTypeSorterBase implements BufferSorter {
   protected int[] pointers; //the array of startOffsets's indices. This will
   protected int[] pointers; //the array of startOffsets's indices. This will
                             //be sorted at the end to contain a sorted array of
                             //be sorted at the end to contain a sorted array of
                             //indices to offsets
                             //indices to offsets
-  protected WritableComparator comparator; //the comparator for the map output
+  protected RawComparator comparator; //the comparator for the map output
   protected int count; //the number of key/values
   protected int count; //the number of key/values
   //the overhead of the arrays in memory 
   //the overhead of the arrays in memory 
   //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
   //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
@@ -90,7 +91,7 @@ abstract class BasicTypeSorterBase implements BufferSorter {
     count++;
     count++;
   }
   }
 
 
-  public void setInputBuffer(DataOutputBuffer buffer) {
+  public void setInputBuffer(OutputBuffer buffer) {
     //store a reference to the keyValBuffer that we need to read during sort
     //store a reference to the keyValBuffer that we need to read during sort
     this.keyValBuffer = buffer;
     this.keyValBuffer = buffer;
   }
   }
@@ -159,11 +160,11 @@ class MRSortResultIterator implements RawKeyValueIterator {
   private int[] valLengths;
   private int[] valLengths;
   private int currStartOffsetIndex;
   private int currStartOffsetIndex;
   private int currIndexInPointers;
   private int currIndexInPointers;
-  private DataOutputBuffer keyValBuffer;
+  private OutputBuffer keyValBuffer;
   private DataOutputBuffer key = new DataOutputBuffer();
   private DataOutputBuffer key = new DataOutputBuffer();
   private InMemUncompressedBytes value = new InMemUncompressedBytes();
   private InMemUncompressedBytes value = new InMemUncompressedBytes();
   
   
-  public MRSortResultIterator(DataOutputBuffer keyValBuffer, 
+  public MRSortResultIterator(OutputBuffer keyValBuffer, 
                               int []pointers, int []startOffsets,
                               int []pointers, int []startOffsets,
                               int []keyLengths, int []valLengths) {
                               int []keyLengths, int []valLengths) {
     this.count = pointers.length;
     this.count = pointers.length;
@@ -214,7 +215,7 @@ class MRSortResultIterator implements RawKeyValueIterator {
     private byte[] data;
     private byte[] data;
     int start;
     int start;
     int dataSize;
     int dataSize;
-    private void reset(DataOutputBuffer d, int start, int length) 
+    private void reset(OutputBuffer d, int start, int length) 
       throws IOException {
       throws IOException {
       data = d.getData();
       data = d.getData();
       this.start = start;
       this.start = start;

+ 2 - 2
src/java/org/apache/hadoop/mapred/BufferSorter.java

@@ -17,7 +17,7 @@
  */
  */
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.OutputBuffer;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
@@ -56,7 +56,7 @@ interface BufferSorter extends JobConfigurable {
    * buffer).
    * buffer).
    * @param buffer the map output buffer
    * @param buffer the map output buffer
    */
    */
-  public void setInputBuffer(DataOutputBuffer buffer);
+  public void setInputBuffer(OutputBuffer buffer);
   
   
   /** The framework invokes this method to get the memory consumed so far
   /** The framework invokes this method to get the memory consumed so far
    * by an implementation of this interface.
    * by an implementation of this interface.

+ 1 - 5
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -28,8 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 
 /** 
 /** 
  * A base class for file-based {@link InputFormat}.
  * A base class for file-based {@link InputFormat}.
@@ -41,9 +39,7 @@ import org.apache.hadoop.io.WritableComparable;
  * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
  * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
  * not split-up and are processed as a whole by {@link Mapper}s.
  * not split-up and are processed as a whole by {@link Mapper}s.
  */
  */
-public abstract class FileInputFormat<K extends WritableComparable,
-                                      V extends Writable>
-  implements InputFormat<K, V> {
+public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
 
   public static final Log LOG =
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.FileInputFormat");
     LogFactory.getLog("org.apache.hadoop.mapred.FileInputFormat");

+ 1 - 4
src/java/org/apache/hadoop/mapred/InputFormat.java

@@ -21,8 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 
 /** 
 /** 
  * <code>InputFormat</code> describes the input-specification for a 
  * <code>InputFormat</code> describes the input-specification for a 
@@ -64,8 +62,7 @@ import org.apache.hadoop.io.WritableComparable;
  * @see JobClient
  * @see JobClient
  * @see FileInputFormat
  * @see FileInputFormat
  */
  */
-public interface InputFormat<K extends WritableComparable,
-                             V extends Writable> {
+public interface InputFormat<K, V> {
 
 
   /**
   /**
    * Check for validity of the input-specification for the job. 
    * Check for validity of the input-specification for the job. 

+ 35 - 36
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -562,9 +562,8 @@ public class JobConf extends Configuration {
    *  
    *  
    * @return the map output key class.
    * @return the map output key class.
    */
    */
-  public Class<? extends WritableComparable> getMapOutputKeyClass() {
-    Class<? extends WritableComparable> retv = getClass("mapred.mapoutput.key.class", null,
-			  WritableComparable.class);
+  public Class<?> getMapOutputKeyClass() {
+    Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class);
     if (retv == null) {
     if (retv == null) {
       retv = getOutputKeyClass();
       retv = getOutputKeyClass();
     }
     }
@@ -578,9 +577,8 @@ public class JobConf extends Configuration {
    * 
    * 
    * @param theClass the map output key class.
    * @param theClass the map output key class.
    */
    */
-  public void setMapOutputKeyClass(Class<? extends WritableComparable> theClass) {
-    setClass("mapred.mapoutput.key.class", theClass,
-             WritableComparable.class);
+  public void setMapOutputKeyClass(Class<?> theClass) {
+    setClass("mapred.mapoutput.key.class", theClass, Object.class);
   }
   }
   
   
   /**
   /**
@@ -590,9 +588,9 @@ public class JobConf extends Configuration {
    *  
    *  
    * @return the map output value class.
    * @return the map output value class.
    */
    */
-  public Class<? extends Writable> getMapOutputValueClass() {
-    Class<? extends Writable> retv = getClass("mapred.mapoutput.value.class", null,
-			  Writable.class);
+  public Class<?> getMapOutputValueClass() {
+    Class<?> retv = getClass("mapred.mapoutput.value.class", null,
+        Object.class);
     if (retv == null) {
     if (retv == null) {
       retv = getOutputValueClass();
       retv = getOutputValueClass();
     }
     }
@@ -606,8 +604,8 @@ public class JobConf extends Configuration {
    * 
    * 
    * @param theClass the map output value class.
    * @param theClass the map output value class.
    */
    */
-  public void setMapOutputValueClass(Class<? extends Writable> theClass) {
-    setClass("mapred.mapoutput.value.class", theClass, Writable.class);
+  public void setMapOutputValueClass(Class<?> theClass) {
+    setClass("mapred.mapoutput.value.class", theClass, Object.class);
   }
   }
   
   
   /**
   /**
@@ -615,9 +613,9 @@ public class JobConf extends Configuration {
    * 
    * 
    * @return the key class for the job output data.
    * @return the key class for the job output data.
    */
    */
-  public Class<? extends WritableComparable> getOutputKeyClass() {
+  public Class<?> getOutputKeyClass() {
     return getClass("mapred.output.key.class",
     return getClass("mapred.output.key.class",
-                    LongWritable.class, WritableComparable.class);
+                    LongWritable.class, Object.class);
   }
   }
   
   
   /**
   /**
@@ -625,33 +623,33 @@ public class JobConf extends Configuration {
    * 
    * 
    * @param theClass the key class for the job output data.
    * @param theClass the key class for the job output data.
    */
    */
-  public void setOutputKeyClass(Class<? extends WritableComparable> theClass) {
-    setClass("mapred.output.key.class", theClass, WritableComparable.class);
+  public void setOutputKeyClass(Class<?> theClass) {
+    setClass("mapred.output.key.class", theClass, Object.class);
   }
   }
 
 
   /**
   /**
-   * Get the {@link WritableComparable} comparator used to compare keys.
+   * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * 
-   * @return the {@link WritableComparable} comparator used to compare keys.
+   * @return the {@link RawComparator} comparator used to compare keys.
    */
    */
-  public WritableComparator getOutputKeyComparator() {
+  public RawComparator getOutputKeyComparator() {
     Class theClass = getClass("mapred.output.key.comparator.class", null,
     Class theClass = getClass("mapred.output.key.comparator.class", null,
-                              WritableComparator.class);
+    		RawComparator.class);
     if (theClass != null)
     if (theClass != null)
-      return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
+      return (RawComparator)ReflectionUtils.newInstance(theClass, this);
     return WritableComparator.get(getMapOutputKeyClass());
     return WritableComparator.get(getMapOutputKeyClass());
   }
   }
 
 
   /**
   /**
-   * Set the {@link WritableComparable} comparator used to compare keys.
+   * Set the {@link RawComparator} comparator used to compare keys.
    * 
    * 
-   * @param theClass the {@link WritableComparable} comparator used to 
+   * @param theClass the {@link RawComparator} comparator used to 
    *                 compare keys.
    *                 compare keys.
    * @see #setOutputValueGroupingComparator(Class)                 
    * @see #setOutputValueGroupingComparator(Class)                 
    */
    */
-  public void setOutputKeyComparatorClass(Class<? extends WritableComparator> theClass) {
+  public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
     setClass("mapred.output.key.comparator.class",
     setClass("mapred.output.key.comparator.class",
-             theClass, WritableComparator.class);
+             theClass, RawComparator.class);
   }
   }
 
 
   /** 
   /** 
@@ -661,24 +659,24 @@ public class JobConf extends Configuration {
    * @return comparator set by the user for grouping values.
    * @return comparator set by the user for grouping values.
    * @see #setOutputValueGroupingComparator(Class) for details.  
    * @see #setOutputValueGroupingComparator(Class) for details.  
    */
    */
-  public WritableComparator getOutputValueGroupingComparator() {
+  public RawComparator getOutputValueGroupingComparator() {
     Class theClass = getClass("mapred.output.value.groupfn.class", null,
     Class theClass = getClass("mapred.output.value.groupfn.class", null,
-                              WritableComparator.class);
+        RawComparator.class);
     if (theClass == null) {
     if (theClass == null) {
       return getOutputKeyComparator();
       return getOutputKeyComparator();
     }
     }
     
     
-    return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
+    return (RawComparator)ReflectionUtils.newInstance(theClass, this);
   }
   }
 
 
   /** 
   /** 
-   * Set the user defined {@link WritableComparable} comparator for 
+   * Set the user defined {@link RawComparator} comparator for 
    * grouping keys in the input to the reduce.
    * grouping keys in the input to the reduce.
    * 
    * 
    * <p>This comparator should be provided if the equivalence rules for keys
    * <p>This comparator should be provided if the equivalence rules for keys
    * for sorting the intermediates are different from those for grouping keys
    * for sorting the intermediates are different from those for grouping keys
    * before each call to 
    * before each call to 
-   * {@link Reducer#reduce(WritableComparable, java.util.Iterator, OutputCollector, Reporter)}.</p>
+   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
    *  
    *  
    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
@@ -693,12 +691,13 @@ public class JobConf extends Configuration {
    * that much sense.)</p>
    * that much sense.)</p>
    * 
    * 
    * @param theClass the comparator class to be used for grouping keys. 
    * @param theClass the comparator class to be used for grouping keys. 
-   *                 It should extend <code>WritableComparator</code>.
+   *                 It should implement <code>RawComparator</code>.
    * @see #setOutputKeyComparatorClass(Class)                 
    * @see #setOutputKeyComparatorClass(Class)                 
    */
    */
-  public void setOutputValueGroupingComparator(Class theClass) {
+  public void setOutputValueGroupingComparator(
+		  Class<? extends RawComparator> theClass) {
     setClass("mapred.output.value.groupfn.class",
     setClass("mapred.output.value.groupfn.class",
-             theClass, WritableComparator.class);
+             theClass, RawComparator.class);
   }
   }
 
 
   /**
   /**
@@ -706,8 +705,8 @@ public class JobConf extends Configuration {
    * 
    * 
    * @return the value class for job outputs.
    * @return the value class for job outputs.
    */
    */
-  public Class<? extends Writable> getOutputValueClass() {
-    return getClass("mapred.output.value.class", Text.class, Writable.class);
+  public Class<?> getOutputValueClass() {
+    return getClass("mapred.output.value.class", Text.class, Object.class);
   }
   }
   
   
   /**
   /**
@@ -715,8 +714,8 @@ public class JobConf extends Configuration {
    * 
    * 
    * @param theClass the value class for job outputs.
    * @param theClass the value class for job outputs.
    */
    */
-  public void setOutputValueClass(Class<? extends Writable> theClass) {
-    setClass("mapred.output.value.class", theClass, Writable.class);
+  public void setOutputValueClass(Class<?> theClass) {
+    setClass("mapred.output.value.class", theClass, Object.class);
   }
   }
 
 
   /**
   /**

+ 1 - 1
src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java

@@ -69,7 +69,7 @@ public class MapFileOutputFormat extends OutputFormatBase {
                          compressionType, codec,
                          compressionType, codec,
                          progress);
                          progress);
 
 
-    return new RecordWriter() {
+    return new RecordWriter<WritableComparable, Writable>() {
 
 
         public void write(WritableComparable key, Writable value)
         public void write(WritableComparable key, Writable value)
           throws IOException {
           throws IOException {

+ 1 - 5
src/java/org/apache/hadoop/mapred/MapRunnable.java

@@ -20,9 +20,6 @@ package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /**
 /**
  * Expert: Generic interface for {@link Mapper}s.
  * Expert: Generic interface for {@link Mapper}s.
  * 
  * 
@@ -31,8 +28,7 @@ import org.apache.hadoop.io.WritableComparable;
  * 
  * 
  * @see Mapper
  * @see Mapper
  */
  */
-public interface MapRunnable<K1 extends WritableComparable, V1 extends Writable,
-                             K2 extends WritableComparable, V2 extends Writable>
+public interface MapRunnable<K1, V1, K2, V2>
     extends JobConfigurable {
     extends JobConfigurable {
   
   
   /** 
   /** 

+ 1 - 4
src/java/org/apache/hadoop/mapred/MapRunner.java

@@ -20,13 +20,10 @@ package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
 /** Default {@link MapRunnable} implementation.*/
 /** Default {@link MapRunnable} implementation.*/
-public class MapRunner<K1 extends WritableComparable, V1 extends Writable,
-                       K2 extends WritableComparable, V2 extends Writable>
+public class MapRunner<K1, V1, K2, V2>
     implements MapRunnable<K1, V1, K2, V2> {
     implements MapRunnable<K1, V1, K2, V2> {
   
   
   private Mapper<K1, V1, K2, V2> mapper;
   private Mapper<K1, V1, K2, V2> mapper;

+ 51 - 32
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -35,11 +35,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.InputBuffer;
+import org.apache.hadoop.io.OutputBuffer;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Sorter;
 import org.apache.hadoop.io.SequenceFile.Sorter;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.SequenceFile.Writer;
@@ -47,6 +47,9 @@ import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
 import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
@@ -119,7 +122,7 @@ class MapTask extends Task {
    * @param <K>
    * @param <K>
    * @param <V>
    * @param <V>
    */
    */
-  class TrackedRecordReader<K extends WritableComparable, V extends Writable> 
+  class TrackedRecordReader<K, V> 
       implements RecordReader<K,V> {
       implements RecordReader<K,V> {
     private RecordReader<K,V> rawIn;
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputByteCounter;
@@ -215,8 +218,7 @@ class MapTask extends Task {
     done(umbilical);
     done(umbilical);
   }
   }
 
 
-  interface MapOutputCollector<K extends WritableComparable,
-                               V extends Writable>
+  interface MapOutputCollector<K, V>
     extends OutputCollector<K, V> {
     extends OutputCollector<K, V> {
 
 
     public void close() throws IOException;
     public void close() throws IOException;
@@ -225,8 +227,7 @@ class MapTask extends Task {
         
         
   }
   }
 
 
-  class DirectMapOutputCollector<K extends WritableComparable,
-                                 V extends Writable>
+  class DirectMapOutputCollector<K, V>
     implements MapOutputCollector<K, V> {
     implements MapOutputCollector<K, V> {
  
  
     private RecordWriter<K, V> out = null;
     private RecordWriter<K, V> out = null;
@@ -268,11 +269,11 @@ class MapTask extends Task {
     private JobConf job;
     private JobConf job;
     private Reporter reporter;
     private Reporter reporter;
 
 
-    private DataOutputBuffer keyValBuffer; //the buffer where key/val will
-                                           //be stored before they are 
-                                           //passed on to the pending buffer
-    private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used
-                                                  // while spilling
+    private OutputBuffer keyValBuffer; //the buffer where key/val will
+                                       //be stored before they are 
+                                       //passed on to the pending buffer
+    private OutputBuffer pendingKeyvalBuffer; // the key value buffer used
+                                              // while spilling
     // a lock used for sync sort-spill with collect
     // a lock used for sync sort-spill with collect
     private final Object pendingKeyvalBufferLock = new Object();
     private final Object pendingKeyvalBufferLock = new Object();
     // since sort-spill and collect are done concurrently, exceptions are 
     // since sort-spill and collect are done concurrently, exceptions are 
@@ -287,7 +288,14 @@ class MapTask extends Task {
     private CompressionType compressionType;
     private CompressionType compressionType;
     private Class keyClass;
     private Class keyClass;
     private Class valClass;
     private Class valClass;
-    private WritableComparator comparator;
+    private RawComparator comparator;
+    private SerializationFactory serializationFactory;
+    private Serializer keySerializer;
+    private Serializer valSerializer;
+    private InputBuffer keyIn = new InputBuffer();
+    private InputBuffer valIn = new InputBuffer();
+    private Deserializer keyDeserializer;
+    private Deserializer valDeserializer;    
     private BufferSorter []sortImpl;
     private BufferSorter []sortImpl;
     private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
     private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
     private SequenceFile.Writer writer;
     private SequenceFile.Writer writer;
@@ -299,6 +307,7 @@ class MapTask extends Task {
     private Counters.Counter combineInputCounter;
     private Counters.Counter combineInputCounter;
     private Counters.Counter combineOutputCounter;
     private Counters.Counter combineOutputCounter;
     
     
+    @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
                            Reporter reporter) throws IOException {
                            Reporter reporter) throws IOException {
       this.partitions = job.getNumReduceTasks();
       this.partitions = job.getNumReduceTasks();
@@ -306,13 +315,22 @@ class MapTask extends Task {
                                                                   job.getPartitionerClass(), job);
                                                                   job.getPartitionerClass(), job);
       maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
       maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
       this.sortSpillException = null;
       this.sortSpillException = null;
-      keyValBuffer = new DataOutputBuffer();
+      keyValBuffer = new OutputBuffer();
 
 
       this.job = job;
       this.job = job;
       this.reporter = reporter;
       this.reporter = reporter;
       this.comparator = job.getOutputKeyComparator();
       this.comparator = job.getOutputKeyComparator();
       this.keyClass = job.getMapOutputKeyClass();
       this.keyClass = job.getMapOutputKeyClass();
       this.valClass = job.getMapOutputValueClass();
       this.valClass = job.getMapOutputValueClass();
+      this.serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(keyValBuffer);
+      this.valSerializer = serializationFactory.getSerializer(valClass);
+      this.valSerializer.open(keyValBuffer);
+      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+      this.keyDeserializer.open(keyIn);
+      this.valDeserializer = serializationFactory.getDeserializer(valClass);
+      this.valDeserializer.open(valIn);
       this.localFs = FileSystem.getLocal(job);
       this.localFs = FileSystem.getLocal(job);
       this.codec = null;
       this.codec = null;
       this.compressionType = CompressionType.NONE;
       this.compressionType = CompressionType.NONE;
@@ -357,8 +375,8 @@ class MapTask extends Task {
     }
     }
     
     
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
-    public synchronized void collect(WritableComparable key,
-                                     Writable value) throws IOException {
+    public synchronized void collect(Object key,
+                                     Object value) throws IOException {
       
       
       if (key.getClass() != keyClass) {
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
         throw new IOException("Type mismatch in key from map: expected "
@@ -377,7 +395,9 @@ class MapTask extends Task {
       }
       }
       
       
       if (keyValBuffer == null) {
       if (keyValBuffer == null) {
-        keyValBuffer = new DataOutputBuffer();
+        keyValBuffer = new OutputBuffer();
+        keySerializer.open(keyValBuffer);
+        valSerializer.open(keyValBuffer);
         sortImpl = new BufferSorter[partitions];
         sortImpl = new BufferSorter[partitions];
         for (int i = 0; i < partitions; i++)
         for (int i = 0; i < partitions; i++)
           sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
           sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
@@ -387,9 +407,9 @@ class MapTask extends Task {
       
       
       //dump the key/value to buffer
       //dump the key/value to buffer
       int keyOffset = keyValBuffer.getLength(); 
       int keyOffset = keyValBuffer.getLength(); 
-      key.write(keyValBuffer);
+      keySerializer.serialize(key);
       int keyLength = keyValBuffer.getLength() - keyOffset;
       int keyLength = keyValBuffer.getLength() - keyOffset;
-      value.write(keyValBuffer);
+      valSerializer.serialize(value);
       int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
       int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
       int partNumber = partitioner.getPartition(key, value, partitions);
       int partNumber = partitioner.getPartition(key, value, partitions);
       sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
       sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
@@ -420,6 +440,8 @@ class MapTask extends Task {
           // prepare for spilling
           // prepare for spilling
           pendingKeyvalBuffer = keyValBuffer;
           pendingKeyvalBuffer = keyValBuffer;
           pendingSortImpl = sortImpl;
           pendingSortImpl = sortImpl;
+          keySerializer.close();
+          valSerializer.close();
           keyValBuffer = null;
           keyValBuffer = null;
           sortImpl = null;
           sortImpl = null;
         }
         }
@@ -483,7 +505,7 @@ class MapTask extends Task {
                                                                       job.getCombinerClass(), job);
                                                                       job.getCombinerClass(), job);
               // make collector
               // make collector
               OutputCollector combineCollector = new OutputCollector() {
               OutputCollector combineCollector = new OutputCollector() {
-                  public void collect(WritableComparable key, Writable value)
+                  public void collect(Object key, Object value)
                     throws IOException {
                     throws IOException {
                     synchronized (this) {
                     synchronized (this) {
                       writer.append(key, value);
                       writer.append(key, value);
@@ -527,31 +549,27 @@ class MapTask extends Task {
       }
       }
     }
     }
     
     
+    @SuppressWarnings("unchecked")
     private void spill(RawKeyValueIterator resultIter) throws IOException {
     private void spill(RawKeyValueIterator resultIter) throws IOException {
-      Writable key = null;
-      Writable value = null;
-
       try {
       try {
         // indicate progress, since constructor may take a while (because of 
         // indicate progress, since constructor may take a while (because of 
         // user code) 
         // user code) 
         reporter.progress();
         reporter.progress();
-        key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
-        value = (Writable)ReflectionUtils.newInstance(valClass, job);
       } catch (Exception e) {
       } catch (Exception e) {
         throw new RuntimeException(e);
         throw new RuntimeException(e);
       }
       }
 
 
-      DataInputBuffer keyIn = new DataInputBuffer();
-      DataInputBuffer valIn = new DataInputBuffer();
+      Object key = null;
+      Object value = null;
       DataOutputBuffer valOut = new DataOutputBuffer();
       DataOutputBuffer valOut = new DataOutputBuffer();
       while (resultIter.next()) {
       while (resultIter.next()) {
         keyIn.reset(resultIter.getKey().getData(), 
         keyIn.reset(resultIter.getKey().getData(), 
                     resultIter.getKey().getLength());
                     resultIter.getKey().getLength());
-        key.readFields(keyIn);
+        key = keyDeserializer.deserialize(key);
         valOut.reset();
         valOut.reset();
         (resultIter.getValue()).writeUncompressedBytes(valOut);
         (resultIter.getValue()).writeUncompressedBytes(valOut);
         valIn.reset(valOut.getData(), valOut.getLength());
         valIn.reset(valOut.getData(), valOut.getLength());
-        value.readFields(valIn);
+        value = valDeserializer.deserialize(value);
         writer.append(key, value);
         writer.append(key, value);
         reporter.progress();
         reporter.progress();
       }
       }
@@ -613,7 +631,8 @@ class MapTask extends Task {
       {
       {
         //create a sorter object as we need access to the SegmentDescriptor
         //create a sorter object as we need access to the SegmentDescriptor
         //class and merge methods
         //class and merge methods
-        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(), valClass, job);
+        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(),
+                                   keyClass, valClass, job);
         sorter.setProgressable(reporter);
         sorter.setProgressable(reporter);
         
         
         for (int parts = 0; parts < partitions; parts++){
         for (int parts = 0; parts < partitions; parts++){
@@ -665,7 +684,7 @@ class MapTask extends Task {
     private class CombineValuesIterator extends ValuesIterator {
     private class CombineValuesIterator extends ValuesIterator {
         
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
-                                   WritableComparator comparator, Class keyClass,
+                                   RawComparator comparator, Class keyClass,
                                    Class valClass, Configuration conf, Reporter reporter) 
                                    Class valClass, Configuration conf, Reporter reporter) 
         throws IOException {
         throws IOException {
         super(in, comparator, keyClass, valClass, conf, reporter);
         super(in, comparator, keyClass, valClass, conf, reporter);

+ 3 - 7
src/java/org/apache/hadoop/mapred/Mapper.java

@@ -23,8 +23,6 @@ import java.io.IOException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
 
 /** 
 /** 
@@ -43,7 +41,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
  * de-initialization.</p>
  * de-initialization.</p>
  * 
  * 
  * <p>The framework then calls 
  * <p>The framework then calls 
- * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)} 
+ * {@link #map(Object, Object, OutputCollector, Reporter)} 
  * for each key/value pair in the <code>InputSplit</code> for that task.</p>
  * for each key/value pair in the <code>InputSplit</code> for that task.</p>
  * 
  * 
  * <p>All intermediate values associated with a given output key are 
  * <p>All intermediate values associated with a given output key are 
@@ -130,9 +128,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
  * @see MapRunnable
  * @see MapRunnable
  * @see SequenceFile
  * @see SequenceFile
  */
  */
-public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
-                        K2 extends WritableComparable, V2 extends Writable>
-  extends JobConfigurable, Closeable {
+public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
   
   
   /** 
   /** 
    * Maps a single input key/value pair into an intermediate key/value pair.
    * Maps a single input key/value pair into an intermediate key/value pair.
@@ -140,7 +136,7 @@ public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
    * <p>Output pairs need not be of the same types as input pairs.  A given 
    * <p>Output pairs need not be of the same types as input pairs.  A given 
    * input pair may map to zero or many output pairs.  Output pairs are 
    * input pair may map to zero or many output pairs.  Output pairs are 
    * collected with calls to 
    * collected with calls to 
-   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
+   * {@link OutputCollector#collect(Object,Object)}.</p>
    *
    *
    * <p>Applications can use the {@link Reporter} provided to report progress 
    * <p>Applications can use the {@link Reporter} provided to report progress 
    * or just indicate that they are alive. In scenarios where the application 
    * or just indicate that they are alive. In scenarios where the application 

+ 1 - 4
src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java

@@ -24,8 +24,6 @@ import java.util.List;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 
 /**
 /**
  * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
  * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
@@ -36,8 +34,7 @@ import org.apache.hadoop.io.WritableComparable;
  * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
  * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
  * @see MultiFileSplit
  * @see MultiFileSplit
  */
  */
-public abstract class MultiFileInputFormat<K extends WritableComparable,
-                                           V extends Writable>
+public abstract class MultiFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
   extends FileInputFormat<K, V> {
 
 
   @Override
   @Override

+ 1 - 6
src/java/org/apache/hadoop/mapred/OutputCollector.java

@@ -20,10 +20,6 @@ package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-
 /**
 /**
  * Collects the <code>&lt;key, value&gt;</code> pairs output by {@link Mapper}s
  * Collects the <code>&lt;key, value&gt;</code> pairs output by {@link Mapper}s
  * and {@link Reducer}s.
  * and {@link Reducer}s.
@@ -33,8 +29,7 @@ import org.apache.hadoop.io.WritableComparable;
  * <code>Mapper</code> or the <code>Reducer</code> i.e. intermediate outputs 
  * <code>Mapper</code> or the <code>Reducer</code> i.e. intermediate outputs 
  * or the output of the job.</p>  
  * or the output of the job.</p>  
  */
  */
-public interface OutputCollector<K extends WritableComparable,
-                                 V extends Writable> {
+public interface OutputCollector<K, V> {
   
   
   /** Adds a key/value pair to the output.
   /** Adds a key/value pair to the output.
    *
    *

+ 1 - 4
src/java/org/apache/hadoop/mapred/OutputFormat.java

@@ -21,8 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
 /** 
 /** 
@@ -45,8 +43,7 @@ import org.apache.hadoop.util.Progressable;
  * @see RecordWriter
  * @see RecordWriter
  * @see JobConf
  * @see JobConf
  */
  */
-public interface OutputFormat<K extends WritableComparable,
-                              V extends Writable> {
+public interface OutputFormat<K, V> {
 
 
   /** 
   /** 
    * Get the {@link RecordWriter} for the given job.
    * Get the {@link RecordWriter} for the given job.

+ 1 - 5
src/java/org/apache/hadoop/mapred/OutputFormatBase.java

@@ -22,15 +22,11 @@ import java.io.IOException;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
 /** A base class for {@link OutputFormat}. */
 /** A base class for {@link OutputFormat}. */
-public abstract class OutputFormatBase<K extends WritableComparable,
-                                       V extends Writable>
-  implements OutputFormat<K, V> {
+public abstract class OutputFormatBase<K, V> implements OutputFormat<K, V> {
 
 
   /**
   /**
    * Set whether the output of the job is compressed.
    * Set whether the output of the job is compressed.

+ 1 - 6
src/java/org/apache/hadoop/mapred/Partitioner.java

@@ -18,9 +18,6 @@
 
 
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /** 
 /** 
  * Partitions the key space.
  * Partitions the key space.
  * 
  * 
@@ -33,9 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
  * 
  * 
  * @see Reducer
  * @see Reducer
  */
  */
-public interface Partitioner<K2 extends WritableComparable,
-                             V2 extends Writable>
-  extends JobConfigurable {
+public interface Partitioner<K2, V2> extends JobConfigurable {
   
   
   /** 
   /** 
    * Get the paritition number for a given key (hence record) given the total 
    * Get the paritition number for a given key (hence record) given the total 

+ 1 - 7
src/java/org/apache/hadoop/mapred/RecordReader.java

@@ -21,9 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataInput;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /**
 /**
  * <code>RecordReader</code> reads &lt;key, value&gt; pairs from an 
  * <code>RecordReader</code> reads &lt;key, value&gt; pairs from an 
  * {@link InputSplit}.
  * {@link InputSplit}.
@@ -37,16 +34,13 @@ import org.apache.hadoop.io.WritableComparable;
  * @see InputSplit
  * @see InputSplit
  * @see InputFormat
  * @see InputFormat
  */
  */
-public interface RecordReader<K extends WritableComparable,
-                              V extends Writable> {
+public interface RecordReader<K, V> {
   /** 
   /** 
    * Reads the next key/value pair from the input for processing.
    * Reads the next key/value pair from the input for processing.
    *
    *
    * @param key the key to read data into
    * @param key the key to read data into
    * @param value the value to read data into
    * @param value the value to read data into
    * @return true iff a key/value was read, false if at EOF
    * @return true iff a key/value was read, false if at EOF
-   *
-   * @see Writable#readFields(DataInput)
    */      
    */      
   boolean next(K key, V value) throws IOException;
   boolean next(K key, V value) throws IOException;
   
   

+ 1 - 6
src/java/org/apache/hadoop/mapred/RecordWriter.java

@@ -19,11 +19,8 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.io.DataOutput;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
 
 
 /**
 /**
  * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs 
  * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs 
@@ -34,15 +31,13 @@ import org.apache.hadoop.io.Writable;
  * 
  * 
  * @see OutputFormat
  * @see OutputFormat
  */
  */
-public interface RecordWriter<K extends WritableComparable,
-                              V extends Writable> {
+public interface RecordWriter<K, V> {
   /** 
   /** 
    * Writes a key/value pair.
    * Writes a key/value pair.
    *
    *
    * @param key the key to write.
    * @param key the key to write.
    * @param value the value to write.
    * @param value the value to write.
    * @throws IOException
    * @throws IOException
-   * @see Writable#write(DataOutput)
    */      
    */      
   void write(K key, V value) throws IOException;
   void write(K key, V value) throws IOException;
 
 

+ 28 - 28
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -49,13 +49,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.InputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -140,30 +142,32 @@ class ReduceTask extends Task {
   /** Iterates values while keys match in sorted input. */
   /** Iterates values while keys match in sorted input. */
   static class ValuesIterator implements Iterator {
   static class ValuesIterator implements Iterator {
     private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
     private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
-    private WritableComparable key;               // current key
-    private Writable value;                       // current value
+    private Object key;               // current key
+    private Object value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
     private boolean more;                         // more in file
-    private WritableComparator comparator;
-    private Class keyClass;
-    private Class valClass;
-    private Configuration conf;
+    private RawComparator comparator;
     private DataOutputBuffer valOut = new DataOutputBuffer();
     private DataOutputBuffer valOut = new DataOutputBuffer();
-    private DataInputBuffer valIn = new DataInputBuffer();
-    private DataInputBuffer keyIn = new DataInputBuffer();
+    private InputBuffer valIn = new InputBuffer();
+    private InputBuffer keyIn = new InputBuffer();
     protected Reporter reporter;
     protected Reporter reporter;
+    private Deserializer keyDeserializer;
+    private Deserializer valDeserializer;
 
 
+    @SuppressWarnings("unchecked")
     public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
     public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
-                           WritableComparator comparator, Class keyClass,
+                           RawComparator comparator, Class keyClass,
                            Class valClass, Configuration conf, 
                            Class valClass, Configuration conf, 
                            Reporter reporter)
                            Reporter reporter)
       throws IOException {
       throws IOException {
       this.in = in;
       this.in = in;
-      this.conf = conf;
       this.comparator = comparator;
       this.comparator = comparator;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
       this.reporter = reporter;
       this.reporter = reporter;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+      this.keyDeserializer.open(keyIn);
+      this.valDeserializer = serializationFactory.getDeserializer(valClass);
+      this.valDeserializer.open(valIn);
       getNext();
       getNext();
     }
     }
 
 
@@ -196,25 +200,20 @@ class ReduceTask extends Task {
     public boolean more() { return more; }
     public boolean more() { return more; }
 
 
     /** The current key. */
     /** The current key. */
-    public WritableComparable getKey() { return key; }
+    public Object getKey() { return key; }
 
 
+    @SuppressWarnings("unchecked")
     private void getNext() throws IOException {
     private void getNext() throws IOException {
-      Writable lastKey = key;                     // save previous key
-      try {
-        key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
-        value = (Writable)ReflectionUtils.newInstance(valClass, this.conf);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
+      Object lastKey = key;                     // save previous key
       more = in.next();
       more = in.next();
       if (more) {
       if (more) {
         //de-serialize the raw key/value
         //de-serialize the raw key/value
         keyIn.reset(in.getKey().getData(), in.getKey().getLength());
         keyIn.reset(in.getKey().getData(), in.getKey().getLength());
-        key.readFields(keyIn);
+        key = keyDeserializer.deserialize(null); // force new object
         valOut.reset();
         valOut.reset();
         (in.getValue()).writeUncompressedBytes(valOut);
         (in.getValue()).writeUncompressedBytes(valOut);
         valIn.reset(valOut.getData(), valOut.getLength());
         valIn.reset(valOut.getData(), valOut.getLength());
-        value.readFields(valIn);
+        value = valDeserializer.deserialize(null); // force new object
 
 
         if (lastKey == null) {
         if (lastKey == null) {
           hasNext = true;
           hasNext = true;
@@ -228,7 +227,7 @@ class ReduceTask extends Task {
   }
   }
   private class ReduceValuesIterator extends ValuesIterator {
   private class ReduceValuesIterator extends ValuesIterator {
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
-                                 WritableComparator comparator, Class keyClass,
+                                 RawComparator comparator, Class keyClass,
                                  Class valClass,
                                  Class valClass,
                                  Configuration conf, Reporter reporter)
                                  Configuration conf, Reporter reporter)
       throws IOException {
       throws IOException {
@@ -293,7 +292,8 @@ class ReduceTask extends Task {
     
     
     // sort the input file
     // sort the input file
     SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, 
     SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, 
-        job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
+        job.getOutputKeyComparator(), job.getMapOutputKeyClass(),
+        job.getMapOutputValueClass(), job);
     sorter.setProgressable(reporter);
     sorter.setProgressable(reporter);
     rIter = sorter.merge(mapFiles, tempDir, 
     rIter = sorter.merge(mapFiles, tempDir, 
         !conf.getKeepFailedTaskFiles()); // sort
         !conf.getKeepFailedTaskFiles()); // sort
@@ -310,7 +310,7 @@ class ReduceTask extends Task {
     
     
     OutputCollector collector = new OutputCollector() {
     OutputCollector collector = new OutputCollector() {
         @SuppressWarnings("unchecked")
         @SuppressWarnings("unchecked")
-        public void collect(WritableComparable key, Writable value)
+        public void collect(Object key, Object value)
           throws IOException {
           throws IOException {
           out.write(key, value);
           out.write(key, value);
           reduceOutputCounter.increment(1);
           reduceOutputCounter.increment(1);
@@ -887,7 +887,7 @@ class ReduceTask extends Task {
       //create an instance of the sorter
       //create an instance of the sorter
       sorter =
       sorter =
         new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
         new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
-                                conf.getMapOutputValueClass(), conf);
+            conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), conf);
       sorter.setProgressable(getReporter(umbilical));
       sorter.setProgressable(getReporter(umbilical));
       
       
       // hosts -> next contact time
       // hosts -> next contact time

+ 4 - 8
src/java/org/apache/hadoop/mapred/Reducer.java

@@ -24,8 +24,6 @@ import java.util.Iterator;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.Closeable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 
 /** 
 /** 
  * Reduces a set of intermediate values which share a key to a smaller set of
  * Reduces a set of intermediate values which share a key to a smaller set of
@@ -89,12 +87,12 @@ import org.apache.hadoop.io.WritableComparable;
  *   <h4 id="Reduce">Reduce</h4>
  *   <h4 id="Reduce">Reduce</h4>
  *   
  *   
  *   <p>In this phase the 
  *   <p>In this phase the 
- *   {@link #reduce(WritableComparable, Iterator, OutputCollector, Reporter)}
+ *   {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
  *   method is called for each <code>&lt;key, (list of values)></code> pair in
  *   method is called for each <code>&lt;key, (list of values)></code> pair in
  *   the grouped inputs.</p>
  *   the grouped inputs.</p>
  *   <p>The output of the reduce task is typically written to the 
  *   <p>The output of the reduce task is typically written to the 
  *   {@link FileSystem} via 
  *   {@link FileSystem} via 
- *   {@link OutputCollector#collect(WritableComparable, Writable)}.</p>
+ *   {@link OutputCollector#collect(Object, Object)}.</p>
  *   </li>
  *   </li>
  * </ol>
  * </ol>
  * 
  * 
@@ -163,9 +161,7 @@ import org.apache.hadoop.io.WritableComparable;
  * @see Reporter
  * @see Reporter
  * @see MapReduceBase
  * @see MapReduceBase
  */
  */
-public interface Reducer<K2 extends WritableComparable, V2 extends Writable,
-                         K3 extends WritableComparable, V3 extends Writable>
-    extends JobConfigurable, Closeable {
+public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
   
   
   /** 
   /** 
    * <i>Reduces</i> values for a given key.  
    * <i>Reduces</i> values for a given key.  
@@ -177,7 +173,7 @@ public interface Reducer<K2 extends WritableComparable, V2 extends Writable,
    * </p>
    * </p>
    *   
    *   
    * <p>Output pairs are collected with calls to  
    * <p>Output pairs are collected with calls to  
-   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
+   * {@link OutputCollector#collect(Object,Object)}.</p>
    *
    *
    * <p>Applications can use the {@link Reporter} provided to report progress 
    * <p>Applications can use the {@link Reporter} provided to report progress 
    * or just indicate that they are alive. In scenarios where the application 
    * or just indicate that they are alive. In scenarios where the application 

+ 1 - 1
src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java

@@ -65,7 +65,7 @@ public class SequenceFileOutputFormat extends OutputFormatBase {
                                 codec,
                                 codec,
                                 progress);
                                 progress);
 
 
-    return new RecordWriter() {
+    return new RecordWriter<WritableComparable, Writable>() {
 
 
         public void write(WritableComparable key, Writable value)
         public void write(WritableComparable key, Writable value)
           throws IOException {
           throws IOException {

+ 2 - 7
src/java/org/apache/hadoop/mapred/TextOutputFormat.java

@@ -28,19 +28,14 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 
 
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 
 
 /** An {@link OutputFormat} that writes plain text files. */
 /** An {@link OutputFormat} that writes plain text files. */
-public class TextOutputFormat<K extends WritableComparable,
-                              V extends Writable>
-  extends OutputFormatBase<K, V> {
+public class TextOutputFormat<K, V> extends OutputFormatBase<K, V> {
 
 
-  protected static class LineRecordWriter<K extends WritableComparable,
-                                          V extends Writable>
+  protected static class LineRecordWriter<K, V>
     implements RecordWriter<K, V> {
     implements RecordWriter<K, V> {
     private static final String utf8 = "UTF-8";
     private static final String utf8 = "UTF-8";
     private static final byte[] tab;
     private static final byte[] tab;

+ 1 - 4
src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java

@@ -25,8 +25,6 @@ import java.util.Iterator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -63,8 +61,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
  * the key is never ignored.
  * the key is never ignored.
  * 
  * 
  */
  */
-public class FieldSelectionMapReduce<K extends WritableComparable,
-                                     V extends Writable>
+public class FieldSelectionMapReduce<K, V>
     implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
     implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
 
 
   private String mapOutputKeyValueSpec;
   private String mapOutputKeyValueSpec;

+ 1 - 6
src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java

@@ -21,13 +21,8 @@ package org.apache.hadoop.mapred.lib;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 
 
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
 /** Partition keys by their {@link Object#hashCode()}. */
 /** Partition keys by their {@link Object#hashCode()}. */
-public class HashPartitioner<K2 extends WritableComparable,
-                             V2 extends Writable>
-    implements Partitioner<K2, V2> {
+public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
 
 
   public void configure(JobConf job) {}
   public void configure(JobConf job) {}
 
 

+ 1 - 4
src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java

@@ -25,11 +25,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MapReduceBase;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /** Implements the identity function, mapping inputs directly to outputs. */
 /** Implements the identity function, mapping inputs directly to outputs. */
-public class IdentityMapper<K extends WritableComparable, V extends Writable>
+public class IdentityMapper<K, V>
     extends MapReduceBase implements Mapper<K, V, K, V> {
     extends MapReduceBase implements Mapper<K, V, K, V> {
 
 
   /** The identify function.  Input key/value pair is written directly to
   /** The identify function.  Input key/value pair is written directly to

+ 1 - 4
src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java

@@ -27,11 +27,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MapReduceBase;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /** Performs no reduction, writing all input values directly to the output. */
 /** Performs no reduction, writing all input values directly to the output. */
-public class IdentityReducer<K extends WritableComparable, V extends Writable>
+public class IdentityReducer<K, V>
     extends MapReduceBase implements Reducer<K, V, K, V> {
     extends MapReduceBase implements Reducer<K, V, K, V> {
 
 
   /** Writes all keys and values directly to output. */
   /** Writes all keys and values directly to output. */

+ 1 - 4
src/java/org/apache/hadoop/mapred/lib/InverseMapper.java

@@ -20,16 +20,13 @@ package org.apache.hadoop.mapred.lib;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 
 
-
 /** A {@link Mapper} that swaps keys and values. */
 /** A {@link Mapper} that swaps keys and values. */
-public class InverseMapper<K extends WritableComparable,
-                           V extends WritableComparable>
+public class InverseMapper<K, V>
     extends MapReduceBase implements Mapper<K, V, V, K> {
     extends MapReduceBase implements Mapper<K, V, V, K> {
 
 
   /** The inverse function.  Input keys and values are swapped.*/
   /** The inverse function.  Input keys and values are swapped.*/

+ 1 - 5
src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java

@@ -18,14 +18,10 @@
 
 
 package org.apache.hadoop.mapred.lib;
 package org.apache.hadoop.mapred.lib;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Partitioner;
 
 
-public class KeyFieldBasedPartitioner<K2 extends WritableComparable,
-                                      V2 extends Writable>
-    implements Partitioner<K2, V2> {
+public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
 
 
   private int numOfPartitionFields;
   private int numOfPartitionFields;
 
 

+ 1 - 3
src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java

@@ -26,12 +26,10 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MapReduceBase;
 
 
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 
 
 /** A {@link Reducer} that sums long values. */
 /** A {@link Reducer} that sums long values. */
-public class LongSumReducer<K extends WritableComparable>
-    extends MapReduceBase
+public class LongSumReducer<K> extends MapReduceBase
     implements Reducer<K, LongWritable, K, LongWritable> {
     implements Reducer<K, LongWritable, K, LongWritable> {
 
 
   public void reduce(K key, Iterator<LongWritable> values,
   public void reduce(K key, Iterator<LongWritable> values,

+ 1 - 4
src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

@@ -50,10 +50,7 @@ import java.util.concurrent.TimeUnit;
  * <b>mapred.map.multithreadedrunner.threads</b> property).
  * <b>mapred.map.multithreadedrunner.threads</b> property).
  * <p>
  * <p>
  */
  */
-public class MultithreadedMapRunner<K1 extends WritableComparable,
-                                    V1 extends Writable,
-                                    K2 extends WritableComparable,
-                                    V2 extends Writable>
+public class MultithreadedMapRunner<K1, V1, K2, V2>
     implements MapRunnable<K1, V1, K2, V2> {
     implements MapRunnable<K1, V1, K2, V2> {
 
 
   private static final Log LOG =
   private static final Log LOG =

+ 1 - 5
src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java

@@ -19,8 +19,6 @@
 package org.apache.hadoop.mapred.lib;
 package org.apache.hadoop.mapred.lib;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -30,9 +28,7 @@ import org.apache.hadoop.util.Progressable;
 /**
 /**
  * Consume all outputs and put them in /dev/null. 
  * Consume all outputs and put them in /dev/null. 
  */
  */
-public class NullOutputFormat<K extends WritableComparable,
-                              V extends Writable>
-  implements OutputFormat<K, V> {
+public class NullOutputFormat<K, V> implements OutputFormat<K, V> {
   
   
   public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 
   public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 
                                       String name, Progressable progress) {
                                       String name, Progressable progress) {

+ 1 - 3
src/java/org/apache/hadoop/mapred/lib/RegexMapper.java

@@ -24,7 +24,6 @@ import java.util.regex.Pattern;
 
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Mapper;
@@ -33,8 +32,7 @@ import org.apache.hadoop.mapred.Reporter;
 
 
 
 
 /** A {@link Mapper} that extracts text matching a regular expression. */
 /** A {@link Mapper} that extracts text matching a regular expression. */
-public class RegexMapper<K extends WritableComparable>
-    extends MapReduceBase
+public class RegexMapper<K> extends MapReduceBase
     implements Mapper<K, Text, Text, LongWritable> {
     implements Mapper<K, Text, Text, LongWritable> {
 
 
   private Pattern pattern;
   private Pattern pattern;

+ 1 - 3
src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java

@@ -23,7 +23,6 @@ import java.util.StringTokenizer;
 
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -32,8 +31,7 @@ import org.apache.hadoop.mapred.Reporter;
 
 
 /** A {@link Mapper} that maps text values into <token,freq> pairs.  Uses
 /** A {@link Mapper} that maps text values into <token,freq> pairs.  Uses
  * {@link StringTokenizer} to break text into tokens. */
  * {@link StringTokenizer} to break text into tokens. */
-public class TokenCountMapper<K extends WritableComparable>
-    extends MapReduceBase
+public class TokenCountMapper<K> extends MapReduceBase
     implements Mapper<K, Text, Text, LongWritable> {
     implements Mapper<K, Text, Text, LongWritable> {
 
 
   public void map(K key, Text value,
   public void map(K key, Text value,

+ 6 - 6
src/java/org/apache/hadoop/mapred/package.html

@@ -81,9 +81,9 @@ order to facilitate grouping by the framework.</p>
 </pre>
 </pre>
 
 
 <p>Applications typically implement 
 <p>Applications typically implement 
-{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} 
+{@link org.apache.hadoop.mapred.Mapper#map(Object, Object, OutputCollector, Reporter)} 
 and
 and
-{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} 
+{@link org.apache.hadoop.mapred.Reducer#reduce(Object, Iterator, OutputCollector, Reporter)} 
 methods.  The application-writer also specifies various facets of the job such
 methods.  The application-writer also specifies various facets of the job such
 as input and output locations, the <tt>Partitioner</tt>, <tt>InputFormat</tt> 
 as input and output locations, the <tt>Partitioner</tt>, <tt>InputFormat</tt> 
 &amp; <tt>OutputFormat</tt> implementations to be used etc. as 
 &amp; <tt>OutputFormat</tt> implementations to be used etc. as 
@@ -94,7 +94,7 @@ and optionally monitors it.</p>
 <p>The framework spawns one map task per 
 <p>The framework spawns one map task per 
 {@link org.apache.hadoop.mapred.InputSplit} generated by the 
 {@link org.apache.hadoop.mapred.InputSplit} generated by the 
 {@link org.apache.hadoop.mapred.InputFormat} of the job and calls 
 {@link org.apache.hadoop.mapred.InputFormat} of the job and calls 
-{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} 
+{@link org.apache.hadoop.mapred.Mapper#map(Object, Object, OutputCollector, Reporter)} 
 with each &lt;key, value&gt; pair read by the 
 with each &lt;key, value&gt; pair read by the 
 {@link org.apache.hadoop.mapred.RecordReader} from the <tt>InputSplit</tt> for 
 {@link org.apache.hadoop.mapred.RecordReader} from the <tt>InputSplit</tt> for 
 the task. The intermediate outputs of the maps are then grouped by <tt>key</tt>s
 the task. The intermediate outputs of the maps are then grouped by <tt>key</tt>s
@@ -104,7 +104,7 @@ the number of partitions is exactly the number of reduce tasks for the job.</p>
 
 
 <p>The reduce tasks fetch the sorted intermediate outputs of the maps, via http, 
 <p>The reduce tasks fetch the sorted intermediate outputs of the maps, via http, 
 merge the &lt;key, value&gt; pairs and call 
 merge the &lt;key, value&gt; pairs and call 
-{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} 
+{@link org.apache.hadoop.mapred.Reducer#reduce(Object, Iterator, OutputCollector, Reporter)} 
 for each &lt;key, list of values&gt; pair. The output of the reduce tasks' is 
 for each &lt;key, list of values&gt; pair. The output of the reduce tasks' is 
 stored on the <tt>FileSystem</tt> by the 
 stored on the <tt>FileSystem</tt> by the 
 {@link org.apache.hadoop.mapred.RecordWriter} provided by the
 {@link org.apache.hadoop.mapred.RecordWriter} provided by the
@@ -117,7 +117,7 @@ public class Grep extends Configured implements Tool {
   // <i>map: Search for the pattern specified by 'grep.mapper.regex' &amp;</i>
   // <i>map: Search for the pattern specified by 'grep.mapper.regex' &amp;</i>
   //      <i>'grep.mapper.regex.group'</i>
   //      <i>'grep.mapper.regex.group'</i>
 
 
-  class GrepMapper&lt;K extends WritableComparable, Text&gt; 
+  class GrepMapper&lt;K, Text&gt; 
   extends MapReduceBase  implements Mapper&lt;K, Text, Text, LongWritable&gt; {
   extends MapReduceBase  implements Mapper&lt;K, Text, Text, LongWritable&gt; {
 
 
     private Pattern pattern;
     private Pattern pattern;
@@ -142,7 +142,7 @@ public class Grep extends Configured implements Tool {
 
 
   // <i>reduce: Count the number of occurrences of the pattern</i>
   // <i>reduce: Count the number of occurrences of the pattern</i>
 
 
-  class GrepReducer&lt;K extends WritableComparable&gt; extends MapReduceBase
+  class GrepReducer&lt;K&gt; extends MapReduceBase
   implements Reducer&lt;K, LongWritable, K, LongWritable&gt; {
   implements Reducer&lt;K, LongWritable, K, LongWritable&gt; {
 
 
     public void reduce(K key, Iterator&lt;LongWritable&gt; values,
     public void reduce(K key, Iterator&lt;LongWritable&gt; values,

+ 1 - 1
src/java/org/apache/hadoop/util/CopyFiles.java

@@ -818,7 +818,7 @@ public class CopyFiles implements Tool {
     SequenceFile.Reader in = null;
     SequenceFile.Reader in = null;
     try {
     try {
       SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
       SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
-        new Text.Comparator(), Text.class, conf);
+        new Text.Comparator(), Text.class, Text.class, conf);
       sorter.sort(file, sorted);
       sorter.sort(file, sorted);
       in = new SequenceFile.Reader(fs, sorted, conf);
       in = new SequenceFile.Reader(fs, sorted, conf);
 
 

+ 8 - 5
src/test/org/apache/hadoop/fs/TestFileSystem.java

@@ -319,7 +319,9 @@ public class TestFileSystem extends TestCase {
   }
   }
 
 
 
 
-  public static class SeekMapper extends Configured implements Mapper {
+  public static class SeekMapper<K> extends Configured
+    implements Mapper<WritableComparable, LongWritable, K, LongWritable> {
+    
     private Random random = new Random();
     private Random random = new Random();
     private byte[] check  = new byte[BUFFER_SIZE];
     private byte[] check  = new byte[BUFFER_SIZE];
     private FileSystem fs;
     private FileSystem fs;
@@ -342,11 +344,12 @@ public class TestFileSystem extends TestCase {
       fastCheck = job.getBoolean("fs.test.fastCheck", false);
       fastCheck = job.getBoolean("fs.test.fastCheck", false);
     }
     }
 
 
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector collector, Reporter reporter)
+    public void map(WritableComparable key, LongWritable value,
+                    OutputCollector<K, LongWritable> collector,
+                    Reporter reporter)
       throws IOException {
       throws IOException {
-      String name = ((UTF8)key).toString();
-      long size = ((LongWritable)value).get();
+      String name = key.toString();
+      long size = value.get();
       long seed = Long.parseLong(name);
       long seed = Long.parseLong(name);
 
 
       if (size == 0) return;
       if (size == 0) return;

+ 2 - 2
src/test/org/apache/hadoop/io/FileBench.java

@@ -142,8 +142,8 @@ public class FileBench extends Configured implements Tool {
     RecordReader rr = inf.getRecordReader(
     RecordReader rr = inf.getRecordReader(
         new FileSplit(pin, 0, in.getLen(), conf), conf, Reporter.NULL);
         new FileSplit(pin, 0, in.getLen(), conf), conf, Reporter.NULL);
     try {
     try {
-      WritableComparable key = rr.createKey();
-      Writable val = rr.createValue();
+      Object key = rr.createKey();
+      Object val = rr.createValue();
       Date start = new Date();
       Date start = new Date();
       while (rr.next(key, val));
       while (rr.next(key, val));
       Date end = new Date();
       Date end = new Date();

+ 2 - 1
src/test/org/apache/hadoop/io/TestSequenceFile.java

@@ -312,7 +312,8 @@ public class TestSequenceFile extends TestCase {
                                                int megabytes, int factor) {
                                                int megabytes, int factor) {
     SequenceFile.Sorter sorter = 
     SequenceFile.Sorter sorter = 
       fast
       fast
-      ? new SequenceFile.Sorter(fs, new RandomDatum.Comparator(), RandomDatum.class, conf)
+      ? new SequenceFile.Sorter(fs, new RandomDatum.Comparator(),
+                                RandomDatum.class, RandomDatum.class, conf)
       : new SequenceFile.Sorter(fs, RandomDatum.class, RandomDatum.class, conf);
       : new SequenceFile.Sorter(fs, RandomDatum.class, RandomDatum.class, conf);
     sorter.setMemory(megabytes * 1024*1024);
     sorter.setMemory(megabytes * 1024*1024);
     sorter.setFactor(factor);
     sorter.setFactor(factor);

+ 2 - 3
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -255,15 +255,14 @@ public class TestMiniMRLocalFS extends TestCase {
   }
   }
 
 
   static class MyOutputFormat implements OutputFormat {
   static class MyOutputFormat implements OutputFormat {
-    static class MyRecordWriter implements RecordWriter {
+    static class MyRecordWriter implements RecordWriter<Object, Object> {
       private DataOutputStream out;
       private DataOutputStream out;
       
       
       public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
       public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
         out = outputFile.getFileSystem(job).create(outputFile);
         out = outputFile.getFileSystem(job).create(outputFile);
       }
       }
       
       
-      public void write(WritableComparable key, 
-                        Writable value) throws IOException {
+      public void write(Object key, Object value) throws IOException {
         out.writeBytes(key.toString() + "\t" + value.toString() + "\n");
         out.writeBytes(key.toString() + "\t" + value.toString() + "\n");
       }
       }