Explorar o código

HADOOP-3791. Introduce generics into ReflectionUtils. Contributd by Chris Smith.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@681235 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas %!s(int64=17) %!d(string=hai) anos
pai
achega
6112b495f0
Modificáronse 53 ficheiros con 203 adicións e 206 borrados
  1. 3 0
      CHANGES.txt
  2. 4 4
      src/core/org/apache/hadoop/fs/FsShell.java
  3. 0 6
      src/core/org/apache/hadoop/io/AbstractMapWritable.java
  4. 2 2
      src/core/org/apache/hadoop/io/ArrayFile.java
  5. 3 3
      src/core/org/apache/hadoop/io/ArrayWritable.java
  6. 1 1
      src/core/org/apache/hadoop/io/GenericWritable.java
  7. 19 13
      src/core/org/apache/hadoop/io/MapFile.java
  8. 1 0
      src/core/org/apache/hadoop/io/MapWritable.java
  9. 8 6
      src/core/org/apache/hadoop/io/SequenceFile.java
  10. 4 3
      src/core/org/apache/hadoop/io/SetFile.java
  11. 1 0
      src/core/org/apache/hadoop/io/SortedMapWritable.java
  12. 7 7
      src/core/org/apache/hadoop/io/WritableComparator.java
  13. 3 3
      src/core/org/apache/hadoop/io/WritableFactories.java
  14. 7 8
      src/core/org/apache/hadoop/io/WritableName.java
  15. 3 3
      src/core/org/apache/hadoop/io/WritableUtils.java
  16. 8 8
      src/core/org/apache/hadoop/io/compress/CompressionCodecFactory.java
  17. 4 4
      src/core/org/apache/hadoop/ipc/Client.java
  18. 8 5
      src/core/org/apache/hadoop/ipc/Server.java
  19. 2 1
      src/core/org/apache/hadoop/record/RecordComparator.java
  20. 11 10
      src/core/org/apache/hadoop/util/PriorityQueue.java
  21. 4 3
      src/core/org/apache/hadoop/util/ReflectionUtils.java
  22. 3 3
      src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
  23. 1 1
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  24. 0 5
      src/mapred/org/apache/hadoop/mapred/Counters.java
  25. 3 3
      src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
  26. 6 2
      src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  27. 8 8
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  28. 1 1
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  29. 5 5
      src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java
  30. 1 2
      src/mapred/org/apache/hadoop/mapred/MapRunner.java
  31. 10 16
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  32. 3 5
      src/mapred/org/apache/hadoop/mapred/Merger.java
  33. 4 11
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  34. 9 10
      src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
  35. 7 5
      src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
  36. 1 2
      src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java
  37. 0 1
      src/mapred/org/apache/hadoop/mapred/Task.java
  38. 1 2
      src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
  39. 2 5
      src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
  40. 0 1
      src/mapred/org/apache/hadoop/mapred/join/CompositeInputFormat.java
  41. 2 2
      src/mapred/org/apache/hadoop/mapred/join/CompositeInputSplit.java
  42. 1 1
      src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
  43. 6 4
      src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java
  44. 0 3
      src/mapred/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
  45. 1 1
      src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
  46. 2 2
      src/mapred/org/apache/hadoop/mapred/pipes/Application.java
  47. 3 2
      src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
  48. 1 1
      src/mapred/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
  49. 3 2
      src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
  50. 1 1
      src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
  51. 4 1
      src/mapred/org/apache/hadoop/mapreduce/JobContext.java
  52. 9 8
      src/test/org/apache/hadoop/mapred/join/FakeIF.java
  53. 2 0
      src/test/org/apache/hadoop/util/TestReflectionUtils.java

+ 3 - 0
CHANGES.txt

@@ -124,6 +124,9 @@ Trunk (unreleased changes)
     HADOOP-3861. MapFile.Reader and Writer should implement Closeable.
     (tomwhite via omalley)
 
