Преглед изворни кода

HADOOP-1204. Rename InputFormatBase to be FileInputFormat.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@527690 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting пре 18 година
родитељ
комит
caec916d45

+ 4 - 0
CHANGES.txt

@@ -169,6 +169,10 @@ Trunk (unreleased changes)
     key and value when either is null, and to print nothing when both
     are null.  (Runping Qi via cutting)
 
+52. HADOOP-1204.  Rename InputFormatBase to be FileInputFormat, and
+    deprecate InputFormatBase.  Also make LineRecordReader easier to
+    extend.  (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 198 - 0
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/** 
+ * A base class for {@link InputFormat}. 
+ * 
+ */
+public abstract class FileInputFormat implements InputFormat {
+
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.mapred.FileInputFormat");
+
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  private long minSplitSize = 1;
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+    public boolean accept( Path p ){
+      String name = p.getName(); 
+      return !name.startsWith("_") && !name.startsWith("."); 
+    }
+  }; 
+  protected void setMinSplitSize(long minSplitSize) {
+    this.minSplitSize = minSplitSize;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * @param fs the file system that the file is on
+   * @param filename the file name to check
+   * @return is this file splitable?
+   */
+  protected boolean isSplitable(FileSystem fs, Path filename) {
+    return true;
+  }
+  
+  public abstract RecordReader getRecordReader(InputSplit split,
+                                               JobConf job,
+                                               Reporter reporter)
+    throws IOException;
+
+  /** List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression. 
+   * 
+   * @param job the job to list input paths for
+   * @return array of Path objects
+   * @throws IOException if zero items.
+   */
+  protected Path[] listPaths(JobConf job)
+    throws IOException {
+    Path[] dirs = job.getInputPaths();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+    List<Path> result = new ArrayList(); 
+    for (Path p: dirs) {
+      FileSystem fs = p.getFileSystem(job); 
+      Path[] matches =
+        fs.listPaths(fs.globPaths(p, hiddenFileFilter),hiddenFileFilter);
+      for (Path match: matches) {
+        result.add(fs.makeQualified(match));
+      }
+    }
+
+    return (Path[])result.toArray(new Path[result.size()]);
+  }
+
+  public void validateInput(JobConf job) throws IOException {
+    Path[] inputDirs = job.getInputPaths();
+    if (inputDirs.length == 0) {
+      throw new IOException("No input paths specified in input"); 
+    }
+    
+    List<IOException> result = new ArrayList();
+    int totalFiles = 0; 
+    for (Path p: inputDirs) {
+      FileSystem fs = p.getFileSystem(job);
+      if (fs.exists(p)) {
+        // make sure all paths are files to avoid exception
+        // while generating splits
+        for (Path subPath : fs.listPaths(p, hiddenFileFilter)) {
+          FileSystem subFS = subPath.getFileSystem(job); 
+          if (!subFS.exists(subPath)) {
+            result.add(new IOException(
+                "Input path does not exist: " + subPath)); 
+          } else {
+            totalFiles++; 
+          }
+        }
+      } else {
+        Path [] paths = fs.globPaths(p, hiddenFileFilter); 
+        if (paths.length == 0) {
+          result.add(
+            new IOException("Input Pattern " + p + " matches 0 files")); 
+        } else {
+          // validate globbed paths 
+          for (Path gPath : paths) {
+            FileSystem gPathFS = gPath.getFileSystem(job); 
+            if (!gPathFS.exists(gPath)) {
+              result.add(
+                new FileNotFoundException(
+                    "Input path doesnt exist : " + gPath)); 
+            }
+          }
+          totalFiles += paths.length ; 
+        }
+      }
+    }
+    if (!result.isEmpty()) {
+      throw new InvalidInputException(result);
+    }
+    // send output to client. 
+    LOG.info("Total input paths to process : " + totalFiles); 
+  }
+
+  /** Splits files returned by {@link #listPaths(JobConf)} when
+   * they're too big.*/ 
+  public InputSplit[] getSplits(JobConf job, int numSplits)
+    throws IOException {
+    Path[] files = listPaths(job);
+    long totalSize = 0;                           // compute total size
+    for (int i = 0; i < files.length; i++) {      // check we have valid files
+      Path file = files[i];
+      FileSystem fs = file.getFileSystem(job);
+      if (fs.isDirectory(file) || !fs.exists(file)) {
+        throw new IOException("Not a file: "+files[i]);
+      }
+      totalSize += fs.getLength(files[i]);
+    }
+
+    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
+    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
+                            minSplitSize);
+
+    ArrayList splits = new ArrayList(numSplits);  // generate splits
+    for (int i = 0; i < files.length; i++) {
+      Path file = files[i];
+      FileSystem fs = file.getFileSystem(job);
+      long length = fs.getLength(file);
+      if (isSplitable(fs, file)) { 
+        long blockSize = fs.getBlockSize(file);
+        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+          splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
+                                   job));
+          bytesRemaining -= splitSize;
+        }
+        
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(file, length-bytesRemaining, 
+                                   bytesRemaining, job));
+        }
+      } else {
+        if (length != 0) {
+          splits.add(new FileSplit(file, 0, length, job));
+        }
+      }
+    }
+    LOG.debug( "Total # of splits: " + splits.size() );
+    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
+  }
+
+  private static long computeSplitSize(long goalSize, long minSize,
+                                       long blockSize) {
+    return Math.max(minSize, Math.min(goalSize, blockSize));
+  }
+}

