Pārlūkot izejas kodu

Merge -r 647057:647058 from trunk to branch-0.17 to fix HADOOP-3174

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@647060 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 gadi atpakaļ
vecāks
revīzija
343b0d6d65

+ 3 - 0
CHANGES.txt

@@ -216,6 +216,9 @@ Release 0.17.0 - Unreleased
     as Hudson generates false negatives under the current load.
     (Nigel Daley via cdouglas)
 
+    HADOOP-3174. Illustrative example for MultipleFileInputFormat. (Enis
+    Soztutar via acmurthy)  
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

+ 1 - 0
src/examples/org/apache/hadoop/examples/ExampleDriver.java

@@ -49,6 +49,7 @@ public class ExampleDriver {
       pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
       pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
       pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
+      pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files.");
       pgd.driver(argv);
     }
     catch(Throwable e){

+ 266 - 0
src/examples/org/apache/hadoop/examples/MultiFileWordCount.java

@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MultiFileInputFormat;
+import org.apache.hadoop.mapred.MultiFileSplit;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+
+/**
+ * MultiFileWordCount is an example to demonstrate the usage of 
+ * MultiFileInputFormat. This examples counts the occurrences of
+ * words in the text files under the given input directory.
+ */
+public class MultiFileWordCount extends Configured implements Tool {
+
+  /**
+   * This record keeps <filename,offset> pairs.
+   */
+  public static class WordOffset implements WritableComparable {
+
+    private long offset;
+    private String fileName;
+
+    public void readFields(DataInput in) throws IOException {
+      this.offset = in.readLong();
+      this.fileName = Text.readString(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(offset);
+      Text.writeString(out, fileName);
+    }
+
+    public int compareTo(Object o) {
+      WordOffset that = (WordOffset)o;
+
+      int f = this.fileName.compareTo(that.fileName);
+      if(f == 0) {
+        return (int)Math.signum((double)(this.offset - that.offset));
+      }
+      return f;
+    }
+    @Override
+    public boolean equals(Object obj) {
+      if(obj instanceof WordOffset)
+        return this.compareTo(obj) == 0;
+      return false;
+    }
+    @Override
+    public int hashCode() {
+      assert false : "hashCode not designed";
+      return 42; //an arbitrary constant
+    }
+  }
+
+
+  /**
+   * To use {@link MultiFileInputFormat}, one should extend it, to return a 
+   * (custom) {@link RecordReader}. MultiFileInputFormat uses 
+   * {@link MultiFileSplit}s. 
+   */
+  public static class MyInputFormat 
+    extends MultiFileInputFormat<WordOffset, Text>  {
+
+    @Override
+    public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
+        , JobConf job, Reporter reporter) throws IOException {
+      return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
+    }
+  }
+
+  /**
+   * RecordReader is responsible from extracting records from the InputSplit. 
+   * This record reader accepts a {@link MultiFileSplit}, which encapsulates several 
+   * files, and no file is divided.
+   */
+  public static class MultiFileLineRecordReader 
+    implements RecordReader<WordOffset, Text> {
+
+    private MultiFileSplit split;
+    private long offset; //total offset read so far;
+    private long totLength;
+    private FileSystem fs;
+    private int count = 0;
+    private Path[] paths;
+    
+    private FSDataInputStream currentStream;
+    private BufferedReader currentReader;
+    
+    public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
+      throws IOException {
+      
+      this.split = split;
+      fs = FileSystem.get(conf);
+      this.paths = split.getPaths();
+      this.totLength = split.getLength();
+      this.offset = 0;
+      
+      //open the first file
+      Path file = paths[count];
+      currentStream = fs.open(file);
+      currentReader = new BufferedReader(new InputStreamReader(currentStream));
+    }
+
+    public void close() throws IOException { }
+
+    public long getPos() throws IOException {
+      long currentOffset = currentStream == null ? 0 : currentStream.getPos();
+      return offset + currentOffset;
+    }
+
+    public float getProgress() throws IOException {
+      return ((float)getPos()) / totLength;
+    }
+
+    public boolean next(WordOffset key, Text value) throws IOException {
+      if(count >= split.getNumPaths())
+        return false;
+
+      /* Read from file, fill in key and value, if we reach the end of file,
+       * then open the next file and continue from there until all files are
+       * consumed.  
+       */
+      String line;
+      do {
+        line = currentReader.readLine();
+        if(line == null) {
+          //close the file
+          currentReader.close();
+          offset += split.getLength(count);
+          
+          if(++count >= split.getNumPaths()) //if we are done
+            return false;
+          
+          //open a new file
+          Path file = paths[count];
+          currentStream = fs.open(file);
+          currentReader=new BufferedReader(new InputStreamReader(currentStream));
+          key.fileName = file.getName();
+        }
+      } while(line == null);
+      //update the key and value
+      key.offset = currentStream.getPos();
+      value.set(line);
+      
+      return true;
+    }
+
+    public WordOffset createKey() {
+      WordOffset wo = new WordOffset();
+      wo.fileName = paths[0].toString(); //set as the first file
+      return wo;
+    }
+
+    public Text createValue() {
+      return new Text();
+    }
+  }
+
+  /**
+   * This Mapper is similar to the one in {@link WordCount.MapClass}.
+   */
+  public static class MapClass extends MapReduceBase
+    implements Mapper<WordOffset, Text, Text, IntWritable> {
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    
+    public void map(WordOffset key, Text value,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      
+      String line = value.toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+    }
+  }
+  
+  
+  private void printUsage() {
+    System.out.println("Usage : multifilewc <input_dir> <output>" );
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 2) {
+      printUsage();
+      return 1;
+    }
+
+    JobConf job = new JobConf(getConf(), WordCount.class);
+    job.setJobName("MultiFileWordCount");
+
+    //set the InputFormat of the job to our InputFormat
+    job.setInputFormat(MyInputFormat.class);
+    
+    // the keys are words (strings)
+    job.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    job.setOutputValueClass(IntWritable.class);
+
+    //use the defined mapper
+    job.setMapperClass(MapClass.class);
+    //use the WordCount Reducer
+    job.setCombinerClass(WordCount.Reduce.class);
+    job.setReducerClass(WordCount.Reduce.class);
+
+    job.addInputPath(new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+    JobClient.runJob(job);
+    
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new MultiFileWordCount(), args);
+    System.exit(ret);
+  }
+
+}

