浏览代码

HADOOP-10047. Add a direct-buffer based apis for compression. Contributed by Gopal V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543542 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 11 年之前
父节点
当前提交
d9ba056bdb

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -387,6 +387,9 @@ Release 2.3.0 - UNRELEASED
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
 
+   HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
+   via acmurthy)
+
   BUG FIXES
 
     HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize

+ 11 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java

@@ -28,11 +28,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class DefaultCodec implements Configurable, CompressionCodec {
+public class DefaultCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
   private static final Log LOG = LogFactory.getLog(DefaultCodec.class);
   
   Configuration conf;
@@ -103,6 +104,15 @@ public class DefaultCodec implements Configurable, CompressionCodec {
     return ZlibFactory.getZlibDecompressor(conf);
   }
   
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return ZlibFactory.getZlibDirectDecompressor(conf);
+  }
+  
+  
   @Override
   public String getDefaultExtension() {
     return ".deflate";

+ 35 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressionCodec.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This class encapsulates a codec which can decompress direct bytebuffers.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface DirectDecompressionCodec extends CompressionCodec {
+  /**
+   * Create a new {@link DirectDecompressor} for use by this {@link DirectDecompressionCodec}.
+   * 
+   * @return a new direct decompressor for use by this codec
+   */
+  DirectDecompressor createDirectDecompressor();
+}

+ 59 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Specification of a direct ByteBuffer 'de-compressor'. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface DirectDecompressor {
+  /*
+   * This exposes a direct interface for record decompression with direct byte
+   * buffers.
+   * 
+   * The decompress() function need not always consume the buffers provided,
+   * it will need to be called multiple times to decompress an entire buffer 
+   * and the object will hold the compression context internally.
+   * 
+   * Codecs such as {@link SnappyCodec} may or may not support partial
+   * decompression of buffers and will need enough space in the destination
+   * buffer to decompress an entire block.
+   * 
+   * The operation is modelled around dst.put(src);
+   * 
+   * The end result will move src.position() by the bytes-read and
+   * dst.position() by the bytes-written. It should not modify the src.limit()
+   * or dst.limit() to maintain consistency of operation between codecs.
+   * 
+   * @param src Source direct {@link ByteBuffer} for reading from. Requires src
+   * != null and src.remaining() > 0
+   * 
+   * @param dst Destination direct {@link ByteBuffer} for storing the results
+   * into. Requires dst != null and dst.remaining() to be > 0
+   * 
+   * @throws IOException if compression fails
+   */
+  public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException;
+}

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.zlib.*;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+
 import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
 
 /**
@@ -218,6 +220,13 @@ public class GzipCodec extends DefaultCodec {
       ? GzipZlibDecompressor.class
       : BuiltInGzipDecompressor.class;
   }
+    
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return ZlibFactory.isNativeZlibLoaded(conf) 
+        ? new ZlibDecompressor.ZlibDirectDecompressor(
+          ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
+  }
 
   @Override
   public String getDefaultExtension() {

+ 10 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java

@@ -26,13 +26,14 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
 import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
  * This class creates snappy compressors/decompressors.
  */
-public class SnappyCodec implements Configurable, CompressionCodec {
+public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
   Configuration conf;
 
   /**
@@ -203,6 +204,14 @@ public class SnappyCodec implements Configurable, CompressionCodec {
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
     return new SnappyDecompressor(bufferSize);
   }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null;
+  }
 
   /**
    * Get the default filename extension for this kind of compression.

+ 72 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java

@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -282,4 +283,75 @@ public class SnappyDecompressor implements Decompressor {
   private native static void initIDs();
 
   private native int decompressBytesDirect();
+  
+  int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert (this instanceof SnappyDirectDecompressor);
+    
+    ByteBuffer presliced = dst;
+    if (dst.position() > 0) {
+      presliced = dst;
+      dst = dst.slice();
+    }
+
+    Buffer originalCompressed = compressedDirectBuf;
+    Buffer originalUncompressed = uncompressedDirectBuf;
+    int originalBufferSize = directBufferSize;
+    compressedDirectBuf = src.slice();
+    compressedDirectBufLen = src.remaining();
+    uncompressedDirectBuf = dst;
+    directBufferSize = dst.remaining();
+    int n = 0;
+    try {
+      n = decompressBytesDirect();
+      presliced.position(presliced.position() + n);
+      // SNAPPY always consumes the whole buffer or throws an exception
+      src.position(src.limit());
+      finished = true;
+    } finally {
+      compressedDirectBuf = originalCompressed;
+      uncompressedDirectBuf = originalUncompressed;
+      compressedDirectBufLen = 0;
+      directBufferSize = originalBufferSize;
+    }
+    return n;
+  }
+  
+  public static class SnappyDirectDecompressor extends SnappyDecompressor implements
+      DirectDecompressor {
+    
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+
+    private boolean endOfInput;
+
+    @Override
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";
+      this.decompressDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public synchronized void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public synchronized int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
 }

+ 84 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java

@@ -23,6 +23,7 @@ import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -106,7 +107,7 @@ public class ZlibDecompressor implements Decompressor {
    */
   public ZlibDecompressor(CompressionHeader header, int directBufferSize) {
     this.header = header;
-    this.directBufferSize = directBufferSize;
+    this.directBufferSize = directBufferSize;    
     compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     uncompressedDirectBuf.position(directBufferSize);
@@ -310,4 +311,86 @@ public class ZlibDecompressor implements Decompressor {
   private native static int getRemaining(long strm);
   private native static void reset(long strm);
   private native static void end(long strm);
+    
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert (this instanceof ZlibDirectDecompressor);
+    
+    ByteBuffer presliced = dst;
+    if (dst.position() > 0) {
+      presliced = dst;
+      dst = dst.slice();
+    }
+
+    Buffer originalCompressed = compressedDirectBuf;
+    Buffer originalUncompressed = uncompressedDirectBuf;
+    int originalBufferSize = directBufferSize;
+    compressedDirectBuf = src;
+    compressedDirectBufOff = src.position();
+    compressedDirectBufLen = src.remaining();
+    uncompressedDirectBuf = dst;
+    directBufferSize = dst.remaining();
+    int n = 0;
+    try {
+      n = inflateBytesDirect();
+      presliced.position(presliced.position() + n);
+      if (compressedDirectBufLen > 0) {
+        src.position(compressedDirectBufOff);
+      } else {
+        src.position(src.limit());
+      }
+    } finally {
+      compressedDirectBuf = originalCompressed;
+      uncompressedDirectBuf = originalUncompressed;
+      compressedDirectBufOff = 0;
+      compressedDirectBufLen = 0;
+      directBufferSize = originalBufferSize;
+    }
+    return n;
+  }
+  
+  public static class ZlibDirectDecompressor 
+      extends ZlibDecompressor implements DirectDecompressor {
+    public ZlibDirectDecompressor() {
+      super(CompressionHeader.DEFAULT_HEADER, 0);
+    }
+
+    public ZlibDirectDecompressor(CompressionHeader header, int directBufferSize) {
+      super(header, directBufferSize);
+    }
+    
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+    
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+    
+    private boolean endOfInput;
+
+    @Override
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";      
+      this.inflateDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public synchronized void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public synchronized int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
 }

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java

@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.util.NativeCodeLoader;
@@ -116,6 +117,17 @@ public class ZlibFactory {
     return (isNativeZlibLoaded(conf)) ? 
       new ZlibDecompressor() : new BuiltInZlibInflater(); 
   }
+  
+  /**
+   * Return the appropriate implementation of the zlib direct decompressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate implementation of the zlib decompressor.
+   */
+  public static DirectDecompressor getZlibDirectDecompressor(Configuration conf) {
+    return (isNativeZlibLoaded(conf)) ? 
+      new ZlibDecompressor.ZlibDirectDecompressor() : null; 
+  }
 
   public static void setCompressionStrategy(Configuration conf,
       CompressionStrategy strategy) {

+ 53 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java

@@ -29,6 +29,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.hadoop.io.DataInputBuffer;
@@ -38,6 +39,7 @@ import org.apache.hadoop.io.compress.BlockDecompressorStream;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -147,7 +149,7 @@ public class TestSnappyCompressorDecompressor {
       fail("testSnappyCompressorCompressAIOBException ex error !!!");
     }
   }
-
+  
   @Test
   public void testSnappyDecompressorCompressAIOBException() {
     try {
@@ -275,6 +277,56 @@ public class TestSnappyCompressorDecompressor {
       fail("testSnappyBlockCompression ex error !!!");
     }
   }
+  
+  private void compressDecompressLoop(int rawDataSize) throws IOException {
+    byte[] rawData = BytesGenerator.get(rawDataSize);    
+    byte[] compressedResult = new byte[rawDataSize+20];
+    int directBufferSize = Math.max(rawDataSize*2, 64*1024);    
+    SnappyCompressor compressor = new SnappyCompressor(directBufferSize);
+    compressor.setInput(rawData, 0, rawDataSize);
+    int compressedSize = compressor.compress(compressedResult, 0, compressedResult.length);
+    SnappyDirectDecompressor decompressor = new SnappyDirectDecompressor();
+   
+    ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
+    ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
+
+    inBuf.put(compressedResult, 0, compressedSize);
+    inBuf.flip();    
+
+    ByteBuffer expected = ByteBuffer.wrap(rawData);
+    
+    outBuf.clear();
+    while(!decompressor.finished()) {
+      decompressor.decompress(inBuf, outBuf);
+      if (outBuf.remaining() == 0) {
+        outBuf.flip();
+        while (outBuf.remaining() > 0) {        
+          assertEquals(expected.get(), outBuf.get());
+        }
+        outBuf.clear();
+      }
+    }
+    outBuf.flip();
+    while (outBuf.remaining() > 0) {        
+      assertEquals(expected.get(), outBuf.get());
+    }
+    outBuf.clear();
+    
+    assertEquals(0, expected.remaining());
+  }
+  
+  @Test
+  public void testSnappyDirectBlockCompression() {
+    int[] size = { 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };    
+    assumeTrue(SnappyCodec.isNativeCodeLoaded());
+    try {
+      for (int i = 0; i < size.length; i++) {
+        compressDecompressLoop(size[i]);
+      }
+    } catch (IOException ex) {
+      fail("testSnappyDirectBlockCompression ex !!!" + ex);
+    }
+  }
 
   @Test
   public void testSnappyCompressorDecopressorLogicWithCompressionStreams() {

+ 60 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java

@@ -19,9 +19,13 @@ package org.apache.hadoop.io.compress.zlib;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
+
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Random;
+import java.util.zip.DeflaterOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -33,6 +37,8 @@ import org.apache.hadoop.io.compress.DecompressorStream;
 import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.junit.Before;
 import org.junit.Test;
 import com.google.common.collect.ImmutableSet;
@@ -150,6 +156,60 @@ public class TestZlibCompressorDecompressor {
     }
   }
   
+  
+  private void compressDecompressLoop(int rawDataSize) throws IOException {
+    byte[] rawData = null;
+    rawData = generate(rawDataSize);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize+12);
+    DeflaterOutputStream dos = new DeflaterOutputStream(baos);
+    dos.write(rawData);
+    dos.flush();
+    dos.close();
+    byte[] compressedResult = baos.toByteArray();
+    int compressedSize = compressedResult.length;
+    ZlibDirectDecompressor decompressor = new ZlibDirectDecompressor();
+   
+    ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
+    ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
+
+    inBuf.put(compressedResult, 0, compressedSize);
+    inBuf.flip();    
+
+    ByteBuffer expected = ByteBuffer.wrap(rawData);
+    
+    outBuf.clear();
+    while(!decompressor.finished()) {
+      decompressor.decompress(inBuf, outBuf);
+      if (outBuf.remaining() == 0) {
+        outBuf.flip();
+        while (outBuf.remaining() > 0) {        
+          assertEquals(expected.get(), outBuf.get());
+        }
+        outBuf.clear();
+      }
+    }
+    outBuf.flip();
+    while (outBuf.remaining() > 0) {        
+      assertEquals(expected.get(), outBuf.get());
+    }
+    outBuf.clear();
+    
+    assertEquals(0, expected.remaining());
+  }
+
+  @Test
+  public void testZlibDirectCompressDecompress() {
+    int[] size = { 1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
+    assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+    try {
+      for (int i = 0; i < size.length; i++) {
+        compressDecompressLoop(size[i]);
+      }
+    } catch (IOException ex) {
+      fail("testZlibDirectCompressDecompress ex !!!" + ex);
+    }
+  }
+  
   @Test
   public void testZlibCompressorDecompressorSetDictionary() {
     Configuration conf = new Configuration();