Explorar el Código

MAPREDUCE-5777. Support utf-8 text with BOM (byte order marker) (Zhihai Xu via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1603680 13f79535-47bb-0310-9956-ffa450edef68
Karthik Kambatla hace 11 años
padre
commit
a13e06776b

+ 3 - 0
CHANGES.txt

@@ -224,6 +224,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5877. Inconsistency between JT/TT for tasks taking a long time to
     launch (Karthik Kambatla via Sandy Ryza)
 
+    MAPREDUCE-5777. Support utf-8 text with BOM (byte order marker) 
+    (Zhihai Xu via kasha)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

+ 1 - 0
build.xml

@@ -929,6 +929,7 @@
     <delete dir="${test.debug.data}"/>
     <mkdir dir="${test.debug.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/testscript.txt" todir="${test.debug.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/testBOM.txt" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>

+ 44 - 3
src/mapred/org/apache/hadoop/mapred/LineRecordReader.java

@@ -134,6 +134,41 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
     return retVal;
   }
 
+  private int skipUtfByteOrderMark(Text value) throws IOException {
+    // Strip BOM(Byte Order Mark)
+    // Text only support UTF-8, we only need to check UTF-8 BOM
+    // (0xEF,0xBB,0xBF) at the start of the text stream.
+    int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
+        Integer.MAX_VALUE);
+    int newSize = in.readLine(value, newMaxLineLength,
+        Math.max(maxBytesToConsume(pos), newMaxLineLength));
+    // Even we read 3 extra bytes for the first line,
+    // we won't alter existing behavior (no backwards incompat issue).
+    // Because the newSize is less than maxLineLength and
+    // the number of bytes copied to Text is always no more than newSize.
+    // If the return size from readLine is not less than maxLineLength,
+    // we will discard the current line and read the next line.
+    pos += newSize;
+    int textLength = value.getLength();
+    byte[] textBytes = value.getBytes();
+    if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
+        (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
+      // find UTF-8 BOM, strip it.
+      LOG.info("Found UTF-8 BOM and skipped it");
+      textLength -= 3;
+      newSize -= 3;
+      if (textLength > 0) {
+        // It may work to use the same buffer and not do the copy
+        byte[] result = new byte[textLength + 3];
+        System.arraycopy(textBytes, 0, result, 0, textLength + 3);
+        value.set(result, 3, textLength);
+      } else {
+        value.clear();
+      }
+    }
+    return newSize;
+  }
+
   public LineRecordReader(InputStream in, long offset, long endOffset,
                           int maxLineLength) {
     this.maxLineLength = maxLineLength;
@@ -173,12 +208,18 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
     while (getFilePosition() <= end) {
       key.set(pos);
 
-      int newSize = in.readLine(value, maxLineLength,
-          Math.max(maxBytesToConsume(pos), maxLineLength));
+      int newSize = 0;
+      if (pos == 0) {
+        newSize = skipUtfByteOrderMark(value);
+      } else {
+        newSize = in.readLine(value, maxLineLength,
+            Math.max(maxBytesToConsume(pos), maxLineLength));
+        pos += newSize;
+      }
+
       if (newSize == 0) {
         return false;
       }
-      pos += newSize;
       if (newSize < maxLineLength) {
         return true;
       }

+ 43 - 6
src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

@@ -124,6 +124,41 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
     return retVal;
   }
 
+  private int skipUtfByteOrderMark() throws IOException {
+    // Strip BOM(Byte Order Mark)
+    // Text only support UTF-8, we only need to check UTF-8 BOM
+    // (0xEF,0xBB,0xBF) at the start of the text stream.
+    int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
+        Integer.MAX_VALUE);
+    int newSize = in.readLine(value, newMaxLineLength,
+        Math.max(maxBytesToConsume(pos), newMaxLineLength));
+    // Even we read 3 extra bytes for the first line,
+    // we won't alter existing behavior (no backwards incompat issue).
+    // Because the newSize is less than maxLineLength and
+    // the number of bytes copied to Text is always no more than newSize.
+    // If the return size from readLine is not less than maxLineLength,
+    // we will discard the current line and read the next line.
+    pos += newSize;
+    int textLength = value.getLength();
+    byte[] textBytes = value.getBytes();
+    if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
+        (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
+      // find UTF-8 BOM, strip it.
+      LOG.info("Found UTF-8 BOM and skipped it");
+      textLength -= 3;
+      newSize -= 3;
+      if (textLength > 0) {
+        // It may work to use the same buffer and not do the copy
+        byte[] result = new byte[textLength + 3];
+        System.arraycopy(textBytes, 0, result, 0, textLength + 3);
+        value.set(result, 3, textLength);
+      } else {
+        value.clear();
+      }
+    }
+    return newSize;
+  }
+
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
@@ -136,13 +171,15 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
     while (getFilePosition() <= end) {
-      newSize = in.readLine(value, maxLineLength,
-          Math.max(maxBytesToConsume(pos), maxLineLength));
-      if (newSize == 0) {
-        break;
+      if (pos == 0) {
+        newSize = skipUtfByteOrderMark();
+      } else {
+        newSize = in.readLine(value, maxLineLength,
+            Math.max(maxBytesToConsume(pos), maxLineLength));
+        pos += newSize;
       }
-      pos += newSize;
-      if (newSize < maxLineLength) {
+
+      if ((newSize == 0) || (newSize < maxLineLength)) {
         break;
       }
 

+ 79 - 0
src/test/org/apache/hadoop/mapred/TestLineRecordReader.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class TestLineRecordReader extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestLineRecordReader.class.getName());
+
+  public void testStripBOM() throws IOException {
+    LOG.info("testStripBOM");
+    // the test data contains a BOM at the start of the file
+    // confirm the BOM is skipped by LineRecordReader
+    Path localCachePath = new Path(System.getProperty("test.cache.data"));
+    Path txtPath = new Path(localCachePath, new Path("testBOM.txt"));
+    String UTF8_BOM = "\uFEFF";
+    LOG.info(txtPath.toString());
+    File testFile = new File(txtPath.toString());
+    long testFileSize = testFile.length();
+    Configuration conf = new Configuration();
+    conf.setInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
+
+    // read the data and check whether BOM is skipped
+    FileSplit split = new FileSplit(txtPath, 0, testFileSize,
+        (String[])null);
+    LineRecordReader reader = new LineRecordReader(conf, split);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    int numRecords = 0;
+    boolean firstLine = true;
+    boolean skipBOM = true;
+    String prevVal = null;
+    while (reader.next(key, value)) {
+      if (firstLine) {
+        firstLine = false;
+        if (value.toString().startsWith(UTF8_BOM)) {
+          skipBOM = false;
+        }
+      } else {
+        assertEquals("not same text", prevVal, value.toString());
+      }
+      prevVal = new String(value.toString());
+      ++numRecords;
+    }
+    reader.close();
+
+    assertTrue("BOM is not skipped", skipBOM);
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestLineRecordReader().testStripBOM();
+  }
+}

+ 2 - 0
src/test/org/apache/hadoop/mapred/testBOM.txt

@@ -0,0 +1,2 @@
+BOM(Byte Order Mark) test file
+BOM(Byte Order Mark) test file

+ 81 - 0
src/test/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.File;
+import java.io.IOException;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class TestLineRecordReader extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestLineRecordReader.class.getName());
+
+  public void testStripBOM() throws IOException {
+    LOG.info("testStripBOM");
+    // the test data contains a BOM at the start of the file
+    // confirm the BOM is skipped by LineRecordReader
+    String UTF8_BOM = "\uFEFF";
+    Path localCachePath = new Path(System.getProperty("test.cache.data"));
+    Path txtPath = new Path(localCachePath, new Path("testBOM.txt"));
+    LOG.info(txtPath.toString());
+    File testFile = new File(txtPath.toString());
+    long testFileSize = testFile.length();
+    Configuration conf = new Configuration();
+    conf.setInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
+    TaskAttemptContext context = new TaskAttemptContext(conf,
+        new TaskAttemptID());
+
+    // read the data and check whether BOM is skipped
+    FileSplit split = new FileSplit(txtPath, 0, testFileSize,
+        (String[])null);
+    LineRecordReader reader = new LineRecordReader();
+    reader.initialize(split, context);
+    int numRecords = 0;
+    boolean firstLine = true;
+    boolean skipBOM = true;
+    String prevVal = null;
+    while (reader.nextKeyValue()) {
+      if (firstLine) {
+        firstLine = false;
+        if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) {
+          skipBOM = false;
+        }
+      } else {
+        assertEquals("not same text", prevVal,
+            reader.getCurrentValue().toString());
+      }
+      prevVal = new String(reader.getCurrentValue().toString());
+      ++numRecords;
+    }
+    reader.close();
+
+    assertTrue("BOM is not skipped", skipBOM);
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestLineRecordReader().testStripBOM();
+  }
+}