Browse Source

HADOOP-3221. Adds org.apache.hadoop.mapred.lib.NLineInputFormat, which splits files into splits each of N lines. N can be specified by configuration property mapred.line.input.format.linespermap, which defaults to 1. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@656585 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
e013d15ab7

+ 5 - 0
CHANGES.txt

@@ -86,6 +86,11 @@ Trunk (unreleased changes)
     be corrupt, retain all copies and mark the block as corrupt.
     be corrupt, retain all copies and mark the block as corrupt.
     (Lohit Vjayarenu via rangadi)
     (Lohit Vjayarenu via rangadi)
 
 
+    HADOOP-3221. Adds org.apache.hadoop.mapred.lib.NLineInputFormat, which 
+    splits files into splits each of N lines. N can be specified by 
+    configuration property "mapred.line.input.format.linespermap", which
+    defaults to 1. (Amareshwari Sriramadasu via ddas) 
+
   IMPROVEMENTS
   IMPROVEMENTS
    
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

+ 7 - 0
conf/hadoop-default.xml

@@ -1057,6 +1057,13 @@ creations/deletions), or "all".</description>
     </description>
     </description>
   </property>
   </property>
 
 
+  <property>
+    <name>mapred.line.input.format.linespermap</name>
+    <value>1</value>
+    <description> Number of lines per split in NLineInputFormat.
+    </description>
+  </property>
+
 <!-- ipc properties -->
 <!-- ipc properties -->
 
 
 <property>
 <property>

+ 124 - 0
src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java

@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+/**
+ * NLineInputFormat which splits N lines of input as one split.
+ *
+ * In many "pleasantly" parallel applications, each process/mapper 
+ * processes the same input file (s), but with computations are 
+ * controlled by different parameters.(Referred to as "parameter sweeps").
+ * One way to achieve this, is to specify a set of parameters 
+ * (one set per line) as input in a control file 
+ * (which is the input path to the map-reduce application,
+ * where as the input dataset is specified 
+ * via a config variable in JobConf.).
+ * 
+ * The NLineInputFormat can be used in such applications, that splits 
+ * the input file such that by default, one line is fed as
+ * a value to one map task, and key is the offset.
+ * i.e. (k,v) is (LongWritable, Text).
+ * The location hints will span the whole mapred cluster.
+ */
+
+public class NLineInputFormat extends FileInputFormat<LongWritable, Text> 
+                              implements JobConfigurable { 
+  private int N = 1;
+
+  public RecordReader<LongWritable, Text> getRecordReader(
+                                            InputSplit genericSplit,
+                                            JobConf job,
+                                            Reporter reporter) 
+  throws IOException {
+    reporter.setStatus(genericSplit.toString());
+    return new LineRecordReader(job, (FileSplit) genericSplit);
+  }
+
+  /** 
+   * Logically splits the set of input files for the job, splits N lines
+   * of the input as one split.
+   * 
+   * @see org.apache.hadoop.mapred.FileInputFormat#getSplits(JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits)
+  throws IOException {
+    ArrayList<FileSplit> splits = new ArrayList<FileSplit>();
+    Path[] files = listPaths(job);
+    for (int i=0; i < files.length; i++) {
+      Path fileName = files[i];
+      FileSystem  fs = fileName.getFileSystem(job);
+      FileStatus status = fs.getFileStatus(fileName);
+      if (status.isDir() || !fs.exists(fileName)) {
+        throw new IOException("Not a file: " + fileName);
+      }
+      LineReader lr = null;
+      try {
+        FSDataInputStream in  = fs.open(fileName);
+        lr = new LineReader(in, job);
+        Text line = new Text();
+        int numLines = 0;
+        long begin = 0;
+        long length = 0;
+        int num = -1;
+        while ((num = lr.readLine(line)) > 0) {
+          numLines++;
+          length += num;
+          if (numLines == N) {
+            splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+            begin += length;
+            length = 0;
+            numLines = 0;
+          }
+        }
+        if (numLines != 0) {
+          splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+        }
+   
+      } finally {
+        if (lr != null) {
+          lr.close();
+        }
+      }
+    }
+    return splits.toArray(new FileSplit[splits.size()]);
+  }
+
+  public void configure(JobConf conf) {
+    N = conf.getInt("mapred.line.input.format.linespermap", 1);
+  }
+}

+ 118 - 0
src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+public class TestLineInputFormat extends TestCase {
+  private static int MAX_LENGTH = 200;
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+
+  static {
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestLineInputFormat");
+  
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf();
+    Path file = new Path(workDir, "test.txt");
+
+    int seed = new Random().nextInt();
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+    int numLinesPerMap = 5;
+    job.setInt("mapred.line.input.format.linespermap", numLinesPerMap);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH/10) + 1) {
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+      checkFormat(job, numLinesPerMap);
+    }
+  }
+
+  // A reporter that does nothing
+  private static final Reporter voidReporter = Reporter.NULL;
+  
+  void checkFormat(JobConf job, int expectedN) throws IOException{
+    NLineInputFormat format = new NLineInputFormat();
+    format.configure(job);
+    int ignoredNumSplits = 1;
+    InputSplit[] splits = format.getSplits(job, ignoredNumSplits);
+
+    // check all splits except last one
+    int count = 0;
+    for (int j = 0; j < splits.length -1; j++) {
+      assertEquals("There are no split locations", 0,
+                   splits[j].getLocations().length);
+      RecordReader<LongWritable, Text> reader =
+        format.getRecordReader(splits[j], job, voidReporter);
+      Class readerClass = reader.getClass();
+      assertEquals("reader class is LineRecordReader.",
+                   LineRecordReader.class, readerClass);        
+      LongWritable key = reader.createKey();
+      Class keyClass = key.getClass();
+      assertEquals("Key class is LongWritable.", LongWritable.class, keyClass);
+      Text value = reader.createValue();
+      Class valueClass = value.getClass();
+      assertEquals("Value class is Text.", Text.class, valueClass);
+         
+      try {
+        count = 0;
+        while (reader.next(key, value)) {
+          count++;
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals("number of lines in split is " + expectedN ,
+                   expectedN, count);
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestLineInputFormat().testFormat();
+  }
+}