Quellcode durchsuchen

HADOOP-7206. Support for Snappy compression. Contributed by Issei Yoshida and Alejandro Abdelnur. Backported 0.23 from trunk/0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1296505 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli vor 13 Jahren
Ursprung
Commit
019dd34d4c

+ 3 - 0
CHANGES.txt

@@ -161,6 +161,9 @@ Release 1.0.2 - unreleased
 
 
   NEW FEATURES
   NEW FEATURES
 
 
+    HADOOP-7206. Support Snappy compression. (Issei Yoshida and
+    Alejandro Abdelnur via vinodkv).
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
   BUG FIXES
   BUG FIXES

+ 26 - 2
build.xml

@@ -207,6 +207,14 @@
   <property name="package.buildroot" value="/tmp/hadoop_package_build_${user.name}"/>
   <property name="package.buildroot" value="/tmp/hadoop_package_build_${user.name}"/>
   <property name="package.build.dir" value="/tmp/hadoop_package_build_${user.name}/BUILD"/>
   <property name="package.build.dir" value="/tmp/hadoop_package_build_${user.name}/BUILD"/>
 
 
+  <!-- Indicate is Snappy native library should be bundled with Hadoop or not -->
+  <property name="bundle.snappy" value="false"/>
+
+  <!-- Snappy native library location -->
+  <property name="snappy.prefix" value="/usr/local"/>
+  <property name="snappy.lib" value="${snappy.prefix}/lib"/>
+  <property name="snappy.include" value="${snappy.prefix}/include"/>
+
   <!-- IVY properteis set here -->
   <!-- IVY properteis set here -->
   <property name="ivy.dir" location="ivy" />
   <property name="ivy.dir" location="ivy" />
   <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
   <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
@@ -298,6 +306,9 @@
   <property name="build.dir.eclipse-test-resources" value="${build.dir.eclipse}/test-resources/"/>
   <property name="build.dir.eclipse-test-resources" value="${build.dir.eclipse}/test-resources/"/>
   <property name="build.dir.eclipse-test-resources-webapps" value="${build.dir.eclipse}/test-resources/webapps"/>
   <property name="build.dir.eclipse-test-resources-webapps" value="${build.dir.eclipse}/test-resources/webapps"/>
 
 
+  <!-- Use environment -->
+  <property environment="env" />
+
   <!-- check if clover reports should be generated -->
   <!-- check if clover reports should be generated -->
   <condition property="clover.enabled">
   <condition property="clover.enabled">
     <and>
     <and>
@@ -603,6 +614,7 @@
   	
   	
     <mkdir dir="${build.native}/lib"/>
     <mkdir dir="${build.native}/lib"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/>
+    <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/snappy"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/io/nativeio"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/io/nativeio"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/security"/>
     <mkdir dir="${build.native}/src/org/apache/hadoop/security"/>
 
 
@@ -616,6 +628,16 @@
       <class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" />
       <class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" />
   	</javah>
   	</javah>
 
 
+    <javah
+      classpath="${build.classes}"
+      destdir="${build.native}/src/org/apache/hadoop/io/compress/snappy"
+      force="yes"
+      verbose="yes"
+      >
+      <class name="org.apache.hadoop.io.compress.snappy.SnappyCompressor"/>
+      <class name="org.apache.hadoop.io.compress.snappy.SnappyDecompressor"/>
+    </javah>
+
         <javah
         <javah
           classpath="${build.classes}"
           classpath="${build.classes}"
           destdir="${build.native}/src/org/apache/hadoop/io/nativeio"
           destdir="${build.native}/src/org/apache/hadoop/io/nativeio"
@@ -647,7 +669,7 @@
 	  <env key="OS_ARCH" value="${os.arch}"/>
 	  <env key="OS_ARCH" value="${os.arch}"/>
 	  <env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/>
 	  <env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/>
 	  <env key="HADOOP_NATIVE_SRCDIR" value="${native.src.dir}"/>
 	  <env key="HADOOP_NATIVE_SRCDIR" value="${native.src.dir}"/>
-	  <arg line="${native.src.dir}/configure"/>
+      <arg line="${native.src.dir}/configure"/>
     </exec>
     </exec>
 
 
     <exec dir="${build.native}" executable="${make.cmd}" failonerror="true">
     <exec dir="${build.native}" executable="${make.cmd}" failonerror="true">
