Procházet zdrojové kódy

HADOOP-5681. Change examples RandomWriter and RandomTextWriter to use new mapreduce API. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@766670 13f79535-47bb-0310-9956-ffa450edef68
Sharad Agarwal před 16 roky
rodič
revize
b451c62bc1

+ 3 - 0
CHANGES.txt

@@ -240,6 +240,9 @@ Trunk (unreleased changes)
     HADOOP-4490. Provide ability to run tasks as job owners.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-5681. Change examples RandomWriter and RandomTextWriter to 
+    use new mapreduce API. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 40 - 45
src/examples/org/apache/hadoop/examples/RandomTextWriter.java

@@ -29,15 +29,10 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -86,7 +81,7 @@ public class RandomTextWriter extends Configured implements Tool {
                        "[-outFormat <output format class>] " + 
                        "<output>");
     ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
+    return 2;
   }
   
   /**
@@ -94,8 +89,7 @@ public class RandomTextWriter extends Configured implements Tool {
    */
   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
 
-  static class Map extends MapReduceBase 
-    implements Mapper<Text, Text, Text, Text> {
+  static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
     
     private long numBytesToWrite;
     private int minWordsInKey;
@@ -107,18 +101,19 @@ public class RandomTextWriter extends Configured implements Tool {
     /**
      * Save the configuration value that we need to write the data.
      */
-    public void configure(JobConf job) {
-      numBytesToWrite = job.getLong("test.randomtextwrite.bytes_per_map",
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      numBytesToWrite = conf.getLong("test.randomtextwrite.bytes_per_map",
                                     1*1024*1024*1024);
       minWordsInKey = 
-        job.getInt("test.randomtextwrite.min_words_key", 5);
+        conf.getInt("test.randomtextwrite.min_words_key", 5);
       wordsInKeyRange = 
-        (job.getInt("test.randomtextwrite.max_words_key", 10) - 
+        (conf.getInt("test.randomtextwrite.max_words_key", 10) - 
          minWordsInKey);
       minWordsInValue = 
-        job.getInt("test.randomtextwrite.min_words_value", 10);
+        conf.getInt("test.randomtextwrite.min_words_value", 10);
       wordsInValueRange = 
-        (job.getInt("test.randomtextwrite.max_words_value", 100) - 
+        (conf.getInt("test.randomtextwrite.max_words_value", 100) - 
          minWordsInValue);
     }
     
@@ -126,8 +121,7 @@ public class RandomTextWriter extends Configured implements Tool {
      * Given an output filename, write a bunch of random records to it.
      */
     public void map(Text key, Text value,
-                    OutputCollector<Text, Text> output, 
-                    Reporter reporter) throws IOException {
+                    Context context) throws IOException,InterruptedException {
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         // Generate the key/value 
@@ -139,20 +133,20 @@ public class RandomTextWriter extends Configured implements Tool {
         Text valueWords = generateSentence(noWordsValue);
         
         // Write the sentence 
-        output.collect(keyWords, valueWords);
+        context.write(keyWords, valueWords);
         
         numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
         
         // Update counters, progress etc.
-        reporter.incrCounter(Counters.BYTES_WRITTEN, 
-                             (keyWords.getLength()+valueWords.getLength()));
-        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+        context.getCounter(Counters.BYTES_WRITTEN).increment(
+                  keyWords.getLength() + valueWords.getLength());
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
         if (++itemCount % 200 == 0) {
-          reporter.setStatus("wrote record " + itemCount + ". " + 
+          context.setStatus("wrote record " + itemCount + ". " + 
                              numBytesToWrite + " bytes left.");
         }
       }
-      reporter.setStatus("done with " + itemCount + " records.");
+      context.setStatus("done with " + itemCount + " records.");
     }
     
     private Text generateSentence(int noWords) {
@@ -178,33 +172,35 @@ public class RandomTextWriter extends Configured implements Tool {
       return printUsage();    
     }
     
-    JobConf job = new JobConf(getConf());
-    
-    job.setJarByClass(RandomTextWriter.class);
-    job.setJobName("random-text-writer");
-    
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(Text.class);
-    
-    job.setInputFormat(RandomWriter.RandomInputFormat.class);
-    job.setMapperClass(Map.class);        
-    
-    JobClient client = new JobClient(job);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
-    long numBytesToWritePerMap = job.getLong("test.randomtextwrite.bytes_per_map",
+    int numMapsPerHost = conf.getInt("test.randomtextwrite.maps_per_host", 10);
+    long numBytesToWritePerMap = conf.getLong("test.randomtextwrite.bytes_per_map",
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
       System.err.println("Cannot have test.randomtextwrite.bytes_per_map set to 0");
       return -2;
     }
-    long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes", 
+    long totalBytesToWrite = conf.getLong("test.randomtextwrite.total_bytes", 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
     }
+    conf.setInt("mapred.map.tasks", numMaps);
+    
+    Job job = new Job(conf);
+    
+    job.setJarByClass(RandomTextWriter.class);
+    job.setJobName("random-text-writer");
+    
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    
+    job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
+    job.setMapperClass(RandomTextMapper.class);        
     
     Class<? extends OutputFormat> outputFormatClass = 
       SequenceFileOutputFormat.class;
@@ -224,10 +220,9 @@ public class RandomTextWriter extends Configured implements Tool {
       }
     }
 
-    job.setOutputFormat(outputFormatClass);
+    job.setOutputFormatClass(outputFormatClass);
     FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
     
-    job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
     
     // reducer NONE
@@ -235,14 +230,14 @@ public class RandomTextWriter extends Configured implements Tool {
     
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date endTime = new Date();
     System.out.println("Job ended: " + endTime);
     System.out.println("The job took " + 
                        (endTime.getTime() - startTime.getTime()) /1000 + 
                        " seconds.");
     
-    return 0;
+    return ret;
   }
   
   public static void main(String[] args) throws Exception {

+ 72 - 70
src/examples/org/apache/hadoop/examples/RandomWriter.java

@@ -19,7 +19,9 @@
 package org.apache.hadoop.examples;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,19 +32,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -93,19 +87,20 @@ public class RandomWriter extends Configured implements Tool {
    * A custom input format that creates virtual inputs of a single string
    * for each map.
    */
-  static class RandomInputFormat implements InputFormat<Text, Text> {
+  static class RandomInputFormat extends InputFormat<Text, Text> {
 
     /** 
      * Generate the requested number of file splits, with the filename
      * set to the filename of the output file.
      */
-    public InputSplit[] getSplits(JobConf job, 
-                                  int numSplits) throws IOException {
-      InputSplit[] result = new InputSplit[numSplits];
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      List<InputSplit> result = new ArrayList<InputSplit>();
       Path outDir = FileOutputFormat.getOutputPath(job);
-      for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
-                                  (String[])null);
+      int numSplits = 
+            job.getConfiguration().getInt("mapred.map.tasks", 1);
+      for(int i=0; i < numSplits; ++i) {
+        result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
+                                  (String[])null));
       }
       return result;
     }
@@ -114,43 +109,52 @@ public class RandomWriter extends Configured implements Tool {
      * Return a single record (filename, "") where the filename is taken from
      * the file split.
      */
-    static class RandomRecordReader implements RecordReader<Text, Text> {
+    static class RandomRecordReader extends RecordReader<Text, Text> {
       Path name;
+      Text key = null;
+      Text value = new Text();
       public RandomRecordReader(Path p) {
         name = p;
       }
-      public boolean next(Text key, Text value) {
+      
+      public void initialize(InputSplit split,
+                             TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    	  
+      }
+      
+      public boolean nextKeyValue() {
         if (name != null) {
+          key = new Text();
           key.set(name.getName());
           name = null;
           return true;
         }
         return false;
       }
-      public Text createKey() {
-        return new Text();
-      }
-      public Text createValue() {
-        return new Text();
+      
+      public Text getCurrentKey() {
+        return key;
       }
-      public long getPos() {
-        return 0;
+      
+      public Text getCurrentValue() {
+        return value;
       }
+      
       public void close() {}
+
       public float getProgress() {
         return 0.0f;
       }
     }
 
-    public RecordReader<Text, Text> getRecordReader(InputSplit split,
-                                        JobConf job, 
-                                        Reporter reporter) throws IOException {
+    public RecordReader<Text, Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException, InterruptedException {
       return new RandomRecordReader(((FileSplit) split).getPath());
     }
   }
 
-  static class Map extends MapReduceBase
-    implements Mapper<WritableComparable, Writable,
+  static class RandomMapper extends Mapper<WritableComparable, Writable,
                       BytesWritable, BytesWritable> {
     
     private long numBytesToWrite;
@@ -173,8 +177,7 @@ public class RandomWriter extends Configured implements Tool {
      */
     public void map(WritableComparable key, 
                     Writable value,
-                    OutputCollector<BytesWritable, BytesWritable> output, 
-                    Reporter reporter) throws IOException {
+                    Context context) throws IOException,InterruptedException {
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         int keyLength = minKeySize + 
@@ -185,16 +188,16 @@ public class RandomWriter extends Configured implements Tool {
           (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
         randomValue.setSize(valueLength);
         randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
-        output.collect(randomKey, randomValue);
+        context.write(randomKey, randomValue);
         numBytesToWrite -= keyLength + valueLength;
-        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
-        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+        context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
         if (++itemCount % 200 == 0) {
-          reporter.setStatus("wrote record " + itemCount + ". " + 
+          context.setStatus("wrote record " + itemCount + ". " + 
                              numBytesToWrite + " bytes left.");
         }
       }
-      reporter.setStatus("done with " + itemCount + " records.");
+      context.setStatus("done with " + itemCount + " records.");
     }
     
     /**
@@ -202,17 +205,17 @@ public class RandomWriter extends Configured implements Tool {
      * the data.
      */
     @Override
-    public void configure(JobConf job) {
-      numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      numBytesToWrite = conf.getLong("test.randomwrite.bytes_per_map",
                                     1*1024*1024*1024);
-      minKeySize = job.getInt("test.randomwrite.min_key", 10);
+      minKeySize = conf.getInt("test.randomwrite.min_key", 10);
       keySizeRange = 
-        job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
-      minValueSize = job.getInt("test.randomwrite.min_value", 0);
+        conf.getInt("test.randomwrite.max_key", 1000) - minKeySize;
+      minValueSize = conf.getInt("test.randomwrite.min_value", 0);
       valueSizeRange = 
-        job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
+        conf.getInt("test.randomwrite.max_value", 20000) - minValueSize;
     }
-    
   }
   
   /**
@@ -226,42 +229,41 @@ public class RandomWriter extends Configured implements Tool {
     if (args.length == 0) {
       System.out.println("Usage: writer <out-dir>");
       ToolRunner.printGenericCommandUsage(System.out);
-      return -1;
+      return 2;
     }
     
     Path outDir = new Path(args[0]);
-    JobConf job = new JobConf(getConf());
-    
-    job.setJarByClass(RandomWriter.class);
-    job.setJobName("random-writer");
-    FileOutputFormat.setOutputPath(job, outDir);
-    
-    job.setOutputKeyClass(BytesWritable.class);
-    job.setOutputValueClass(BytesWritable.class);
-    
-    job.setInputFormat(RandomInputFormat.class);
-    job.setMapperClass(Map.class);        
-    job.setReducerClass(IdentityReducer.class);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    
-    JobClient client = new JobClient(job);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
-    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
+    int numMapsPerHost = conf.getInt("test.randomwriter.maps_per_host", 10);
+    long numBytesToWritePerMap = conf.getLong("test.randomwrite.bytes_per_map",
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
       System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
       return -2;
     }
-    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
+    long totalBytesToWrite = conf.getLong("test.randomwrite.total_bytes", 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
     }
+    conf.setInt("mapred.map.tasks", numMaps);
+
+    Job job = new Job(conf);
+    
+    job.setJarByClass(RandomWriter.class);
+    job.setJobName("random-writer");
+    FileOutputFormat.setOutputPath(job, outDir);
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BytesWritable.class);
+    job.setInputFormatClass(RandomInputFormat.class);
+    job.setMapperClass(RandomMapper.class);        
+    job.setReducerClass(Reducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     
-    job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
     
     // reducer NONE
@@ -269,14 +271,14 @@ public class RandomWriter extends Configured implements Tool {
     
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date endTime = new Date();
     System.out.println("Job ended: " + endTime);
     System.out.println("The job took " + 
                        (endTime.getTime() - startTime.getTime()) /1000 + 
                        " seconds.");
     
-    return 0;
+    return ret;
   }
   
   public static void main(String[] args) throws Exception {

+ 12 - 0
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -410,6 +410,18 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     init(conf);
   }
 
+  /**
+   * Build a job client with the given {@link Configuration}, 
+   * and connect to the default {@link JobTracker}.
+   * 
+   * @param conf the configuration.
+   * @throws IOException
+   */
+  public JobClient(Configuration conf) throws IOException {
+    setConf(conf);
+    init(new JobConf(conf));
+  }
+
   /**
    * Connect to the default {@link JobTracker}.
    * @param conf the job configuration.

+ 1 - 1
src/test/org/apache/hadoop/mapred/UtilsForTests.java

@@ -491,7 +491,7 @@ public class UtilsForTests {
   // Input formats
   /**
    * A custom input format that creates virtual inputs of a single string
-   * for each map. Using {@link RandomWriter} code. 
+   * for each map. 
    */
   public static class RandomInputFormat implements InputFormat<Text, Text> {