Browse Source

HADOOP-18400. Fix file split duplicating records from a succeeding split when reading BZip2 text files (#4732)

Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 30c36ef25a335bc123fdae90b3366e582ad1b37a)
Ashutosh Gupta 2 years ago
parent
commit
3af155ceeb

+ 24 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

@@ -335,6 +335,7 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
     private boolean isSubHeaderStripped = false;
     private boolean isSubHeaderStripped = false;
     private READ_MODE readMode = READ_MODE.CONTINUOUS;
     private READ_MODE readMode = READ_MODE.CONTINUOUS;
     private long startingPos = 0L;
     private long startingPos = 0L;
+    private boolean didInitialRead;
 
 
     // Following state machine handles different states of compressed stream
     // Following state machine handles different states of compressed stream
     // position
     // position
@@ -480,24 +481,42 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
     */
     */
 
 
     public int read(byte[] b, int off, int len) throws IOException {
     public int read(byte[] b, int off, int len) throws IOException {
+      if (b == null) {
+        throw new NullPointerException();
+      }
+      if (off < 0 || len < 0 || len > b.length - off) {
+        throw new IndexOutOfBoundsException();
+      }
+      if (len == 0) {
+        return 0;
+      }
       if (needsReset) {
       if (needsReset) {
         internalReset();
         internalReset();
       }
       }
-
-      int result = 0;
-      result = this.input.read(b, off, len);
+      // When startingPos > 0, the stream should be initialized at the end of
+      // one block (which would correspond to be the start of another block).
+      // Thus, the initial read would technically be reading one byte passed a
+      // BZip2 end of block marker. To be consistent, we should also be
+      // updating the position to be one byte after the end of an block on the
+      // initial read.
+      boolean initializedAtEndOfBlock =
+          !didInitialRead && startingPos > 0 && readMode == READ_MODE.BYBLOCK;
+      int result = initializedAtEndOfBlock
+          ? BZip2Constants.END_OF_BLOCK
+          : this.input.read(b, off, len);
       if (result == BZip2Constants.END_OF_BLOCK) {
       if (result == BZip2Constants.END_OF_BLOCK) {
         this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
         this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
       }
       }
 
 
       if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
       if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
-        result = this.input.read(b, off, off + 1);
+        result = this.input.read(b, off, 1);
         // This is the precise time to update compressed stream position
         // This is the precise time to update compressed stream position
         // to the client of this code.
         // to the client of this code.
         this.updatePos(true);
         this.updatePos(true);
         this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
         this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
       }
       }
 
 
+      didInitialRead = true;
       return result;
       return result;
 
 
     }
     }
@@ -513,6 +532,7 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
         needsReset = false;
         needsReset = false;
         BufferedInputStream bufferedIn = readStreamHeader();
         BufferedInputStream bufferedIn = readStreamHeader();
         input = new CBZip2InputStream(bufferedIn, this.readMode);
         input = new CBZip2InputStream(bufferedIn, this.readMode);
+        didInitialRead = false;
       }
       }
     }    
     }    
     
     

+ 203 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBZip2Codec.java