@@ -1043,7 +1065,7 @@
                      value="@{test.krb5.conf.filename}"/>
                      value="@{test.krb5.conf.filename}"/>
         <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
         <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
         <sysproperty key="java.library.path"
         <sysproperty key="java.library.path"
-                     value="${build.native}/lib:${lib.dir}/native/${build.platform}:${lib.file.path}" />
+                     value="${build.native}/lib:${lib.dir}/native/${build.platform}:${lib.file.path}:${snappy.lib}" />
         <sysproperty key="install.c++.examples"
         <sysproperty key="install.c++.examples"
                      value="${install.c++.examples}" />
                      value="${install.c++.examples}" />
         <sysproperty key="testjar"
         <sysproperty key="testjar"
@@ -1498,6 +1520,8 @@
 	  <env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/>
 	  <env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/>
 	  <env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/>
 	  <env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/>
 	  <env key="DIST_LIB_DIR" value="${dist.dir}/lib/native"/>
 	  <env key="DIST_LIB_DIR" value="${dist.dir}/lib/native"/>
+      <env key="BUNDLE_SNAPPY_LIB" value="${bundle.snappy}"/>
+      <env key="SNAPPY_LIB_DIR" value="${snappy.prefix}/lib"/>
 	  <arg line="${native.src.dir}/packageNativeHadoop.sh"/>
 	  <arg line="${native.src.dir}/packageNativeHadoop.sh"/>
     </exec>
     </exec>
 
 

+ 1 - 1
src/core/core-default.xml

@@ -114,7 +114,7 @@
 
 
 <property>
 <property>
   <name>io.compression.codecs</name>
   <name>io.compression.codecs</name>
-  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value>
   <description>A list of the compression codec classes that can be used 
   <description>A list of the compression codec classes that can be used 
                for compression/decompression.</description>
                for compression/decompression.</description>
 </property>
 </property>

+ 12 - 0
src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -52,5 +52,17 @@ public class CommonConfigurationKeys {
                                         "ipc.server.read.threadpool.size";
                                         "ipc.server.read.threadpool.size";
   public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
   public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
 
 
+  public static final String  IO_NATIVE_LIB_AVAILABLE_KEY =
+      "hadoop.native.lib";
+  /** Default value for IO_NATIVE_LIB_AVAILABLE_KEY */
+  public static final boolean IO_NATIVE_LIB_AVAILABLE_DEFAULT = true;
+
+  /** Internal buffer size for Snappy compressor/decompressors */
+  public static final String IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY =
+      "io.compression.codec.snappy.buffersize";
+
+  /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */
+  public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
+      256 * 1024;
 }
 }
 
 

+ 220 - 0
src/core/org/apache/hadoop/io/compress/SnappyCodec.java

@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.snappy.LoadSnappy;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+
+/**
+ * This class creates snappy compressors/decompressors.
+ */
+public class SnappyCodec implements Configurable, CompressionCodec {
+
+  static {
+    LoadSnappy.isLoaded();
+  }
+
+  Configuration conf;
+
+  /**
+   * Set the configuration to be used by this object.
+   *
+   * @param conf the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this objec.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Are the native snappy libraries loaded & initialized?
+   *
+   * @param conf configuration
+   * @return true if loaded & initialized, otherwise false
+   */
+  public static boolean isNativeSnappyLoaded(Configuration conf) {
+    return LoadSnappy.isLoaded() && conf.getBoolean(
+        CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
+        CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT);
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream}.
+   *
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream} with the given {@link Compressor}.
+   *
+   * @param out        the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out,
+                                                    Compressor compressor)
+      throws IOException {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native snappy library not available");
+    }
+    int bufferSize = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
+
+    int compressionOverhead = (bufferSize / 6) + 32;
+
+    return new BlockCompressorStream(out, compressor, bufferSize,
+        compressionOverhead);
+  }
+
+  /**
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native snappy library not available");
+    }
+
+    return SnappyCompressor.class;
+  }
+
+  /**
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
+  @Override
+  public Compressor createCompressor() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native snappy library not available");
+    }
+    int bufferSize = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
+    return new SnappyCompressor(bufferSize);
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * input stream.
+   *
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * {@link InputStream} with the given {@link Decompressor}.
+   *
+   * @param in           the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in,
+                                                  Decompressor decompressor)
+      throws IOException {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native snappy library not available");
+    }
+
+    return new BlockDecompressorStream(in, decompressor, conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
+  }
+
+  /**
+   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native snappy library not available");
+    }
+
+    return SnappyDecompressor.class;
+  }
+
+  /**
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
+  @Override
+  public Decompressor createDecompressor() {
+    if (!isNativeSnappyLoaded(conf)) {
+      throw new RuntimeException("native snappy library not available");
+    }
+    int bufferSize = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
+    return new SnappyDecompressor(bufferSize);
+  }
+
+  /**
+   * Get the default filename extension for this kind of compression.
+   *
+   * @return <code>.snappy</code>.
+   */
+  @Override
+  public String getDefaultExtension() {
+    return ".snappy";
+  }
+}

