1
0
فهرست منبع

HADOOP-1040. Update RandomWriter example to use counters and user-defined input and output formats.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@511985 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 سال پیش
والد
کامیت
c173365443
2فایلهای تغییر یافته به همراه118 افزوده شده و 81 حذف شده
  1. 3 0
      CHANGES.txt
  2. 115 81
      src/examples/org/apache/hadoop/examples/RandomWriter.java

+ 3 - 0
CHANGES.txt

@@ -138,6 +138,9 @@ Trunk (unreleased changes)
 40. HADOOP-1039.  In HDFS's TestCheckpoint, avoid restarting
 40. HADOOP-1039.  In HDFS's TestCheckpoint, avoid restarting
     MiniDFSCluster so often, speeding this test.  (Dhruba Borthakur via cutting)
     MiniDFSCluster so often, speeding this test.  (Dhruba Borthakur via cutting)
 
 
+41. HADOOP-1040.  Update RandomWriter example to use counters and
+    user-defined input and output formats.  (omalley via cutting)
+
 
 
 Release 0.11.2 - 2007-02-16
 Release 0.11.2 - 2007-02-16
 
 

+ 115 - 81
src/examples/org/apache/hadoop/examples/RandomWriter.java

@@ -19,29 +19,16 @@
 package org.apache.hadoop.examples;
 package org.apache.hadoop.examples;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.text.NumberFormat;
 import java.util.Date;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.Random;
 import java.util.Random;
 
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Progressable;
 
 
 /**
 /**
  * This program uses map/reduce to just run a distributed job where there is
  * This program uses map/reduce to just run a distributed job where there is
@@ -50,9 +37,91 @@ import org.apache.hadoop.mapred.Reporter;
  * 
  * 
  * @author Owen O'Malley
  * @author Owen O'Malley
  */
  */