@@ -0,0 +1,203 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.com.google.common.primitives.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
+import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter;
+import org.apache.hadoop.io.compress.bzip2.BZip2Utils;
+
+import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK;
+import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.CONTINUOUS;
+import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public final class TestBZip2Codec {
+
+  private static final long HEADER_LEN = 2;
+
+  private Configuration conf;
+  private FileSystem fs;
+  private BZip2Codec codec;
+  private Decompressor decompressor;
+  private Path tempFile;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+
+    Path workDir = new Path(System.getProperty("test.build.data", "target"),
+        "data/" + getClass().getSimpleName());
+
+    Path inputDir = new Path(workDir, "input");
+    tempFile = new Path(inputDir, "test.txt.bz2");
+
+    fs = workDir.getFileSystem(conf);
+
+    codec = new BZip2Codec();
+    codec.setConf(new Configuration(/* loadDefaults */ false));
+    decompressor = CodecPool.getDecompressor(codec);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CodecPool.returnDecompressor(decompressor);
+    fs.delete(tempFile, /* recursive */ false);
+  }
+
+  @Test
+  public void createInputStreamWithStartAndEnd() throws Exception {
+    byte[] data1 = newAlternatingByteArray(BLOCK_SIZE, 'a', 'b');
+    byte[] data2 = newAlternatingByteArray(BLOCK_SIZE, 'c', 'd');
+    byte[] data3 = newAlternatingByteArray(BLOCK_SIZE, 'e', 'f');
+
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.write(data1);
+      writer.write(data2);
+      writer.write(data3);
+    }
+    long fileSize = fs.getFileStatus(tempFile).getLen();
+
+    List<Long> nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(tempFile, conf);
+    long block2Start = nextBlockOffsets.get(0);
+    long block3Start = nextBlockOffsets.get(1);
+
+    try (SplitCompressionInputStream stream = newCompressionStream(tempFile, 0, fileSize,
+        BYBLOCK)) {
+      assertEquals(0, stream.getPos());
+      assertCasesWhereReadDoesNotAdvanceStream(stream);
+      assertReadingAtPositionZero(stream, data1);
+      assertCasesWhereReadDoesNotAdvanceStream(stream);
+      assertReadingPastEndOfBlock(stream, block2Start, data2);
+      assertReadingPastEndOfBlock(stream, block3Start, data3);
+      assertEquals(-1, stream.read());
+    }
+
+    try (SplitCompressionInputStream stream = newCompressionStream(tempFile, 1, fileSize - 1,
+        BYBLOCK)) {
+      assertEquals(block2Start, stream.getPos());
+      assertCasesWhereReadDoesNotAdvanceStream(stream);
+      assertReadingPastEndOfBlock(stream, block2Start, data2);
+      assertCasesWhereReadDoesNotAdvanceStream(stream);
+      assertReadingPastEndOfBlock(stream, block3Start, data3);
+      assertEquals(-1, stream.read());
+    }
+
+    // With continuous mode, only starting at or after the stream header is
+    // supported.
+    byte[] allData = Bytes.concat(data1, data2, data3);
+    assertReadingWithContinuousMode(tempFile, 0, fileSize, allData);
+    assertReadingWithContinuousMode(tempFile, HEADER_LEN, fileSize - HEADER_LEN, allData);
+  }
+
+  private void assertReadingWithContinuousMode(Path file, long start, long length,
+      byte[] expectedData) throws IOException {
+    try (SplitCompressionInputStream stream = newCompressionStream(file, start, length,
+        CONTINUOUS)) {
+      assertEquals(HEADER_LEN, stream.getPos());
+
+      assertRead(stream, expectedData);
+      assertEquals(-1, stream.read());
+
+      // When specifying CONTINUOUS read mode, the position ends up not being
+      // updated at all.
+      assertEquals(HEADER_LEN, stream.getPos());
+    }
+  }
+
+  private SplitCompressionInputStream newCompressionStream(Path file, long start, long length,
+      READ_MODE readMode) throws IOException {
+    FSDataInputStream rawIn = fs.open(file);
+    rawIn.seek(start);
+    long end = start + length;
+    return codec.createInputStream(rawIn, decompressor, start, end, readMode);
+  }
+
+  private static byte[] newAlternatingByteArray(int size, int... choices) {
+    checkArgument(choices.length > 1);
+    byte[] result = new byte[size];
+    for (int i = 0; i < size; i++) {
+      result[i] = (byte) choices[i % choices.length];
+    }
+    return result;
+  }
+
+  private static void assertCasesWhereReadDoesNotAdvanceStream(SplitCompressionInputStream in)
+      throws IOException {
+    long initialPos = in.getPos();
+
+    assertEquals(0, in.read(new byte[0]));
+
+    assertThatNullPointerException().isThrownBy(() -> in.read(null, 0, 1));
+    assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(
+        () -> in.read(new byte[5], -1, 2));
+    assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(
+        () -> in.read(new byte[5], 0, -1));
+    assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(
+        () -> in.read(new byte[5], 1, 5));
+
+    assertEquals(initialPos, in.getPos());
+  }
+
+  private static void assertReadingAtPositionZero(SplitCompressionInputStream in,
+      byte[] expectedData) throws IOException {
+    byte[] buffer = new byte[expectedData.length];
+    assertEquals(1, in.read(buffer, 0, 1));
+    assertEquals(expectedData[0], buffer[0]);
+    assertEquals(0, in.getPos());
+
+    IOUtils.readFully(in, buffer, 1, expectedData.length - 1);
+    assertArrayEquals(expectedData, buffer);
+    assertEquals(0, in.getPos());
+  }
+
+  private static void assertReadingPastEndOfBlock(SplitCompressionInputStream in,
+      long endOfBlockPos, byte[] expectedData) throws IOException {
+    byte[] buffer = new byte[expectedData.length];
+    assertEquals(1, in.read(buffer));
+    assertEquals(expectedData[0], buffer[0]);
+    assertEquals(endOfBlockPos + 1, in.getPos());
+
+    IOUtils.readFully(in, buffer, 1, expectedData.length - 1);
+    assertArrayEquals(expectedData, buffer);
+    assertEquals(endOfBlockPos + 1, in.getPos());
+  }
+
+  private static void assertRead(InputStream in, byte[] expectedData) throws IOException {
+    byte[] buffer = new byte[expectedData.length];
+    IOUtils.readFully(in, buffer);
+    assertArrayEquals(expectedData, buffer);
+  }
+}

