Ver Fonte

MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA
(cherry picked from commit 077250d8d7b4b757543a39a6ce8bb6e3be356c6f)

Jason Lowe há 10 anos atrás
pai
commit
3bf44b862b

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java

@@ -369,4 +369,12 @@ public class LineReader implements Closeable {
   public int readLine(Text str) throws IOException {
     return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
   }
+
+  protected int getBufferPosn() {
+    return bufferPosn;
+  }
+
+  protected int getBufferSize() {
+    return bufferSize;
+  }
 }

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -237,6 +237,10 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6403. Fix typo in the usage of NNBench.
     (Jagadesh Kiran N via aajisaka)
 
+    MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle
+    multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
+    AJISAKA via jlowe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
 import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
+import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -131,7 +132,8 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
       }
     } else {
       fileIn.seek(start);
-      in = new SplitLineReader(fileIn, job, recordDelimiter);
+      in = new UncompressedSplitLineReader(
+          fileIn, job, recordDelimiter, split.getLength());
       filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

@@ -112,7 +112,8 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
       }
     } else {
       fileIn.seek(start);
-      in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
+      in = new UncompressedSplitLineReader(
+          fileIn, job, this.recordDelimiterBytes, split.getLength());
       filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record

+ 125 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+/**
+ * SplitLineReader for uncompressed files.
+ * This class can split the file correctly even if the delimiter is multi-bytes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UncompressedSplitLineReader extends SplitLineReader {
+  private boolean needAdditionalRecord = false;
+  private long splitLength;
+  /** Total bytes read from the input stream. */
+  private long totalBytesRead = 0;
+  private boolean finished = false;
+  private boolean usingCRLF;
+  private int unusedBytes = 0;
+  private int lastBytesRead = 0;
+
+  public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
+      byte[] recordDelimiterBytes, long splitLength) throws IOException {
+    super(in, conf, recordDelimiterBytes);
+    this.splitLength = splitLength;
+    usingCRLF = (recordDelimiterBytes == null);
+  }
+
+  @Override
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    int maxBytesToRead = buffer.length;
+    if (totalBytesRead < splitLength) {
+      maxBytesToRead = Math.min(maxBytesToRead,
+                                (int)(splitLength - totalBytesRead));
+    }
+    int bytesRead = in.read(buffer, 0, maxBytesToRead);
+    lastBytesRead = bytesRead;
+
+    // If the split ended in the middle of a record delimiter then we need
+    // to read one additional record, as the consumer of the next split will
+    // not recognize the partial delimiter as a record.
+    // However if using the default delimiter and the next character is a
+    // linefeed then next split will treat it as a delimiter all by itself
+    // and the additional record read should not be performed.
+    if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
+      if (usingCRLF) {
+        needAdditionalRecord = (buffer[0] != '\n');
+      } else {
+        needAdditionalRecord = true;
+      }
+    }
+    if (bytesRead > 0) {
+      totalBytesRead += bytesRead;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    long bytesRead = 0;
+    if (!finished) {
+      // only allow at most one more record to be read after the stream
+      // reports the split ended
+      if (totalBytesRead > splitLength) {
+        finished = true;
+      }
+      bytesRead = totalBytesRead;
+      int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume);
+      bytesRead = totalBytesRead - bytesRead;
+
+      // No records left.
+      if (bytesConsumed == 0 && bytesRead == 0) {
+        return 0;
+      }
+
+      int bufferSize = getBufferSize();
+
+      // Add the remaining buffer size not used for the last call
+      // of fillBuffer method.
+      if (lastBytesRead <= 0) {
+        bytesRead += bufferSize;
+      } else if (bytesRead > 0) {
+        bytesRead += bufferSize - lastBytesRead;
+      }
+
+      // Adjust the size of the buffer not used for this record.
+      // The size is carried over for the next calculation.
+      bytesRead += unusedBytes;
+      unusedBytes = bufferSize - getBufferPosn();
+      bytesRead -= unusedBytes;
+    }
+    return (int) bytesRead;
+  }
+
+  @Override
+  public boolean needAdditionalRecordAfterSplit() {
+    return !finished && needAdditionalRecord;
+  }
+}

+ 72 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java

