Bladeren bron

HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.

(cherry picked from commit e3ba9ad3f116306910f74645ded91506345b9f6e)
(cherry picked from commit d219550e8bf4c24be68a74f7b8032b6a8b8af0fc)
Akira Ajisaka 9 jaren geleden
bovenliggende
commit
d44873a2a0

+ 4 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -98,6 +98,10 @@ Release 2.7.3 - UNRELEASED
     printAmazonServiceException/printAmazonClientException appear copy & paste
     of AWS examples. (Steve Loughran via cnauroth)
 
+    HADOOP-13270. BZip2CompressionInputStream finds the same compression marker
+    twice in corner case, causing duplicate data blocks.
+    (Kai Sasaki via aajisaka)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

+ 6 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

@@ -207,7 +207,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
     // time stream might start without a leading BZ.
     final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
       CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
-    long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
+    long adjStart = 0L;
+    if (start != 0) {
+      // Other than the first of file, the marker size is 6 bytes.
+      adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
+          - (HEADER_LEN + SUB_HEADER_LEN)));
+    }
 
     ((Seekable)seekableIn).seek(adjStart);
     SplitCompressionInputStream in =

+ 57 - 51
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -175,69 +175,75 @@ public class TestTextInputFormat {
     for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
         length += random.nextInt(MAX_LENGTH / 4)+1) {
 
-      LOG.info("creating; entries = " + length);
-
-
-      // create a file with length entries
-      Writer writer =
-        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
-      try {
-        for (int i = 0; i < length; i++) {
-          writer.write(Integer.toString(i));
-          writer.write("\n");
-        }
-      } finally {
-        writer.close();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1;
+        verifyPartitions(length, numSplits, file, codec, conf);
       }
+    }
 
-      // try splitting the file in a variety of sizes
-      TextInputFormat format = new TextInputFormat();
-      format.configure(conf);
-      LongWritable key = new LongWritable();
-      Text value = new Text();
-      for (int i = 0; i < 3; i++) {
-        int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
-        LOG.info("splitting: requesting = " + numSplits);
-        InputSplit[] splits = format.getSplits(conf, numSplits);
-        LOG.info("splitting: got =        " + splits.length);
+    // corner case when we have byte alignment and position of stream are same
+    verifyPartitions(471507, 218, file, codec, conf);
+    verifyPartitions(473608, 110, file, codec, conf);
+  }
 
+  private void verifyPartitions(int length, int numSplits, Path file,
+      CompressionCodec codec, JobConf conf) throws IOException {
 
+    LOG.info("creating; entries = " + length);
 
-        // check each split
-        BitSet bits = new BitSet(length);
-        for (int j = 0; j < splits.length; j++) {
-          LOG.debug("split["+j+"]= " + splits[j]);
-          RecordReader<LongWritable, Text> reader =
-            format.getRecordReader(splits[j], conf, reporter);
-          try {
-            int counter = 0;
-            while (reader.next(key, value)) {
-              int v = Integer.parseInt(value.toString());
-              LOG.debug("read " + v);
 
-              if (bits.get(v)) {
-                LOG.warn("conflict with " + v +
+    // create a file with length entries
+    Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+    try {
+      for (int i = 0; i < length; i++) {
+        writer.write(Integer.toString(i));
+        writer.write("\n");
+      }
+    } finally {
+      writer.close();
+    }
+
+    // try splitting the file in a variety of sizes
+    TextInputFormat format = new TextInputFormat();
+    format.configure(conf);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    LOG.info("splitting: requesting = " + numSplits);
+    InputSplit[] splits = format.getSplits(conf, numSplits);
+    LOG.info("splitting: got =        " + splits.length);
+
+
+    // check each split
+    BitSet bits = new BitSet(length);
+    for (int j = 0; j < splits.length; j++) {
+      LOG.debug("split["+j+"]= " + splits[j]);
+      RecordReader<LongWritable, Text> reader =
+              format.getRecordReader(splits[j], conf, Reporter.NULL);
+      try {
+        int counter = 0;
+        while (reader.next(key, value)) {
+          int v = Integer.parseInt(value.toString());
+          LOG.debug("read " + v);
+          if (bits.get(v)) {
+            LOG.warn("conflict with " + v +
                     " in split " + j +
                     " at position "+reader.getPos());
-              }
-              assertFalse("Key in multiple partitions.", bits.get(v));
-              bits.set(v);
-              counter++;
-            }
-            if (counter > 0) {
-              LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
-            } else {
-              LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
-            }
-          } finally {
-            reader.close();
           }
+          assertFalse("Key in multiple partitions.", bits.get(v));
+          bits.set(v);
+          counter++;
         }
-        assertEquals("Some keys in no partition.", length, bits.cardinality());
+        if (counter > 0) {
+          LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+        } else {
+          LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+        }
+      } finally {
+        reader.close();
       }
-
     }
-
+    assertEquals("Some keys in no partition.", length, bits.cardinality());
   }
 
   private static LineReader makeStream(String str) throws IOException {