|
@@ -25,11 +25,15 @@ import static org.junit.Assert.assertTrue;
|
|
|
import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStreamWriter;
|
|
|
+import java.io.Writer;
|
|
|
import java.net.URL;
|
|
|
import java.util.ArrayList;
|
|
|
|
|
|
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
|
|
+import org.apache.commons.io.Charsets;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
@@ -37,6 +41,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestLineRecordReader {
|
|
|
+ private static Path workDir = new Path(new Path(System.getProperty(
|
|
|
+ "test.build.data", "target"), "data"), "TestTextInputFormat");
|
|
|
+ private static Path inputDir = new Path(workDir, "input");
|
|
|
|
|
|
private void testSplitRecords(String testFileName, long firstSplitLength)
|
|
|
throws IOException {
|
|
@@ -46,17 +53,28 @@ public class TestLineRecordReader {
|
|
|
long testFileSize = testFile.length();
|
|
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
|
|
Configuration conf = new Configuration();
|
|
|
+ testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testSplitRecordsForFile(Configuration conf,
|
|
|
+ long firstSplitLength, long testFileSize, Path testFilePath)
|
|
|
+ throws IOException {
|
|
|
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
|
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
|
- assertTrue("unexpected test data at " + testFile,
|
|
|
+ assertTrue("unexpected test data at " + testFilePath,
|
|
|
testFileSize > firstSplitLength);
|
|
|
|
|
|
+ String delimiter = conf.get("textinputformat.record.delimiter");
|
|
|
+ byte[] recordDelimiterBytes = null;
|
|
|
+ if (null != delimiter) {
|
|
|
+ recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
|
|
+ }
|
|
|
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
|
|
|
|
|
|
// read the data without splitting to count the records
|
|
|
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
|
|
(String[])null);
|
|
|
- LineRecordReader reader = new LineRecordReader();
|
|
|
+ LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
|
|
|
reader.initialize(split, context);
|
|
|
int numRecordsNoSplits = 0;
|
|
|
while (reader.nextKeyValue()) {
|
|
@@ -66,7 +84,7 @@ public class TestLineRecordReader {
|
|
|
|
|
|
// count the records in the first split
|
|
|
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
|
|
|
- reader = new LineRecordReader();
|
|
|
+ reader = new LineRecordReader(recordDelimiterBytes);
|
|
|
reader.initialize(split, context);
|
|
|
int numRecordsFirstSplit = 0;
|
|
|
while (reader.nextKeyValue()) {
|
|
@@ -77,16 +95,15 @@ public class TestLineRecordReader {
|
|
|
// count the records in the second split
|
|
|
split = new FileSplit(testFilePath, firstSplitLength,
|
|
|
testFileSize - firstSplitLength, (String[])null);
|
|
|
- reader = new LineRecordReader();
|
|
|
+ reader = new LineRecordReader(recordDelimiterBytes);
|
|
|
reader.initialize(split, context);
|
|
|
int numRecordsRemainingSplits = 0;
|
|
|
while (reader.nextKeyValue()) {
|
|
|
++numRecordsRemainingSplits;
|
|
|
}
|
|
|
reader.close();
|
|
|
-
|
|
|
- assertEquals("Unexpected number of records in bzip2 compressed split",
|
|
|
- numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
|
|
|
+ assertEquals("Unexpected number of records in split ", numRecordsNoSplits,
|
|
|
+ numRecordsFirstSplit + numRecordsRemainingSplits);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -231,4 +248,52 @@ public class TestLineRecordReader {
|
|
|
|
|
|
assertTrue("BOM is not skipped", skipBOM);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Writes the input test file
|
|
|
+ *
|
|
|
+ * @param conf
|
|
|
+ * @return Path of the file created
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private Path createInputFile(Configuration conf, String data)
|
|
|
+ throws IOException {
|
|
|
+ FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
+ Path file = new Path(inputDir, "test.txt");
|
|
|
+ Writer writer = new OutputStreamWriter(localFs.create(file));
|
|
|
+ try {
|
|
|
+ writer.write(data);
|
|
|
+ } finally {
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ return file;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testUncompressedInput() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ String inputData = "abc+++def+++ghi+++"
|
|
|
+ + "jkl+++mno+++pqr+++stu+++vw +++xyz";
|
|
|
+ Path inputFile = createInputFile(conf, inputData);
|
|
|
+ conf.set("textinputformat.record.delimiter", "+++");
|
|
|
+ for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
|
|
+ for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
|
|
+ conf.setInt("io.file.buffer.size", bufferSize);
|
|
|
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testUncompressedInputContainingCRLF() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ String inputData = "a\r\nb\rc\nd\r\n";
|
|
|
+ Path inputFile = createInputFile(conf, inputData);
|
|
|
+ for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
|
|
+ for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
|
|
+ conf.setInt("io.file.buffer.size", bufferSize);
|
|
|
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|