Browse Source

Fix for HADOOP-103. Add a base class for Mapper and Reducer implementations that implements Closeable and JobConfigurable. Use it in supplied Mappers & Reducers. Also some minor improvements to demos. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@390258 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
eeea2a252d

+ 7 - 19
src/examples/org/apache/hadoop/examples/WordCount.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 /**
  * This is an example Hadoop Map/Reduce application.
@@ -49,9 +50,10 @@ public class WordCount {
    * For each line of input, break the line into words and emit them as
    * (<b>word</b>, <b>1</b>).
    */
-  public static class MapClass implements Mapper {
+  public static class MapClass extends MapReduceBase implements Mapper {
     
     private final static IntWritable one = new IntWritable(1);
+    private UTF8 word = new UTF8();
     
     public void map(WritableComparable key, Writable value, 
         OutputCollector output, 
@@ -59,23 +61,16 @@ public class WordCount {
       String line = ((UTF8)value).toString();
       StringTokenizer itr = new StringTokenizer(line);
       while (itr.hasMoreTokens()) {
-        String word = itr.nextToken();
-        output.collect(new UTF8(word), one);
+        word.set(itr.nextToken());
+        output.collect(word, one);
       }
     }
-    
-    public void configure(JobConf job) {
-    }
-    
-    public void close() {
-    }
-
   }
   
   /**
    * A reducer class that just emits the sum of the input values.
    */
-  public static class Reduce implements Reducer {
+  public static class Reduce extends MapReduceBase implements Reducer {
     
     public void reduce(WritableComparable key, Iterator values,
         OutputCollector output, 
@@ -86,13 +81,6 @@ public class WordCount {
       }
       output.collect(key, new IntWritable(sum));
     }
-    
-    public void configure(JobConf job) {
-    }
-    
-    public void close() {
-    }
-    
   }
   
   static void printUsage() {
@@ -150,7 +138,7 @@ public class WordCount {
     conf.setOutputDir(new File((String) other_args.get(1)));
     
     // Uncomment to run locally in a single process
-    // countJob.set("mapred.job.tracker", "local");
+    // conf.set("mapred.job.tracker", "local");
     
     JobClient.runJob(conf);
   }

+ 2 - 5
src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java

@@ -20,16 +20,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /** Implements the identity function, mapping inputs directly to outputs. */
-public class IdentityMapper implements Mapper {
-
-  public void configure(JobConf job) {}
+public class IdentityMapper extends MapReduceBase implements Mapper {
 
   /** The identify function.  Input key/value pair is written directly to
    * output.*/
@@ -38,5 +36,4 @@ public class IdentityMapper implements Mapper {
     throws IOException {
     output.collect(key, val);
   }
-	public void close() {}
 }

+ 2 - 6
src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java

@@ -22,16 +22,14 @@ import java.util.Iterator;
 
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /** Performs no reduction, writing all input values directly to the output. */
-public class IdentityReducer implements Reducer {
-
-  public void configure(JobConf job) {}
+public class IdentityReducer extends MapReduceBase implements Reducer {
 
   /** Writes all keys and values directly to output. */
   public void reduce(WritableComparable key, Iterator values,
@@ -42,6 +40,4 @@ public class IdentityReducer implements Reducer {
     }
   }
 	
-	public void close() {}
-	
 }

+ 2 - 6
src/java/org/apache/hadoop/mapred/lib/InverseMapper.java

@@ -20,17 +20,15 @@ import java.io.IOException;
 
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
 
 /** A {@link Mapper} that swaps keys and values. */
-public class InverseMapper implements Mapper {
-
-  public void configure(JobConf job) {}
+public class InverseMapper extends MapReduceBase implements Mapper {
 
   /** The inverse function.  Input keys and values are swapped.*/
   public void map(WritableComparable key, Writable value,
@@ -39,6 +37,4 @@ public class InverseMapper implements Mapper {
     output.collect((WritableComparable)value, key);
   }
   
-  public void close() {}
-  
 }

+ 3 - 7
src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java

@@ -21,16 +21,14 @@ import java.util.Iterator;
 
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.LongWritable;
 
 /** A {@link Reducer} that sums long values. */
-public class LongSumReducer implements Reducer {
-
-  public void configure(JobConf job) {}
+public class LongSumReducer extends MapReduceBase implements Reducer {
 
   public void reduce(WritableComparable key, Iterator values,
                      OutputCollector output, Reporter reporter)
@@ -45,7 +43,5 @@ public class LongSumReducer implements Reducer {
     // output sum
     output.collect(key, new LongWritable(sum));
   }
-  
-  public void close() {}
-  
+
 }

+ 3 - 4
src/java/org/apache/hadoop/mapred/lib/RegexMapper.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
@@ -34,7 +35,7 @@ import java.util.regex.Matcher;
 
 
 /** A {@link Mapper} that extracts text matching a regular expression. */
-public class RegexMapper implements Mapper {
+public class RegexMapper extends MapReduceBase implements Mapper {
 
   private Pattern pattern;
   private int group;
@@ -53,7 +54,5 @@ public class RegexMapper implements Mapper {
       output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
     }
   }
-  
-  public void close() {}
-  
+
 }

+ 2 - 7
src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java

@@ -21,8 +21,8 @@ import java.util.StringTokenizer;
 
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
@@ -32,15 +32,12 @@ import org.apache.hadoop.io.UTF8;
 
 /** A {@link Mapper} that maps text values into <token,freq> pairs.  Uses
  * {@link StringTokenizer} to break text into tokens. */
-public class TokenCountMapper implements Mapper {
-
-  public void configure(JobConf job) {}
+public class TokenCountMapper extends MapReduceBase implements Mapper {
 
   public void map(WritableComparable key, Writable value,
                   OutputCollector output, Reporter reporter)
     throws IOException {
     // get input text
-    long position = ((LongWritable)key).get();    // key is position in file
     String text = ((UTF8)value).toString();       // value is line of text
 
     // tokenize the value
@@ -51,6 +48,4 @@ public class TokenCountMapper implements Mapper {
     }  
   }
   
-  public void close() {}
-  
 }