Selaa lähdekoodia

HADOOP-6165. Add metadata to Serializations.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@810756 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 16 vuotta sitten
vanhempi
commit
d6428581ff
23 muutettua tiedostoa jossa 707 lisäystä ja 104 poistoa
  1. 2 0
      CHANGES.txt
  2. 1 1
      src/java/core-default.xml
  3. 15 13
      src/java/org/apache/hadoop/io/DefaultStringifier.java
  4. 36 14
      src/java/org/apache/hadoop/io/SequenceFile.java
  5. 1 0
      src/java/org/apache/hadoop/io/serializer/Deserializer.java
  6. 46 0
      src/java/org/apache/hadoop/io/serializer/DeserializerBase.java
  7. 7 0
      src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
  8. 47 0
      src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java
  9. 85 0
      src/java/org/apache/hadoop/io/serializer/LegacySerialization.java
  10. 54 0
      src/java/org/apache/hadoop/io/serializer/LegacySerializer.java
  11. 1 0
      src/java/org/apache/hadoop/io/serializer/Serialization.java
  12. 91 0
      src/java/org/apache/hadoop/io/serializer/SerializationBase.java
  13. 51 13
      src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
  14. 1 0
      src/java/org/apache/hadoop/io/serializer/Serializer.java
  15. 42 0
      src/java/org/apache/hadoop/io/serializer/SerializerBase.java
  16. 37 15
      src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
  17. 63 0
      src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
  18. 19 4
      src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
  19. 37 24
      src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
  20. 18 5
      src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
  21. 14 7
      src/java/org/apache/hadoop/util/ReflectionUtils.java
  22. 24 8
      src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java
  23. 15 0
      src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java

+ 2 - 0
CHANGES.txt

@@ -166,6 +166,8 @@ Trunk (unreleased changes)
     the io package and makes it available to other users (MAPREDUCE-318). 
     (Jothi Padmanabhan via ddas)
 