+ 4 - 175
src/java/org/apache/hadoop/mapred/InputFormatBase.java

@@ -18,181 +18,10 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-/** A base class for {@link InputFormat}. */
-public abstract class InputFormatBase implements InputFormat {
-
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.mapred.InputFormatBase");
-
-  private static final double SPLIT_SLOP = 1.1;   // 10% slop
-
-  private long minSplitSize = 1;
-  private static final PathFilter hiddenFileFilter = new PathFilter(){
-    public boolean accept( Path p ){
-      String name = p.getName(); 
-      return !name.startsWith("_") && !name.startsWith("."); 
-    }
-  }; 
-  protected void setMinSplitSize(long minSplitSize) {
-    this.minSplitSize = minSplitSize;
-  }
-
-  /**
-   * Is the given filename splitable? Usually, true, but if the file is
-   * stream compressed, it will not be.
-   * @param fs the file system that the file is on
-   * @param filename the file name to check
-   * @return is this file splitable?
-   */
-  protected boolean isSplitable(FileSystem fs, Path filename) {
-    return true;
-  }
-  
-  public abstract RecordReader getRecordReader(InputSplit split,
-                                               JobConf job,
-                                               Reporter reporter)
-    throws IOException;
-
-  /** List input directories.
-   * Subclasses may override to, e.g., select only files matching a regular
-   * expression. 
-   * 
-   * @param job the job to list input paths for
-   * @return array of Path objects
-   * @throws IOException if zero items.
-   */
-  protected Path[] listPaths(JobConf job)
-    throws IOException {
-    Path[] dirs = job.getInputPaths();
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-    List<Path> result = new ArrayList(); 
-    for (Path p: dirs) {
-      FileSystem fs = p.getFileSystem(job); 
-      Path[] matches =
-        fs.listPaths(fs.globPaths(p, hiddenFileFilter),hiddenFileFilter);
-      for (Path match: matches) {
-        result.add(fs.makeQualified(match));
-      }
-    }
-
-    return (Path[])result.toArray(new Path[result.size()]);
-  }
-
-  public void validateInput(JobConf job) throws IOException {
-    Path[] inputDirs = job.getInputPaths();
-    if (inputDirs.length == 0) {
-      throw new IOException("No input paths specified in input"); 
-    }
-    
-    List<IOException> result = new ArrayList();
-    int totalFiles = 0; 
-    for (Path p: inputDirs) {
-      FileSystem fs = p.getFileSystem(job);
-      if (fs.exists(p)) {
-        // make sure all paths are files to avoid exception
-        // while generating splits
-        for (Path subPath : fs.listPaths(p, hiddenFileFilter)) {
-          FileSystem subFS = subPath.getFileSystem(job); 
-          if (!subFS.exists(subPath)) {
-            result.add(new IOException(
-                "Input path does not exist: " + subPath)); 
-          } else {
-            totalFiles++; 
-          }
-        }
-      } else {
-        Path [] paths = fs.globPaths(p, hiddenFileFilter); 
-        if (paths.length == 0) {
-          result.add(
-            new IOException("Input Pattern " + p + " matches 0 files")); 
-        } else {
-          // validate globbed paths 
-          for (Path gPath : paths) {
-            FileSystem gPathFS = gPath.getFileSystem(job); 
-            if (!gPathFS.exists(gPath)) {
-              result.add(
-                new FileNotFoundException(
-                    "Input path doesnt exist : " + gPath)); 
-            }
-          }
-          totalFiles += paths.length ; 
-        }
-      }
-    }
-    if (!result.isEmpty()) {
-      throw new InvalidInputException(result);
-    }
-    // send output to client. 
-    LOG.info("Total input paths to process : " + totalFiles); 
-  }
-
-  /** Splits files returned by {@link #listPaths(JobConf)} when
-   * they're too big.*/ 
-  public InputSplit[] getSplits(JobConf job, int numSplits)
-    throws IOException {
-    Path[] files = listPaths(job);
-    long totalSize = 0;                           // compute total size
-    for (int i = 0; i < files.length; i++) {      // check we have valid files
-      Path file = files[i];
-      FileSystem fs = file.getFileSystem(job);
-      if (fs.isDirectory(file) || !fs.exists(file)) {
-        throw new IOException("Not a file: "+files[i]);
-      }
-      totalSize += fs.getLength(files[i]);
-    }
-
-    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
-    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
-                            minSplitSize);
-
-    ArrayList splits = new ArrayList(numSplits);  // generate splits
-    for (int i = 0; i < files.length; i++) {
-      Path file = files[i];
-      FileSystem fs = file.getFileSystem(job);
-      long length = fs.getLength(file);
-      if (isSplitable(fs, file)) { 
-        long blockSize = fs.getBlockSize(file);
-        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
-        long bytesRemaining = length;
-        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
-                                   job));
-          bytesRemaining -= splitSize;
-        }
-        
-        if (bytesRemaining != 0) {
-          splits.add(new FileSplit(file, length-bytesRemaining, 
-                                   bytesRemaining, job));
-        }
-      } else {
-        if (length != 0) {
-          splits.add(new FileSplit(file, 0, length, job));
-        }
-      }
-    }
-    LOG.debug( "Total # of splits: " + splits.size() );
-    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
-  }
+/** A base class for {@link InputFormat}. 
+ *  @deprecated replaced by {@link FileInputFormat}
+ */
+public abstract class InputFormatBase extends FileInputFormat {
 
-  private static long computeSplitSize(long goalSize, long minSize,
-                                       long blockSize) {
-    return Math.max(minSize, Math.min(goalSize, blockSize));
-  }
 }
 