+ 123 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java

@@ -19,8 +19,10 @@ package org.apache.hadoop.mapreduce.lib.input;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
+import java.util.StringJoiner;
 
 
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
@@ -34,6 +36,7 @@ import org.apache.hadoop.io.compress.bzip2.BZip2Utils;
 
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
 import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 
 
 public abstract class BaseTestLineRecordReaderBZip2 {
 public abstract class BaseTestLineRecordReaderBZip2 {
@@ -306,6 +309,8 @@ public abstract class BaseTestLineRecordReaderBZip2 {
     countAssert.assertSingleSplit();
     countAssert.assertSingleSplit();
     countAssert.assertSplittingAtBlocks();
     countAssert.assertSplittingAtBlocks();
     countAssert.assertSplittingJustAfterSecondBlockStarts();
     countAssert.assertSplittingJustAfterSecondBlockStarts();
+    countAssert.assertSplittingEachBlockRangeInThreeParts();
+    countAssert.assertSplitsAroundBlockStartOffsets();
   }
   }
 
 
   private class RecordCountAssert {
   private class RecordCountAssert {
@@ -334,16 +339,7 @@ public abstract class BaseTestLineRecordReaderBZip2 {
     }
     }
 
 
     private void assertSplittingAtBlocks() throws IOException {
     private void assertSplittingAtBlocks() throws IOException {
-      for (int i = 0; i < numBlocks; i++) {
-        long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1);
-        long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i);
-        long length = end - start;
-
-        String message = "At i=" + i;
-        long expectedCount = countsIfSplitAtBlocks[i];
-        assertEquals(
-            message, expectedCount, reader.countRecords(start, length));
-      }
+      assertSplits(getSplitsAtBlocks());
     }
     }
 
 
     private void assertSplittingJustAfterSecondBlockStarts()
     private void assertSplittingJustAfterSecondBlockStarts()
@@ -363,6 +359,123 @@ public abstract class BaseTestLineRecordReaderBZip2 {
           remainingRecords,
           remainingRecords,
           reader.countRecords(firstSplitSize, fileSize - firstSplitSize));
           reader.countRecords(firstSplitSize, fileSize - firstSplitSize));
     }
     }