+    HADOOP-3791. Introduce generics into ReflectionUtils. (Chris Smith via
+    cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 4 - 4
src/core/org/apache/hadoop/fs/FsShell.java

@@ -363,10 +363,10 @@ public class FsShell extends Configured implements Tool {
 
     public TextRecordInputStream(FileStatus f) throws IOException {
       r = new SequenceFile.Reader(fs, f.getPath(), getConf());
-      key = (WritableComparable)ReflectionUtils.newInstance(
-          r.getKeyClass(), getConf());
-      val = (Writable)ReflectionUtils.newInstance(
-          r.getValueClass(), getConf());
+      key = ReflectionUtils.newInstance(r.getKeyClass().asSubclass(WritableComparable.class),
+                                        getConf());
+      val = ReflectionUtils.newInstance(r.getValueClass().asSubclass(Writable.class),
+                                        getConf());
       inbuf = new DataInputBuffer();
       outbuf = new DataOutputBuffer();
     }

+ 0 - 6
src/core/org/apache/hadoop/io/AbstractMapWritable.java

@@ -43,11 +43,9 @@ public abstract class AbstractMapWritable implements Writable, Configurable {
   private AtomicReference<Configuration> conf;
   
   /* Class to id mappings */
-  @SuppressWarnings("unchecked")
   private Map<Class, Byte> classToIdMap = new ConcurrentHashMap<Class, Byte>();
   
   /* Id to Class mappings */
-  @SuppressWarnings("unchecked")
   private Map<Byte, Class> idToClassMap = new ConcurrentHashMap<Byte, Class>();
   
   /* The number of new classes (those not established by the constructor) */
@@ -61,7 +59,6 @@ public abstract class AbstractMapWritable implements Writable, Configurable {
   /**
    * Used to add "predefined" classes and by Writable to copy "new" classes.
    */
-  @SuppressWarnings("unchecked")
   private synchronized void addToMap(Class clazz, byte id) {
     if (classToIdMap.containsKey(clazz)) {
       byte b = classToIdMap.get(clazz);
@@ -82,7 +79,6 @@ public abstract class AbstractMapWritable implements Writable, Configurable {
   }
   
   /** Add a Class to the maps if it is not already present. */ 
-  @SuppressWarnings("unchecked")
   protected synchronized void addToMap(Class clazz) {
     if (classToIdMap.containsKey(clazz)) {
       return;
@@ -96,13 +92,11 @@ public abstract class AbstractMapWritable implements Writable, Configurable {
   }
 
   /** @return the Class class for the specified id */
-  @SuppressWarnings("unchecked")
   protected Class getClass(byte id) {
     return idToClassMap.get(id);
   }
 
   /** @return the id for the specified Class */
-  @SuppressWarnings("unchecked")
   protected byte getId(Class clazz) {
     return classToIdMap.containsKey(clazz) ? classToIdMap.get(clazz) : -1;
   }

+ 2 - 2
src/core/org/apache/hadoop/io/ArrayFile.java

@@ -36,14 +36,14 @@ public class ArrayFile extends MapFile {
 
     /** Create the named file for values of the named class. */
     public Writer(Configuration conf, FileSystem fs,
-                  String file, Class valClass)
+                  String file, Class<? extends Writable> valClass)
       throws IOException {
       super(conf, fs, file, LongWritable.class, valClass);
     }
 
     /** Create the named file for values of the named class. */
     public Writer(Configuration conf, FileSystem fs,
-                  String file, Class valClass,
+                  String file, Class<? extends Writable> valClass,
                   CompressionType compress, Progressable progress)
       throws IOException {
       super(conf, fs, file, LongWritable.class, valClass, compress, progress);

+ 3 - 3
src/core/org/apache/hadoop/io/ArrayWritable.java

@@ -37,17 +37,17 @@ import java.lang.reflect.Array;
  * </code>
  */
 public class ArrayWritable implements Writable {
-  private Class valueClass;
+  private Class<? extends Writable> valueClass;
   private Writable[] values;
 
-  public ArrayWritable(Class valueClass) {
+  public ArrayWritable(Class<? extends Writable> valueClass) {
     if (valueClass == null) { 
       throw new IllegalArgumentException("null valueClass"); 
     }    
     this.valueClass = valueClass;
   }
 
-  public ArrayWritable(Class valueClass, Writable[] values) {
+  public ArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
     this(valueClass);
     this.values = values;
   }

+ 1 - 1
src/core/org/apache/hadoop/io/GenericWritable.java

@@ -119,7 +119,7 @@ public abstract class GenericWritable implements Writable, Configurable {
     type = in.readByte();
     Class<? extends Writable> clazz = getTypes()[type & 0xff];
     try {
-      instance = (Writable)ReflectionUtils.newInstance(clazz, conf);
+      instance = ReflectionUtils.newInstance(clazz, conf);
     } catch (Exception e) {
       e.printStackTrace();
       throw new IOException("Cannot initialize the class: " + clazz);

+ 19 - 13
src/core/org/apache/hadoop/io/MapFile.java

@@ -77,7 +77,7 @@ public class MapFile {
 
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  Class keyClass, Class valClass)
+                  Class<? extends WritableComparable> keyClass, Class valClass)
       throws IOException {
       this(conf, fs, dirName,
            WritableComparator.get(keyClass), valClass,
@@ -86,7 +86,7 @@ public class MapFile {
 
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  Class keyClass, Class valClass,
+                  Class<? extends WritableComparable> keyClass, Class valClass,
                   CompressionType compress, Progressable progress)
       throws IOException {
       this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
@@ -95,7 +95,7 @@ public class MapFile {
 
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  Class keyClass, Class valClass,
+                  Class<? extends WritableComparable> keyClass, Class valClass,
                   CompressionType compress, CompressionCodec codec,
                   Progressable progress)
       throws IOException {
@@ -105,7 +105,8 @@ public class MapFile {
 
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  Class keyClass, Class valClass, CompressionType compress)
+                  Class<? extends WritableComparable> keyClass, Class valClass,
+                  CompressionType compress)
       throws IOException {
       this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
     }
@@ -242,10 +243,10 @@ public class MapFile {
     private long[] positions;
 
     /** Returns the class of keys in this file. */
-    public Class getKeyClass() { return data.getKeyClass(); }
+    public Class<?> getKeyClass() { return data.getKeyClass(); }
 
     /** Returns the class of values in this file. */
-    public Class getValueClass() { return data.getValueClass(); }
+    public Class<?> getValueClass() { return data.getValueClass(); }
 
     /** Construct a map reader for the named map.*/
     public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
@@ -284,7 +285,7 @@ public class MapFile {
       this.firstPosition = data.getPosition();
 
       if (comparator == null)
-        this.comparator = WritableComparator.get(data.getKeyClass());
+        this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
       else
         this.comparator = comparator;
 
@@ -611,7 +612,9 @@ public class MapFile {
    * @throws Exception
    */
   public static long fix(FileSystem fs, Path dir,
-                         Class keyClass, Class valueClass, boolean dryrun, Configuration conf) throws Exception {
+                         Class<? extends Writable> keyClass,
+                         Class<? extends Writable> valueClass, boolean dryrun,
+                         Configuration conf) throws Exception {
     String dr = (dryrun ? "[DRY RUN ] " : "");
     Path data = new Path(dir, DATA_FILE_NAME);
     Path index = new Path(dir, INDEX_FILE_NAME);
@@ -634,8 +637,8 @@ public class MapFile {
                           ", got " + dataReader.getValueClass().getName());
     }
     long cnt = 0L;
-    Writable key = (Writable)ReflectionUtils.newInstance(keyClass, conf);
-    Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
+    Writable key = ReflectionUtils.newInstance(keyClass, conf);
+    Writable value = ReflectionUtils.newInstance(valueClass, conf);
     SequenceFile.Writer indexWriter = null;
     if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
     try {
@@ -673,11 +676,14 @@ public class MapFile {
     FileSystem fs = FileSystem.getLocal(conf);
     MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
     MapFile.Writer writer =
-      new MapFile.Writer(conf, fs, out, reader.getKeyClass(), reader.getValueClass());
+      new MapFile.Writer(conf, fs, out,
+          reader.getKeyClass().asSubclass(WritableComparable.class),
+          reader.getValueClass());
 
     WritableComparable key =
-      (WritableComparable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-    Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
+      ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
+    Writable value =
+      ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
 
     while (reader.next(key, value))               // copy all entries
       writer.append(key, value);

+ 1 - 0
src/core/org/apache/hadoop/io/MapWritable.java

@@ -140,6 +140,7 @@ public class MapWritable extends AbstractMapWritable
   }
 
   /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);

+ 8 - 6
src/core/org/apache/hadoop/io/SequenceFile.java

@@ -1489,9 +1489,9 @@ public class SequenceFile {
         if (version >= CUSTOM_COMPRESS_VERSION) {
           String codecClassname = Text.readString(in);
           try {
-            Class codecClass = conf.getClassByName(codecClassname);
-            this.codec = (CompressionCodec)
-              ReflectionUtils.newInstance(codecClass, conf);
+            Class<? extends CompressionCodec> codecClass
+              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
+            this.codec = ReflectionUtils.newInstance(codecClass, conf);
           } catch (ClassNotFoundException cnfe) {
             throw new IllegalArgumentException("Unknown codec: " + 
                                                codecClassname, cnfe);
@@ -1587,7 +1587,7 @@ public class SequenceFile {
     }
 
     /** Returns the class of keys in this file. */
-    public synchronized Class getKeyClass() {
+    public synchronized Class<?> getKeyClass() {
       if (null == keyClass) {
         try {
           keyClass = WritableName.getClass(getKeyClassName(), conf);
@@ -1604,7 +1604,7 @@ public class SequenceFile {
     }
 
     /** Returns the class of values in this file. */
-    public synchronized Class getValueClass() {
+    public synchronized Class<?> getValueClass() {
       if (null == valClass) {
         try {
           valClass = WritableName.getClass(getValueClassName(), conf);
@@ -2228,7 +2228,8 @@ public class SequenceFile {
     private Progressable progressable = null;
 
     /** Sort and merge files containing the named classes. */
-    public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {
+    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
+                  Class valClass, Configuration conf)  {
       this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
     }
 
@@ -2753,6 +2754,7 @@ public class SequenceFile {
       private Map<SegmentDescriptor, Void> sortedSegmentSizes =
         new TreeMap<SegmentDescriptor, Void>();
             
+      @SuppressWarnings("unchecked")
       public void put(SegmentDescriptor stream) throws IOException {
         if (size() == 0) {
           compress = stream.in.isCompressed();

+ 4 - 3
src/core/org/apache/hadoop/io/SetFile.java

@@ -22,7 +22,6 @@ import java.io.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /** A file-based set of keys. */
 public class SetFile extends MapFile {
@@ -37,13 +36,15 @@ public class SetFile extends MapFile {
     /** Create the named set for keys of the named class. 
      *  @deprecated pass a Configuration too
      */
-    public Writer(FileSystem fs, String dirName, Class keyClass) throws IOException {
+    public Writer(FileSystem fs, String dirName,
+	Class<? extends WritableComparable> keyClass) throws IOException {
       super(new Configuration(), fs, dirName, keyClass, NullWritable.class);
     }
 
     /** Create a set naming the element class and compression type. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  Class keyClass, SequenceFile.CompressionType compress)
+                  Class<? extends WritableComparable> keyClass,
+                  SequenceFile.CompressionType compress)
       throws IOException {
       this(conf, fs, dirName, WritableComparator.get(keyClass), compress);
     }

+ 1 - 0
src/core/org/apache/hadoop/io/SortedMapWritable.java

@@ -159,6 +159,7 @@ public class SortedMapWritable extends AbstractMapWritable
   }
 
   /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);

+ 7 - 7
src/core/org/apache/hadoop/io/WritableComparator.java

@@ -38,7 +38,7 @@ public class WritableComparator implements RawComparator {
     new HashMap<Class, WritableComparator>(); // registry
 
   /** Get a comparator for a {@link WritableComparable} implementation. */
-  public static synchronized WritableComparator get(Class c) {
+  public static synchronized WritableComparator get(Class<? extends WritableComparable> c) {
     WritableComparator comparator = comparators.get(c);
     if (comparator == null)
       comparator = new WritableComparator(c, true);
@@ -53,17 +53,18 @@ public class WritableComparator implements RawComparator {
   }
 
 
-  private final Class keyClass;
+  private final Class<? extends WritableComparable> keyClass;
   private final WritableComparable key1;
   private final WritableComparable key2;
   private final DataInputBuffer buffer;
 
   /** Construct for a {@link WritableComparable} implementation. */
-  protected WritableComparator(Class keyClass) {
+  protected WritableComparator(Class<? extends WritableComparable> keyClass) {
     this(keyClass, false);
   }
 
-  private WritableComparator(Class keyClass, boolean createInstances) {
+  private WritableComparator(Class<? extends WritableComparable> keyClass,
+      boolean createInstances) {
     this.keyClass = keyClass;
     if (createInstances) {
       key1 = newKey();
@@ -76,12 +77,11 @@ public class WritableComparator implements RawComparator {
   }
 
   /** Returns the WritableComparable implementation class. */
-  public Class getKeyClass() { return keyClass; }
+  public Class<? extends WritableComparable> getKeyClass() { return keyClass; }
 
   /** Construct a new {@link WritableComparable} instance. */
   public WritableComparable newKey() {
-    return (WritableComparable)
-      ReflectionUtils.newInstance(keyClass, null);
+    return ReflectionUtils.newInstance(keyClass, null);
   }
 
   /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.

+ 3 - 3
src/core/org/apache/hadoop/io/WritableFactories.java

@@ -41,7 +41,7 @@ public class WritableFactories {
   }
 
   /** Create a new instance of a class with a defined factory. */
-  public static Writable newInstance(Class c, Configuration conf) {
+  public static Writable newInstance(Class<? extends Writable> c, Configuration conf) {
     WritableFactory factory = WritableFactories.getFactory(c);
     if (factory != null) {
       Writable result = factory.newInstance();
@@ -50,12 +50,12 @@ public class WritableFactories {
       }
       return result;
     } else {
-      return (Writable)ReflectionUtils.newInstance(c, conf);
+      return ReflectionUtils.newInstance(c, conf);
     }
   }
   
   /** Create a new instance of a class with a defined factory. */
-  public static Writable newInstance(Class c) {
+  public static Writable newInstance(Class<? extends Writable> c) {
     return newInstance(c, null);
   }
 

+ 7 - 8
src/core/org/apache/hadoop/io/WritableName.java

@@ -27,10 +27,10 @@ import org.apache.hadoop.conf.Configuration;
  * invalidiating files that contain their class name.
  */
 public class WritableName {
-  private static HashMap<String, Class> NAME_TO_CLASS =
-    new HashMap<String, Class>();
-  private static HashMap<Class, String> CLASS_TO_NAME =
-    new HashMap<Class, String>();
+  private static HashMap<String, Class<?>> NAME_TO_CLASS =
+    new HashMap<String, Class<?>>();
+  private static HashMap<Class<?>, String> CLASS_TO_NAME =
+    new HashMap<Class<?>, String>();
 
   static {                                        // define important types
     WritableName.setName(NullWritable.class, "null");
@@ -62,12 +62,11 @@ public class WritableName {
   }
 
   /** Return the class for a name.  Default is {@link Class#forName(String)}.*/
-  public static synchronized Class getClass(String name,
-                                            Configuration conf
+  public static synchronized Class<?> getClass(String name, Configuration conf
                                             ) throws IOException {
-    Class writableClass = NAME_TO_CLASS.get(name);
+    Class<?> writableClass = NAME_TO_CLASS.get(name);
     if (writableClass != null)
-      return writableClass;
+      return writableClass.asSubclass(Writable.class);
     try {
       return conf.getClassByName(name);
     } catch (ClassNotFoundException e) {

+ 3 - 3
src/core/org/apache/hadoop/io/WritableUtils.java

@@ -224,10 +224,10 @@ public final class WritableUtils  {
    * @param orig The object to copy
    * @return The copied object
    */
-  public static Writable clone(Writable orig, Configuration conf) {
+  public static <T extends Writable> T clone(T orig, Configuration conf) {
     try {
-      Writable newInst =
-        (Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
+      @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
+      T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
       cloneInto(newInst, orig);
       return newInst;
     } catch (IOException e) {

+ 8 - 8
src/core/org/apache/hadoop/io/compress/CompressionCodecFactory.java

@@ -76,21 +76,22 @@ public class CompressionCodecFactory {
    * @return a list of the Configuration classes or null if the attribute
    *         was not set
    */
-  public static List<Class> getCodecClasses(Configuration conf) {
+  public static List<Class<? extends CompressionCodec>> getCodecClasses(Configuration conf) {
     String codecsString = conf.get("io.compression.codecs");
     if (codecsString != null) {
-      List<Class> result = new ArrayList<Class>();
+      List<Class<? extends CompressionCodec>> result
+        = new ArrayList<Class<? extends CompressionCodec>>();
       StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
       while (codecSplit.hasMoreElements()) {
         String codecSubstring = codecSplit.nextToken();
         if (codecSubstring.length() != 0) {
           try {
-            Class cls = conf.getClassByName(codecSubstring);
+            Class<?> cls = conf.getClassByName(codecSubstring);
             if (!CompressionCodec.class.isAssignableFrom(cls)) {
               throw new IllegalArgumentException("Class " + codecSubstring +
                                                  " is not a CompressionCodec");
             }
-            result.add(cls);
+            result.add(cls.asSubclass(CompressionCodec.class));
           } catch (ClassNotFoundException ex) {
             throw new IllegalArgumentException("Compression codec " + 
                                                codecSubstring + " not found.",
@@ -130,15 +131,14 @@ public class CompressionCodecFactory {
    */
   public CompressionCodecFactory(Configuration conf) {
     codecs = new TreeMap<String, CompressionCodec>();
-    List<Class> codecClasses = getCodecClasses(conf);
+    List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf);
     if (codecClasses == null) {
       addCodec(new GzipCodec());
       addCodec(new DefaultCodec());      
     } else {
-      Iterator<Class> itr = codecClasses.iterator();
+      Iterator<Class<? extends CompressionCodec>> itr = codecClasses.iterator();
       while (itr.hasNext()) {
-        CompressionCodec codec = 
-          (CompressionCodec) ReflectionUtils.newInstance(itr.next(), conf);
+        CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf);
         addCodec(codec);     
       }
     }

+ 4 - 4
src/core/org/apache/hadoop/ipc/Client.java

@@ -65,7 +65,7 @@ public class Client {
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
-  private Class<?> valueClass;                       // class of call values
+  private Class<? extends Writable> valueClass;   // class of call values
   private int counter;                            // counter for call ids
   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final private Configuration conf;
@@ -508,7 +508,7 @@ public class Client {
           call.setException(new RemoteException( WritableUtils.readString(in),
               WritableUtils.readString(in)));
         } else {
-          Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
+          Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setValue(value);
         }
@@ -622,7 +622,7 @@ public class Client {
 
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
-  public Client(Class valueClass, Configuration conf, 
+  public Client(Class<? extends Writable> valueClass, Configuration conf, 
       SocketFactory factory) {
     this.valueClass = valueClass;
     this.maxIdleTime = 
@@ -642,7 +642,7 @@ public class Client {
    * @param valueClass
    * @param conf
    */
-  public Client(Class<?> valueClass, Configuration conf) {
+  public Client(Class<? extends Writable> valueClass, Configuration conf) {
     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
   }
  

+ 8 - 5
src/core/org/apache/hadoop/ipc/Server.java

@@ -119,7 +119,7 @@ public abstract class Server {
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private Class<?> paramClass;                    // class of call parameters
+  private Class<? extends Writable> paramClass;   // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -837,7 +837,7 @@ public abstract class Server {
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + id);
             
-      Writable param = (Writable)ReflectionUtils.newInstance(paramClass, conf);           // read param
+      Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
       param.readFields(dis);        
         
       Call call = new Call(id, param, this);
@@ -922,7 +922,9 @@ public abstract class Server {
 
   }
   
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf)
+  protected Server(String bindAddress, int port,
+                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Configuration conf)
     throws IOException 
   {
     this(bindAddress, port, paramClass, handlerCount,  conf, Integer.toString(port));
@@ -932,8 +934,9 @@ public abstract class Server {
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class<?> paramClass, int handlerCount, Configuration conf,
-                  String serverName) 
+  protected Server(String bindAddress, int port, 
+                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Configuration conf, String serverName) 
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;

+ 2 - 1
src/core/org/apache/hadoop/record/RecordComparator.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.record;
 
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
 /**
@@ -27,7 +28,7 @@ public abstract class RecordComparator extends WritableComparator {
   
   /**
    * Construct a raw {@link Record} comparison implementation. */
-  protected RecordComparator(Class recordClass) {
+  protected RecordComparator(Class<? extends WritableComparable> recordClass) {
     super(recordClass);
   }
   

+ 11 - 10
src/core/org/apache/hadoop/util/PriorityQueue.java

@@ -21,8 +21,8 @@ package org.apache.hadoop.util;
 /** A PriorityQueue maintains a partial ordering of its elements such that the
   least element can always be found in constant time.  Put()'s and pop()'s
   require log(size) time. */
-public abstract class PriorityQueue {
-  private Object[] heap;
+public abstract class PriorityQueue<T> {
+  private T[] heap;
   private int size;
   private int maxSize;
 
@@ -31,10 +31,11 @@ public abstract class PriorityQueue {
   protected abstract boolean lessThan(Object a, Object b);
 
   /** Subclass constructors must call this. */
+  @SuppressWarnings("unchecked")
   protected final void initialize(int maxSize) {
     size = 0;
     int heapSize = maxSize + 1;
-    heap = new Object[heapSize];
+    heap = (T[]) new Object[heapSize];
     this.maxSize = maxSize;
   }
 
@@ -43,7 +44,7 @@ public abstract class PriorityQueue {
    * If one tries to add more objects than maxSize from initialize
    * a RuntimeException (ArrayIndexOutOfBound) is thrown.
    */
-  public final void put(Object element) {
+  public final void put(T element) {
     size++;
     heap[size] = element;
     upHeap();
@@ -55,7 +56,7 @@ public abstract class PriorityQueue {
    * @param element
    * @return true if element is added, false otherwise.
    */
-  public boolean insert(Object element){
+  public boolean insert(T element){
     if (size < maxSize){
       put(element);
       return true;
@@ -70,7 +71,7 @@ public abstract class PriorityQueue {
   }
 
   /** Returns the least element of the PriorityQueue in constant time. */
-  public final Object top() {
+  public final T top() {
     if (size > 0)
       return heap[1];
     else
@@ -79,9 +80,9 @@ public abstract class PriorityQueue {
 
   /** Removes and returns the least element of the PriorityQueue in log(size)
       time. */
-  public final Object pop() {
+  public final T pop() {
     if (size > 0) {
-      Object result = heap[1];			  // save first value
+      T result = heap[1];			  // save first value
       heap[1] = heap[size];			  // move last to first
       heap[size] = null;			  // permit GC of objects
       size--;
@@ -117,7 +118,7 @@ public abstract class PriorityQueue {
 
   private final void upHeap() {
     int i = size;
-    Object node = heap[i];			  // save bottom node
+    T node = heap[i];			  // save bottom node
     int j = i >>> 1;
     while (j > 0 && lessThan(node, heap[j])) {
       heap[i] = heap[j];			  // shift parents down
@@ -129,7 +130,7 @@ public abstract class PriorityQueue {
 
   private final void downHeap() {
     int i = 1;
-    Object node = heap[i];			  // save top node
+    T node = heap[i];			  // save top node
     int j = i << 1;				  // find smaller child
     int k = j + 1;
     if (k <= size && lessThan(heap[k], heap[j])) {

+ 4 - 3
src/core/org/apache/hadoop/util/ReflectionUtils.java

@@ -66,10 +66,11 @@ public class ReflectionUtils {
    * @param conf Configuration
    * @return a new object
    */
-  public static Object newInstance(Class<?> theClass, Configuration conf) {
-    Object result;
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> theClass, Configuration conf) {
+    T result;
     try {
-      Constructor meth = CONSTRUCTOR_CACHE.get(theClass);
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
       if (meth == null) {
         meth = theClass.getDeclaredConstructor(emptyArray);
         meth.setAccessible(true);

+ 3 - 3
src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java

@@ -160,7 +160,7 @@ public class DistributedPentomino extends Configured implements Tool {
     int depth = 5;
     int width = 9;
     int height = 10;
-    Class pentClass;
+    Class<? extends Pentomino> pentClass;
     if (args.length == 0) {
       System.out.println("pentomino <output>");
       ToolRunner.printGenericCommandUsage(System.out);
@@ -171,7 +171,7 @@ public class DistributedPentomino extends Configured implements Tool {
     width = conf.getInt("pent.width", width);
     height = conf.getInt("pent.height", height);
     depth = conf.getInt("pent.depth", depth);
-    pentClass = conf.getClass("pent.class", OneSidedPentomino.class);
+    pentClass = conf.getClass("pent.class", OneSidedPentomino.class, Pentomino.class);
     
     Path output = new Path(args[0]);
     Path input = new Path(output + "_input");
@@ -182,7 +182,7 @@ public class DistributedPentomino extends Configured implements Tool {
       conf.setJarByClass(PentMap.class);
       
       conf.setJobName("dancingElephant");
-      Pentomino pent = (Pentomino) ReflectionUtils.newInstance(pentClass, conf);
+      Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
       pent.initialize(width, height);
       createInputDirectory(fileSys, input, pent, depth);
    

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -329,7 +329,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     this.dnthread = new Daemon(new DecommissionedMonitor());
     dnthread.start();
 
-    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
 

+ 0 - 5
src/mapred/org/apache/hadoop/mapred/Counters.java

@@ -275,7 +275,6 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    * A cache from enum values to the associated counter. Dramatically speeds up
    * typical usage.
    */
-  @SuppressWarnings("unchecked")
   private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
   
   /**
@@ -309,7 +308,6 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    * @param key the counter key
    * @return the matching counter object
    */
-  @SuppressWarnings("unchecked")
   public synchronized Counter findCounter(Enum key) {
     Counter counter = cache.get(key);
     if (counter == null) {
@@ -339,7 +337,6 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    * @param key identifies a counter
    * @param amount amount by which counter is to be incremented
    */
-  @SuppressWarnings("unchecked")
   public synchronized void incrCounter(Enum key, long amount) {
     findCounter(key).value += amount;
   }
@@ -351,7 +348,6 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    * @param counter the internal name of the counter
    * @param amount amount by which counter is to be incremented
    */
-  @SuppressWarnings("unchecked")
   public synchronized void incrCounter(String group, String counter, long amount) {
     getGroup(group).getCounterForName(counter).value += amount;
   }
@@ -360,7 +356,6 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    * Returns current value of the specified counter, or 0 if the counter
    * does not exist.
    */
-  @SuppressWarnings("unchecked")
   public synchronized long getCounter(Enum key) {
     return findCounter(key).value;
   }

+ 3 - 3
src/mapred/org/apache/hadoop/mapred/FileInputFormat.java

@@ -124,10 +124,10 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
    * @return the PathFilter instance set for the job, NULL if none has been set.
    */
   public static PathFilter getInputPathFilter(JobConf conf) {
-    Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
-        PathFilter.class);
+    Class<? extends PathFilter> filterClass = conf.getClass(
+	"mapred.input.pathFilter.class", null, PathFilter.class);
     return (filterClass != null) ?
-        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
+        ReflectionUtils.newInstance(filterClass, conf) : null;
   }
 
   /** List input directories.

+ 6 - 2
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 public class IsolationRunner {
   private static final Log LOG = 
@@ -120,8 +122,10 @@ public class IsolationRunner {
                                               TaskAttemptID taskId,
                                               int numMaps,
                                               JobConf conf) throws IOException {
-    Class keyClass = conf.getMapOutputKeyClass();
-    Class valueClass = conf.getMapOutputValueClass();
+    Class<? extends WritableComparable> keyClass
+        = conf.getMapOutputKeyClass().asSubclass(WritableComparable.class);
+    Class<? extends Writable> valueClass
+        = conf.getMapOutputValueClass().asSubclass(Writable.class);
     MapOutputFile namer = new MapOutputFile(taskId.getJobID());
     namer.setConf(conf);
     for(int i=0; i<numMaps; i++) {

+ 8 - 8
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -318,7 +318,7 @@ public class JobConf extends Configuration {
    * @return the {@link InputFormat} implementation for the map-reduce job.
    */
   public InputFormat getInputFormat() {
-    return (InputFormat)ReflectionUtils.newInstance(getClass("mapred.input.format.class",
+    return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
                                                              TextInputFormat.class,
                                                              InputFormat.class),
                                                     this);
@@ -341,7 +341,7 @@ public class JobConf extends Configuration {
    * @return the {@link OutputFormat} implementation for the map-reduce job.
    */
   public OutputFormat getOutputFormat() {
-    return (OutputFormat)ReflectionUtils.newInstance(getClass("mapred.output.format.class",
+    return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
                                                               TextOutputFormat.class,
                                                               OutputFormat.class),
                                                      this);
@@ -491,11 +491,11 @@ public class JobConf extends Configuration {
    * @return the {@link RawComparator} comparator used to compare keys.
    */
   public RawComparator getOutputKeyComparator() {
-    Class theClass = getClass("mapred.output.key.comparator.class", null,
-    		RawComparator.class);
+    Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",
+	        null, RawComparator.class);
     if (theClass != null)
-      return (RawComparator)ReflectionUtils.newInstance(theClass, this);
-    return WritableComparator.get(getMapOutputKeyClass());
+      return ReflectionUtils.newInstance(theClass, this);
+    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
   }
 
   /**
@@ -518,13 +518,13 @@ public class JobConf extends Configuration {
    * @see #setOutputValueGroupingComparator(Class) for details.  
    */
   public RawComparator getOutputValueGroupingComparator() {
-    Class theClass = getClass("mapred.output.value.groupfn.class", null,
+    Class<? extends RawComparator> theClass = getClass("mapred.output.value.groupfn.class", null,
         RawComparator.class);
     if (theClass == null) {
       return getOutputKeyComparator();
     }
     
-    return (RawComparator)ReflectionUtils.newInstance(theClass, this);
+    return ReflectionUtils.newInstance(theClass, this);
   }
 
   /** 

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -713,7 +713,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       infoServer.setAttribute("fileSys", historyFS);
     }
 
-    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 

+ 5 - 5
src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java

@@ -53,16 +53,16 @@ extends FileOutputFormat<WritableComparable, Writable> {
       compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
 
       // find the right codec
-      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
-      codec = (CompressionCodec) 
-        ReflectionUtils.newInstance(codecClass, job);
+      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
+	  DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, job);
     }
     
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
       new MapFile.Writer(job, fs, file.toString(),
-                         job.getOutputKeyClass(),
-                         job.getOutputValueClass(),
+                         job.getOutputKeyClass().asSubclass(WritableComparable.class),
+                         job.getOutputValueClass().asSubclass(Writable.class),
                          compressionType, codec,
                          progress);
 

+ 1 - 2
src/mapred/org/apache/hadoop/mapred/MapRunner.java

@@ -30,8 +30,7 @@ public class MapRunner<K1, V1, K2, V2>
 
   @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
-    this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
-                                                      job);
+    this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
   }
 
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,

+ 10 - 16
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -221,7 +221,7 @@ class MapTask extends Task {
     RecordReader in = new TrackedRecordReader(rawIn, getCounters());
 
     MapRunnable runner =
-      (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+      ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     try {
       runner.run(in, collector, reporter);      
@@ -349,8 +349,7 @@ class MapTask extends Task {
       this.reporter = reporter;
       localFs = FileSystem.getLocal(job);
       partitions = job.getNumReduceTasks();
-      partitioner = (Partitioner)
-        ReflectionUtils.newInstance(job.getPartitionerClass(), job);
+      partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
       // sanity checks
       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
@@ -364,9 +363,8 @@ class MapTask extends Task {
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
       }
-      sorter = (IndexedSorter)
-        ReflectionUtils.newInstance(
-            job.getClass("map.sort.class", QuickSort.class), job);
+      sorter = ReflectionUtils.newInstance(
+            job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
       LOG.info("io.sort.mb = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
@@ -400,13 +398,12 @@ class MapTask extends Task {
       if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
           job.getMapOutputCompressorClass(DefaultCodec.class);
-        codec = (CompressionCodec)
-          ReflectionUtils.newInstance(codecClass, job);
+        codec = ReflectionUtils.newInstance(codecClass, job);
       }
       // combiner
       combinerClass = job.getCombinerClass();
       combineCollector = (null != combinerClass)
-        ? new CombineOutputCollector(combineOutputCounter)
+        ? new CombineOutputCollector<K,V>(combineOutputCounter)
         : null;
       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
       spillThread.setDaemon(true);
@@ -429,7 +426,6 @@ class MapTask extends Task {
       }
     }
 
-    @SuppressWarnings("unchecked")
     public synchronized void collect(K key, V value)
         throws IOException {
       reporter.progress();
@@ -877,7 +873,6 @@ class MapTask extends Task {
      * the in-memory buffer, so we must spill the record from collect
      * directly to a spill file. Consider this "losing".
      */
-    @SuppressWarnings("unchecked")
     private void spillSingleRecord(final K key, final V value) 
         throws IOException {
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
@@ -896,11 +891,11 @@ class MapTask extends Task {
         indexOut = localFs.create(indexFilename);
         // we don't run the combiner for a single record
         for (int i = 0; i < partitions; ++i) {
-          IFile.Writer writer = null;
+          IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            writer = new IFile.Writer(job, out, keyClass, valClass, codec);
+            writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec);
 
             if (i == partition) {
               final long recordStart = out.getPos();
@@ -943,10 +938,9 @@ class MapTask extends Task {
     @SuppressWarnings("unchecked")
     private void combineAndSpill(RawKeyValueIterator kvIter,
         Counters.Counter inCounter) throws IOException {
-      Reducer combiner =
-        (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+      Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
       try {
-        CombineValuesIterator values = new CombineValuesIterator(
+        CombineValuesIterator<K, V> values = new CombineValuesIterator<K, V>(
             kvIter, comparator, keyClass, valClass, job, reporter,
             inCounter);
         while (values.more()) {

+ 3 - 5
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -144,7 +144,7 @@ class Merger {
   }
   
   private static class MergeQueue<K extends Object, V extends Object> 
-  extends PriorityQueue implements RawKeyValueIterator {
+  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
     Configuration conf;
     FileSystem fs;
     CompressionCodec codec;
@@ -205,10 +205,9 @@ class Merger {
       this.reporter = reporter;
     }
 
-    @SuppressWarnings("unchecked")
     public void close() throws IOException {
       Segment<K, V> segment;
-      while((segment = (Segment<K, V>)pop()) != null) {
+      while((segment = pop()) != null) {
         segment.close();
       }
     }
@@ -230,7 +229,6 @@ class Merger {
       }
     }
 
-    @SuppressWarnings("unchecked")
     public boolean next() throws IOException {
       if (size() == 0)
         return false;
@@ -245,7 +243,7 @@ class Merger {
           return false;
         }
       }
-      minSegment = (Segment<K, V>)top();
+      minSegment = top();
       
       key = minSegment.getKey();
       value = minSegment.getValue();

+ 4 - 11
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -53,7 +53,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.RawComparator;
@@ -160,7 +159,7 @@ class ReduceTask extends Task {
     if (conf.getCompressMapOutput()) {
       Class<? extends CompressionCodec> codecClass =
         conf.getMapOutputCompressorClass(DefaultCodec.class);
-      return (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+      return ReflectionUtils.newInstance(codecClass, conf);
     } 
 
     return null;
@@ -246,8 +245,7 @@ class ReduceTask extends Task {
   @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
-    Reducer reducer = (Reducer)ReflectionUtils.newInstance(
-                                                           job.getReducerClass(), job);
+    Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
 
     // start thread that will handle communication with parent
     startCommunicationThread(umbilical);
@@ -303,7 +301,6 @@ class ReduceTask extends Task {
       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
     
     OutputCollector collector = new OutputCollector() {
-        @SuppressWarnings("unchecked")
         public void collect(Object key, Object value)
           throws IOException {
           out.write(key, value);
@@ -892,8 +889,7 @@ class ReduceTask extends Task {
         if (job.getCompressMapOutput()) {
           Class<? extends CompressionCodec> codecClass =
             job.getMapOutputCompressorClass(DefaultCodec.class);
-          codec = (CompressionCodec)
-            ReflectionUtils.newInstance(codecClass, job);
+          codec = ReflectionUtils.newInstance(codecClass, job);
           decompressor = CodecPool.getDecompressor(codec);
         }
       }
@@ -1461,7 +1457,6 @@ class ReduceTask extends Task {
       return numInFlight > maxInFlight;
     }
     
-    @SuppressWarnings("unchecked")
     public boolean fetchOutputs() throws IOException {
       //The map for (Hosts, List of MapIds from this Host)
       HashMap<String, List<MapOutputLocation>> mapLocations = 
@@ -2136,7 +2131,6 @@ class ReduceTask extends Task {
         setDaemon(true);
       }
       
-      @SuppressWarnings("unchecked")
       public void run() {
         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
         try {
@@ -2229,8 +2223,7 @@ class ReduceTask extends Task {
         RawKeyValueIterator kvIter,
         Counters.Counter inCounter) throws IOException {
       JobConf job = (JobConf)getConf();
-      Reducer combiner =
-        (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+      Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
       RawComparator comparator = job.getOutputKeyComparator();

+ 9 - 10
src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java

@@ -107,9 +107,10 @@ public class SequenceFileAsBinaryOutputFormat
    * 
    * @return the key class of the {@link SequenceFile}
    */
-  static public Class<?> getSequenceFileOutputKeyClass(JobConf conf) { 
+  static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 
     return conf.getClass("mapred.seqbinary.output.key.class", 
-                         conf.getOutputKeyClass(), Object.class);
+                         conf.getOutputKeyClass().asSubclass(WritableComparable.class),
+                         WritableComparable.class);
   }
 
   /**
@@ -117,13 +118,11 @@ public class SequenceFileAsBinaryOutputFormat
    * 
    * @return the value class of the {@link SequenceFile}
    */
-  static public Class<?> getSequenceFileOutputValueClass(JobConf conf) { 
+  static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 
     return conf.getClass("mapred.seqbinary.output.value.class", 
-                         conf.getOutputValueClass(), Object.class);
+                         conf.getOutputValueClass().asSubclass(Writable.class),
+                         Writable.class);
   }
-
-
-
   
   @Override 
   public RecordWriter <BytesWritable, BytesWritable> 
@@ -141,9 +140,9 @@ public class SequenceFileAsBinaryOutputFormat
       compressionType = getOutputCompressionType(job);
 
       // find the right codec
-      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
-      codec = (CompressionCodec) 
-        ReflectionUtils.newInstance(codecClass, job);
+      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
+	  DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, job);
     }
     final SequenceFile.Writer out = 
       SequenceFile.createWriter(fs, job, file,

+ 7 - 5
src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileUtil;
 
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
@@ -50,14 +52,14 @@ public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
       compressionType = getOutputCompressionType(job);
 
       // find the right codec
-      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
-      codec = (CompressionCodec) 
-        ReflectionUtils.newInstance(codecClass, job);
+      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
+	  DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, job);
     }
     final SequenceFile.Writer out = 
       SequenceFile.createWriter(fs, job, file,
-                                job.getOutputKeyClass(),
-                                job.getOutputValueClass(),
+                                job.getOutputKeyClass().asSubclass(WritableComparable.class),
+                                job.getOutputValueClass().asSubclass(Writable.class),
                                 compressionType,
                                 codec,
                                 progress);

+ 1 - 2
src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java

@@ -62,8 +62,7 @@ public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
   
   @SuppressWarnings("unchecked")
   public K createKey() {
-    return (K) ReflectionUtils.newInstance(getKeyClass(), 
-                                                            conf);
+    return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
   }
   
   @SuppressWarnings("unchecked")

+ 0 - 1
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -642,7 +642,6 @@ abstract class Task implements Writable, Configurable {
     private DataInputBuffer keyIn = new DataInputBuffer();
     private DataInputBuffer valueIn = new DataInputBuffer();
     
-    @SuppressWarnings("unchecked")
     public ValuesIterator (RawKeyValueIterator in, 
                            RawComparator<KEY> comparator, 
                            Class<KEY> keyClass,

+ 1 - 2
src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java

@@ -120,8 +120,7 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
       Class<? extends CompressionCodec> codecClass =
         getOutputCompressorClass(job, GzipCodec.class);
       // create the named codec
-      CompressionCodec codec = (CompressionCodec)
-        ReflectionUtils.newInstance(codecClass, job);
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
       // build the filename including the extension
       Path file = 
         FileOutputFormat.getTaskOutputPath(job, 

+ 2 - 5
src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java

@@ -23,7 +23,6 @@ import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.JobConf;
 
 /**
  * This class provides an implementation of ResetableIterator. The
@@ -51,12 +50,11 @@ public class ArrayListBackedIterator<X extends Writable>
     return iter.hasNext();
   }
 
-  @SuppressWarnings("unchecked")
   public boolean next(X val) throws IOException {
     if (iter.hasNext()) {
       WritableUtils.cloneInto(val, iter.next());
       if (null == hold) {
-        hold = (X) WritableUtils.clone(val, null);
+        hold = WritableUtils.clone(val, null);
       } else {
         WritableUtils.cloneInto(hold, val);
       }
@@ -74,9 +72,8 @@ public class ArrayListBackedIterator<X extends Writable>
     iter = data.iterator();
   }
 
-  @SuppressWarnings("unchecked")
   public void add(X item) throws IOException {
-    data.add((X) WritableUtils.clone(item, null));
+    data.add(WritableUtils.clone(item, null));
   }
 
   public void close() throws IOException {

+ 0 - 1
src/mapred/org/apache/hadoop/mapred/join/CompositeInputFormat.java

@@ -26,7 +26,6 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/join/CompositeInputSplit.java

@@ -125,7 +125,7 @@ public class CompositeInputSplit implements InputSplit {
    * @throws IOException If the child InputSplit cannot be read, typically
    *                     for faliing access checks.
    */
-  @SuppressWarnings("unchecked")  // Explicit check for split class agreement
+  @SuppressWarnings("unchecked")  // Generic array assignment
   public void readFields(DataInput in) throws IOException {
     int card = WritableUtils.readVInt(in);
     if (splits == null || splits.length != card) {
@@ -138,7 +138,7 @@ public class CompositeInputSplit implements InputSplit {
           Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
       }
       for (int i = 0; i < card; ++i) {
-        splits[i] = (InputSplit) ReflectionUtils.newInstance(cls[i], null);
+        splits[i] = ReflectionUtils.newInstance(cls[i], null);
         splits[i].readFields(in);
       }
     } catch (ClassNotFoundException e) {

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java

@@ -68,7 +68,7 @@ public abstract class CompositeRecordReader<
     assert capacity > 0 : "Invalid capacity";
     this.id = id;
     if (null != cmpcl) {
-      cmp = (WritableComparator)ReflectionUtils.newInstance(cmpcl, null);
+      cmp = ReflectionUtils.newInstance(cmpcl, null);
       q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
           new Comparator<ComposableRecordReader<K,?>>() {
             public int compare(ComposableRecordReader<K,?> o1,

+ 6 - 4
src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.Progressable;
 
@@ -244,11 +246,11 @@ public class MultipleOutputs {
    * @param namedOutput named output
    * @return class for the named output key
    */
-  public static Class<?> getNamedOutputKeyClass(JobConf conf,
+  public static Class<? extends WritableComparable> getNamedOutputKeyClass(JobConf conf,
                                                 String namedOutput) {
     checkNamedOutput(conf, namedOutput, false);
     return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
-      Object.class);
+	WritableComparable.class);
   }
 
   /**
@@ -258,11 +260,11 @@ public class MultipleOutputs {
    * @param namedOutput named output
    * @return class of named output value
    */
-  public static Class<?> getNamedOutputValueClass(JobConf conf,
+  public static Class<? extends Writable> getNamedOutputValueClass(JobConf conf,
                                                   String namedOutput) {
     checkNamedOutput(conf, namedOutput, false);
     return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
-      Object.class);
+      Writable.class);
   }
 
   /**

+ 0 - 3
src/mapred/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java

@@ -21,8 +21,6 @@ package org.apache.hadoop.mapred.lib;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -38,7 +36,6 @@ extends MultipleOutputFormat<K, V> {
     private SequenceFileOutputFormat<K,V> theSequenceFileOutputFormat = null;
   
   @Override
-  @SuppressWarnings("unchecked") 
   protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
                                                    JobConf job,
                                                    String name,

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

@@ -69,7 +69,7 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
     }
 
     this.job = jobConf;
-    this.mapper = (Mapper)ReflectionUtils.newInstance(jobConf.getMapperClass(),
+    this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
         jobConf);
 
     // Creating a threadpool of the configured size to execute the Mapper

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/pipes/Application.java

@@ -66,9 +66,9 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
    * @throws IOException
    * @throws InterruptedException
    */
-  @SuppressWarnings("unchecked")
   Application(JobConf conf, OutputCollector<K2, V2> output, Reporter reporter,
-              Class outputKeyClass, Class outputValueClass
+              Class<? extends K2> outputKeyClass,
+              Class<? extends V2> outputValueClass
               ) throws IOException, InterruptedException {
     serverSocket = new ServerSocket(0);
     Map<String, String> env = new HashMap<String,String>();

+ 3 - 2
src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java

@@ -50,13 +50,14 @@ class PipesMapRunner<K1 extends WritableComparable, V1 extends Writable,
    * @param output the object to collect the outputs of the map
    * @param reporter the object to update with status
    */
+  @SuppressWarnings("unchecked")
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                   Reporter reporter) throws IOException {
     Application<K1, V1, K2, V2> application = null;
     try {
       application = new Application<K1, V1, K2, V2>(job, output, reporter,
-                                    job.getMapOutputKeyClass(),
-                                    job.getMapOutputValueClass());
+          (Class<? extends K2>) job.getOutputKeyClass(), 
+          (Class<? extends V2>) job.getOutputValueClass());
     } catch (InterruptedException ie) {
       throw new RuntimeException("interrupted", ie);
     }

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/pipes/PipesPartitioner.java

@@ -37,7 +37,7 @@ class PipesPartitioner<K extends WritableComparable,
   
   @SuppressWarnings("unchecked")
   public void configure(JobConf conf) {
-    part = (Partitioner) 
+    part =
       ReflectionUtils.newInstance(Submitter.getJavaPartitioner(conf), conf);
   }
 

+ 3 - 2
src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java

@@ -62,13 +62,14 @@ class PipesReducer<K2 extends WritableComparable, V2 extends Writable,
     isOk = true;
   }
 
+  @SuppressWarnings("unchecked")
   private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
     if (application == null) {
       try {
         LOG.info("starting application");
         application = new Application<K2, V2, K3, V3>(job, output, reporter, 
-                                      job.getOutputKeyClass(), 
-                                      job.getOutputValueClass());
+                                      (Class<? extends K3>) job.getOutputKeyClass(), 
+                                      (Class<? extends V3>) job.getOutputValueClass());
         downlink = application.getDownlink();
       } catch (InterruptedException ie) {
         throw new RuntimeException("interrupted", ie);

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java

@@ -173,7 +173,7 @@ public class Submitter {
    * @param conf the configuration to look in
    * @return the class that the user submitted
    */
-  static Class getJavaPartitioner(JobConf conf) {
+  static Class<? extends Partitioner> getJavaPartitioner(JobConf conf) {
     return conf.getClass("hadoop.pipes.partitioner", 
                          HashPartitioner.class,
                          Partitioner.class);

+ 4 - 1
src/mapred/org/apache/hadoop/mapreduce/JobContext.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -245,12 +246,14 @@ public class JobContext {
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
    */
+  @SuppressWarnings("unchecked")
   public RawComparator<?> getSortComparator() {
     Class<?> theClass = conf.getClass(SORT_COMPARATOR_ATTR, null,
                                    RawComparator.class);
     if (theClass != null)
       return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
-    return WritableComparator.get(getMapOutputKeyClass());
+    return WritableComparator.get(
+        (Class<? extends WritableComparable>)getMapOutputKeyClass());
   }
 
   /** 

+ 9 - 8
src/test/org/apache/hadoop/mapred/join/FakeIF.java

@@ -50,12 +50,15 @@ public class FakeIF<K,V>
     job.setClass("test.fakeif.valclass", v, Writable.class);
   }
 
-  private Class<?> keyclass;
-  private Class<?> valclass;
+  private Class<? extends K> keyclass;
+  private Class<? extends V> valclass;
 
+  @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
-    keyclass = job.getClass("test.fakeif.keyclass", IncomparableKey.class, WritableComparable.class);
-    valclass = job.getClass("test.fakeif.valclass", NullWritable.class, WritableComparable.class);
+    keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
+	IncomparableKey.class, WritableComparable.class);
+    valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
+	NullWritable.class, WritableComparable.class);
   }
 
   public FakeIF() { }
@@ -70,13 +73,11 @@ public class FakeIF<K,V>
       InputSplit ignored, JobConf conf, Reporter reporter) {
     return new RecordReader<K,V>() {
       public boolean next(K key, V value) throws IOException { return false; }
-      @SuppressWarnings("unchecked")
       public K createKey() {
-        return (K)ReflectionUtils.newInstance(keyclass, null);
+        return ReflectionUtils.newInstance(keyclass, null);
       }
-      @SuppressWarnings("unchecked")
       public V createValue() {
-        return (V)ReflectionUtils.newInstance(valclass, null);
+        return ReflectionUtils.newInstance(valclass, null);
       }
       public long getPos() throws IOException { return 0L; }
       public void close() throws IOException { }

+ 2 - 0
src/test/org/apache/hadoop/util/TestReflectionUtils.java

@@ -42,6 +42,7 @@ public class TestReflectionUtils extends TestCase {
   }
     
     
+  @SuppressWarnings("unchecked")
   private void doTestCache() {
     for (int i=0; i<toConstruct.length; i++) {
       Class cl = toConstruct[i];
@@ -88,6 +89,7 @@ public class TestReflectionUtils extends TestCase {
     }
   }
     
+  @SuppressWarnings("unchecked")
   public void testCacheDoesntLeak() throws Exception {
     int iterations=9999; // very fast, but a bit less reliable - bigger numbers force GC
     for (int i=0; i<iterations; i++) {