+ 70 - 0
src/core/org/apache/hadoop/io/compress/snappy/LoadSnappy.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * Determines if Snappy native library is available and loads it if available.
+ */
+public class LoadSnappy {
+  private static final Log LOG = LogFactory.getLog(LoadSnappy.class.getName());
+
+  private static boolean AVAILABLE = false;
+  private static boolean LOADED = false;
+
+  static {
+    try {
+      System.loadLibrary("snappy");
+      LOG.warn("Snappy native library is available");
+      AVAILABLE = true;
+    } catch (UnsatisfiedLinkError ex) {
+      //NOP
+    }
+    boolean hadoopNativeAvailable = NativeCodeLoader.isNativeCodeLoaded();
+    LOADED = AVAILABLE && hadoopNativeAvailable;
+    if (LOADED) {
+      LOG.info("Snappy native library loaded");
+    } else {
+      LOG.warn("Snappy native library not loaded");
+    }
+  }
+
+  /**
+   * Returns if Snappy native library is loaded.
+   *
+   * @return <code>true</code> if Snappy native library is loaded,
+   * <code>false</code> if not.
+   */
+  public static boolean isAvailable() {
+    return AVAILABLE;
+  }
+
+  /**
+   * Returns if Snappy native library is loaded.
+   *
+   * @return <code>true</code> if Snappy native library is loaded,
+   * <code>false</code> if not.
+   */
+  public static boolean isLoaded() {
+    return LOADED;
+  }
+
+}

+ 298 - 0
src/core/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java

