Browse Source

HADOOP-3402. Add terasort example program.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@673517 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 17 năm trước cách đây
mục cha
commit
bd433e2c57

+ 2 - 0
CHANGES.txt

@@ -48,6 +48,8 @@ Trunk (unreleased changes)
     HADOOP-3587. Add a unit test for the contrib/data_join framework.
     (cdouglas)
 
+    HADOOP-3402. Add terasort example program (omalley)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 6 - 0
src/examples/org/apache/hadoop/examples/ExampleDriver.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.examples;
 import org.apache.hadoop.examples.dancing.DistributedPentomino;
 import org.apache.hadoop.examples.dancing.Sudoku;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraValidate;
 import org.apache.hadoop.util.ProgramDriver;
 
 /**
@@ -50,6 +53,9 @@ public class ExampleDriver {
       pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
       pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
       pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files.");
+      pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
+      pgd.addClass("terasort", TeraSort.class, "Run the terasort");
+      pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort");
       pgd.driver(argv);
     }
     catch(Throwable e){

+ 364 - 0
src/examples/org/apache/hadoop/examples/terasort/TeraGen.java

@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate the official terasort input data set.
+ * The user specifies the number of rows and the output directory and this
+ * class runs a map/reduce program to generate the data.
+ * The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
+ * <li>The keys are random characters from the set ' ' .. '~'.
+ * <li>The rowid is the right justified row id as a int.
+ * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * </ul>
+ *
+ * <p>
+ * To run the program: 
+ * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
+ */
+public class TeraGen extends Configured implements Tool {
+
+  /**
+   * An input format that assigns ranges of longs to each mapper.
+   */
+  static class RangeInputFormat 
+       implements InputFormat<LongWritable, NullWritable> {
+    
+    /**
+     * An input split consisting of a range on numbers.
+     */
+    static class RangeInputSplit implements InputSplit {
+      long firstRow;
+      long rowCount;
+
+      public RangeInputSplit() { }
+
+      public RangeInputSplit(long offset, long length) {
+        firstRow = offset;
+        rowCount = length;
+      }
+
+      public long getLength() throws IOException {
+        return 0;
+      }
+
+      public String[] getLocations() throws IOException {
+        return new String[]{};
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        firstRow = WritableUtils.readVLong(in);
+        rowCount = WritableUtils.readVLong(in);
+      }
+
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVLong(out, firstRow);
+        WritableUtils.writeVLong(out, rowCount);
+      }
+    }
+    
+    /**
+     * A record reader that will generate a range of numbers.
+     */
+    static class RangeRecordReader 
+          implements RecordReader<LongWritable, NullWritable> {
+      long startRow;
+      long finishedRows;
+      long totalRows;
+
+      public RangeRecordReader(RangeInputSplit split) {
+        startRow = split.firstRow;
+        finishedRows = 0;
+        totalRows = split.rowCount;
+      }
+
+      public void close() throws IOException {
+        // NOTHING
+      }
+
+      public LongWritable createKey() {
+        return new LongWritable();
+      }
+
+      public NullWritable createValue() {
+        return NullWritable.get();
+      }
+
+      public long getPos() throws IOException {
+        return finishedRows;
+      }
+
+      public float getProgress() throws IOException {
+        return finishedRows / (float) totalRows;
+      }
+
+      public boolean next(LongWritable key, 
+                          NullWritable value) {
+        if (finishedRows < totalRows) {
+          key.set(startRow + finishedRows);
+          finishedRows += 1;
+          return true;
+        } else {
+          return false;
+        }
+      }
+      
+    }
+
+    public RecordReader<LongWritable, NullWritable> 
+      getRecordReader(InputSplit split, JobConf job,
+                      Reporter reporter) throws IOException {
+      return new RangeRecordReader((RangeInputSplit) split);
+    }
+
+    /**
+     * Create the desired number of splits, dividing the number of rows
+     * between the mappers.
+     */
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) {
+      long totalRows = getNumberOfRows(job);
+      long rowsPerSplit = totalRows / numSplits;
+      System.out.println("Generating " + totalRows + " using " + numSplits + 
+                         " maps with step of " + rowsPerSplit);
+      InputSplit[] splits = new InputSplit[numSplits];
+      long currentRow = 0;
+      for(int split=0; split < numSplits-1; ++split) {
+        splits[split] = new RangeInputSplit(currentRow, rowsPerSplit);
+        currentRow += rowsPerSplit;
+      }
+      splits[numSplits-1] = new RangeInputSplit(currentRow, 
+                                                totalRows - currentRow);
+      return splits;
+    }
+
+    public void validateInput(JobConf job) throws IOException {
+      // NOTHING
+    }
+  }
+  
+  static long getNumberOfRows(JobConf job) {
+    return job.getLong("terasort.num-rows", 0);
+  }
+  
+  static void setNumberOfRows(JobConf job, long numRows) {
+    job.setLong("terasort.num-rows", numRows);
+  }
+
+  static class RandomGenerator {
+    private long seed = 0;
+    private static final long mask32 = (1l<<32) - 1;
+    /**
+     * The number of iterations separating the precomputed seeds.
+     */
+    private static final int seedSkip = 128 * 1024 * 1024;
+    /**
+     * The precomputed seed values after every seedSkip iterations.
+     * There should be enough values so that a 2**32 iterations are 
+     * covered.
+     */
+    private static final long[] seeds = new long[]{0L,
+                                                   4160749568L,
+                                                   4026531840L,
+                                                   3892314112L,
+                                                   3758096384L,
+                                                   3623878656L,
+                                                   3489660928L,
+                                                   3355443200L,
+                                                   3221225472L,
+                                                   3087007744L,
+                                                   2952790016L,
+                                                   2818572288L,
+                                                   2684354560L,
+                                                   2550136832L,
+                                                   2415919104L,
+                                                   2281701376L,
+                                                   2147483648L,
+                                                   2013265920L,
+                                                   1879048192L,
+                                                   1744830464L,
+                                                   1610612736L,
+                                                   1476395008L,
+                                                   1342177280L,
+                                                   1207959552L,
+                                                   1073741824L,
+                                                   939524096L,
+                                                   805306368L,
+                                                   671088640L,
+                                                   536870912L,
+                                                   402653184L,
+                                                   268435456L,
+                                                   134217728L,
+                                                  };
+
+    /**
+     * Start the random number generator on the given iteration.
+     * @param initalIteration the iteration number to start on
+     */
+    RandomGenerator(long initalIteration) {
+      int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
+      seed = seeds[baseIndex];
+      for(int i=0; i < initalIteration % seedSkip; ++i) {
+        next();
+      }
+    }
+
+    RandomGenerator() {
+      this(0);
+    }
+
+    long next() {
+      seed = (seed * 3141592621l + 663896637) & mask32;
+      return seed;
+    }
+  }
+
+  /**
+   * The Mapper class that given a row number, will generate the appropriate 
+   * output line.
+   */
+  public static class SortGenMapper extends MapReduceBase 
+      implements Mapper<LongWritable, NullWritable, Text, Text> {
+
+    private Text key = new Text();
+    private Text value = new Text();
+    private RandomGenerator rand;
+    private byte[] keyBytes = new byte[12];
+    private byte[] spaces = "          ".getBytes();
+    private byte[][] filler = new byte[26][];
+    {
+      for(int i=0; i < 26; ++i) {
+        filler[i] = new byte[10];
+        for(int j=0; j<10; ++j) {
+          filler[i][j] = (byte) ('A' + i);
+        }
+      }
+    }
+    
+    /**
+     * Add a random key to the text
+     * @param rowId
+     */
+    private void addKey() {
+      for(int i=0; i<3; i++) {
+        long temp = rand.next() / 52;
+        keyBytes[3 + 4*i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[2 + 4*i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[1 + 4*i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[4*i] = (byte) (' ' + (temp % 95));
+      }
+      key.set(keyBytes, 0, 10);
+    }
+    
+    /**
+     * Add the rowid to the row.
+     * @param rowId
+     */
+    private void addRowId(long rowId) {
+      byte[] rowid = Integer.toString((int) rowId).getBytes();
+      int padSpace = 10 - rowid.length;
+      if (padSpace > 0) {
+        value.append(spaces, 0, 10 - rowid.length);
+      }
+      value.append(rowid, 0, Math.min(rowid.length, 10));
+    }
+
+    /**
+     * Add the required filler bytes. Each row consists of 7 blocks of
+     * 10 characters and 1 block of 8 characters.
+     * @param rowId the current row number
+     */
+    private void addFiller(long rowId) {
+      int base = (int) ((rowId * 8) % 26);
+      for(int i=0; i<7; ++i) {
+        value.append(filler[(base+i) % 26], 0, 10);
+      }
+      value.append(filler[(base+7) % 26], 0, 8);
+    }
+
+    public void map(LongWritable row, NullWritable ignored,
+                    OutputCollector<Text, Text> output,
+                    Reporter reporter) throws IOException {
+      long rowId = row.get();
+      if (rand == null) {
+        // we use 3 random numbers per a row
+        rand = new RandomGenerator(rowId*3);
+      }
+      addKey();
+      value.clear();
+      addRowId(rowId);
+      addFiller(rowId);
+      output.collect(key, value);
+    }
+
+  }
+
+  /**
+   * @param args the cli arguments
+   */
+  public int run(String[] args) throws IOException {
+    JobConf job = (JobConf) getConf();
+    setNumberOfRows(job, Long.parseLong(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraGen");
+    job.setJarByClass(TeraGen.class);
+    job.setMapperClass(SortGenMapper.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setInputFormat(RangeInputFormat.class);
+    job.setOutputFormat(TeraOutputFormat.class);
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new JobConf(), new TeraGen(), args);
+    System.exit(res);
+  }
+
+}

+ 212 - 0
src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java

@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+/**
+ * An input format that reads the first 10 characters of each line as the key
+ * and the rest of the line as the value. Both key and value are represented
+ * as Text.
+ */
+public class TeraInputFormat extends FileInputFormat<Text,Text> {
+
+  static final String PARTITION_FILENAME = "_partition.lst";
+  static final String SAMPLE_SIZE = "terasort.partitions.sample";
+  private static JobConf lastConf = null;
+  private static InputSplit[] lastResult = null;
+
+  static class TextSampler implements IndexedSortable {
+    private ArrayList<Text> records = new ArrayList<Text>();
+
+    public int compare(int i, int j) {
+      Text left = records.get(i);
+      Text right = records.get(j);
+      return left.compareTo(right);
+    }
+
+    public void swap(int i, int j) {
+      Text left = records.get(i);
+      Text right = records.get(j);
+      records.set(j, left);
+      records.set(i, right);
+    }
+
+    public void addKey(Text key) {
+      records.add(new Text(key));
+    }
+
+    /**
+     * Find the split points for a given sample. The sample keys are sorted
+     * and down sampled to find even split points for the partitions. The
+     * returned keys should be the start of their respective partitions.
+     * @param numPartitions the desired number of partitions
+     * @return an array of size numPartitions - 1 that holds the split points
+     */
+    Text[] createPartitions(int numPartitions) {
+      int numRecords = records.size();
+      System.out.println("Making " + numPartitions + " from " + numRecords + 
+                         " records");
+      if (numPartitions > numRecords) {
+        throw new IllegalArgumentException
+          ("Requested more partitions than input keys (" + numPartitions +
+           " > " + numRecords + ")");
+      }
+      new QuickSort().sort(this, 0, records.size());
+      float stepSize = numRecords / (float) numPartitions;
+      System.out.println("Step size is " + stepSize);
+      Text[] result = new Text[numPartitions-1];
+      for(int i=1; i < numPartitions; ++i) {
+        result[i-1] = records.get(Math.round(stepSize * i));
+      }
+      return result;
+    }
+  }
+  
+  /**
+   * Use the input splits to take samples of the input and generate sample
+   * keys. By default reads 100,000 keys from 10 locations in the input, sorts
+   * them and picks N-1 keys to generate N equally sized partitions.
+   * @param conf the job to sample
+   * @param partFile where to write the output file to
+   * @throws IOException if something goes wrong
+   */
+  public static void writePartitionFile(JobConf conf, 
+                                        Path partFile) throws IOException {
+    TeraInputFormat inFormat = new TeraInputFormat();
+    TextSampler sampler = new TextSampler();
+    Text key = new Text();
+    Text value = new Text();
+    int partitions = conf.getNumReduceTasks();
+    long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
+    InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
+    int samples = Math.min(10, splits.length);
+    long recordsPerSample = sampleSize / samples;
+    int sampleStep = splits.length / samples;
+    long records = 0;
+    // take N samples from different parts of the input
+    for(int i=0; i < samples; ++i) {
+      RecordReader<Text,Text> reader = 
+        inFormat.getRecordReader(splits[sampleStep * i], conf, null);
+      while (reader.next(key, value)) {
+        sampler.addKey(key);
+        records += 1;
+        if ((i+1) * recordsPerSample <= records) {
+          break;
+        }
+      }
+    }
+    FileSystem outFs = partFile.getFileSystem(conf);
+    if (outFs.exists(partFile)) {
+      outFs.delete(partFile, false);
+    }
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(outFs, conf, partFile, Text.class, 
+                                NullWritable.class);
+    NullWritable nullValue = NullWritable.get();
+    for(Text split : sampler.createPartitions(partitions)) {
+      writer.append(split, nullValue);
+    }
+    writer.close();
+  }
+
+  static class TeraRecordReader implements RecordReader<Text,Text> {
+    private LineRecordReader in;
+    private LongWritable junk = new LongWritable();
+    private Text line = new Text();
+    private static int KEY_LENGTH = 10;
+
+    public TeraRecordReader(Configuration job, 
+                            FileSplit split) throws IOException {
+      in = new LineRecordReader(job, split);
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    public Text createKey() {
+      return new Text();
+    }
+
+    public Text createValue() {
+      return new Text();
+    }
+
+    public long getPos() throws IOException {
+      return in.getPos();
+    }
+
+    public float getProgress() throws IOException {
+      return in.getProgress();
+    }
+
+    public boolean next(Text key, Text value) throws IOException {
+      if (in.next(junk, line)) {
+        if (line.getLength() < KEY_LENGTH) {
+          key.set(line);
+          value.clear();
+        } else {
+          byte[] bytes = line.getBytes();
+          key.set(bytes, 0, KEY_LENGTH);
+          value.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  @Override
+  public RecordReader<Text, Text> 
+      getRecordReader(InputSplit split,
+                      JobConf job, 
+                      Reporter reporter) throws IOException {
+    return new TeraRecordReader(job, (FileSplit) split);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
+    if (conf == lastConf) {
+      return lastResult;
+    }
+    lastConf = conf;
+    lastResult = super.getSplits(conf, splits);
+    return lastResult;
+  }
+}

+ 88 - 0
src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A streamlined text output format that writes key, value, and "\r\n".
+ */
+public class TeraOutputFormat extends TextOutputFormat<Text,Text> {
+  static final String FINAL_SYNC_ATTRIBUTE = "terasort.final.sync";
+
+  /**
+   * Set the requirement for a final sync before the stream is closed.
+   */
+  public static void setFinalSync(JobConf conf, boolean newValue) {
+    conf.setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
+  }
+
+  /**
+   * Does the user want a final sync at close?
+   */
+  public static boolean getFinalSync(JobConf conf) {
+    return conf.getBoolean(FINAL_SYNC_ATTRIBUTE, false);
+  }
+
+  static class TeraRecordWriter extends LineRecordWriter<Text,Text> {
+    private static final byte[] newLine = "\r\n".getBytes();
+    private boolean finalSync = false;
+
+    public TeraRecordWriter(DataOutputStream out,
+                            JobConf conf) {
+      super(out);
+      finalSync = getFinalSync(conf);
+    }
+
+    public synchronized void write(Text key, 
+                                   Text value) throws IOException {
+      out.write(key.getBytes(), 0, key.getLength());
+      out.write(value.getBytes(), 0, value.getLength());
+      out.write(newLine, 0, newLine.length);
+    }
+    
+    public void close() throws IOException {
+      if (finalSync) {
+        ((FSDataOutputStream) out).sync();
+      }
+      super.close(null);
+    }
+  }
+
+  public RecordWriter<Text,Text> getRecordWriter(FileSystem ignored,
+                                                 JobConf job,
+                                                 String name,
+                                                 Progressable progress
+                                                 ) throws IOException {
+    Path dir = getWorkOutputPath(job);
+    FileSystem fs = dir.getFileSystem(job);
+    FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+    return new TeraRecordWriter(fileOut, job);
+  }
+}

+ 261 - 0
src/examples/org/apache/hadoop/examples/terasort/TeraSort.java

@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generates the sampled split points, launches the job, and waits for it to
+ * finish. 
+ * <p>
+ * To run the program: 
+ * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
+ */
+public class TeraSort extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(TeraSort.class);
+
+  /**
+   * A partitioner that splits text keys into roughly equal partitions
+   * in a global sorted order.
+   */
+  static class TotalOrderPartitioner implements Partitioner<Text,Text>{
+    private TrieNode trie;
+    private Text[] splitPoints;
+
+    /**
+     * A generic trie node
+     */
+    static abstract class TrieNode {
+      private int level;
+      TrieNode(int level) {
+        this.level = level;
+      }
+      abstract int findPartition(Text key);
+      abstract void print(PrintStream strm) throws IOException;
+      int getLevel() {
+        return level;
+      }
+    }
+
+    /**
+     * An inner trie node that contains 256 children based on the next
+     * character.
+     */
+    static class InnerTrieNode extends TrieNode {
+      private TrieNode[] child = new TrieNode[256];
+      
+      InnerTrieNode(int level) {
+        super(level);
+      }
+      int findPartition(Text key) {
+        int level = getLevel();
+        if (key.getLength() <= level) {
+          return child[0].findPartition(key);
+        }
+        return child[key.getBytes()[level]].findPartition(key);
+      }
+      void setChild(int idx, TrieNode child) {
+        this.child[idx] = child;
+      }
+      void print(PrintStream strm) throws IOException {
+        for(int ch=0; ch < 255; ++ch) {
+          for(int i = 0; i < 2*getLevel(); ++i) {
+            strm.print(' ');
+          }
+          strm.print(ch);
+          strm.println(" ->");
+          if (child[ch] != null) {
+            child[ch].print(strm);
+          }
+        }
+      }
+    }
+
+    /**
+     * A leaf trie node that does string compares to figure out where the given
+     * key belongs between lower..upper.
+     */
+    static class LeafTrieNode extends TrieNode {
+      int lower;
+      int upper;
+      Text[] splitPoints;
+      LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
+        super(level);
+        this.splitPoints = splitPoints;
+        this.lower = lower;
+        this.upper = upper;
+      }
+      int findPartition(Text key) {
+        for(int i=lower; i<upper; ++i) {
+          if (splitPoints[i].compareTo(key) >= 0) {
+            return i;
+          }
+        }
+        return upper;
+      }
+      void print(PrintStream strm) throws IOException {
+        for(int i = 0; i < 2*getLevel(); ++i) {
+          strm.print(' ');
+        }
+        strm.print(lower);
+        strm.print(", ");
+        strm.println(upper);
+      }
+    }
+
+
+    /**
+     * Read the cut points from the given sequence file.
+     * @param fs the file system
+     * @param p the path to read
+     * @param job the job config
+     * @return the strings to split the partitions on
+     * @throws IOException
+     */
+    private static Text[] readPartitions(FileSystem fs, Path p, 
+                                         JobConf job) throws IOException {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+      List<Text> parts = new ArrayList<Text>();
+      Text key = new Text();
+      NullWritable value = NullWritable.get();
+      while (reader.next(key, value)) {
+        parts.add(key);
+        key = new Text();
+      }
+      reader.close();
+      return parts.toArray(new Text[parts.size()]);  
+    }
+
+    /**
+     * Given a sorted set of cut points, build a trie that will find the correct
+     * partition quickly.
+     * @param splits the list of cut points
+     * @param lower the lower bound of partitions 0..numPartitions-1
+     * @param upper the upper bound of partitions 0..numPartitions-1
+     * @param prefix the prefix that we have already checked against
+     * @param maxDepth the maximum depth we will build a trie for
+     * @return the trie node that will divide the splits correctly
+     */
+    private static TrieNode buildTrie(Text[] splits, int lower, int upper, 
+                                      Text prefix, int maxDepth) {
+      int depth = prefix.getLength();
+      if (depth >= maxDepth || lower == upper) {
+        return new LeafTrieNode(depth, splits, lower, upper);
+      }
+      InnerTrieNode result = new InnerTrieNode(depth);
+      Text trial = new Text(prefix);
+      // append an extra byte on to the prefix
+      trial.append(new byte[1], 0, 1);
+      int currentBound = lower;
+      for(int ch = 0; ch < 255; ++ch) {
+        trial.getBytes()[depth] = (byte) (ch + 1);
+        lower = currentBound;
+        while (currentBound < upper) {
+          if (splits[currentBound].compareTo(trial) >= 0) {
+            break;
+          }
+          currentBound += 1;
+        }
+        trial.getBytes()[depth] = (byte) ch;
+        result.child[ch] = buildTrie(splits, lower, currentBound, trial, 
+                                     maxDepth);
+      }
+      // pick up the rest
+      trial.getBytes()[depth] = 127;
+      result.child[255] = buildTrie(splits, currentBound, upper, trial,
+                                    maxDepth);
+      return result;
+    }
+
+    public void configure(JobConf job) {
+      try {
+        FileSystem fs = FileSystem.getLocal(job);
+        Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
+        splitPoints = readPartitions(fs, partFile, job);
+        trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
+      } catch (IOException ie) {
+        throw new IllegalArgumentException("can't read paritions file", ie);
+      }
+    }
+
+    public TotalOrderPartitioner() {
+    }
+
+    public int getPartition(Text key, Text value, int numPartitions) {
+      return trie.findPartition(key);
+    }
+    
+  }
+  
+  public int run(String[] args) throws Exception {
+    LOG.info("starting");
+    JobConf job = (JobConf) getConf();
+    Path inputDir = new Path(args[0]);
+    inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
+    Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
+    URI partitionUri = new URI(partitionFile.toString() +
+                               "#" + TeraInputFormat.PARTITION_FILENAME);
+    TeraInputFormat.setInputPaths(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraSort");
+    job.setJarByClass(TeraSort.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setInputFormat(TeraInputFormat.class);
+    job.setOutputFormat(TeraOutputFormat.class);
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    TeraInputFormat.writePartitionFile(job, partitionFile);
+    DistributedCache.addCacheFile(partitionUri, job);
+    DistributedCache.createSymlink(job);
+    job.setInt("dfs.replication", 1);
+    TeraOutputFormat.setFinalSync(job, true);
+    JobClient.runJob(job);
+    LOG.info("done");
+    return 0;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new JobConf(), new TeraSort(), args);
+    System.exit(res);
+  }
+
+}

+ 157 - 0
src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate 1 mapper per a file that checks to make sure the keys
+ * are sorted within each file. The mapper also generates 
+ * "$file:begin", first key and "$file:end", last key. The reduce verifies that
+ * all of the start/end items are in order.
+ * Any output from the reduce is problem report.
+ * <p>
+ * To run the program: 
+ * <b>bin/hadoop jar hadoop-*-examples.jar teravalidate out-dir report-dir</b>
+ * <p>
+ * If there is any output, something is wrong and the output of the reduce
+ * will have the problem report.
+ */
+public class TeraValidate extends Configured implements Tool {
+  private static final Text error = new Text("error");
+
+  static class ValidateMapper extends MapReduceBase 
+      implements Mapper<Text,Text,Text,Text> {
+    private Text lastKey;
+    private OutputCollector<Text,Text> output;
+    private String filename;
+    
+    /**
+     * Get the final part of the input name
+     * @param split the input split
+     * @return the "part-00000" for the input
+     */
+    private String getFilename(FileSplit split) {
+      return split.getPath().getName();
+    }
+
+    public void map(Text key, Text value, OutputCollector<Text,Text> output,
+                    Reporter reporter) throws IOException {
+      if (lastKey == null) {
+        filename = getFilename((FileSplit) reporter.getInputSplit());
+        output.collect(new Text(filename + ":begin"), key);
+        lastKey = new Text();
+        this.output = output;
+      } else {
+        if (key.compareTo(lastKey) < 0) {
+          output.collect(error, new Text("misorder in " + filename + 
+                                         " last: '" + lastKey + 
+                                         "' current: '" + key + "'"));
+        }
+      }
+      lastKey.set(key);
+    }
+    
+    public void close() throws IOException {
+      if (lastKey != null) {
+        output.collect(new Text(filename + ":end"), lastKey);
+      }
+    }
+  }
+
+  /**
+   * Check the boundaries between the output files by making sure that the
+   * boundary keys are always increasing.
+   * Also passes any error reports along intact.
+   */
+  static class ValidateReducer extends MapReduceBase 
+      implements Reducer<Text,Text,Text,Text> {
+    private boolean firstKey = true;
+    private Text lastKey = new Text();
+    private Text lastValue = new Text();
+    public void reduce(Text key, Iterator<Text> values,
+                       OutputCollector<Text, Text> output, 
+                       Reporter reporter) throws IOException {
+      if (error.equals(key)) {
+        while(values.hasNext()) {
+          output.collect(key, values.next());
+        }
+      } else {
+        Text value = values.next();
+        if (firstKey) {
+          firstKey = false;
+        } else {
+          if (value.compareTo(lastValue) < 0) {
+            output.collect(error, 
+                           new Text("misordered keys last: " + 
+                                    lastKey + " '" + lastValue +
+                                    "' current: " + key + " '" + value + "'"));
+          }
+        }
+        lastKey.set(key);
+        lastValue.set(value);
+      }
+    }
+    
+  }
+
+  public int run(String[] args) throws Exception {
+    JobConf job = (JobConf) getConf();
+    TeraInputFormat.setInputPaths(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraValidate");
+    job.setJarByClass(TeraValidate.class);
+    job.setMapperClass(ValidateMapper.class);
+    job.setReducerClass(ValidateReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    // force a single reducer
+    job.setNumReduceTasks(1);
+    // force a single split 
+    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+    job.setInputFormat(TeraInputFormat.class);
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new JobConf(), new TeraValidate(), args);
+    System.exit(res);
+  }
+
+}

+ 100 - 0
src/examples/org/apache/hadoop/examples/terasort/job_history_summary.py

@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+
+pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
+counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')
+
+def parse(tail):
+  result = {}
+  for n,v in re.findall(pat, tail):
+    result[n] = v
+  return result
+
+mapStartTime = {}
+mapEndTime = {}
+reduceStartTime = {}
+reduceShuffleTime = {}
+reduceSortTime = {}
+reduceEndTime = {}
+reduceBytes = {}
+
+for line in sys.stdin:
+  words = line.split(" ",1)
+  event = words[0]
+  attrs = parse(words[1])
+  if event == 'MapAttempt':
+    if attrs.has_key("START_TIME"):
+      mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
+    elif attrs.has_key("FINISH_TIME"):
+      mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
+  elif event == 'ReduceAttempt':
+    if attrs.has_key("START_TIME"):
+      reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
+    elif attrs.has_key("FINISH_TIME"):
+      reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
+      reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
+      reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
+  elif event == 'Task':
+    if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
+      for n,v in re.findall(counterPat, attrs["COUNTERS"]):
+        if n == "File Systems.HDFS bytes written":
+          reduceBytes[attrs["TASKID"]] = int(v)
+
+runningMaps = {}
+shufflingReduces = {}
+sortingReduces = {}
+runningReduces = {}
+startTime = min(reduce(min, mapStartTime.values()),
+                reduce(min, reduceStartTime.values()))
+endTime = max(reduce(max, mapEndTime.values()),
+              reduce(max, reduceEndTime.values()))
+
+reduces = reduceBytes.keys()
+reduces.sort()
+
+print "Name reduce-output-bytes shuffle-finish reduce-finish"
+for r in reduces:
+  print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
+  print reduceEndTime[r] - startTime
+
+print
+
+for t in range(startTime, endTime):
+  runningMaps[t] = 0
+  shufflingReduces[t] = 0
+  sortingReduces[t] = 0
+  runningReduces[t] = 0
+
+for map in mapStartTime.keys():
+  for t in range(mapStartTime[map], mapEndTime[map]):
+    runningMaps[t] += 1
+for reduce in reduceStartTime.keys():
+  for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
+    shufflingReduces[t] += 1
+  for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
+    sortingReduces[t] += 1
+  for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
+    runningReduces[t] += 1
+
+print "time maps shuffle merge reduce"
+for t in range(startTime, endTime):
+  print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t], 
+  print runningReduces[t]

+ 113 - 0
src/examples/org/apache/hadoop/examples/terasort/package.html

@@ -0,0 +1,113 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+This package consists of 3 map/reduce applications for Hadoop to
+compete in the annual <a
+href="http://www.hpl.hp.com/hosted/sortbenchmark" target="_top">terabyte sort</a>
+competition.
+
+<ul>
+<li><b>TeraGen</b> is a map/reduce program to generate the data.
+<li><b>TeraSort</b> samples the input data and uses map/reduce to
+    sort the data into a total order.
+<li><b>TeraValidate</b> is a map/reduce program that validates the
+    output is sorted.
+</ul>
+
+<p>
+
+<b>TeraGen</b> generates output data that is byte for byte
+equivalent to the C version including the newlines and specific
+keys. It divides the desired number of rows by the desired number of
+tasks and assigns ranges of rows to each map. The map jumps the random
+number generator to the correct value for the first row and generates
+the following rows.
+
+<p>
+
+<b>TeraSort</b> is a standard map/reduce sort, except for a custom
+partitioner that uses a sorted list of <i>N-1</i> sampled keys that define
+the key range for each reduce. In particular, all keys such that
+<i>sample[i-1] &lt;= key &lt; sample[i]</i> are sent to reduce
+<i>i</i>. This guarantees that the output of reduce <i>i</i> are all
+less than the output of reduce <i>i+1</i>. To speed up the
+partitioning, the partitioner builds a two level trie that quickly
+indexes into the list of sample keys based on the first two bytes of
+the key. TeraSort generates the sample keys by sampling the input
+before the job is submitted and writing the list of keys into HDFS.
+The input and output format, which are used by all 3 applications,
+read and write the text files in the right format. The output of the
+reduce has replication set to 1, instead of the default 3, because the
+contest does not require the output data be replicated on to multiple
+nodes.  
+
+<p>
+
+<b>TeraValidate</b> ensures that the output is globally sorted. It
+creates one map per a file in the output directory and each map ensures that
+each key is less than or equal to the previous one. The map also generates
+records with the first and last keys of the file and the reduce
+ensures that the first key of file <i>i</i> is greater that the last key of
+file <i>i-1</i>. Any problems are reported as output of the reduce with the
+keys that are out of order.
+
+<p>
+
+In May 2008, Owen O'Malley ran this code on a 910 node cluster and
+sorted the 10 billion records (1 TB) in 209 seconds (3.48 minutes) to
+win the annual general purpose (daytona)
+<a href="http://www.hpl.hp.com/hosted/sortbenchmark/">terabyte sort
+benchmark</a>.
+
+<p>
+
+The cluster statistics were:
+<ul>
+<li> 910 nodes
+<li> 4 dual core Xeons @ 2.0ghz per a node
+<li> 4 SATA disks per a node
+<li> 8G RAM per a node
+<li> 1 gigabit ethernet on each node
+<li> 40 nodes per a rack
+<li> 8 gigabit ethernet uplinks from each rack to the core
+<li> Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
+<li> Sun Java JDK 1.6.0_05-b13
+</ul>
+
+<p>
+
+The test was on Hadoop trunk (pre-0.18) patched with <a
+href="http://issues.apache.org/jira/browse/HADOOP-3443">HADOOP-3443</a>
+and <a
+href="http://issues.apache.org/jira/browse/HADOOP-3446">HADOOP-3446</a>,
+which were required to remove intermediate writes to disk.
+TeraGen used
+1800 tasks to generate a total of 10 billion rows in HDFS, with a
+block size of 1024 MB.
+TeraSort was configured with 1800 maps and 1800 reduces, and
+<i>io.sort.mb</i>,
+<i>io.sort.factor</i>, <i>fs.inmemory.size.mb</i>, and task heap size
+sufficient that transient data was never spilled to disk, other at the
+end of the map. The sampler looked at 100,000 keys to determine the
+reduce boundaries, which lead to imperfect balancing with reduce
+outputs ranging from 337 MB to 872 MB.
+
+</body>
+</html>

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java

@@ -47,7 +47,7 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
       }
     }
 
-    private DataOutputStream out;
+    protected DataOutputStream out;
     private final byte[] keyValueSeparator;
 
     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {