فهرست منبع

HADOOP-5281. Prevent sharing incompatible ZlibCompressor instances between GzipCodec and DefaultCodec.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@755346 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas 16 سال پیش
والد
کامیت
6b4ebe9a47
3فایلهای تغییر یافته به همراه45 افزوده شده و 12 حذف شده
  1. 3 0
      CHANGES.txt
  2. 26 12
      src/core/org/apache/hadoop/io/compress/GzipCodec.java
  3. 16 0
      src/test/org/apache/hadoop/io/compress/TestCodec.java

+ 3 - 0
CHANGES.txt

@@ -742,6 +742,9 @@ Release 0.20.0 - Unreleased
     HADOOP-5483. Fixes a problem in the Directory Cleanup Thread due to which
     TestMiniMRWithDFS sometimes used to fail. (ddas) 
 
+    HADOOP-5281. Prevent sharing incompatible ZlibCompressor instances between
+    GzipCodec and DefaultCodec. (cdouglas)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

+ 26 - 12
src/core/org/apache/hadoop/io/compress/GzipCodec.java

@@ -153,16 +153,15 @@ public class GzipCodec extends DefaultCodec {
   }
 
   public Compressor createCompressor() {
-    return (ZlibFactory.isNativeZlibLoaded(conf)) ?
-               new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
-                                  ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
-                                  ZlibCompressor.CompressionHeader.GZIP_FORMAT,
-                                  64*1024) :
-               null;
+    return (ZlibFactory.isNativeZlibLoaded(conf))
+      ? new GzipZlibCompressor()
+      : null;
   }
 
   public Class<? extends Compressor> getCompressorType() {
-    return ZlibFactory.getZlibCompressorType(conf);
+    return ZlibFactory.isNativeZlibLoaded(conf)
+      ? GzipZlibCompressor.class
+      : BuiltInZlibDeflater.class;
   }
 
   public CompressionInputStream createInputStream(InputStream in) 
@@ -185,18 +184,33 @@ public class GzipCodec extends DefaultCodec {
   }
 
   public Decompressor createDecompressor() {
-    return (ZlibFactory.isNativeZlibLoaded(conf)) ?
-               new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
-                                    64*1024) :
-               null;                               
+    return (ZlibFactory.isNativeZlibLoaded(conf))
+      ? new GzipZlibDecompressor()
+      : null;
   }
 
   public Class<? extends Decompressor> getDecompressorType() {
-    return ZlibFactory.getZlibDecompressorType(conf);
+    return ZlibFactory.isNativeZlibLoaded(conf)
+      ? GzipZlibDecompressor.class
+      : BuiltInZlibInflater.class;
   }
 
   public String getDefaultExtension() {
     return ".gz";
   }
 
+  static final class GzipZlibCompressor extends ZlibCompressor {
+    public GzipZlibCompressor() {
+      super(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
+          ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
+          ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
+    }
+  }
+
+  static final class GzipZlibDecompressor extends ZlibDecompressor {
+    public GzipZlibDecompressor() {
+      super(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 64*1024);
+    }
+  }
+
 }

+ 16 - 0
src/test/org/apache/hadoop/io/compress/TestCodec.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 
 public class TestCodec extends TestCase {
 
@@ -129,6 +130,21 @@ public class TestCodec extends TestCase {
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }
 
+  public void testCodecPoolGzipReuse() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean("hadoop.native.lib", true);
+    if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+      LOG.warn("testCodecPoolGzipReuse skipped: native libs not loaded");
+      return;
+    }
+    GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
+    DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
+    Compressor c1 = CodecPool.getCompressor(gzc);
+    Compressor c2 = CodecPool.getCompressor(dfc);
+    CodecPool.returnCompressor(c1);
+    CodecPool.returnCompressor(c2);
+    assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc));
+  }
 
   public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, 
       InstantiationException, IllegalAccessException {