+ 39 - 1
src/java/org/apache/hadoop/mapred/LineRecordReader.java

@@ -6,10 +6,16 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 /**
  * Treats keys as offset in file and value as line. 
@@ -17,6 +23,7 @@ import org.apache.hadoop.io.WritableComparable;
  *
  */
 public class LineRecordReader implements RecordReader {
+  private CompressionCodecFactory compressionCodecs = null;
   private long start; 
   private long pos;
   private long end;
@@ -37,6 +44,33 @@ public class LineRecordReader implements RecordReader {
   }
   private TextStuffer bridge = new TextStuffer();
 
+  public LineRecordReader(Configuration job, FileSplit split)
+      throws IOException {
+    long start = split.getStart();
+    long end = start + split.getLength();
+    final Path file = split.getPath();
+    compressionCodecs = new CompressionCodecFactory(job);
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+    // open the file and seek to the start of the split
+    FileSystem fs = FileSystem.get(job);
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    InputStream in = fileIn;
+    if (codec != null) {
+      in = codec.createInputStream(fileIn);
+      end = Long.MAX_VALUE;
+    } else if (start != 0) {
+      fileIn.seek(start - 1);
+      LineRecordReader.readLine(fileIn, null);
+      start = fileIn.getPos();
+    }
+
+    this.in = new BufferedInputStream(in);
+    this.start = start;
+    this.pos = start;
+    this.end = end;
+  }
+  
   public LineRecordReader(InputStream in, long offset, long endOffset) 
     throws IOException{
     this.in = new BufferedInputStream(in);
@@ -62,7 +96,7 @@ public class LineRecordReader implements RecordReader {
 
     ((LongWritable)key).set(pos);           // key is position
     buffer.reset();
-    long bytesRead = readLine(in, buffer);
+    long bytesRead = readLine();
     if (bytesRead == 0) {
       return false;
     }
@@ -71,6 +105,10 @@ public class LineRecordReader implements RecordReader {
     buffer.writeTo(bridge);
     return true;
   }
+  
+  protected long readLine() throws IOException {
+    return LineRecordReader.readLine(in, buffer);
+  }
 
   public static long readLine(InputStream in, 
       OutputStream out) throws IOException {

+ 1 - 2
src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java

@@ -20,14 +20,13 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
 
 /** An {@link InputFormat} for {@link SequenceFile}s. */
-public class SequenceFileInputFormat extends InputFormatBase {
+public class SequenceFileInputFormat extends FileInputFormat {
 
   public SequenceFileInputFormat() {
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);

+ 4 - 26
src/java/org/apache/hadoop/mapred/TextInputFormat.java

@@ -26,7 +26,7 @@ import org.apache.hadoop.io.compress.*;
 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
  * Either linefeed or carriage-return are used to signal end of line.  Keys are
  * the position in the file, and values are the line of text.. */
-public class TextInputFormat extends InputFormatBase implements JobConfigurable {
+public class TextInputFormat extends FileInputFormat implements JobConfigurable {
 
   private CompressionCodecFactory compressionCodecs = null;
   
@@ -37,32 +37,10 @@ public class TextInputFormat extends InputFormatBase implements JobConfigurable
   protected boolean isSplitable(FileSystem fs, Path file) {
     return compressionCodecs.getCodec(file) == null;
   }
-  
-  public RecordReader getRecordReader(InputSplit genericSplit,
-                                      JobConf job, Reporter reporter)
-    throws IOException {
 
+  public RecordReader getRecordReader(InputSplit genericSplit, JobConf job,
+      Reporter reporter) throws IOException {
     reporter.setStatus(genericSplit.toString());
-    FileSplit split = (FileSplit) genericSplit;
-    long start = split.getStart();
-    long end = start + split.getLength();
-    final Path file = split.getPath();
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
-
-    // open the file and seek to the start of the split
-    FileSystem fs = FileSystem.get(job);
-    FSDataInputStream fileIn = fs.open(split.getPath());
-    InputStream in = fileIn;
-    if (codec != null) {
-      in = codec.createInputStream(fileIn);
-      end = Long.MAX_VALUE;
-    } else if (start != 0) {
-      fileIn.seek(start-1);
-      LineRecordReader.readLine(fileIn, null);
-      start = fileIn.getPos();
-    }
-    
-    return new LineRecordReader(in, start, end);
+    return new LineRecordReader(job, (FileSplit) genericSplit);
   }
 }
-