-public class RandomWriter extends MapReduceBase implements Reducer {
+public class RandomWriter {
   
   
-  public static class Map extends MapReduceBase implements Mapper {
+  /**
+   * User counters
+   */
+  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+  
+  /**
+   * A custom input format that creates virtual inputs of a single string
+   * for each map.
+   */
+  static class RandomInputFormat implements InputFormat {
+    
+    /** Accept all job confs */
+    public void validateInput(JobConf job) throws IOException {
+    }
+
+    /** 
+     * Generate the requested number of file splits, with the filename
+     * set to the filename of the output file.
+     */
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      InputSplit[] result = new InputSplit[numSplits];
+      Path outDir = job.getOutputPath();
+      for(int i=0; i < result.length; ++i) {
+        result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job);
+      }
+      return result;
+    }
+
+    /**
+     * Return a single record (filename, "") where the filename is taken from
+     * the file split.
+     */
+    static class RandomRecordReader implements RecordReader {
+      Path name;
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      public boolean next(Writable key, Writable value) {
+        if (name != null) {
+          ((Text) key).set(name.toString());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      public WritableComparable createKey() {
+        return new Text();
+      }
+      public Writable createValue() {
+        return new Text();
+      }
+      public long getPos() {
+        return 0;
+      }
+      public void close() {}
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader getRecordReader(InputSplit split,
+                                        JobConf job, 
+                                        Reporter reporter) throws IOException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
+
+  /**
+   * Consume all outputs and put them in /dev/null. 
+   */
+  static class DataSink implements OutputFormat {
+    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
+                                        String name, Progressable progress) {
+      return new RecordWriter(){
+        public void write(WritableComparable key, Writable value) { }
+        public void close(Reporter reporter) { }
+      };
+    }
+    public void checkOutputSpecs(FileSystem ignored, JobConf job) { }
+  }
+
+  static class Map extends MapReduceBase implements Mapper {
     private FileSystem fileSys = null;
     private FileSystem fileSys = null;
     private JobConf jobConf = null;
     private JobConf jobConf = null;
     private long numBytesToWrite;
     private long numBytesToWrite;
@@ -77,7 +146,7 @@ public class RandomWriter extends MapReduceBase implements Reducer {
                     Writable value,
                     Writable value,
                     OutputCollector output, 
                     OutputCollector output, 
                     Reporter reporter) throws IOException {
                     Reporter reporter) throws IOException {
-      String filename = ((Text) value).toString();
+      String filename = ((Text) key).toString();
       SequenceFile.Writer writer = 
       SequenceFile.Writer writer = 
         SequenceFile.createWriter(fileSys, jobConf, new Path(filename), 
         SequenceFile.createWriter(fileSys, jobConf, new Path(filename), 
                                 BytesWritable.class, BytesWritable.class,
                                 BytesWritable.class, BytesWritable.class,
@@ -94,6 +163,8 @@ public class RandomWriter extends MapReduceBase implements Reducer {
         randomizeBytes(randomValue.get(), 0, randomValue.getSize());
         randomizeBytes(randomValue.get(), 0, randomValue.getSize());
         writer.append(randomKey, randomValue);
         writer.append(randomKey, randomValue);
         numBytesToWrite -= keyLength + valueLength;
         numBytesToWrite -= keyLength + valueLength;
+        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
+        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
         if (++itemCount % 200 == 0) {
         if (++itemCount % 200 == 0) {
           reporter.setStatus("wrote record " + itemCount + ". " + 
           reporter.setStatus("wrote record " + itemCount + ". " + 
                              numBytesToWrite + " bytes left.");
                              numBytesToWrite + " bytes left.");
@@ -126,13 +197,6 @@ public class RandomWriter extends MapReduceBase implements Reducer {
     
     
   }
   }
   
   
-  public void reduce(WritableComparable key, 
-                     Iterator values,
-                     OutputCollector output, 
-                     Reporter reporter) throws IOException {
-    // nothing
-  }
-  
   /**
   /**
    * This is the main routine for launching a distributed random write job.
    * This is the main routine for launching a distributed random write job.
    * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
    * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
@@ -149,78 +213,48 @@ public class RandomWriter extends MapReduceBase implements Reducer {
    * @throws IOException 
    * @throws IOException 
    */
    */
   public static void main(String[] args) throws IOException {
   public static void main(String[] args) throws IOException {
-    Configuration defaults = new Configuration();
     if (args.length == 0) {
     if (args.length == 0) {
       System.out.println("Usage: writer <out-dir> [<config>]");
       System.out.println("Usage: writer <out-dir> [<config>]");
       return;
       return;
     }
     }
     Path outDir = new Path(args[0]);
     Path outDir = new Path(args[0]);
+    JobConf job;
     if (args.length >= 2) {
     if (args.length >= 2) {
-      defaults.addFinalResource(new Path(args[1]));
+      job = new JobConf(new Path(args[1]));
+    } else {
+      job = new JobConf();
     }
     }
-    
-    JobConf jobConf = new JobConf(defaults, RandomWriter.class);
-    jobConf.setJobName("random-writer");
+    job.setJarByClass(RandomWriter.class);
+    job.setJobName("random-writer");
+    job.setOutputPath(outDir);
     
     
     // turn off speculative execution, because DFS doesn't handle
     // turn off speculative execution, because DFS doesn't handle
     // multiple writers to the same file.
     // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
-    jobConf.setOutputKeyClass(BytesWritable.class);
-    jobConf.setOutputValueClass(BytesWritable.class);
+    job.setSpeculativeExecution(false);
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BytesWritable.class);
     
     
-    jobConf.setMapperClass(Map.class);        
-    jobConf.setReducerClass(RandomWriter.class);
+    job.setInputFormat(RandomInputFormat.class);
+    job.setMapperClass(Map.class);        
+    job.setReducerClass(IdentityReducer.class);
+    job.setOutputFormat(DataSink.class);
     
     
-    JobClient client = new JobClient(jobConf);
+    JobClient client = new JobClient(job);
     ClusterStatus cluster = client.getClusterStatus();
     ClusterStatus cluster = client.getClusterStatus();
     int numMaps = cluster.getTaskTrackers() * 
     int numMaps = cluster.getTaskTrackers() * 
-         jobConf.getInt("test.randomwriter.maps_per_host", 10);
-    jobConf.setNumMapTasks(numMaps);
+         job.getInt("test.randomwriter.maps_per_host", 10);
+    job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
     System.out.println("Running " + numMaps + " maps.");
-    jobConf.setNumReduceTasks(1);
-    
-    Path tmpDir = new Path("random-work");
-    Path inDir = new Path(tmpDir, "in");
-    Path fakeOutDir = new Path(tmpDir, "out");
-    FileSystem fileSys = FileSystem.get(jobConf);
-    if (fileSys.exists(outDir)) {
-      System.out.println("Error: Output directory " + outDir + 
-                         " already exists.");
-      return;
-    }
-    fileSys.delete(tmpDir);
-    if (!fileSys.mkdirs(inDir)) {
-      System.out.println("Error: Mkdirs failed to create " + 
-                         inDir.toString());
-      return;
-    }
-    NumberFormat numberFormat = NumberFormat.getInstance();
-    numberFormat.setMinimumIntegerDigits(6);
-    numberFormat.setGroupingUsed(false);
-
-    for(int i=0; i < numMaps; ++i) {
-      Path file = new Path(inDir, "part"+i);
-      FSDataOutputStream writer = fileSys.create(file);
-      writer.writeBytes(outDir + "/part" + numberFormat.format(i)+ "\n");
-      writer.close();
-    }
-    jobConf.setInputPath(inDir);
-    jobConf.setOutputPath(fakeOutDir);
-    
-    // Uncomment to run locally in a single process
-    //job_conf.set("mapred.job.tracker", "local");
+    job.setNumReduceTasks(1);
     
     
     Date startTime = new Date();
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
     System.out.println("Job started: " + startTime);
-    try {
-      JobClient.runJob(jobConf);
-      Date endTime = new Date();
-      System.out.println("Job ended: " + endTime);
-      System.out.println("The job took " + 
-         (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
-    } finally {
-      fileSys.delete(tmpDir);
-    }
+    JobClient.runJob(job);
+    Date endTime = new Date();
+    System.out.println("Job ended: " + endTime);
+    System.out.println("The job took " + 
+                       (endTime.getTime() - startTime.getTime()) /1000 + 
+                       " seconds.");
   }
   }
   
   
 }
 }