@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * A {@link Compressor} based on the snappy compression algorithm.
+ * http://code.google.com/p/snappy/
+ */
+public class SnappyCompressor implements Compressor {
+  private static final Log LOG =
+      LogFactory.getLog(SnappyCompressor.class.getName());
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+  // HACK - Use this as a global lock in the JNI layer
+  @SuppressWarnings({"unchecked", "unused"})
+  private static Class clazz = SnappyCompressor.class;
+
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int uncompressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finish, finished;
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+
+
+  static {
+    if (LoadSnappy.isLoaded()) {
+      // Initialize the native library
+      try {
+        initIDs();
+      } catch (Throwable t) {
+        // Ignore failure to load/initialize snappy
+        LOG.warn(t.toString());
+      }
+    } else {
+      LOG.error("Cannot load " + SnappyCompressor.class.getName() +
+          " without snappy library!");
+    }
+  }
+
+  /**
+   * Creates a new compressor.
+   *
+   * @param directBufferSize size of the direct buffer to be used.
+   */
+  public SnappyCompressor(int directBufferSize) {
+    this.directBufferSize = directBufferSize;
+
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+
+  /**
+   * Creates a new compressor with the default buffer size.
+   */
+  public SnappyCompressor() {
+    this(DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /**
+   * Sets input data for compression.
+   * This should be called whenever #needsInput() returns
+   * <code>true</code> indicating that more input data is required.
+   *
+   * @param b   Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  @Override
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    finished = false;
+
+    if (len > uncompressedDirectBuf.remaining()) {
+      // save data; now !needsInput
+      this.userBuf = b;
+      this.userBufOff = off;
+      this.userBufLen = len;
+    } else {
+      ((ByteBuffer) uncompressedDirectBuf).put(b, off, len);
+      uncompressedDirectBufLen = uncompressedDirectBuf.position();
+    }
+
+    bytesRead += len;
+  }
+
+  /**
+   * If a write would exceed the capacity of the direct buffers, it is set
+   * aside to be loaded by this function while the compressed data are
+   * consumed.
+   */
+  synchronized void setInputFromSavedData() {
+    if (0 >= userBufLen) {
+      return;
+    }
+    finished = false;
+
+    uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+    ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff,
+        uncompressedDirectBufLen);
+
+    // Note how much data is being fed to snappy
+    userBufOff += uncompressedDirectBufLen;
+    userBufLen -= uncompressedDirectBufLen;
+  }
+
+  /**
+   * Does nothing.
+   */
+  @Override
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  /**
+   * Returns true if the input data buffer is empty and
+   * #setInput() should be called to provide more input.
+   *
+   * @return <code>true</code> if the input data buffer is empty and
+   *         #setInput() should be called in order to provide more input.
+   */
+  @Override
+  public synchronized boolean needsInput() {
+    return !(compressedDirectBuf.remaining() > 0
+        || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
+  }
+
+  /**
+   * When called, indicates that compression should end
+   * with the current contents of the input buffer.
+   */
+  @Override
+  public synchronized void finish() {
+    finish = true;
+  }
+
+  /**
+   * Returns true if the end of the compressed
+   * data output stream has been reached.
+   *
+   * @return <code>true</code> if the end of the compressed
+   *         data output stream has been reached.
+   */
+  @Override
+  public synchronized boolean finished() {
+    // Check if all uncompressed data has been consumed
+    return (finish && finished && compressedDirectBuf.remaining() == 0);
+  }
+
+  /**
+   * Fills specified buffer with compressed data. Returns actual number
+   * of bytes of compressed data. A return value of 0 indicates that
+   * needsInput() should be called in order to determine if more input
+   * data is required.
+   *
+   * @param b   Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   */
+  @Override
+  public synchronized int compress(byte[] b, int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // Check if there is compressed data
+    int n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer) compressedDirectBuf).get(b, off, n);
+      bytesWritten += n;
+      return n;
+    }
+
+    // Re-initialize the snappy's output direct-buffer
+    compressedDirectBuf.clear();
+    compressedDirectBuf.limit(0);
+    if (0 == uncompressedDirectBuf.position()) {
+      // No compressed data, so we should have !needsInput or !finished
+      setInputFromSavedData();
+      if (0 == uncompressedDirectBuf.position()) {
+        // Called without data; write nothing
+        finished = true;
+        return 0;
+      }
+    }
+
+    // Compress data
+    n = compressBytesDirect();
+    compressedDirectBuf.limit(n);
+    uncompressedDirectBuf.clear(); // snappy consumes all buffer input
+
+    // Set 'finished' if snapy has consumed all user-data
+    if (0 == userBufLen) {
+      finished = true;
+    }
+
+    // Get atmost 'len' bytes
+    n = Math.min(n, len);
+    bytesWritten += n;
+    ((ByteBuffer) compressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+
+  /**
+   * Resets compressor so that a new set of input data can be processed.
+   */
+  @Override
+  public synchronized void reset() {
+    finish = false;
+    finished = false;
+    uncompressedDirectBuf.clear();
+    uncompressedDirectBufLen = 0;
+    compressedDirectBuf.clear();
+    compressedDirectBuf.limit(0);
+    userBufOff = userBufLen = 0;
+    bytesRead = bytesWritten = 0L;
+  }
+
+  /**
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration
+   *
+   * @param conf Configuration from which new setting are fetched
+   */
+  @Override
+  public synchronized void reinit(Configuration conf) {
+    reset();
+  }
+
+  /**
+   * Return number of bytes given to this compressor since last reset.
+   */
+  @Override
+  public synchronized long getBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Return number of bytes consumed by callers of compress since last reset.
+   */
+  @Override
+  public synchronized long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  /**
+   * Closes the compressor and discards any unprocessed input.
+   */
+  @Override
+  public synchronized void end() {
+  }
+
+  private native static void initIDs();
+
+  private native int compressBytesDirect();
+}

+ 280 - 0
src/core/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java

@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A {@link Decompressor} based on the snappy compression algorithm.
+ * http://code.google.com/p/snappy/
+ */
+public class SnappyDecompressor implements Decompressor {
+  private static final Log LOG =
+      LogFactory.getLog(SnappyCompressor.class.getName());
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+  // HACK - Use this as a global lock in the JNI layer
+  @SuppressWarnings({"unchecked", "unused"})
+  private static Class clazz = SnappyDecompressor.class;
+
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int compressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finished;
+
+  static {
+    if (LoadSnappy.isLoaded()) {
+      // Initialize the native library
+      try {
+        initIDs();
+      } catch (Throwable t) {
+        // Ignore failure to load/initialize snappy
+        LOG.warn(t.toString());
+      }
+    } else {
+      LOG.error("Cannot load " + SnappyDecompressor.class.getName() +
+          " without snappy library!");
+    }
+  }
+
+  /**
+   * Creates a new compressor.
+   *
+   * @param directBufferSize size of the direct buffer to be used.
+   */
+  public SnappyDecompressor(int directBufferSize) {
+    this.directBufferSize = directBufferSize;
+
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+
+  }
+
+  /**
+   * Creates a new decompressor with the default buffer size.
+   */
+  public SnappyDecompressor() {
+    this(DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /**
+   * Sets input data for decompression.
+   * This should be called if and only if {@link #needsInput()} returns
+   * <code>true</code> indicating that more input data is required.
+   * (Both native and non-native versions of various Decompressors require
+   * that the data passed in via <code>b[]</code> remain unmodified until
+   * the caller is explicitly notified--via {@link #needsInput()}--that the
+   * buffer may be safely modified.  With this requirement, an extra
+   * buffer-copy can be avoided.)
+   *
+   * @param b   Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  @Override
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+
+    setInputFromSavedData();
+
+    // Reinitialize snappy's output direct-buffer
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+
+  /**
+   * If a write would exceed the capacity of the direct buffers, it is set
+   * aside to be loaded by this function while the compressed data are
+   * consumed.
+   */
+  synchronized void setInputFromSavedData() {
+    compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+
+    // Reinitialize snappy's input direct buffer
+    compressedDirectBuf.rewind();
+    ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff,
+        compressedDirectBufLen);
+
+    // Note how much data is being fed to snappy
+    userBufOff += compressedDirectBufLen;
+    userBufLen -= compressedDirectBufLen;
+  }
+
+  /**
+   * Does nothing.
+   */
+  @Override
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  /**
+   * Returns true if the input data buffer is empty and
+   * {@link #setInput(byte[], int, int)} should be called to
+   * provide more input.
+   *
+   * @return <code>true</code> if the input data buffer is empty and
+   *         {@link #setInput(byte[], int, int)} should be called in
+   *         order to provide more input.
+   */
+  @Override
+  public synchronized boolean needsInput() {
+    // Consume remaining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Check if snappy has consumed all input
+    if (compressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Returns <code>false</code>.
+   *
+   * @return <code>false</code>.
+   */
+  @Override
+  public synchronized boolean needsDictionary() {
+    return false;
+  }
+
+  /**
+   * Returns true if the end of the decompressed
+   * data output stream has been reached.
+   *
+   * @return <code>true</code> if the end of the decompressed
+   *         data output stream has been reached.
+   */
+  @Override
+  public synchronized boolean finished() {
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  /**
+   * Fills specified buffer with uncompressed data. Returns actual number
+   * of bytes of uncompressed data. A return value of 0 indicates that
+   * {@link #needsInput()} should be called in order to determine if more
+   * input data is required.
+   *
+   * @param b   Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   * @throws IOException
+   */
+  @Override
+  public synchronized int decompress(byte[] b, int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    int n = 0;
+
+    // Check if there is uncompressed data
+    n = uncompressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer) uncompressedDirectBuf).get(b, off, n);
+      return n;
+    }
+    if (compressedDirectBufLen > 0) {
+      // Re-initialize the snappy's output direct buffer
+      uncompressedDirectBuf.rewind();
+      uncompressedDirectBuf.limit(directBufferSize);
+
+      // Decompress data
+      n = decompressBytesDirect();
+      uncompressedDirectBuf.limit(n);
+
+      if (userBufLen <= 0) {
+        finished = true;
+      }
+
+      // Get atmost 'len' bytes
+      n = Math.min(n, len);
+      ((ByteBuffer) uncompressedDirectBuf).get(b, off, n);
+    }
+
+    return n;
+  }
+
+  /**
+   * Returns <code>0</code>.
+   *
+   * @return <code>0</code>.
+   */
+  @Override
+  public synchronized int getRemaining() {
+    // Never use this function in BlockDecompressorStream.
+    return 0;
+  }
+
+  public synchronized void reset() {
+    finished = false;
+    compressedDirectBufLen = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+
+  /**
+   * Resets decompressor and input and output buffers so that a new set of
+   * input data can be processed.
+   */
+  @Override
+  public synchronized void end() {
+    // do nothing
+  }
+
+  private native static void initIDs();
+
+  private native int decompressBytesDirect();
+}

+ 3 - 0
src/native/Makefile.am

@@ -33,6 +33,7 @@ export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z])
 
 
 AM_CPPFLAGS = @JNI_CPPFLAGS@ -I$(HADOOP_NATIVE_SRCDIR)/src \
 AM_CPPFLAGS = @JNI_CPPFLAGS@ -I$(HADOOP_NATIVE_SRCDIR)/src \
               -Isrc/org/apache/hadoop/io/compress/zlib \
               -Isrc/org/apache/hadoop/io/compress/zlib \
+              -Isrc/org/apache/hadoop/io/compress/snappy \
               -Isrc/org/apache/hadoop/io/nativeio \
               -Isrc/org/apache/hadoop/io/nativeio \
               -Isrc/org/apache/hadoop/security
               -Isrc/org/apache/hadoop/security
 AM_LDFLAGS = @JNI_LDFLAGS@ -m$(JVM_DATA_MODEL)
 AM_LDFLAGS = @JNI_LDFLAGS@ -m$(JVM_DATA_MODEL)
@@ -41,6 +42,8 @@ AM_CFLAGS = -g -Wall -fPIC -O2 -m$(JVM_DATA_MODEL)
 lib_LTLIBRARIES = libhadoop.la
 lib_LTLIBRARIES = libhadoop.la
 libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \
 libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \
                        src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \
                        src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \
+                       src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c \
+                       src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c \
                        src/org/apache/hadoop/security/getGroup.c \
                        src/org/apache/hadoop/security/getGroup.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \

+ 3 - 0
src/native/configure.ac

@@ -87,6 +87,9 @@ AC_SUBST([JNI_CPPFLAGS])
 dnl Check for zlib headers
 dnl Check for zlib headers
 AC_CHECK_HEADERS([zlib.h zconf.h], AC_COMPUTE_NEEDED_DSO(z,HADOOP_ZLIB_LIBRARY), AC_MSG_ERROR(Zlib headers were not found... native-hadoop library needs zlib to build. Please install the requisite zlib development package.))
 AC_CHECK_HEADERS([zlib.h zconf.h], AC_COMPUTE_NEEDED_DSO(z,HADOOP_ZLIB_LIBRARY), AC_MSG_ERROR(Zlib headers were not found... native-hadoop library needs zlib to build. Please install the requisite zlib development package.))
 
 
+dnl Check for snappy headers
+AC_CHECK_HEADERS([snappy-c.h], AC_COMPUTE_NEEDED_DSO(snappy,HADOOP_SNAPPY_LIBRARY), AC_MSG_WARN(Snappy headers were not found... building without snappy.))
+
 dnl Check for headers needed by the native Group resolution implementation
 dnl Check for headers needed by the native Group resolution implementation
 AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.))
 AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.))
 
 