+ 8 - 7
src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java

@@ -22,13 +22,14 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Random;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
-import junit.framework.TestCase;
+import org.apache.hadoop.io.Text;
 
 public class TestMultiFileInputFormat extends TestCase{
 
@@ -46,9 +47,9 @@ public class TestMultiFileInputFormat extends TestCase{
   private HashMap<String, Long> lengths = new HashMap<String, Long>();
   
   /** Dummy class to extend MultiFileInputFormat*/
-  private class DummyMultiFileInputFormat extends MultiFileInputFormat {
+  private class DummyMultiFileInputFormat extends MultiFileInputFormat<Text, Text> {
     @Override
-    public RecordReader getRecordReader(InputSplit split, JobConf job
+    public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf job
         , Reporter reporter) throws IOException {
       return null;
     }
@@ -89,7 +90,7 @@ public class TestMultiFileInputFormat extends TestCase{
       LOG.info("Number of files increment = " + NUM_FILES_INCR);
     }
     
-    MultiFileInputFormat format = new DummyMultiFileInputFormat();
+    MultiFileInputFormat<Text,Text> format = new DummyMultiFileInputFormat();
     FileSystem fs = FileSystem.getLocal(job);
     
     for(int numFiles = 1; numFiles< MAX_NUM_FILES ; 
@@ -106,7 +107,7 @@ public class TestMultiFileInputFormat extends TestCase{
         for(MultiFileSplit split : splits) {
           long splitLength = 0;
           for(Path p : split.getPaths()) {
-            long length = fs.getContentLength(p);
+            long length = fs.getContentSummary(p).getLength();
             assertEquals(length, lengths.get(p.getName()).longValue());
             splitLength += length;
             String name = p.getName();
@@ -125,7 +126,7 @@ public class TestMultiFileInputFormat extends TestCase{
   }
   
   public void testFormatWithLessPathsThanSplits() throws Exception {
-    MultiFileInputFormat format = new DummyMultiFileInputFormat();
+    MultiFileInputFormat<Text,Text> format = new DummyMultiFileInputFormat();
     FileSystem fs = FileSystem.getLocal(job);     
     
     // Test with no path