|
@@ -110,6 +110,43 @@ public class TestLineRecordReader {
|
|
|
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
|
|
|
}
|
|
|
|
|
|
+ private void testLargeSplitRecordForFile(Configuration conf,
|
|
|
+ long firstSplitLength, long testFileSize, Path testFilePath)
|
|
|
+ throws IOException {
|
|
|
+ conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
|
|
+ LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
|
+ assertTrue("unexpected firstSplitLength:" + firstSplitLength,
|
|
|
+ testFileSize < firstSplitLength);
|
|
|
+ String delimiter = conf.get("textinputformat.record.delimiter");
|
|
|
+ byte[] recordDelimiterBytes = null;
|
|
|
+ if (null != delimiter) {
|
|
|
+ recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
|
|
+ }
|
|
|
+ // read the data without splitting to count the records
|
|
|
+ FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
|
|
+ (String[])null);
|
|
|
+ LineRecordReader reader = new LineRecordReader(conf, split,
|
|
|
+ recordDelimiterBytes);
|
|
|
+ LongWritable key = new LongWritable();
|
|
|
+ Text value = new Text();
|
|
|
+ int numRecordsNoSplits = 0;
|
|
|
+ while (reader.next(key, value)) {
|
|
|
+ ++numRecordsNoSplits;
|
|
|
+ }
|
|
|
+ reader.close();
|
|
|
+
|
|
|
+ // count the records in the first split
|
|
|
+ split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
|
|
|
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
|
|
+ int numRecordsFirstSplit = 0;
|
|
|
+ while (reader.next(key, value)) {
|
|
|
+ ++numRecordsFirstSplit;
|
|
|
+ }
|
|
|
+ reader.close();
|
|
|
+ assertEquals("Unexpected number of records in split",
|
|
|
+ numRecordsNoSplits, numRecordsFirstSplit);
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testBzip2SplitEndsAtCR() throws IOException {
|
|
|
// the test data contains a carriage-return at the end of the first
|
|
@@ -331,6 +368,22 @@ public class TestLineRecordReader {
|
|
|
return file;
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testUncompressedInputWithLargeSplitSize() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // single char delimiter
|
|
|
+ String inputData = "abcde +fghij+ klmno+pqrst+uvwxyz";
|
|
|
+ Path inputFile = createInputFile(conf, inputData);
|
|
|
+ conf.set("textinputformat.record.delimiter", "+");
|
|
|
+ // split size over max value of integer
|
|
|
+ long longSplitSize = (long)Integer.MAX_VALUE + 1;
|
|
|
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
|
|
+ conf.setInt("io.file.buffer.size", bufferSize);
|
|
|
+ testLargeSplitRecordForFile(conf, longSplitSize, inputData.length(),
|
|
|
+ inputFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testUncompressedInput() throws Exception {
|
|
|
Configuration conf = new Configuration();
|