+ 13 - 0
src/native/packageNativeHadoop.sh

@@ -62,4 +62,17 @@ then
   done  
   done  
 fi
 fi
 
 
+if [ "${BUNDLE_SNAPPY_LIB}" = "true" ]
+then
+ if [ -d ${SNAPPY_LIB_DIR} ]
+ then
+   echo "Copying Snappy library in ${SNAPPY_LIB_DIR} to $DIST_LIB_DIR/"
+   cd ${SNAPPY_LIB_DIR}
+   $TAR . | (cd $DIST_LIB_DIR/; $UNTAR)
+ else
+   echo "Snappy lib directory ${SNAPPY_LIB_DIR} does not exist"
+   exit 1
+ fi
+fi
+
 #vim: ts=2: sw=2: et
 #vim: ts=2: sw=2: et

+ 127 - 0
src/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c

@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined HAVE_CONFIG_H
+  #include <config.h>
+#endif
+
+#if defined HADOOP_SNAPPY_LIBRARY
+
+#if defined HAVE_STDIO_H
+  #include <stdio.h>
+#else
+  #error 'stdio.h not found'
+#endif
+
+#if defined HAVE_STDLIB_H
+  #include <stdlib.h>
+#else
+  #error 'stdlib.h not found'
+#endif
+
+#if defined HAVE_STRING_H
+  #include <string.h>
+#else
+  #error 'string.h not found'
+#endif
+
+#if defined HAVE_DLFCN_H
+  #include <dlfcn.h>
+#else
+  #error 'dlfcn.h not found'
+#endif
+
+#include "org_apache_hadoop_io_compress_snappy.h"
+#include "org_apache_hadoop_io_compress_snappy_SnappyCompressor.h"
+
+static jfieldID SnappyCompressor_clazz;
+static jfieldID SnappyCompressor_uncompressedDirectBuf;
+static jfieldID SnappyCompressor_uncompressedDirectBufLen;
+static jfieldID SnappyCompressor_compressedDirectBuf;
+static jfieldID SnappyCompressor_directBufferSize;
+
+static snappy_status (*dlsym_snappy_compress)(const char*, size_t, char*, size_t*);
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_initIDs
+(JNIEnv *env, jclass clazz){
+
+  // Load libsnappy.so
+  void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+  if (!libsnappy) {
+    char* msg = (char*)malloc(1000);
+    snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror());
+    THROW(env, "java/lang/UnsatisfiedLinkError", msg);
+    return;
+  }
+
+  // Locate the requisite symbols from libsnappy.so
+  dlerror();                                 // Clear any existing error
+  LOAD_DYNAMIC_SYMBOL(dlsym_snappy_compress, env, libsnappy, "snappy_compress");
+
+  SnappyCompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
+                                                 "Ljava/lang/Class;");
+  SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
+                                                           "uncompressedDirectBuf",
+                                                           "Ljava/nio/Buffer;");
+  SnappyCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz,
+                                                              "uncompressedDirectBufLen", "I");
+  SnappyCompressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz,
+                                                         "compressedDirectBuf",
+                                                         "Ljava/nio/Buffer;");
+  SnappyCompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
+                                                       "directBufferSize", "I");
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_compressBytesDirect
+(JNIEnv *env, jobject thisj){
+  // Get members of SnappyCompressor
+  jobject clazz = (*env)->GetStaticObjectField(env, thisj, SnappyCompressor_clazz);
+  jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf);
+  jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen);
+  jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf);
+  jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_directBufferSize);
+
+  // Get the input direct buffer
+  LOCK_CLASS(env, clazz, "SnappyCompressor");
+  const char* uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "SnappyCompressor");
+
+  if (uncompressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  // Get the output direct buffer
+  LOCK_CLASS(env, clazz, "SnappyCompressor");
+  char* compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "SnappyCompressor");
+
+  if (compressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  snappy_status ret = dlsym_snappy_compress(uncompressed_bytes, uncompressed_direct_buf_len, compressed_bytes, &compressed_direct_buf_len);
+  if (ret != SNAPPY_OK){
+    THROW(env, "Ljava/lang/InternalError", "Could not compress data. Buffer length is too small.");
+  }
+
+  (*env)->SetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen, 0);
+
+  return (jint)compressed_direct_buf_len;
+}
+
+#endif //define HADOOP_SNAPPY_LIBRARY