+
+    private void assertSplittingEachBlockRangeInThreeParts()
+        throws IOException {
+      for (SplitRange splitRange : getSplitsAtBlocks()) {
+        long[] expectedNumRecordsPerPart = new long[] {
+            splitRange.expectedNumRecords, 0, 0
+        };
+        List<SplitRange> parts = splitRange.divide(expectedNumRecordsPerPart);
+        assertSplits(parts);
+      }
+    }
+
+    private void assertSplitsAroundBlockStartOffsets()
+        throws IOException {
+      for (SplitRange split : getSplitsAtBlocks()) {
+        assertSplit(split.withLength(1));
+        if (split.start > 0) {
+          assertSplit(split.moveBy(-2).withLength(3));
+          assertSplit(split.moveBy(-2).withLength(2).withExpectedNumRecords(0));
+          assertSplit(split.moveBy(-1).withLength(2));
+          assertSplit(split.moveBy(-1).withLength(1).withExpectedNumRecords(0));
+        }
+        assertSplit(split.moveBy(1).withLength(1).withExpectedNumRecords(0));
+        assertSplit(split.moveBy(2).withLength(1).withExpectedNumRecords(0));
+      }
+    }
+
+    private List<SplitRange> getSplitsAtBlocks() {
+      List<SplitRange> splits = new ArrayList<>();
+      for (int i = 0; i < numBlocks; i++) {
+        String name = "Block" + i;
+        long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1);
+        long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i);
+        long length = end - start;
+        long expectedNumRecords = countsIfSplitAtBlocks[i];
+        splits.add(new SplitRange(name, start, length, expectedNumRecords));
+      }
+      return splits;
+    }
+
+    private void assertSplits(Iterable<SplitRange> splitRanges)
+        throws IOException {
+      for (SplitRange splitRange : splitRanges) {
+        assertSplit(splitRange);
+      }
+    }
+
+    private void assertSplit(SplitRange splitRange) throws IOException {
+      String message = splitRange.toString();
+      long actual = reader.countRecords(splitRange.start, splitRange.length);
+      assertEquals(message, splitRange.expectedNumRecords, actual);
+    }
+  }
+
+  private static class SplitRange {
+    final private String name;
+    final private long start;
+    final private long length;
+    final private long expectedNumRecords;
+
+    SplitRange(
+        String name,
+        long start,
+        long length,
+        long expectedNumRecords) {
+      this.name = name;
+      this.start = start;
+      this.length = length;
+      this.expectedNumRecords = expectedNumRecords;
+    }
+
+    @Override
+    public String toString() {
+      return new StringJoiner(", ", SplitRange.class.getSimpleName() + "[", "]")
+          .add("name='" + name + "'")
+          .add("start=" + start)
+          .add("length=" + length)
+          .add("expectedNumRecords=" + expectedNumRecords)
+          .toString();
+    }
+
+    List<SplitRange> divide(long[] expectedNumRecordsPerPart) {
+      int numParts = expectedNumRecordsPerPart.length;
+      checkArgument(numParts > 0);
+
+      long minPartSize = length / numParts;
+      checkArgument(minPartSize > 0);
+      long lastPartExtraSize = length % numParts;
+
+      List<SplitRange> partRanges = new ArrayList<>();
+      long partStart = start;
+      for (int i = 0; i < numParts; i++) {
+        String partName = name + "_Part" + i;
+
+        long extraSize = i == numParts - 1 ? lastPartExtraSize : 0;
+        long partSize = minPartSize + extraSize;
+
+        long partExpectedNumRecords = expectedNumRecordsPerPart[i];
+
+        partRanges.add(new SplitRange(
+            partName, partStart, partSize, partExpectedNumRecords));
+        partStart += partSize;
+      }
+      return partRanges;
+    }
+
+    SplitRange withLength(long newLength) {
+      return new SplitRange(name, start, newLength, expectedNumRecords);
+    }
+
+    SplitRange withExpectedNumRecords(long newExpectedNumRecords) {
+      return new SplitRange(name, start, length, newExpectedNumRecords);
+    }
+
+    SplitRange moveBy(long delta) {
+      return new SplitRange(name, start + delta, length, expectedNumRecords);
+    }
   }
   }
 
 
   private long getFileSize(Path path) throws IOException {
   private long getFileSize(Path path) throws IOException {