@@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -41,6 +45,9 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.junit.Test;
 
 public class TestLineRecordReader {
+  private static Path workDir = new Path(new Path(System.getProperty(
+      "test.build.data", "target"), "data"), "TestTextInputFormat");
+  private static Path inputDir = new Path(workDir, "input");
 
   private void testSplitRecords(String testFileName, long firstSplitLength)
       throws IOException {
@@ -50,15 +57,27 @@ public class TestLineRecordReader {
     long testFileSize = testFile.length();
     Path testFilePath = new Path(testFile.getAbsolutePath());
     Configuration conf = new Configuration();
+    testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
+  }
+
+  private void testSplitRecordsForFile(Configuration conf,
+      long firstSplitLength, long testFileSize, Path testFilePath)
+      throws IOException {
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
         LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    assertTrue("unexpected test data at " + testFile,
+    assertTrue("unexpected test data at " + testFilePath,
         testFileSize > firstSplitLength);
 
+    String delimiter = conf.get("textinputformat.record.delimiter");
+    byte[] recordDelimiterBytes = null;
+    if (null != delimiter) {
+      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    }
     // read the data without splitting to count the records
     FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
         (String[])null);
-    LineRecordReader reader = new LineRecordReader(conf, split);
+    LineRecordReader reader = new LineRecordReader(conf, split,
+        recordDelimiterBytes);
     LongWritable key = new LongWritable();
     Text value = new Text();
     int numRecordsNoSplits = 0;
@@ -69,7 +88,7 @@ public class TestLineRecordReader {
 
     // count the records in the first split
     split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
-    reader = new LineRecordReader(conf, split);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
     int numRecordsFirstSplit = 0;
     while (reader.next(key,  value)) {
       ++numRecordsFirstSplit;
@@ -79,14 +98,14 @@ public class TestLineRecordReader {
     // count the records in the second split
     split = new FileSplit(testFilePath, firstSplitLength,
         testFileSize - firstSplitLength, (String[])null);
-    reader = new LineRecordReader(conf, split);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
     int numRecordsRemainingSplits = 0;
     while (reader.next(key,  value)) {
       ++numRecordsRemainingSplits;
     }
     reader.close();
 
-    assertEquals("Unexpected number of records in bzip2 compressed split",
+    assertEquals("Unexpected number of records in split",
         numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
   }
 
@@ -290,4 +309,52 @@ public class TestLineRecordReader {
     }
     assertEquals(10, decompressors.size());
   }
+
+  /**
+   * Writes the input test file
+   *
+   * @param conf
+   * @return Path of the file created
+   * @throws IOException
+   */
+  private Path createInputFile(Configuration conf, String data)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path file = new Path(inputDir, "test.txt");
+    Writer writer = new OutputStreamWriter(localFs.create(file));
+    try {
+      writer.write(data);
+    } finally {
+      writer.close();
+    }
+    return file;
+  }
+
+  @Test
+  public void testUncompressedInput() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "abc+++def+++ghi+++"
+        + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+++");
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedInputContainingCRLF() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "a\r\nb\rc\nd\r\n";
+    Path inputFile = createInputFile(conf, inputData);
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
 }

+ 72 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java

@@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -42,6 +46,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.junit.Test;
 
 public class TestLineRecordReader {
+  private static Path workDir = new Path(new Path(System.getProperty(
+      "test.build.data", "target"), "data"), "TestTextInputFormat");
+  private static Path inputDir = new Path(workDir, "input");
 
   private void testSplitRecords(String testFileName, long firstSplitLength)
       throws IOException {
@@ -51,17 +58,28 @@ public class TestLineRecordReader {
     long testFileSize = testFile.length();
     Path testFilePath = new Path(testFile.getAbsolutePath());
     Configuration conf = new Configuration();
+    testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
+  }
+
+  private void testSplitRecordsForFile(Configuration conf,
+      long firstSplitLength, long testFileSize, Path testFilePath)
+      throws IOException {
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
         LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    assertTrue("unexpected test data at " + testFile,
+    assertTrue("unexpected test data at " + testFilePath,
         testFileSize > firstSplitLength);
 
+    String delimiter = conf.get("textinputformat.record.delimiter");
+    byte[] recordDelimiterBytes = null;
+    if (null != delimiter) {
+      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    }
     TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
 
     // read the data without splitting to count the records
     FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
         (String[])null);
-    LineRecordReader reader = new LineRecordReader();
+    LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     int numRecordsNoSplits = 0;
     while (reader.nextKeyValue()) {
@@ -71,7 +89,7 @@ public class TestLineRecordReader {
 
     // count the records in the first split
     split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
-    reader = new LineRecordReader();
+    reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     int numRecordsFirstSplit = 0;
     while (reader.nextKeyValue()) {
@@ -82,16 +100,15 @@ public class TestLineRecordReader {
     // count the records in the second split
     split = new FileSplit(testFilePath, firstSplitLength,
         testFileSize - firstSplitLength, (String[])null);
-    reader = new LineRecordReader();
+    reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     int numRecordsRemainingSplits = 0;
     while (reader.nextKeyValue()) {
       ++numRecordsRemainingSplits;
     }
     reader.close();
-
-    assertEquals("Unexpected number of records in bzip2 compressed split",
-        numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
+    assertEquals("Unexpected number of records in split ", numRecordsNoSplits,
+        numRecordsFirstSplit + numRecordsRemainingSplits);
   }
 
   @Test
@@ -276,4 +293,52 @@ public class TestLineRecordReader {
     }
     assertEquals(10, decompressors.size());
   }
+
+  /**
+   * Writes the input test file
+   *
+   * @param conf
+   * @return Path of the file created
+   * @throws IOException
+   */
+  private Path createInputFile(Configuration conf, String data)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path file = new Path(inputDir, "test.txt");
+    Writer writer = new OutputStreamWriter(localFs.create(file));
+    try {
+      writer.write(data);
+    } finally {
+      writer.close();
+    }
+    return file;
+  }
+
+  @Test
+  public void testUncompressedInput() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "abc+++def+++ghi+++"
+        + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+++");
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedInputContainingCRLF() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "a\r\nb\rc\nd\r\n";
+    Path inputFile = createInputFile(conf, inputData);
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
 }