+    HADOOP-6165. Add metadata to Serializations. (tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

+ 1 - 1
src/java/core-default.xml

@@ -101,7 +101,7 @@
 
 <property>
   <name>io.serializations</name>
-  <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
+  <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.apache.hadoop.io.serializer.avro.AvroGenericSerialization</value>
   <description>A list of serialization classes that can be used for
   obtaining serializers and deserializers.</description>
 </property>

+ 15 - 13
src/java/org/apache/hadoop/io/DefaultStringifier.java

@@ -21,20 +21,21 @@ package org.apache.hadoop.io;
 import java.io.IOException;
 import java.nio.charset.UnsupportedCharsetException;
 import java.util.ArrayList;
+import java.util.Map;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.util.GenericsUtil;
 
 /**
  * DefaultStringifier is the default implementation of the {@link Stringifier}
  * interface which stringifies the objects using base64 encoding of the
- * serialized version of the objects. The {@link Serializer} and
- * {@link Deserializer} are obtained from the {@link SerializationFactory}.
+ * serialized version of the objects. The {@link SerializerBase} and
+ * {@link DeserializerBase} are obtained from the {@link SerializationFactory}.
  * <br>
  * DefaultStringifier offers convenience methods to store/load objects to/from
  * the configuration.
@@ -45,9 +46,9 @@ public class DefaultStringifier<T> implements Stringifier<T> {
 
   private static final String SEPARATOR = ",";
 
-  private Serializer<T> serializer;
+  private SerializerBase<T> serializer;
 
-  private Deserializer<T> deserializer;
+  private DeserializerBase<T> deserializer;
 
   private DataInputBuffer inBuf;
 
@@ -56,8 +57,9 @@ public class DefaultStringifier<T> implements Stringifier<T> {
   public DefaultStringifier(Configuration conf, Class<T> c) {
 
     SerializationFactory factory = new SerializationFactory(conf);
-    this.serializer = factory.getSerializer(c);
-    this.deserializer = factory.getDeserializer(c);
+    Map<String, String> metadata = SerializationBase.getMetadataFromClass(c);
+    this.serializer = factory.getSerializer(metadata);
+    this.deserializer = factory.getDeserializer(metadata);
     this.inBuf = new DataInputBuffer();
     this.outBuf = new DataOutputBuffer();
     try {
@@ -102,7 +104,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
    * @param item the object to be stored
    * @param keyName the name of the key to use
    * @throws IOException : forwards Exceptions from the underlying 
-   * {@link Serialization} classes. 
+   * {@link SerializationBase} classes. 
    */
   public static <K> void store(Configuration conf, K item, String keyName)
   throws IOException {
@@ -122,7 +124,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
    * @param itemClass the class of the item
    * @return restored object
    * @throws IOException : forwards Exceptions from the underlying 
-   * {@link Serialization} classes.
+   * {@link SerializationBase} classes.
    */
   public static <K> K load(Configuration conf, String keyName,
       Class<K> itemClass) throws IOException {
@@ -145,7 +147,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
    * @param keyName the name of the key to use
    * @throws IndexOutOfBoundsException if the items array is empty
    * @throws IOException : forwards Exceptions from the underlying 
-   * {@link Serialization} classes.         
+   * {@link SerializationBase} classes.         
    */
   public static <K> void storeArray(Configuration conf, K[] items,
       String keyName) throws IOException {
@@ -173,7 +175,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
    * @param itemClass the class of the item
    * @return restored object
    * @throws IOException : forwards Exceptions from the underlying 
-   * {@link Serialization} classes.
+   * {@link SerializationBase} classes.
    */
   public static <K> K[] loadArray(Configuration conf, String keyName,
       Class<K> itemClass) throws IOException {

+ 36 - 14
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -33,9 +33,10 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progress;
@@ -705,6 +706,14 @@ public class SequenceFile {
       return new TreeMap<Text, Text>(this.theMetadata);
     }
     
+    public Map<String, String> getMetadataAsStringMap() {
+      Map<String, String> map = new HashMap<String, String>();
+      for (Map.Entry<Text, Text> entry : theMetadata.entrySet()) {
+        map.put(entry.getKey().toString(), entry.getValue().toString());
+      }
+      return map;
+    }
+    
     public void write(DataOutput out) throws IOException {
       out.writeInt(this.theMetadata.size());
       Iterator<Map.Entry<Text, Text>> iter =
@@ -801,9 +810,9 @@ public class SequenceFile {
     Metadata metadata = null;
     Compressor compressor = null;
     
-    protected Serializer keySerializer;
-    protected Serializer uncompressedValSerializer;
-    protected Serializer compressedValSerializer;
+    protected SerializerBase keySerializer;
+    protected SerializerBase uncompressedValSerializer;
+    protected SerializerBase compressedValSerializer;
     
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -914,9 +923,10 @@ public class SequenceFile {
       this.codec = codec;
       this.metadata = metadata;
       SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer = getSerializer(serializationFactory, keyClass, metadata);
       this.keySerializer.open(buffer);
-      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
+      this.uncompressedValSerializer = getSerializer(serializationFactory,
+        valClass, metadata);
       this.uncompressedValSerializer.open(buffer);
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
@@ -924,11 +934,20 @@ public class SequenceFile {
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
-        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
+        this.compressedValSerializer = getSerializer(serializationFactory,
+          valClass, metadata);
         this.compressedValSerializer.open(deflateOut);
       }
     }
     
+    @SuppressWarnings("unchecked")
+    private SerializerBase getSerializer(SerializationFactory sf, Class c,
+	Metadata metadata) {
+      Map<String, String> stringMetadata = metadata.getMetadataAsStringMap();
+      stringMetadata.put(SerializationBase.CLASS_KEY, c.getName());
+      return sf.getSerializer(stringMetadata);
+    }
+    
     /** Returns the class of keys in this file. */
     public Class getKeyClass() { return keyClass; }
 
@@ -1412,8 +1431,8 @@ public class SequenceFile {
     private DataInputStream valIn = null;
     private Decompressor valDecompressor = null;
     
-    private Deserializer keyDeserializer;
-    private Deserializer valDeserializer;
+    private DeserializerBase keyDeserializer;
+    private DeserializerBase valDeserializer;
 
     /** Open the named file. */
     public Reader(FileSystem fs, Path file, Configuration conf)
@@ -1563,21 +1582,24 @@ public class SequenceFile {
         SerializationFactory serializationFactory =
           new SerializationFactory(conf);
         this.keyDeserializer =
-          getDeserializer(serializationFactory, getKeyClass());
+          getDeserializer(serializationFactory, getKeyClass(), metadata);
         if (!blockCompressed) {
           this.keyDeserializer.open(valBuffer);
         } else {
           this.keyDeserializer.open(keyIn);
         }
         this.valDeserializer =
-          getDeserializer(serializationFactory, getValueClass());
+          getDeserializer(serializationFactory, getValueClass(), metadata);
         this.valDeserializer.open(valIn);
       }
     }
     
     @SuppressWarnings("unchecked")
-    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
-      return sf.getDeserializer(c);
+    private DeserializerBase getDeserializer(SerializationFactory sf, Class c,
+	Metadata metadata) {
+      Map<String, String> stringMetadata = metadata.getMetadataAsStringMap();
+      stringMetadata.put(SerializationBase.CLASS_KEY, c.getName());
+      return sf.getDeserializer(stringMetadata);
     }
     
     /** Close the file. */

+ 1 - 0
src/java/org/apache/hadoop/io/serializer/Deserializer.java

@@ -34,6 +34,7 @@ import java.io.InputStream;
  * </p>
  * @param <T>
  */
+@Deprecated
 public interface Deserializer<T> {
   /**
    * <p>Prepare the deserializer for reading.</p>

+ 46 - 0
src/java/org/apache/hadoop/io/serializer/DeserializerBase.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configured;
+
+public abstract class DeserializerBase<T> extends Configured
+  implements Closeable, Deserializer<T> {
+  
+  /**
+   * <p>Prepare the deserializer for reading.</p>
+   */
+  public abstract void open(InputStream in) throws IOException;
+  
+  /**
+   * <p>
+   * Deserialize the next object from the underlying input stream.
+   * If the object <code>t</code> is non-null then this deserializer
+   * <i>may</i> set its internal state to the next object read from the input
+   * stream. Otherwise, if the object <code>t</code> is null a new
+   * deserialized object will be created.
+   * </p>
+   * @return the deserialized object
+   */
+  public abstract T deserialize(T t) throws IOException;
+  
+}

+ 7 - 0
src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java

@@ -52,6 +52,13 @@ public abstract class DeserializerComparator<T> implements RawComparator<T> {
     this.deserializer.open(buffer);
   }
 
+  protected DeserializerComparator(DeserializerBase<T> deserializer)
+    throws IOException {
+    
+    this.deserializer = deserializer;
+    this.deserializer.open(buffer);
+  }
+
   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
     try {
       

+ 47 - 0
src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+@SuppressWarnings("deprecation")
+class LegacyDeserializer<T> extends DeserializerBase<T> {
+  
+  private Deserializer<T> deserializer;
+
+  public LegacyDeserializer(Deserializer<T> deserializer) {
+    this.deserializer = deserializer;
+  }
+
+  @Override
+  public void open(InputStream in) throws IOException {
+    deserializer.open(in);
+  }
+  
+  @Override
+  public T deserialize(T t) throws IOException {
+    return deserializer.deserialize(t);
+  }
+
+  @Override
+  public void close() throws IOException {
+    deserializer.close();
+  }
+
+}

+ 85 - 0
src/java/org/apache/hadoop/io/serializer/LegacySerialization.java

@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * Wraps a legacy {@link Serialization} as a {@link SerializationBase}.
+ * </p>
+ * 
+ * @param <T>
+ */
+@SuppressWarnings("deprecation")
+class LegacySerialization<T> extends SerializationBase<T> {
+
+  private Serialization<T> serialization;
+
+  public LegacySerialization(Serialization<T> serialization,
+      Configuration conf) {
+    this.serialization = serialization;
+    setConf(conf);
+  }
+  
+  Serialization<T> getUnderlyingSerialization() {
+    return serialization;
+  }
+
+  @Deprecated
+  @Override
+  public boolean accept(Class<?> c) {
+    return serialization.accept(c);
+  }
+
+  @Deprecated
+  @Override
+  public Deserializer<T> getDeserializer(Class<T> c) {
+    return serialization.getDeserializer(c);
+  }
+
+  @Deprecated
+  @Override
+  public Serializer<T> getSerializer(Class<T> c) {
+    return serialization.getSerializer(c);
+  }
+  
+  @Override
+  public boolean accept(Map<String, String> metadata) {
+    Class<?> c = getClassFromMetadata(metadata);
+    return accept(c);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public SerializerBase<T> getSerializer(Map<String, String> metadata) {
+    Class<T> c = (Class<T>) getClassFromMetadata(metadata);
+    return new LegacySerializer<T>(getSerializer(c));
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
+    Class<T> c = (Class<T>) getClassFromMetadata(metadata);
+    return new LegacyDeserializer<T>(getDeserializer(c));
+  }
+
+}

+ 54 - 0
src/java/org/apache/hadoop/io/serializer/LegacySerializer.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+@SuppressWarnings("deprecation")
+class LegacySerializer<T> extends SerializerBase<T> {
+  
+  private Serializer<T> serializer;
+
+  public LegacySerializer(Serializer<T> serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public void open(OutputStream out) throws IOException {
+    serializer.open(out);
+  }
+
+  @Override
+  public void serialize(T t) throws IOException {
+    serializer.serialize(t);
+  }
+
+  @Override
+  public void close() throws IOException {
+    serializer.close();
+  }
+
+  @Override
+  public Map<String, String> getMetadata() throws IOException {
+    return Collections.<String, String>emptyMap();
+  }
+
+}

+ 1 - 0
src/java/org/apache/hadoop/io/serializer/Serialization.java

@@ -24,6 +24,7 @@ package org.apache.hadoop.io.serializer;
  * </p>
  * @param <T>
  */
+@Deprecated
 public interface Serialization<T> {
   
   /**

+ 91 - 0
src/java/org/apache/hadoop/io/serializer/SerializationBase.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * <p>
+ * Encapsulates a {@link SerializerBase}/{@link DeserializerBase} pair.
+ * </p>
+ * 
+ * @param <T>
+ */
+public abstract class SerializationBase<T> extends Configured
+  implements Serialization<T> {
+    
+  public static final String SERIALIZATION_KEY = "Serialization-Class";
+  public static final String CLASS_KEY = "Serialized-Class";
+  
+  public static Map<String, String> getMetadataFromClass(Class<?> c) {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(CLASS_KEY, c.getName());
+    return metadata;
+  }
+  
+  @Deprecated
+  @Override
+  public boolean accept(Class<?> c) {
+    return accept(getMetadataFromClass(c));
+  }
+
+  @Deprecated
+  @Override
+  public Deserializer<T> getDeserializer(Class<T> c) {
+    return getDeserializer(getMetadataFromClass(c));
+  }
+
+  @Deprecated
+  @Override
+  public Serializer<T> getSerializer(Class<T> c) {
+    return getSerializer(getMetadataFromClass(c));
+  }
+
+  /**
+   * Allows clients to test whether this {@link SerializationBase} supports the
+   * given metadata.
+   */
+  public abstract boolean accept(Map<String, String> metadata);
+
+  /**
+   * @return a {@link SerializerBase} for the given metadata.
+   */
+  public abstract SerializerBase<T> getSerializer(Map<String, String> metadata);
+
+  /**
+   * @return a {@link DeserializerBase} for the given metadata.
+   */
+  public abstract DeserializerBase<T> getDeserializer(
+      Map<String, String> metadata);
+  
+  protected Class<?> getClassFromMetadata(Map<String, String> metadata) {
+    String classname = metadata.get(CLASS_KEY);
+    if (classname == null) {
+      return null;
+    }
+    try {
+      return getConf().getClassByName(classname);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+}

+ 51 - 13
src/java/org/apache/hadoop/io/serializer/SerializationFactory.java

@@ -20,11 +20,13 @@ package org.apache.hadoop.io.serializer;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization;
 import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
 import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -32,7 +34,7 @@ import org.apache.hadoop.util.StringUtils;
 
 /**
  * <p>
- * A factory for {@link Serialization}s.
+ * A factory for {@link SerializationBase}s.
  * </p>
  */
 public class SerializationFactory extends Configured {
@@ -40,7 +42,10 @@ public class SerializationFactory extends Configured {
   private static final Log LOG =
     LogFactory.getLog(SerializationFactory.class.getName());
 
-  private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
+  private List<SerializationBase<?>> serializations =
+    new ArrayList<SerializationBase<?>>();
+  private List<SerializationBase<?>> legacySerializations =
+    new ArrayList<SerializationBase<?>>();
   
   /**
    * <p>
@@ -54,7 +59,8 @@ public class SerializationFactory extends Configured {
     for (String serializerName : conf.getStrings("io.serializations", 
       new String[]{WritableSerialization.class.getName(), 
         AvroSpecificSerialization.class.getName(), 
-        AvroReflectSerialization.class.getName()})) {
+        AvroReflectSerialization.class.getName(),
+        AvroGenericSerialization.class.getName()})) {
       add(conf, serializerName);
     }
   }
@@ -62,30 +68,62 @@ public class SerializationFactory extends Configured {
   @SuppressWarnings("unchecked")
   private void add(Configuration conf, String serializationName) {
     try {
-      
-      Class<? extends Serialization> serializionClass =
-        (Class<? extends Serialization>) conf.getClassByName(serializationName);
-      serializations.add((Serialization)
-          ReflectionUtils.newInstance(serializionClass, getConf()));
+      Class<?> serializationClass = conf.getClassByName(serializationName);
+      if (SerializationBase.class.isAssignableFrom(serializationClass)) {
+	serializations.add((SerializationBase)
+	    ReflectionUtils.newInstance(serializationClass, getConf()));	
+      } else if (Serialization.class.isAssignableFrom(serializationClass)) {
+	Serialization serialization = (Serialization)
+	    ReflectionUtils.newInstance(serializationClass, getConf());
+	legacySerializations.add(new LegacySerialization(serialization,
+	    getConf()));	
+      } else {
+	LOG.warn("Serialization class " + serializationName + " is not an " +
+			"instance of Serialization or BaseSerialization.");
+      }
     } catch (ClassNotFoundException e) {
-      LOG.warn("Serilization class not found: " +
+      LOG.warn("Serialization class not found: " +
           StringUtils.stringifyException(e));
     }
   }
 
+  @Deprecated
   public <T> Serializer<T> getSerializer(Class<T> c) {
     return getSerialization(c).getSerializer(c);
   }
 
+  @Deprecated
   public <T> Deserializer<T> getDeserializer(Class<T> c) {
     return getSerialization(c).getDeserializer(c);
   }
 
-  @SuppressWarnings("unchecked")
+  @Deprecated
   public <T> Serialization<T> getSerialization(Class<T> c) {
-    for (Serialization serialization : serializations) {
-      if (serialization.accept(c)) {
-        return (Serialization<T>) serialization;
+    return getSerialization(SerializationBase.getMetadataFromClass(c));
+  }
+  
+  public <T> SerializerBase<T> getSerializer(Map<String, String> metadata) {
+    SerializationBase<T> serialization = getSerialization(metadata);
+    return serialization.getSerializer(metadata);
+  }
+    
+  public <T> DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
+    SerializationBase<T> serialization = getSerialization(metadata);
+    return serialization.getDeserializer(metadata);
+  }
+    
+  @SuppressWarnings("unchecked")
+  public <T> SerializationBase<T> getSerialization(Map<String, String> metadata) {
+    for (SerializationBase serialization : serializations) {
+      if (serialization.accept(metadata)) {
+        return (SerializationBase<T>) serialization;
+      }
+    }
+    // Look in the legacy serializations last, since they ignore
+    // non-class metadata
+    for (SerializationBase serialization : legacySerializations) {
+      if (serialization.accept(metadata)) {
+        return (SerializationBase<T>) serialization;
       }
     }
     return null;

+ 1 - 0
src/java/org/apache/hadoop/io/serializer/Serializer.java

@@ -34,6 +34,7 @@ import java.io.OutputStream;
  * </p>
  * @param <T>
  */
+@Deprecated
 public interface Serializer<T> {
   /**
    * <p>Prepare the serializer for writing.</p>

+ 42 - 0
src/java/org/apache/hadoop/io/serializer/SerializerBase.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+
+public abstract class SerializerBase<T> extends Configured
+  implements Closeable, Serializer<T> {
+
+  /**
+   * <p>Prepare the serializer for writing.</p>
+   */
+  public abstract void open(OutputStream out) throws IOException;
+  
+  /**
+   * <p>Serialize <code>t</code> to the underlying output stream.</p>
+   */
+  public abstract void serialize(T t) throws IOException;
+  
+  public abstract Map<String, String> getMetadata() throws IOException;
+  
+}

+ 37 - 15
src/java/org/apache/hadoop/io/serializer/WritableSerialization.java

@@ -23,22 +23,20 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * A {@link Serialization} for {@link Writable}s that delegates to
+ * A {@link SerializationBase} for {@link Writable}s that delegates to
  * {@link Writable#write(java.io.DataOutput)} and
  * {@link Writable#readFields(java.io.DataInput)}.
  */
-public class WritableSerialization extends Configured 
-  implements Serialization<Writable> {
+public class WritableSerialization extends SerializationBase<Writable> {
   
-  static class WritableDeserializer extends Configured 
-    implements Deserializer<Writable> {
+  static class WritableDeserializer extends DeserializerBase<Writable> {
 
     private Class<?> writableClass;
     private DataInputStream dataIn;
@@ -48,6 +46,7 @@ public class WritableSerialization extends Configured
       this.writableClass = c;
     }
     
+    @Override
     public void open(InputStream in) {
       if (in instanceof DataInputStream) {
         dataIn = (DataInputStream) in;
@@ -56,6 +55,7 @@ public class WritableSerialization extends Configured
       }
     }
     
+    @Override
     public Writable deserialize(Writable w) throws IOException {
       Writable writable;
       if (w == null) {
@@ -68,16 +68,23 @@ public class WritableSerialization extends Configured
       return writable;
     }
 
+    @Override
     public void close() throws IOException {
       dataIn.close();
     }
     
   }
   
-  static class WritableSerializer implements Serializer<Writable> {
-
+  static class WritableSerializer extends SerializerBase<Writable> {
+    
+    private Map<String, String> metadata;
     private DataOutputStream dataOut;
     
+    public WritableSerializer(Map<String, String> metadata) {
+      this.metadata = metadata;
+    }
+    
+    @Override
     public void open(OutputStream out) {
       if (out instanceof DataOutputStream) {
         dataOut = (DataOutputStream) out;
@@ -86,26 +93,41 @@ public class WritableSerialization extends Configured
       }
     }
 
+    @Override
     public void serialize(Writable w) throws IOException {
       w.write(dataOut);
     }
 
+    @Override
     public void close() throws IOException {
       dataOut.close();
     }
 
-  }
+    @Override
+    public Map<String, String> getMetadata() throws IOException {
+      return metadata;
+    }
 
-  public boolean accept(Class<?> c) {
-    return Writable.class.isAssignableFrom(c);
   }
 
-  public Deserializer<Writable> getDeserializer(Class<Writable> c) {
-    return new WritableDeserializer(getConf(), c);
+  @Override
+  public boolean accept(Map<String, String> metadata) {
+    if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
+      return true;
+    }
+    Class<?> c = getClassFromMetadata(metadata);
+    return c == null ? false : Writable.class.isAssignableFrom(c);
   }
 
-  public Serializer<Writable> getSerializer(Class<Writable> c) {
-    return new WritableSerializer();
+  @Override
+  public SerializerBase<Writable> getSerializer(Map<String, String> metadata) {
+    return new WritableSerializer(metadata);
+  }
+  
+  @Override
+  public DeserializerBase<Writable> getDeserializer(Map<String, String> metadata) {
+    Class<?> c = getClassFromMetadata(metadata);
+    return new WritableDeserializer(getConf(), c);
   }
 
 }

+ 63 - 0
src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer.avro;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.io.serializer.SerializationBase;
+
+/**
+ * Serialization for Avro Generic classes. For a class to be accepted by this 
+ * serialization it must have metadata with key
+ * {@link SerializationBase#SERIALIZATION_KEY} set to {@link AvroGenericSerialization}'s
+ * fully-qualified classname.
+ * The schema used is the one set by {@link AvroSerialization#AVRO_SCHEMA_KEY}.
+ */
+@SuppressWarnings("unchecked")
+public class AvroGenericSerialization extends AvroSerialization<Object> {
+  
+  @Override
+  public boolean accept(Map<String, String> metadata) {
+    return metadata.get(AVRO_SCHEMA_KEY) != null;
+  }
+
+  @Override
+  protected DatumReader getReader(Map<String, String> metadata) {
+    Schema schema = Schema.parse(metadata.get(AVRO_SCHEMA_KEY));
+    return new GenericDatumReader<Object>(schema);
+  }
+
+  @Override
+  protected Schema getSchema(Object t, Map<String, String> metadata) {
+    String jsonSchema = metadata.get(AVRO_SCHEMA_KEY);
+    return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.induce(t);
+  }
+
+  @Override
+  protected DatumWriter getWriter(Map<String, String> metadata) {
+    return new GenericDatumWriter<Object>();
+  }
+
+}

+ 19 - 4
src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.io.serializer.avro;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.avro.Schema;
@@ -27,6 +28,7 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 /**
  * Serialization for Avro Reflect classes. For a class to be accepted by this 
@@ -47,10 +49,18 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
 
   private Set<String> packages; 
 
-  public synchronized boolean accept(Class<?> c) {
+  @Override
+  public synchronized boolean accept(Map<String, String> metadata) {
     if (packages == null) {
       getPackages();
     }
+    if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
+      return true;
+    }
+    Class<?> c = getClassFromMetadata(metadata);
+    if (c == null) {
+      return false;
+    }
     return AvroReflectSerializable.class.isAssignableFrom(c) || 
       packages.contains(c.getPackage().getName());
   }
@@ -65,8 +75,11 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
     }
   }
 
-  protected DatumReader getReader(Class<Object> clazz) {
+  @Override
+  protected DatumReader getReader(Map<String, String> metadata) {
     try {
+      Class<SpecificRecord> clazz = (Class<SpecificRecord>)
+        getClassFromMetadata(metadata);
       String prefix =  
         ((clazz.getEnclosingClass() == null 
             || "null".equals(clazz.getEnclosingClass().getName())) ? 
@@ -78,11 +91,13 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
     }
   }
 
-  protected Schema getSchema(Object t) {
+  @Override
+  protected Schema getSchema(Object t, Map<String, String> metadata) {
     return ReflectData.getSchema(t.getClass());
   }
 
-  protected DatumWriter getWriter(Class<Object> clazz) {
+  @Override
+  protected DatumWriter getWriter(Map<String, String> metadata) {
     return new ReflectDatumWriter();
   }
 

+ 37 - 24
src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java

@@ -21,92 +21,105 @@ package org.apache.hadoop.io.serializer.avro;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 
 /**
  * Base class for providing serialization to Avro types.
  */
-public abstract class AvroSerialization<T> extends Configured 
-                                        implements Serialization<T>{
+public abstract class AvroSerialization<T> extends SerializationBase<T> {
+  
+  public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
 
-  public Deserializer<T> getDeserializer(Class<T> c) {
-    return new AvroDeserializer(c);
+  public DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
+    return new AvroDeserializer(metadata);
   }
 
-  public Serializer<T> getSerializer(Class<T> c) {
-    return new AvroSerializer(c);
+  public SerializerBase<T> getSerializer(Map<String, String> metadata) {
+    return new AvroSerializer(metadata);
   }
 
   /**
-   * Return an Avro Schema instance for the given class.
+   * Return an Avro Schema instance for the given class and metadata.
    */
-  protected abstract Schema getSchema(T t);
+  protected abstract Schema getSchema(T t, Map<String, String> metadata);
 
   /**
-   * Create and return Avro DatumWriter for the given class.
+   * Create and return Avro DatumWriter for the given metadata.
    */
-  protected abstract DatumWriter<T> getWriter(Class<T> clazz);
+  protected abstract DatumWriter<T> getWriter(Map<String, String> metadata);
 
   /**
-   * Create and return Avro DatumReader for the given class.
+   * Create and return Avro DatumReader for the given metadata.
    */
-  protected abstract DatumReader<T> getReader(Class<T> clazz);
+  protected abstract DatumReader<T> getReader(Map<String, String> metadata);
 
-  class AvroSerializer implements Serializer<T> {
+  class AvroSerializer extends SerializerBase<T> {
 
+    private Map<String, String> metadata;
     private DatumWriter<T> writer;
     private BinaryEncoder encoder;
     private OutputStream outStream;
-    protected Class<T> clazz;
 
-    AvroSerializer(Class<T> clazz) {
-      writer = getWriter(clazz);
+    AvroSerializer(Map<String, String> metadata) {
+      this.metadata = metadata;
+      writer = getWriter(metadata);
     }
 
+    @Override
     public void close() throws IOException {
       encoder.flush();
       outStream.close();
     }
 
+    @Override
     public void open(OutputStream out) throws IOException {
       outStream = out;
       encoder = new BinaryEncoder(out);
     }
 
+    @Override
     public void serialize(T t) throws IOException {
-      writer.setSchema(getSchema(t));
+      writer.setSchema(getSchema(t, metadata));
       writer.write(t, encoder);
     }
 
+    @Override
+    public Map<String, String> getMetadata() throws IOException {
+      return metadata;
+    }
+
   }
 
-  class AvroDeserializer implements Deserializer<T> {
+  class AvroDeserializer extends DeserializerBase<T> {
 
     private DatumReader<T> reader;
     private BinaryDecoder decoder;
     private InputStream inStream;
 
-    AvroDeserializer(Class<T> clazz) {
-      this.reader = getReader(clazz);
+    AvroDeserializer(Map<String, String> metadata) {
+      this.reader = getReader(metadata);
     }
 
+    @Override
     public void close() throws IOException {
       inStream.close();
     }
 
+    @Override
     public T deserialize(T t) throws IOException {
       return reader.read(t, decoder);
     }
 
+    @Override
     public void open(InputStream in) throws IOException {
       inStream = in;
       decoder = new BinaryDecoder(in);

+ 18 - 5
src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java

@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.io.serializer.avro;
 
+import java.util.Map;
+
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -33,23 +36,33 @@ import org.apache.avro.specific.SpecificRecord;
 public class AvroSpecificSerialization 
                           extends AvroSerialization<SpecificRecord>{
 
-  public boolean accept(Class<?> c) {
-    return SpecificRecord.class.isAssignableFrom(c);
+  @Override
+  public boolean accept(Map<String, String> metadata) {
+    if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
+      return true;
+    }
+    Class<?> c = getClassFromMetadata(metadata);
+    return c == null ? false : SpecificRecord.class.isAssignableFrom(c);
   }
 
-  protected DatumReader getReader(Class<SpecificRecord> clazz) {
+  @Override
+  protected DatumReader getReader(Map<String, String> metadata) {
     try {
+      Class<SpecificRecord> clazz = (Class<SpecificRecord>)
+        getClassFromMetadata(metadata);
       return new SpecificDatumReader(clazz.newInstance().schema());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  protected Schema getSchema(SpecificRecord t) {
+  @Override
+  protected Schema getSchema(SpecificRecord t, Map<String, String> metadata) {
     return t.schema();
   }
 
-  protected DatumWriter getWriter(Class<SpecificRecord> clazz) {
+  @Override
+  protected DatumWriter getWriter(Map<String, String> metadata) {
     return new SpecificDatumWriter();
   }
 

+ 14 - 7
src/java/org/apache/hadoop/util/ReflectionUtils.java

@@ -18,21 +18,27 @@
 
 package org.apache.hadoop.util;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
-import java.io.*;
-import java.lang.management.*;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializerBase;
 
 /**
  * General reflection utils
@@ -269,11 +275,12 @@ public class ReflectionUtils {
     buffer.outBuffer.reset();
     SerializationFactory factory = getFactory(conf);
     Class<T> cls = (Class<T>) src.getClass();
-    Serializer<T> serializer = factory.getSerializer(cls);
+    Map<String, String> metadata = SerializationBase.getMetadataFromClass(cls);
+    SerializerBase<T> serializer = factory.getSerializer(metadata);
     serializer.open(buffer.outBuffer);
     serializer.serialize(src);
     buffer.moveData();
-    Deserializer<T> deserializer = factory.getDeserializer(cls);
+    DeserializerBase<T> deserializer = factory.getDeserializer(metadata);
     deserializer.open(buffer.inBuffer);
     dst = deserializer.deserialize(dst);
     return dst;

+ 24 - 8
src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.io.serializer;
 
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -33,19 +35,33 @@ public class SerializationTestUtil {
    * @return deserialized item
    */
   public static<K> K testSerialization(Configuration conf, K before) 
-    throws Exception {
-    
+      throws Exception {
+    Map<String, String> metadata =
+      SerializationBase.getMetadataFromClass(GenericsUtil.getClass(before));
+    return testSerialization(conf, metadata, before);
+  }
+  
+  /**
+   * A utility that tests serialization/deserialization. 
+   * @param conf configuration to use, "io.serializations" is read to 
+   * determine the serialization
+   * @param metadata the metadata to pass to the serializer/deserializer
+   * @param <K> the class of the item
+   * @param before item to (de)serialize
+   * @return deserialized item
+   */
+  public static <K> K testSerialization(Configuration conf, 
+      Map<String, String> metadata, K before) throws Exception {
+
     SerializationFactory factory = new SerializationFactory(conf);
-    Serializer<K> serializer 
-      = factory.getSerializer(GenericsUtil.getClass(before));
-    Deserializer<K> deserializer 
-      = factory.getDeserializer(GenericsUtil.getClass(before));
-   
+    SerializerBase<K> serializer = factory.getSerializer(metadata);
+    DeserializerBase<K> deserializer = factory.getDeserializer(metadata);
+
     DataOutputBuffer out = new DataOutputBuffer();
     serializer.open(out);
     serializer.serialize(before);
     serializer.close();
-    
+
     DataInputBuffer in = new DataInputBuffer();
     in.reset(out.getData(), out.getLength());
     deserializer.open(in);

+ 15 - 0
src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java

@@ -18,9 +18,14 @@
 
 package org.apache.hadoop.io.serializer.avro;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import junit.framework.TestCase;
 
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.io.serializer.SerializationTestUtil;
 
 public class TestAvroSerialization extends TestCase {
@@ -59,6 +64,16 @@ public class TestAvroSerialization extends TestCase {
       SerializationTestUtil.testSerialization(conf, before);
     assertEquals(before, after);
   }
+  
+  public void testGeneric() throws Exception {
+    Utf8 before = new Utf8("hadoop");
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(SerializationBase.SERIALIZATION_KEY,
+      AvroGenericSerialization.class.getName());
+    metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, "\"string\"");
+    Utf8 after = SerializationTestUtil.testSerialization(conf, metadata, before);
+    assertEquals(before, after);
+  }
 
   public static class InnerRecord {
     public int x = 7;