소스 검색

HADOOP-7206. Integrate Snappy compression. Contributed by T Jake Luciani.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1137690 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 14 년 전
부모
커밋
7e1e4bf50f

+ 2 - 0
common/CHANGES.txt

@@ -47,6 +47,8 @@ Trunk (unreleased changes)
     HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
     in ObjectWritable. (todd)
 
+    HADOOP-7206. Integrate Snappy compression. (T Jake Luciani via tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

+ 4 - 0
common/ivy.xml

@@ -327,5 +327,9 @@
       name="protobuf-java"
       rev="${protobuf.version}"
       conf="common->default"/>
+    <dependency org="org.xerial.snappy"
+      name="snappy-java"
+      rev="${snappy-java.version}"
+      conf="common->default"/>
   </dependencies>
 </ivy-module>

+ 5 - 0
common/ivy/hadoop-common-template.xml

@@ -155,5 +155,10 @@
       <artifactId>protobuf-java</artifactId>
       <version>2.4.0a</version>
     </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>java-snappy</artifactId>
+      <version>1.0.3-rc2</version>
+    </dependency>
   </dependencies>
 </project>

+ 1 - 0
common/ivy/libraries.properties

@@ -74,6 +74,7 @@ rats-lib.version=0.6
 servlet.version=4.0.6
 servlet-api-2.5.version=6.1.14
 servlet-api.version=2.5
+snappy-java.version=1.0.3-rc2
 slf4j-api.version=1.5.11
 slf4j-log4j12.version=1.5.11
 

+ 1 - 1
common/src/java/core-default.xml

@@ -174,7 +174,7 @@
 
 <property>
   <name>io.compression.codecs</name>
-  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec</value>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.DeflateCodec</value>
   <description>A list of the compression codec classes that can be used 
                for compression/decompression.</description>
 </property>

+ 150 - 0
common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java

@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyError;
+
+public class SnappyCodec implements Configurable, CompressionCodec {
+  private static final Log logger = LogFactory.getLog(SnappyCodec.class
+      .getName());
+  private static boolean nativeSnappyLoaded = false;
+  private Configuration conf;
+
+  public static final String SNAPPY_BUFFER_SIZE_KEY = "io.compression.codec.snappy.buffersize";
+  public static final int DEFAULT_SNAPPY_BUFFER_SIZE = 256 * 1024;
+
+  public SnappyCodec() {
+
+  }
+
+  public SnappyCodec(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  static {
+    try {
+      if (Snappy.getNativeLibraryVersion() != null) {
+        logger
+            .info("Successfully loaded & initialized native-snappy library [snappy-java rev "
+                + Snappy.getNativeLibraryVersion() + "]");
+
+        nativeSnappyLoaded = true;
+      } else {
+        logger.info("Failed to load native-snappy library");
+      }
+
+    } catch (SnappyError e) {
+      logger.error("Native Snappy load error: ", e);
+    }
+  }
+
+  public static boolean isNativeSnappyLoaded(Configuration conf) {
+    return nativeSnappyLoaded;
+  }
+
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  public CompressionOutputStream createOutputStream(OutputStream out,
+      Compressor compressor) throws IOException {
+
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+
+    int bufferSize = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+        DEFAULT_SNAPPY_BUFFER_SIZE);
+
+    int compressionOverhead = Snappy.maxCompressedLength(bufferSize) - bufferSize; 
+
+    return new BlockCompressorStream(out, compressor, bufferSize,
+        compressionOverhead);
+  }
+
+  public Class<? extends Compressor> getCompressorType() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+    return SnappyCompressor.class;
+  }
+
+  public Compressor createCompressor() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+
+    return new SnappyCompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+        DEFAULT_SNAPPY_BUFFER_SIZE));
+  }
+
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  public CompressionInputStream createInputStream(InputStream in,
+      Decompressor decompressor) throws IOException {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+    return new BlockDecompressorStream(in, decompressor, conf.getInt(
+        SNAPPY_BUFFER_SIZE_KEY, DEFAULT_SNAPPY_BUFFER_SIZE));
+  }
+
+  public Class<? extends Decompressor> getDecompressorType() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+    return SnappyDecompressor.class;
+  }
+
+  public Decompressor createDecompressor() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native-snappy library not available");
+    }
+
+    return new SnappyDecompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+        DEFAULT_SNAPPY_BUFFER_SIZE));
+  }
+
+  public String getDefaultExtension() {
+    return ".snappy";
+  }
+}

+ 177 - 0
common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java

