Преглед изворни кода

HADOOP-1230. Partial update:
1. made Partitioner and OutputFormat abstract classes.
2. fixed reference to WritableComparable
3. made Job set methods throw if they are in the wrong (ie. submitted) state
4. removed the Writable interface from InputSplit, although it remains on
FileSplit


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@681202 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley пре 17 година
родитељ
комит
43e90282be

+ 3 - 5
src/mapred/org/apache/hadoop/mapreduce/InputSplit.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -31,12 +30,12 @@ import org.apache.hadoop.mapreduce.RecordReader;
  *
  * <p>Typically, it presents a byte-oriented view on the input and is the 
  * responsibility of {@link RecordReader} of the job to process this and present
- * a record-oriented view. Although the type is Writable, only the InputFormat
+ * a record-oriented view.
  * 
  * @see InputFormat
  * @see RecordReader
  */
-public abstract class InputSplit implements Writable {
+public abstract class InputSplit {
   /**
    * Get the size of the split, so that the input splits can be sorted by size.
    * @return the number of bytes in the split
@@ -47,8 +46,7 @@ public abstract class InputSplit implements Writable {
 
   /**
    * Get the list of nodes by name where the data for the split would be local.
-   * The locations do not need to be serialized by calls to 
-   * {@link Writable#write(java.io.DataOutput)}
+   * The locations do not need to be serialized.
    * @return a new array of the node nodes.
    * @throws IOException
    * @throws InterruptedException

+ 44 - 15
src/mapred/org/apache/hadoop/mapreduce/Job.java

@@ -29,7 +29,9 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
 
 /**
  * The job submitter's view of the Job. It allows the user to configure the
- * job, submit it, control its execution, and query the state.
+ * job, submit it, control its execution, and query the state. The set methods
+ * only work until the job is submitted, afterwards they will throw an 
+ * IllegalStateException.
  */
 public class Job extends JobContext {  
   
@@ -49,8 +51,9 @@ public class Job extends JobContext {
   /**
    * Set the number of reduce tasks for the job.
    * @param tasks the number of reduce tasks
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setNumReduceTasks(int tasks) {
+  public void setNumReduceTasks(int tasks) throws IllegalStateException {
     conf.setInt(NUM_REDUCES_ATTR, tasks);
   }
 
@@ -58,6 +61,7 @@ public class Job extends JobContext {
    * Set the current working directory for the default file system.
    * 
    * @param dir the new current working directory.
+   * @throws IllegalStateException if the job is submitted
    */
   public void setWorkingDirectory(Path dir) throws IOException {
     dir = dir.makeQualified(FileSystem.get(conf));
@@ -67,48 +71,60 @@ public class Job extends JobContext {
   /**
    * Set the {@link InputFormat} for the job.
    * @param cls the <code>InputFormat</code> to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls) {
+  public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls
+                                  ) throws IllegalStateException {
     conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
   }
 
   /**
    * Set the {@link OutputFormat} for the job.
    * @param cls the <code>OutputFormat</code> to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls) {
+  public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls
+                                   ) throws IllegalStateException {
     conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
   }
 
   /**
    * Set the {@link Mapper} for the job.
    * @param cls the <code>Mapper</code> to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls) {
+  public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls
+                             ) throws IllegalStateException {
     conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
   }
 
   /**
    * Set the combiner class for the job.
    * @param cls the combiner to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls) {
+  public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls
+                               ) throws IllegalStateException {
     conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
   }
 
   /**
    * Set the {@link Reducer} for the job.
    * @param cls the <code>Reducer</code> to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls) {
+  public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls
+                              ) throws IllegalStateException {
     conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
   }
 
   /**
    * Set the {@link Partitioner} for the job.
    * @param cls the <code>Partitioner</code> to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls) {
+  public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls
+                                  ) throws IllegalStateException {
     conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
   }
 
@@ -118,8 +134,10 @@ public class Job extends JobContext {
    * value class.
    * 
    * @param theClass the map output key class.
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setMapOutputKeyClass(Class<?> theClass) {
+  public void setMapOutputKeyClass(Class<?> theClass
+                                   ) throws IllegalStateException {
     conf.setClass(MAP_OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
   }
 
@@ -129,8 +147,10 @@ public class Job extends JobContext {
    * value class.
    * 
    * @param theClass the map output value class.
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setMapOutputValueClass(Class<?> theClass) {
+  public void setMapOutputValueClass(Class<?> theClass
+                                     ) throws IllegalStateException {
     conf.setClass(MAP_OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
   }
 
@@ -138,8 +158,10 @@ public class Job extends JobContext {
    * Set the key class for the job output data.
    * 
    * @param theClass the key class for the job output data.
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setOutputKeyClass(Class<?> theClass) {
+  public void setOutputKeyClass(Class<?> theClass
+                                ) throws IllegalStateException {
     conf.setClass(OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
   }
 
@@ -147,8 +169,10 @@ public class Job extends JobContext {
    * Set the value class for job outputs.
    * 
    * @param theClass the value class for job outputs.
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setOutputValueClass(Class<?> theClass) {
+  public void setOutputValueClass(Class<?> theClass
+                                  ) throws IllegalStateException {
     conf.setClass(OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
   }
 
@@ -156,8 +180,10 @@ public class Job extends JobContext {
    * Define the comparator that controls how the keys are sorted before they
    * are passed to the {@link Reducer}.
    * @param cls the raw comparator
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setSortComparatorClass(Class<? extends RawComparator<?>> cls) {
+  public void setSortComparatorClass(Class<? extends RawComparator<?>> cls
+                                     ) throws IllegalStateException {
     conf.setClass(SORT_COMPARATOR_ATTR, cls, RawComparator.class);
   }
 
@@ -166,8 +192,10 @@ public class Job extends JobContext {
    * for a single call to 
    * {@link Reducer#reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
    * @param cls the raw comparator to use
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setGroupingComparatorClass(Class<? extends RawComparator<?>> cls){
+  public void setGroupingComparatorClass(Class<? extends RawComparator<?>> cls
+                                         ) throws IllegalStateException {
     conf.setClass(GROUPING_COMPARATOR_ATTR, cls, RawComparator.class);
   }
 
@@ -175,8 +203,9 @@ public class Job extends JobContext {
    * Set the user-specified job name.
    * 
    * @param name the job's new name.
+   * @throws IllegalStateException if the job is submitted
    */
-  public void setJobName(String name) {
+  public void setJobName(String name) throws IllegalStateException {
     conf.set(JOB_NAME_ATTR, name);
   }
 

+ 1 - 2
src/mapred/org/apache/hadoop/mapreduce/JobContext.java

@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -255,7 +254,7 @@ public class JobContext {
   }
 
   /** 
-   * Get the user defined {@link WritableComparable} comparator for 
+   * Get the user defined {@link RawComparator} comparator for 
    * grouping keys of inputs to the reduce.
    * 
    * @return comparator set by the user for grouping values.

+ 7 - 5
src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java

@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
  * 
  * @see RecordWriter
  */
-public interface OutputFormat<K, V> {
+public abstract class OutputFormat<K, V> {
 
   /** 
    * Get the {@link RecordWriter} for the given task.
@@ -50,8 +50,9 @@ public interface OutputFormat<K, V> {
    * @return a {@link RecordWriter} to write the output for the job.
    * @throws IOException
    */
-  RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-  throws IOException, InterruptedException;
+  public abstract RecordWriter<K, V> 
+    getRecordWriter(TaskAttemptContext context
+                    ) throws IOException, InterruptedException;
 
   /** 
    * Check for validity of the output-specification for the job.
@@ -64,7 +65,8 @@ public interface OutputFormat<K, V> {
    * @param context information about the job
    * @throws IOException when output should not be attempted
    */
-  void checkOutputSpecs(JobContext context
-                        ) throws IOException, InterruptedException;
+  public abstract void checkOutputSpecs(JobContext context
+                                        ) throws IOException, 
+                                                 InterruptedException;
 }
 

+ 2 - 2
src/mapred/org/apache/hadoop/mapreduce/Partitioner.java

@@ -30,7 +30,7 @@ package org.apache.hadoop.mapreduce;
  * 
  * @see Reducer
  */
-public interface Partitioner<KEY, VALUE> {
+public abstract class Partitioner<KEY, VALUE> {
   
   /** 
    * Get the partition number for a given key (hence record) given the total 
@@ -43,5 +43,5 @@ public interface Partitioner<KEY, VALUE> {
    * @param numPartitions the total number of partitions.
    * @return the partition number for the <code>key</code>.
    */
-  int getPartition(KEY key, VALUE value, int numPartitions);
+  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
 }

+ 3 - 1
src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java

@@ -37,7 +37,9 @@ public abstract class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
   }
 
   /**
-   * Iterate through the values for the current key.
+   * Iterate through the values for the current key, reusing the same value 
+   * object, which is stored in the context.
+   * @return the series of values associated with the current key
    */
   public abstract 
   Iterable<VALUEIN> getValues() throws IOException, InterruptedException;

+ 7 - 1
src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java

@@ -27,11 +27,12 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 
 /** A section of an input file.  Returned by {@link
  * InputFormat#getSplits(JobContext)} and passed to
  * {@link InputFormat#createRecordReader(InputSplit,TaskAttemptContext)}. */
-public class FileSplit extends InputSplit {
+public class FileSplit extends InputSplit implements Writable {
   private Path file;
   private long start;
   private long length;
@@ -60,20 +61,24 @@ public class FileSplit extends InputSplit {
   public long getStart() { return start; }
   
   /** The number of bytes in the file to process. */
+  @Override
   public long getLength() { return length; }
 
+  @Override
   public String toString() { return file + ":" + start + "+" + length; }
 
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
 
+  @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, file.toString());
     out.writeLong(start);
     out.writeLong(length);
   }
 
+  @Override
   public void readFields(DataInput in) throws IOException {
     file = new Path(Text.readString(in));
     start = in.readLong();
@@ -81,6 +86,7 @@ public class FileSplit extends InputSplit {
     hosts = null;
   }
 
+  @Override
   public String[] getLocations() throws IOException {
     if (this.hosts == null) {
       return new String[]{};

+ 1 - 1
src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
-public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
+public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
 
   private static final String TEMP_DIR_NAME = "_temp";
   /**

+ 1 - 1
src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java

@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 /**
  * Consume all outputs and put them in /dev/null. 
  */
-public class NullOutputFormat<K, V> implements OutputFormat<K, V> {
+public class NullOutputFormat<K, V> extends OutputFormat<K, V> {
   
   public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
     return new RecordWriter<K, V>(){

+ 1 - 1
src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.lib.partition;
 import org.apache.hadoop.mapreduce.Partitioner;
 
 /** Partition keys by their {@link Object#hashCode()}. */
-public class HashPartitioner<K, V> implements Partitioner<K, V> {
+public class HashPartitioner<K, V> extends Partitioner<K, V> {
 
   /** Use {@link Object#hashCode()} to partition. */
   public int getPartition(K key, V value,