+ 131 - 0
src/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c

@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined HAVE_CONFIG_H
+  #include <config.h>
+#endif
+
+#if defined HADOOP_SNAPPY_LIBRARY
+
+#if defined HAVE_STDIO_H
+  #include <stdio.h>
+#else
+  #error 'stdio.h not found'
+#endif
+
+#if defined HAVE_STDLIB_H
+  #include <stdlib.h>
+#else
+  #error 'stdlib.h not found'
+#endif
+
+#if defined HAVE_STRING_H
+  #include <string.h>
+#else
+  #error 'string.h not found'
+#endif
+
+#if defined HAVE_DLFCN_H
+  #include <dlfcn.h>
+#else
+  #error 'dlfcn.h not found'
+#endif
+
+#include "org_apache_hadoop_io_compress_snappy.h"
+#include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h"
+
+static jfieldID SnappyDecompressor_clazz;
+static jfieldID SnappyDecompressor_compressedDirectBuf;
+static jfieldID SnappyDecompressor_compressedDirectBufLen;
+static jfieldID SnappyDecompressor_uncompressedDirectBuf;
+static jfieldID SnappyDecompressor_directBufferSize;
+
+static snappy_status (*dlsym_snappy_uncompress)(const char*, size_t, char*, size_t*);
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_initIDs
+(JNIEnv *env, jclass clazz){
+
+  // Load libsnappy.so
+  void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+  if (!libsnappy) {
+    char* msg = (char*)malloc(1000);
+    snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror());
+    THROW(env, "java/lang/UnsatisfiedLinkError", msg);
+    return;
+  }
+
+  // Locate the requisite symbols from libsnappy.so
+  dlerror();                                 // Clear any existing error
+  LOAD_DYNAMIC_SYMBOL(dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress");
+
+  SnappyDecompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
+                                                   "Ljava/lang/Class;");
+  SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
+                                                           "compressedDirectBuf",
+                                                           "Ljava/nio/Buffer;");
+  SnappyDecompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz,
+                                                              "compressedDirectBufLen", "I");
+  SnappyDecompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz,
+                                                             "uncompressedDirectBuf",
+                                                             "Ljava/nio/Buffer;");
+  SnappyDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
+                                                         "directBufferSize", "I");
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_decompressBytesDirect
+(JNIEnv *env, jobject thisj){
+  // Get members of SnappyDecompressor
+  jobject clazz = (*env)->GetStaticObjectField(env,thisj, SnappyDecompressor_clazz);
+  jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf);
+  jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen);
+  jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf);
+  size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize);
+
+  // Get the input direct buffer
+  LOCK_CLASS(env, clazz, "SnappyDecompressor");
+  const char* compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "SnappyDecompressor");
+
+  if (compressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  // Get the output direct buffer
+  LOCK_CLASS(env, clazz, "SnappyDecompressor");
+  char* uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "SnappyDecompressor");
+
+  if (uncompressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  snappy_status ret = dlsym_snappy_uncompress(compressed_bytes, compressed_direct_buf_len, uncompressed_bytes, &uncompressed_direct_buf_len);
+  if (ret == SNAPPY_BUFFER_TOO_SMALL){
+    THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Buffer length is too small.");
+  } else if (ret == SNAPPY_INVALID_INPUT){
+    THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Input is invalid.");
+  } else if (ret != SNAPPY_OK){
+    THROW(env, "Ljava/lang/InternalError", "Could not decompress data.");
+  }
+
+  (*env)->SetIntField(env, thisj, SnappyDecompressor_compressedDirectBufLen, 0);
+
+  return (jint)uncompressed_direct_buf_len;
+}
+
+#endif //define HADOOP_SNAPPY_LIBRARY

