Browse Source

HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed by Aki Tanaka

(cherry picked from commit 0898ff42e9e5c53f2fce7ccdeb4e1cd7d0f123b3)
Jason Lowe 7 years ago
parent
commit
75a303b548

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -46,6 +46,9 @@ Release 2.7.6 - UNRELEASED
     HADOOP-13508. FsPermission string constructor does not recognize
     sticky bit. (Atul Sikaria via Wei-Chiu Chuang, shv)
 
+    HADOOP-15206. BZip2 drops and duplicates records when input split size
+    is small. (Aki Tanaka via jlowe)
+
 Release 2.7.5 - 2017-12-14
 
   INCOMPATIBLE CHANGES

+ 29 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

@@ -357,9 +357,29 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
       bufferedIn = new BufferedInputStream(super.in);
       this.startingPos = super.getPos();
       this.readMode = readMode;
+      long numSkipped = 0;
       if (this.startingPos == 0) {
         // We only strip header if it is start of file
         bufferedIn = readStreamHeader();
+      } else if (this.readMode == READ_MODE.BYBLOCK  &&
+          this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) {
+        // When we're in BYBLOCK mode and the start position is >=0
+        // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after
+        // start of the first bz2 block to avoid duplicated records
+        numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos;
+        long skipBytes = numSkipped;
+        while (skipBytes > 0) {
+          long s = bufferedIn.skip(skipBytes);
+          if (s > 0) {
+            skipBytes -= s;
+          } else {
+            if (bufferedIn.read() == -1) {
+              break; // end of the split
+            } else {
+              skipBytes--;
+            }
+          }
+        }
       }
       input = new CBZip2InputStream(bufferedIn, readMode);
       if (this.isHeaderStripped) {
@@ -370,7 +390,15 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
         input.updateReportedByteCount(SUB_HEADER_LEN);
       }
 
-      this.updatePos(false);
+      if (numSkipped > 0) {
+        input.updateReportedByteCount((int) numSkipped);
+      }
+
+      // To avoid dropped records, not advertising a new byte position
+      // when we are in BYBLOCK mode and the start position is 0
+      if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) {
+        this.updatePos(false);
+      }
     }
 
     private BufferedInputStream readStreamHeader() throws IOException {

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -184,6 +184,14 @@ public class TestTextInputFormat {
     // corner case when we have byte alignment and position of stream are same
     verifyPartitions(471507, 218, file, codec, conf);
     verifyPartitions(473608, 110, file, codec, conf);
+
+    // corner case when split size is small and position of stream is before
+    // the first BZip2 block
+    verifyPartitions(100, 20, file, codec, conf);
+    verifyPartitions(100, 25, file, codec, conf);
+    verifyPartitions(100, 30, file, codec, conf);
+    verifyPartitions(100, 50, file, codec, conf);
+    verifyPartitions(100, 100, file, codec, conf);
   }
 
   // Test a corner case when position of stream is right after BZip2 marker