|
@@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
@@ -90,4 +91,21 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text>
|
|
|
public void configure(JobConf conf) {
|
|
|
N = conf.getInt("mapreduce.input.lineinputformat.linespermap", 1);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * NLineInputFormat uses LineRecordReader, which always reads
|
|
|
+ * (and consumes) at least one character out of its upper split
|
|
|
+ * boundary. So to make sure that each mapper gets N lines, we
|
|
|
+ * move back the upper split limits of each split
|
|
|
+ * by one character here.
|
|
|
+ * @param fileName Path of file
|
|
|
+ * @param begin the position of the first byte in the file to process
|
|
|
+ * @param length number of bytes in InputSplit
|
|
|
+ * @return FileSplit
|
|
|
+ */
|
|
|
+ protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
|
|
|
+ return (begin == 0)
|
|
|
+ ? new FileSplit(fileName, begin, length - 1, new String[] {})
|
|
|
+ : new FileSplit(fileName, begin - 1, length, new String[] {});
|
|
|
+ }
|
|
|
}
|