+ 58 - 0
src/native/src/org/apache/hadoop/io/compress/snappy/org_apache_hadoop_io_compress_snappy.h

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H
+#define ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H
+
+
+#if defined HAVE_CONFIG_H
+  #include <config.h>
+#endif
+
+#if defined HADOOP_SNAPPY_LIBRARY
+
+  #if defined HAVE_STDDEF_H
+    #include <stddef.h>
+  #else
+    #error 'stddef.h not found'
+  #endif
+
+  #if defined HAVE_SNAPPY_C_H
+    #include <snappy-c.h>
+  #else
+    #error 'Please install snappy-development packages for your platform.'
+  #endif
+
+  #if defined HAVE_DLFCN_H
+    #include <dlfcn.h>
+  #else
+    #error "dlfcn.h not found"
+  #endif
+
+  #if defined HAVE_JNI_H
+    #include <jni.h>
+  #else
+    #error 'jni.h not found'
+  #endif
+
+  #include "org_apache_hadoop.h"
+
+#endif //define HADOOP_SNAPPY_LIBRARY
+
+#endif //ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H

+ 16 - 5
src/test/org/apache/hadoop/io/compress/TestCodec.java

@@ -42,7 +42,6 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -50,18 +49,17 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RandomDatum;
 import org.apache.hadoop.io.RandomDatum;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressorStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.snappy.LoadSnappy;
 import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.ReflectionUtils;
 
 
 public class TestCodec extends TestCase {
 public class TestCodec extends TestCase {
 
 
@@ -86,6 +84,19 @@ public class TestCodec extends TestCase {
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
   }
   }
 
 
+  
+  public void testSnappyCodec() throws IOException {
+    if (LoadSnappy.isAvailable()) {
+      if (LoadSnappy.isLoaded()) {
+        codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
+        codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
+      }
+      else {
+        fail("Snappy native available but Hadoop native not");
+      }
+    }
+  }
+
   public void testGzipCodecWithParam() throws IOException {
   public void testGzipCodecWithParam() throws IOException {
     Configuration conf = new Configuration(this.conf);
     Configuration conf = new Configuration(this.conf);
     ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
     ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);