瀏覽代碼

HADOOP-19167 Bug Fix: Change of Codec configuration does not work (#6807)

skyskyhu 11 月之前
父節點
當前提交
3c00093cb5

+ 4 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -152,6 +153,9 @@ public class CodecPool {
       compressor = codec.createCompressor();
       LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
     } else {
+      if (conf == null && codec instanceof Configurable) {
+        conf = ((Configurable)codec).getConf();
+      }
       compressor.reinit(conf);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Got recycled compressor");

+ 35 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java

@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -32,7 +34,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -86,6 +91,36 @@ public class TestCodecPool {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testCompressorConf() throws Exception {
+    DefaultCodec codec1 = new DefaultCodec();
+    Configuration conf = new Configuration();
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.TWO);
+    codec1.setConf(conf);
+    Compressor comp1 = CodecPool.getCompressor(codec1);
+    CodecPool.returnCompressor(comp1);
+
+    DefaultCodec codec2 = new DefaultCodec();
+    Configuration conf2 = new Configuration();
+    CompressionLevel newCompressionLevel = CompressionLevel.THREE;
+    ZlibFactory.setCompressionLevel(conf2, newCompressionLevel);
+    codec2.setConf(conf2);
+    Compressor comp2 = CodecPool.getCompressor(codec2);
+    List<Field> fields = ReflectionUtils.getDeclaredFieldsIncludingInherited(comp2.getClass());
+    for (Field field : fields) {
+      if (field.getName().equals("level")) {
+        field.setAccessible(true);
+        Object levelValue = field.get(comp2);
+        if (levelValue instanceof CompressionLevel) {
+          assertEquals(newCompressionLevel, levelValue);
+        } else {
+          assertEquals(3, levelValue);
+        }
+      }
+    }
+    CodecPool.returnCompressor(comp2);
+  }
+
   @Test(timeout = 10000)
   public void testDecompressorPoolCounts() {
     // Get two decompressors and return them