@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+public class SnappyCompressor implements Compressor {
+  private static final Log logger = LogFactory.getLog(SnappyCompressor.class
+      .getName());
+
+  private boolean finish, finished;
+  private ByteBuffer outBuf;
+  private ByteBuffer compressedBuf;
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+
+  public SnappyCompressor(int bufferSize) {
+    outBuf = ByteBuffer.allocateDirect(bufferSize);
+    compressedBuf = ByteBuffer.allocateDirect(Snappy
+        .maxCompressedLength(bufferSize));
+
+    reset();
+  }
+
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    finished = false;
+
+    outBuf.put(b, off, len);
+
+    bytesRead += len;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  public synchronized boolean needsInput() {
+    // needs input if compressed data was consumed
+    if (compressedBuf.position() > 0
+        && compressedBuf.limit() > compressedBuf.position())
+      return false;
+
+    return true;
+  }
+
+  public synchronized void finish() {
+    finish = true;
+  }
+
+  public synchronized boolean finished() {
+    // Check if all compressed data has been consumed
+    return (finish && finished);
+  }
+
+  public synchronized int compress(byte[] b, int off, int len)
+      throws IOException {
+
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    if (finished || outBuf.position() == 0) {
+      finished = true;
+      return 0;
+    }
+
+    // Only need todo this once
+    if (compressedBuf.position() == 0) {
+      try {
+        outBuf.limit(outBuf.position());
+        outBuf.rewind();
+
+        int lim = Snappy.compress(outBuf, compressedBuf);
+
+        compressedBuf.limit(lim);
+        compressedBuf.rewind();
+      } catch (SnappyException e) {
+        throw new IOException(e);
+      }
+    }
+
+    int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len
+        : (compressedBuf.limit() - compressedBuf.position());
+
+    if (n == 0) {
+      finished = true;
+      return 0;
+    }
+
+    compressedBuf.get(b, off, n);
+
+    bytesWritten += n;
+
+    // Set 'finished' if snappy has consumed all user-data
+    if (compressedBuf.position() == compressedBuf.limit()) {
+      finished = true;
+
+      outBuf.limit(outBuf.capacity());
+      outBuf.rewind();
+
+      compressedBuf.limit(compressedBuf.capacity());
+      compressedBuf.rewind();
+
+    }
+
+    return n;
+  }
+
+  public synchronized void reset() {
+    finish = false;
+    finished = false;
+
+    outBuf.limit(outBuf.capacity());
+    outBuf.rewind();
+
+    compressedBuf.limit(compressedBuf.capacity());
+    compressedBuf.rewind();
+
+    bytesRead = bytesWritten = 0L;
+  }
+
+  public synchronized void reinit(Configuration conf) {
+    reset();
+  }
+
+  /**
+   * Return number of bytes given to this compressor since last reset.
+   */
+  public synchronized long getBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Return number of bytes consumed by callers of compress since last reset.
+   */
+  public synchronized long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  public synchronized void end() {
+  }
+
+}

+ 171 - 0
common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java

@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+public class SnappyDecompressor implements Decompressor {
+
+  private static final Log logger = LogFactory.getLog(SnappyDecompressor.class
+      .getName());
+
+  private boolean finished;
+  private ByteBuffer outBuf;
+  private ByteBuffer uncompressedBuf;
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+
+  public SnappyDecompressor(int bufferSize) {
+    outBuf = ByteBuffer.allocateDirect(bufferSize);
+    uncompressedBuf = ByteBuffer.allocateDirect(bufferSize);
+
+    reset();
+  }
+
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    finished = false;
+
+    outBuf.put(b, off, len);
+
+    bytesRead += len;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  public synchronized boolean needsInput() {
+    // needs input if the uncompressed data was consumed
+    if (uncompressedBuf.position() > 0
+        && uncompressedBuf.limit() > uncompressedBuf.position())
+      return false;
+
+    return true;
+  }
+
+  public synchronized boolean needsDictionary() {
+    return false;
+  }
+
+  public synchronized boolean finished() {
+    return finished;
+  }
+
+  public synchronized int decompress(byte[] b, int off, int len)
+      throws IOException {
+
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // nothing to decompress
+    if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) || finished) {
+      reset();
+      finished = true;
+
+      return 0;
+    }
+
+    // only needs to do this once per input
+    if (uncompressedBuf.position() == 0) {
+      try {
+        outBuf.limit(outBuf.position());
+        outBuf.rewind();
+
+        int neededLen = Snappy.uncompressedLength(outBuf);
+        outBuf.rewind();
+
+        if (neededLen > uncompressedBuf.capacity())
+          uncompressedBuf = ByteBuffer.allocateDirect(neededLen);
+
+        int lim = Snappy.uncompress(outBuf, uncompressedBuf);
+
+        uncompressedBuf.limit(lim);
+        uncompressedBuf.rewind();
+      } catch (SnappyException e) {
+        throw new IOException(e);
+      }
+    }
+
+    int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len
+        : (uncompressedBuf.limit() - uncompressedBuf.position());
+
+    if (n == 0) {
+      reset();
+      finished = true;
+      return 0;
+    }
+
+    uncompressedBuf.get(b, off, n);
+
+    bytesWritten += n;
+
+    // Set 'finished' if snappy has consumed all user-data
+    if (uncompressedBuf.position() == uncompressedBuf.limit()) {
+      reset();
+      finished = true;
+    }
+
+    return n;
+  }
+
+  public synchronized int getRemaining() {
+    // Never use this function in BlockDecompressorStream.
+    return 0;
+  }
+
+  public synchronized void reset() {
+    finished = false;
+
+    uncompressedBuf.limit(uncompressedBuf.capacity());
+    uncompressedBuf.rewind();
+
+    outBuf.limit(outBuf.capacity());
+    outBuf.rewind();
+
+    bytesRead = bytesWritten = 0L;
+  }
+
+  public synchronized void end() {
+    // do nothing
+  }
+
+  protected void finalize() {
+    end();
+  }
+
+}

+ 6 - 0
common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java

@@ -102,6 +102,12 @@ public class TestCodec {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec");
   }
+  
+  @Test
+  public void testSnappyCodec() throws IOException {
+    codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
+    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
+  }
 
   @Test
   public void testGzipCodecWithParam() throws IOException {