Browse Source

HADOOP-8462. Native-code implementation of bzip2 codec. Contributed by Govind Kamat

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1453608 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 years ago
parent
commit
85470f0a33

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -1508,6 +1508,9 @@ Release 0.23.7 - UNRELEASED
 
   OPTIMIZATIONS
 
+    HADOOP-8462. Native-code implementation of bzip2 codec. (Govind Kamat via
+    jlowe)
+
   BUG FIXES
 
     HADOOP-9302. HDFS docs not linked from top level (Andy Isaacson via

+ 4 - 1
hadoop-common-project/hadoop-common/pom.xml

@@ -464,6 +464,7 @@
         <activeByDefault>false</activeByDefault>
       </activation>
       <properties>
+        <require.bzip2>false</require.bzip2>
         <snappy.prefix></snappy.prefix>
         <snappy.lib></snappy.lib>
         <snappy.include></snappy.include>
@@ -507,6 +508,8 @@
                   <javahClassNames>
                     <javahClassName>org.apache.hadoop.io.compress.zlib.ZlibCompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.zlib.ZlibDecompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Compressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
@@ -532,7 +535,7 @@
                 <configuration>
                   <target>
                     <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
-                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include}"/>
+                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_BZIP2=${require.bzip2} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include}"/>
                     </exec>
                     <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
                       <arg line="VERBOSE=1"/>

+ 19 - 0
hadoop-common-project/hadoop-common/src/CMakeLists.txt

@@ -97,6 +97,23 @@ set(T main/native/src/test/org/apache/hadoop)
 
 GET_FILENAME_COMPONENT(HADOOP_ZLIB_LIBRARY ${ZLIB_LIBRARIES} NAME)
 
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_package(BZip2 QUIET)
+if (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
+    GET_FILENAME_COMPONENT(HADOOP_BZIP2_LIBRARY ${BZIP2_LIBRARIES} NAME)
+    set(BZIP2_SOURCE_FILES
+          "${D}/io/compress/bzip2/Bzip2Compressor.c"
+          "${D}/io/compress/bzip2/Bzip2Decompressor.c")
+else (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
+    set(BZIP2_SOURCE_FILES "")
+    set(BZIP2_INCLUDE_DIR "")
+    IF(REQUIRE_BZIP2)
+        MESSAGE(FATAL_ERROR "Required bzip2 library and/or header files could not be found.")
+    ENDIF(REQUIRE_BZIP2)
+endif (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
 INCLUDE(CheckFunctionExists)
 INCLUDE(CheckCSourceCompiles)
 INCLUDE(CheckLibraryExists)
@@ -136,6 +153,7 @@ include_directories(
     ${CMAKE_BINARY_DIR}
     ${JNI_INCLUDE_DIRS}
     ${ZLIB_INCLUDE_DIRS}
+    ${BZIP2_INCLUDE_DIR}
     ${SNAPPY_INCLUDE_DIR}
     ${D}/util
 )
@@ -155,6 +173,7 @@ add_dual_library(hadoop
     ${SNAPPY_SOURCE_FILES}
     ${D}/io/compress/zlib/ZlibCompressor.c
     ${D}/io/compress/zlib/ZlibDecompressor.c
+    ${BZIP2_SOURCE_FILES}
     ${D}/io/nativeio/NativeIO.c
     ${D}/io/nativeio/errno_enum.c
     ${D}/io/nativeio/file_descriptor.c

+ 1 - 0
hadoop-common-project/hadoop-common/src/config.h.cmake

@@ -19,6 +19,7 @@
 #define CONFIG_H
 
 #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
+#cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
 #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
 #cmakedefine HAVE_SYNC_FILE_RANGE
 #cmakedefine HAVE_POSIX_FADVISE

+ 106 - 69
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

@@ -23,108 +23,156 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
-import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
-import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
 
 /**
- * This class provides CompressionOutputStream and CompressionInputStream for
- * compression and decompression. Currently we dont have an implementation of
- * the Compressor and Decompressor interfaces, so those methods of
- * CompressionCodec which have a Compressor or Decompressor type argument, throw
- * UnsupportedOperationException.
+ * This class provides output and input streams for bzip2 compression
+ * and decompression.  It uses the native bzip2 library on the system
+ * if possible, else it uses a pure-Java implementation of the bzip2
+ * algorithm.  The configuration parameter
+ * io.compression.codec.bzip2.library can be used to control this
+ * behavior.
+ *
+ * In the pure-Java mode, the Compressor and Decompressor interfaces
+ * are not implemented.  Therefore, in that mode, those methods of
+ * CompressionCodec which have a Compressor or Decompressor type
+ * argument, throw UnsupportedOperationException.
+ *
+ * Currently, support for splittability is available only in the
+ * pure-Java mode; therefore, if a SplitCompressionInputStream is
+ * requested, the pure-Java implementation is used, regardless of the
+ * setting of the configuration parameter mentioned above.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class BZip2Codec implements SplittableCompressionCodec {
+public class BZip2Codec implements Configurable, SplittableCompressionCodec {
 
   private static final String HEADER = "BZ";
   private static final int HEADER_LEN = HEADER.length();
   private static final String SUB_HEADER = "h9";
   private static final int SUB_HEADER_LEN = SUB_HEADER.length();
 
+  private Configuration conf;
+  
   /**
-  * Creates a new instance of BZip2Codec
+   * Set the configuration to be used by this object.
+   *
+   * @param conf the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this objec.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  /**
+  * Creates a new instance of BZip2Codec.
   */
   public BZip2Codec() { }
 
   /**
-  * Creates CompressionOutputStream for BZip2
-  *
-  * @param out
-  *            The output Stream
-  * @return The BZip2 CompressionOutputStream
-  * @throws java.io.IOException
-  *             Throws IO exception
-  */
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream}.
+   *
+   * @param out        the location for the final output stream
+   * @return a stream the user can write uncompressed data to, to have it 
+   *         compressed
+   * @throws IOException
+   */
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out)
       throws IOException {
-    return new BZip2CompressionOutputStream(out);
+    return createOutputStream(out, createCompressor());
   }
 
   /**
-  * Creates a compressor using given OutputStream.
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream} with the given {@link Compressor}.
    *
-  * @return CompressionOutputStream
-    @throws java.io.IOException
+   * @param out        the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to, to have it 
+   *         compressed
+   * @throws IOException
    */
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out,
       Compressor compressor) throws IOException {
-    return createOutputStream(out);
+    return Bzip2Factory.isNativeBzip2Loaded(conf) ?
+      new CompressorStream(out, compressor, 
+                           conf.getInt("io.file.buffer.size", 4*1024)) :
+      new BZip2CompressionOutputStream(out);
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return BZip2DummyCompressor.class
-  */
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
   @Override
-  public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
-    return BZip2DummyCompressor.class;
+  public Class<? extends Compressor> getCompressorType() {
+    return Bzip2Factory.getBzip2CompressorType(conf);
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return Compressor
-  */
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
   @Override
   public Compressor createCompressor() {
-    return new BZip2DummyCompressor();
+    return Bzip2Factory.getBzip2Compressor(conf);
   }
 
   /**
-  * Creates CompressionInputStream to be used to read off uncompressed data.
-  *
-  * @param in
-  *            The InputStream
-  * @return Returns CompressionInputStream for BZip2
-  * @throws java.io.IOException
-  *             Throws IOException
-  */
+   * Create a {@link CompressionInputStream} that will read from the given
+   * input stream and return a stream for uncompressed data.
+   *
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
   @Override
   public CompressionInputStream createInputStream(InputStream in)
       throws IOException {
-    return new BZip2CompressionInputStream(in);
+    return createInputStream(in, createDecompressor());
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return CompressionInputStream
-  */
+   * Create a {@link CompressionInputStream} that will read from the given
+   * {@link InputStream} with the given {@link Decompressor}, and return a 
+   * stream for uncompressed data.
+   *
+   * @param in           the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
   @Override
   public CompressionInputStream createInputStream(InputStream in,
       Decompressor decompressor) throws IOException {
-    return createInputStream(in);
+    return Bzip2Factory.isNativeBzip2Loaded(conf) ? 
+      new DecompressorStream(in, decompressor,
+                             conf.getInt("io.file.buffer.size", 4*1024)) :
+      new BZip2CompressionInputStream(in);
   }
 
   /**
@@ -139,7 +187,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
    *
    * @return CompressionInputStream for BZip2 aligned at block boundaries
    */
-  @Override
   public SplitCompressionInputStream createInputStream(InputStream seekableIn,
       Decompressor decompressor, long start, long end, READ_MODE readMode)
       throws IOException {
@@ -184,23 +231,23 @@ public class BZip2Codec implements SplittableCompressionCodec {
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return BZip2DummyDecompressor.class
-  */
+   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
   @Override
-  public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
-    return BZip2DummyDecompressor.class;
+  public Class<? extends Decompressor> getDecompressorType() {
+    return Bzip2Factory.getBzip2DecompressorType(conf);
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return Decompressor
-  */
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
   @Override
   public Decompressor createDecompressor() {
-    return new BZip2DummyDecompressor();
+    return Bzip2Factory.getBzip2Decompressor(conf);
   }
 
   /**
@@ -236,7 +283,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
       }
     }
 
-    @Override
     public void finish() throws IOException {
       if (needsReset) {
         // In the case that nothing is written to this stream, we still need to
@@ -256,14 +302,12 @@ public class BZip2Codec implements SplittableCompressionCodec {
       }
     }    
     
-    @Override
     public void resetState() throws IOException {
       // Cannot write to out at this point because out might not be ready
       // yet, as in SequenceFile.Writer implementation.
       needsReset = true;
     }
 
-    @Override
     public void write(int b) throws IOException {
       if (needsReset) {
         internalReset();
@@ -271,7 +315,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
       this.output.write(b);
     }
 
-    @Override
     public void write(byte[] b, int off, int len) throws IOException {
       if (needsReset) {
         internalReset();
@@ -279,7 +322,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
       this.output.write(b, off, len);
     }
 
-    @Override
     public void close() throws IOException {
       if (needsReset) {
         // In the case that nothing is written to this stream, we still need to
@@ -397,7 +439,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
 
     }// end of method
 
-    @Override
     public void close() throws IOException {
       if (!needsReset) {
         input.close();
@@ -433,7 +474,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
     *
     */
 
-    @Override
     public int read(byte[] b, int off, int len) throws IOException {
       if (needsReset) {
         internalReset();
@@ -457,7 +497,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
 
     }
 
-    @Override
     public int read() throws IOException {
       byte b[] = new byte[1];
       int result = this.read(b, 0, 1);
@@ -472,7 +511,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
       }
     }    
     
-    @Override
     public void resetState() throws IOException {
       // Cannot read from bufferedIn at this point because bufferedIn
       // might not be ready
@@ -480,7 +518,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
       needsReset = true;
     }
 
-    @Override
     public long getPos() {
       return this.compressedStreamPosition;
       }

+ 301 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java

@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link Compressor} based on the popular 
+ * bzip2 compression algorithm.
+ * http://www.bzip2.org/
+ * 
+ */
+public class Bzip2Compressor implements Compressor {
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
+
+  // The default values for the block size and work factor are the same 
+  // those in Julian Seward's original bzip2 implementation.
+  static final int DEFAULT_BLOCK_SIZE = 9;
+  static final int DEFAULT_WORK_FACTOR = 30;
+
+  private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class);
+
+  // HACK - Use this as a global lock in the JNI layer.
+  private static Class<Bzip2Compressor> clazz = Bzip2Compressor.class;
+
+  private long stream;
+  private int blockSize;
+  private int workFactor;
+  private int directBufferSize;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private Buffer uncompressedDirectBuf = null;
+  private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+  private boolean keepUncompressedBuf = false;
+  private Buffer compressedDirectBuf = null;
+  private boolean finish, finished;
+
+  /**
+   * Creates a new compressor with a default values for the
+   * compression block size and work factor.  Compressed data will be
+   * generated in bzip2 format.
+   */
+  public Bzip2Compressor() {
+    this(DEFAULT_BLOCK_SIZE, DEFAULT_WORK_FACTOR, DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /**
+   * Creates a new compressor, taking settings from the configuration.
+   */
+  public Bzip2Compressor(Configuration conf) {
+    this(Bzip2Factory.getBlockSize(conf),
+         Bzip2Factory.getWorkFactor(conf),
+         DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /** 
+   * Creates a new compressor using the specified block size.
+   * Compressed data will be generated in bzip2 format.
+   * 
+   * @param blockSize The block size to be used for compression.  This is
+   *        an integer from 1 through 9, which is multiplied by 100,000 to 
+   *        obtain the actual block size in bytes.
+   * @param workFactor This parameter is a threshold that determines when a 
+   *        fallback algorithm is used for pathological data.  It ranges from
+   *        0 to 250.
+   * @param directBufferSize Size of the direct buffer to be used.
+   */
+  public Bzip2Compressor(int blockSize, int workFactor, 
+                         int directBufferSize) {
+    this.blockSize = blockSize;
+    this.workFactor = workFactor;
+    this.directBufferSize = directBufferSize;
+    stream = init(blockSize, workFactor);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+
+  /**
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration. It will reset the compressor's block size and
+   * and work factor.
+   * 
+   * @param conf Configuration storing new settings
+   */
+  @Override
+  public synchronized void reinit(Configuration conf) {
+    reset();
+    end(stream);
+    if (conf == null) {
+      stream = init(blockSize, workFactor);
+      return;
+    }
+    blockSize = Bzip2Factory.getBlockSize(conf);
+    workFactor = Bzip2Factory.getWorkFactor(conf);
+    stream = init(blockSize, workFactor);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Reinit compressor with new compression configuration");
+    }
+  }
+
+  @Override
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    uncompressedDirectBufOff = 0;
+    setInputFromSavedData();
+    
+    // Reinitialize bzip2's output direct buffer.
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+  
+  // Copy enough data from userBuf to uncompressedDirectBuf.
+  synchronized void setInputFromSavedData() {
+    int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
+    userBufLen -= len;
+    userBufOff += len;
+    uncompressedDirectBufLen = uncompressedDirectBuf.position();
+  }
+
+  @Override
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized boolean needsInput() {
+    // Compressed data still available?
+    if (compressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Uncompressed data available in either the direct buffer or user buffer?
+    if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
+      return false;
+    
+    if (uncompressedDirectBuf.remaining() > 0) {
+      // Check if we have consumed all data in the user buffer.
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        // Copy enough data from userBuf to uncompressedDirectBuf.
+        setInputFromSavedData();
+        return uncompressedDirectBuf.remaining() > 0;
+      }
+    }
+    
+    return false;
+  }
+  
+  @Override
+  public synchronized void finish() {
+    finish = true;
+  }
+  
+  @Override
+  public synchronized boolean finished() {
+    // Check if bzip2 says it has finished and
+    // all compressed data has been consumed.
+    return (finished && compressedDirectBuf.remaining() == 0);
+  }
+
+  @Override
+  public synchronized int compress(byte[] b, int off, int len) 
+    throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    // Check if there is compressed data.
+    int n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+      return n;
+    }
+
+    // Re-initialize bzip2's output direct buffer.
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.limit(directBufferSize);
+
+    // Compress the data.
+    n = deflateBytesDirect();
+    compressedDirectBuf.limit(n);
+    
+    // Check if bzip2 has consumed the entire input buffer.
+    // Set keepUncompressedBuf properly.
+    if (uncompressedDirectBufLen <= 0) { // bzip2 consumed all input
+      keepUncompressedBuf = false;
+      uncompressedDirectBuf.clear();
+      uncompressedDirectBufOff = 0;
+      uncompressedDirectBufLen = 0;
+    } else {
+      keepUncompressedBuf = true;
+    }
+
+    // Get at most 'len' bytes.
+    n = Math.min(n, len);
+    ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+
+  /**
+   * Returns the total number of compressed bytes output so far.
+   *
+   * @return the total (non-negative) number of compressed bytes output so far
+   */
+  @Override
+  public synchronized long getBytesWritten() {
+    checkStream();
+    return getBytesWritten(stream);
+  }
+
+  /**
+   * Returns the total number of uncompressed bytes input so far.</p>
+   *
+   * @return the total (non-negative) number of uncompressed bytes input so far
+   */
+  @Override
+  public synchronized long getBytesRead() {
+    checkStream();
+    return getBytesRead(stream);
+  }
+
+  @Override
+  public synchronized void reset() {
+    checkStream();
+    end(stream);
+    stream = init(blockSize, workFactor);
+    finish = false;
+    finished = false;
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+    keepUncompressedBuf = false;
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+  
+  @Override
+  public synchronized void end() {
+    if (stream != 0) {
+      end(stream);
+      stream = 0;
+    }
+  }
+  
+  static void initSymbols(String libname) {
+    initIDs(libname);
+  }
+
+  private void checkStream() {
+    if (stream == 0)
+      throw new NullPointerException();
+  }
+  
+  private native static void initIDs(String libname);
+  private native static long init(int blockSize, int workFactor);
+  private native int deflateBytesDirect();
+  private native static long getBytesRead(long strm);
+  private native static long getBytesWritten(long strm);
+  private native static void end(long strm);
+}

+ 250 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java

@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link Decompressor} based on the popular 
+ * bzip2 compression algorithm.
+ * http://www.bzip2.org/
+ * 
+ */
+public class Bzip2Decompressor implements Decompressor {
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
+  
+  private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class);
+
+  // HACK - Use this as a global lock in the JNI layer.
+  private static Class<Bzip2Decompressor> clazz = Bzip2Decompressor.class;
+  
+  private long stream;
+  private boolean conserveMemory;
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int compressedDirectBufOff, compressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finished;
+
+  /**
+   * Creates a new decompressor.
+   */
+  public Bzip2Decompressor(boolean conserveMemory, int directBufferSize) {
+    this.conserveMemory = conserveMemory;
+    this.directBufferSize = directBufferSize;
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    
+    stream = init(conserveMemory ? 1 : 0);
+  }
+  
+  public Bzip2Decompressor() {
+    this(false, DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  @Override
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    
+    setInputFromSavedData();
+    
+    // Reinitialize bzip2's output direct buffer.
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+  
+  synchronized void setInputFromSavedData() {
+    compressedDirectBufOff = 0;
+    compressedDirectBufLen = userBufLen;
+    if (compressedDirectBufLen > directBufferSize) {
+      compressedDirectBufLen = directBufferSize;
+    }
+
+    // Reinitialize bzip2's input direct buffer.
+    compressedDirectBuf.rewind();
+    ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff, 
+                                          compressedDirectBufLen);
+    
+    // Note how much data is being fed to bzip2.
+    userBufOff += compressedDirectBufLen;
+    userBufLen -= compressedDirectBufLen;
+  }
+
+  @Override
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized boolean needsInput() {
+    // Consume remaining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+    
+    // Check if bzip2 has consumed all input.
+    if (compressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input.
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    
+    return false;
+  }
+
+  @Override
+  public synchronized boolean needsDictionary() {
+    return false;
+  }
+
+  @Override
+  public synchronized boolean finished() {
+    // Check if bzip2 says it has finished and
+    // all compressed data has been consumed.
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  @Override
+  public synchronized int decompress(byte[] b, int off, int len) 
+    throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    // Check if there is uncompressed data.
+    int n = uncompressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+      return n;
+    }
+    
+    // Re-initialize bzip2's output direct buffer.
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBuf.limit(directBufferSize);
+
+    // Decompress the data.
+    n = finished ? 0 : inflateBytesDirect();
+    uncompressedDirectBuf.limit(n);
+
+    // Get at most 'len' bytes.
+    n = Math.min(n, len);
+    ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+  
+  /**
+   * Returns the total number of uncompressed bytes output so far.
+   *
+   * @return the total (non-negative) number of uncompressed bytes output so far
+   */
+  public synchronized long getBytesWritten() {
+    checkStream();
+    return getBytesWritten(stream);
+  }
+
+  /**
+   * Returns the total number of compressed bytes input so far.</p>
+   *
+   * @return the total (non-negative) number of compressed bytes input so far
+   */
+  public synchronized long getBytesRead() {
+    checkStream();
+    return getBytesRead(stream);
+  }
+
+  /**
+   * Returns the number of bytes remaining in the input buffers; normally
+   * called when finished() is true to determine amount of post-gzip-stream
+   * data.</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  @Override
+  public synchronized int getRemaining() {
+    checkStream();
+    return userBufLen + getRemaining(stream);  // userBuf + compressedDirectBuf
+  }
+
+  /**
+   * Resets everything including the input buffers (user and direct).</p>
+   */
+  @Override
+  public synchronized void reset() {
+    checkStream();
+    end(stream);
+    stream = init(conserveMemory ? 1 : 0);
+    finished = false;
+    compressedDirectBufOff = compressedDirectBufLen = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+
+  @Override
+  public synchronized void end() {
+    if (stream != 0) {
+      end(stream);
+      stream = 0;
+    }
+  }
+
+  static void initSymbols(String libname) {
+    initIDs(libname);
+  }
+
+  private void checkStream() {
+    if (stream == 0)
+      throw new NullPointerException();
+  }
+  
+  private native static void initIDs(String libname);
+  private native static long init(int conserveMemory);
+  private native int inflateBytesDirect();
+  private native static long getBytesRead(long strm);
+  private native static long getBytesWritten(long strm);
+  private native static int getRemaining(long strm);
+  private native static void end(long strm);
+}

+ 145 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java

@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
+
+/**
+ * A collection of factories to create the right 
+ * bzip2 compressor/decompressor instances.
+ * 
+ */
+public class Bzip2Factory {
+  private static final Log LOG = LogFactory.getLog(Bzip2Factory.class);
+
+  private static String bzip2LibraryName = "";
+  private static boolean nativeBzip2Loaded;
+  
+  /**
+   * Check if native-bzip2 code is loaded & initialized correctly and 
+   * can be loaded for this job.
+   * 
+   * @param conf configuration
+   * @return <code>true</code> if native-bzip2 is loaded & initialized 
+   *         and can be loaded for this job, else <code>false</code>
+   */
+  public static boolean isNativeBzip2Loaded(Configuration conf) {
+    String libname = conf.get("io.compression.codec.bzip2.library", 
+                              "system-native");
+    if (!bzip2LibraryName.equals(libname)) {
+      nativeBzip2Loaded = false;
+      bzip2LibraryName = libname;
+      if (libname.equals("java-builtin")) {
+        LOG.info("Using pure-Java version of bzip2 library");
+      } else if (conf.getBoolean(
+                CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, 
+                CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT) &&
+          NativeCodeLoader.isNativeCodeLoaded()) {
+        try {
+          // Initialize the native library.
+          Bzip2Compressor.initSymbols(libname);
+          Bzip2Decompressor.initSymbols(libname);
+          nativeBzip2Loaded = true;
+          LOG.info("Successfully loaded & initialized native-bzip2 library " +
+                   libname);
+        } catch (Throwable t) {
+          LOG.warn("Failed to load/initialize native-bzip2 library " + 
+                   libname + ", will use pure-Java version");
+        }
+      }
+    }
+    return nativeBzip2Loaded;
+  }
+
+  /**
+   * Return the appropriate type of the bzip2 compressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate type of the bzip2 compressor.
+   */
+  public static Class<? extends Compressor> 
+  getBzip2CompressorType(Configuration conf) {
+    return isNativeBzip2Loaded(conf) ? 
+      Bzip2Compressor.class : BZip2DummyCompressor.class;
+  }
+  
+  /**
+   * Return the appropriate implementation of the bzip2 compressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate implementation of the bzip2 compressor.
+   */
+  public static Compressor getBzip2Compressor(Configuration conf) {
+    return isNativeBzip2Loaded(conf)? 
+      new Bzip2Compressor(conf) : new BZip2DummyCompressor();
+  }
+
+  /**
+   * Return the appropriate type of the bzip2 decompressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate type of the bzip2 decompressor.
+   */
+  public static Class<? extends Decompressor> 
+  getBzip2DecompressorType(Configuration conf) {
+    return  isNativeBzip2Loaded(conf) ? 
+      Bzip2Decompressor.class : BZip2DummyDecompressor.class;
+  }
+  
+  /**
+   * Return the appropriate implementation of the bzip2 decompressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate implementation of the bzip2 decompressor.
+   */
+  public static Decompressor getBzip2Decompressor(Configuration conf) {
+    return isNativeBzip2Loaded(conf) ? 
+      new Bzip2Decompressor() : new BZip2DummyDecompressor();
+  }
+
+  public static void setBlockSize(Configuration conf, int blockSize) {
+    conf.setInt("bzip2.compress.blocksize", blockSize);
+  }
+
+  public static int getBlockSize(Configuration conf) {
+    return conf.getInt("bzip2.compress.blocksize", 
+                       Bzip2Compressor.DEFAULT_BLOCK_SIZE);
+  }
+
+  public static void setWorkFactor(Configuration conf, int workFactor) {
+    conf.setInt("bzip2.compress.workfactor", workFactor);
+  }
+
+  public static int getWorkFactor(Configuration conf) {
+    return conf.getInt("bzip2.compress.workfactor", 
+                       Bzip2Compressor.DEFAULT_WORK_FACTOR);
+  }
+
+}

+ 245 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c

@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <dlfcn.h>
+
+#include "org_apache_hadoop_io_compress_bzip2.h"
+#include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h"
+
+static jfieldID Bzip2Compressor_clazz;
+static jfieldID Bzip2Compressor_stream;
+static jfieldID Bzip2Compressor_uncompressedDirectBuf;
+static jfieldID Bzip2Compressor_uncompressedDirectBufOff;
+static jfieldID Bzip2Compressor_uncompressedDirectBufLen;
+static jfieldID Bzip2Compressor_compressedDirectBuf;
+static jfieldID Bzip2Compressor_directBufferSize;
+static jfieldID Bzip2Compressor_finish;
+static jfieldID Bzip2Compressor_finished;
+
+static int (*dlsym_BZ2_bzCompressInit)(bz_stream*, int, int, int);
+static int (*dlsym_BZ2_bzCompress)(bz_stream*, int);
+static int (*dlsym_BZ2_bzCompressEnd)(bz_stream*);
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_initIDs(
+                                 JNIEnv *env, jclass class, jstring libname)
+{
+    const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL);
+    if (strcmp(bzlib_name, "system-native") == 0)
+      bzlib_name = HADOOP_BZIP2_LIBRARY;
+    // Load the native library.
+    void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL);
+    if (!libbz2) {
+        THROW(env, "java/lang/UnsatisfiedLinkError",
+              "Cannot load bzip2 native library");
+        return;
+    }
+
+    // Locate the requisite symbols from libbz2.so.
+    dlerror();                                 // Clear any existing error.
+    LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressInit, env, libbz2,
+                        "BZ2_bzCompressInit");
+    LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompress, env, libbz2,
+                        "BZ2_bzCompress");
+    LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressEnd, env, libbz2,
+                        "BZ2_bzCompressEnd");
+
+    // Initialize the requisite fieldIds.
+    Bzip2Compressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+                                                     "Ljava/lang/Class;");
+    Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
+    Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
+    Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
+    Bzip2Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, 
+                                                       "uncompressedDirectBuf",
+                                                       "Ljava/nio/Buffer;");
+    Bzip2Compressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, class, 
+                                                  "uncompressedDirectBufOff",
+                                                  "I");
+    Bzip2Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class, 
+                                                  "uncompressedDirectBufLen",
+                                                  "I");
+    Bzip2Compressor_compressedDirectBuf = (*env)->GetFieldID(env, class, 
+                                                     "compressedDirectBuf", 
+                                                     "Ljava/nio/Buffer;");
+    Bzip2Compressor_directBufferSize = (*env)->GetFieldID(env, class, 
+                                                  "directBufferSize", "I");
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_init(
+            JNIEnv *env, jclass class, jint blockSize, jint workFactor)
+{
+    // Create a bz_stream.
+    bz_stream *stream = malloc(sizeof(bz_stream));
+    if (!stream) {
+        THROW(env, "java/lang/OutOfMemoryError", NULL);
+        return (jlong)0;
+    }
+    memset((void*)stream, 0, sizeof(bz_stream));
+
+    // Initialize stream.
+    int rv = (*dlsym_BZ2_bzCompressInit)(stream, blockSize, 0, workFactor);
+    if (rv != BZ_OK) {
+        // Contingency - Report error by throwing appropriate exceptions.
+        free(stream);
+        stream = NULL;
+        
+        switch (rv) {
+        case BZ_MEM_ERROR: 
+            {
+                THROW(env, "java/lang/OutOfMemoryError", NULL);
+            }
+            break;
+        case BZ_PARAM_ERROR:
+            {
+                THROW(env,
+                      "java/lang/IllegalArgumentException",
+                      NULL);
+            }
+            break;
+        default:
+            {
+                THROW(env, "java/lang/InternalError", NULL);
+            }
+            break;
+        }
+    }
+        
+    return JLONG(stream);
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect(
+        JNIEnv *env, jobject this)
+{
+    // Get members of Bzip2Compressor.
+    bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this, 
+                                              Bzip2Compressor_stream));
+    if (!stream) {
+        THROW(env, "java/lang/NullPointerException", NULL);
+        return (jint)0;
+    } 
+
+    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+                                                 Bzip2Compressor_clazz);
+    jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, 
+                                     Bzip2Compressor_uncompressedDirectBuf);
+    jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this, 
+                                   Bzip2Compressor_uncompressedDirectBufOff);
+    jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, 
+                                   Bzip2Compressor_uncompressedDirectBufLen);
+
+    jobject compressed_direct_buf = (*env)->GetObjectField(env, this, 
+                                   Bzip2Compressor_compressedDirectBuf);
+    jint compressed_direct_buf_len = (*env)->GetIntField(env, this, 
+                                 Bzip2Compressor_directBufferSize);
+
+    jboolean finish = (*env)->GetBooleanField(env, this,
+                                              Bzip2Compressor_finish);
+
+    // Get the input and output direct buffers.
+    LOCK_CLASS(env, clazz, "Bzip2Compressor");
+    char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
+                                                uncompressed_direct_buf);
+    char* compressed_bytes = (*env)->GetDirectBufferAddress(env, 
+                                                compressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "Bzip2Compressor");
+
+    if (!uncompressed_bytes || !compressed_bytes) {
+        return (jint)0;
+    }
+        
+    // Re-calibrate the bz_stream.
+    stream->next_in = uncompressed_bytes + uncompressed_direct_buf_off;
+    stream->avail_in = uncompressed_direct_buf_len;
+    stream->next_out = compressed_bytes;
+    stream->avail_out = compressed_direct_buf_len;
+        
+    // Compress.
+    int rv = dlsym_BZ2_bzCompress(stream, finish ? BZ_FINISH : BZ_RUN);
+
+    jint no_compressed_bytes = 0;
+    switch (rv) {
+        // Contingency? - Report error by throwing appropriate exceptions.
+    case BZ_STREAM_END:
+        {
+            (*env)->SetBooleanField(env, this,
+                                    Bzip2Compressor_finished,
+                                    JNI_TRUE);
+        } // cascade
+    case BZ_RUN_OK:
+    case BZ_FINISH_OK:
+        {
+            uncompressed_direct_buf_off +=
+                uncompressed_direct_buf_len - stream->avail_in;
+            (*env)->SetIntField(env, this, 
+                                Bzip2Compressor_uncompressedDirectBufOff,
+                                uncompressed_direct_buf_off);
+            (*env)->SetIntField(env, this, 
+                                Bzip2Compressor_uncompressedDirectBufLen,
+                                stream->avail_in);
+            no_compressed_bytes =
+                compressed_direct_buf_len - stream->avail_out;
+        }
+        break;
+    default:
+        {
+            THROW(env, "java/lang/InternalError", NULL);
+        }
+        break;
+    }
+        
+    return no_compressed_bytes;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesRead(
+                            JNIEnv *env, jclass class, jlong stream)
+{
+    const bz_stream* strm = BZSTREAM(stream);
+    return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesWritten(
+                                JNIEnv *env, jclass class, jlong stream)
+{
+    const bz_stream* strm = BZSTREAM(stream);
+    return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_end(
+                                JNIEnv *env, jclass class, jlong stream)
+{
+    if (dlsym_BZ2_bzCompressEnd(BZSTREAM(stream)) != BZ_OK) {
+        THROW(env, "java/lang/InternalError", NULL);
+    } else {
+        free(BZSTREAM(stream));
+    }
+}
+
+/**
+ * vim: sw=2: ts=2: et:
+ */
+

+ 248 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c

@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <dlfcn.h>
+
+#include "org_apache_hadoop_io_compress_bzip2.h"
+#include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h"
+
+static jfieldID Bzip2Decompressor_clazz;
+static jfieldID Bzip2Decompressor_stream;
+static jfieldID Bzip2Decompressor_compressedDirectBuf;
+static jfieldID Bzip2Decompressor_compressedDirectBufOff;
+static jfieldID Bzip2Decompressor_compressedDirectBufLen;
+static jfieldID Bzip2Decompressor_uncompressedDirectBuf;
+static jfieldID Bzip2Decompressor_directBufferSize;
+static jfieldID Bzip2Decompressor_finished;
+
+static int (*dlsym_BZ2_bzDecompressInit)(bz_stream*, int, int);
+static int (*dlsym_BZ2_bzDecompress)(bz_stream*);
+static int (*dlsym_BZ2_bzDecompressEnd)(bz_stream*);
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_initIDs(
+                                 JNIEnv *env, jclass class, jstring libname)
+{
+    const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL);
+    if (strcmp(bzlib_name, "system-native") == 0)
+      bzlib_name = HADOOP_BZIP2_LIBRARY;
+    // Load the native library.
+    void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL);
+    if (!libbz2) {
+        THROW(env, "java/lang/UnsatisfiedLinkError",
+              "Cannot load bzip2 native library");
+        return;
+    }
+
+    // Locate the requisite symbols from libbz2.so.
+    dlerror();                                 // Clear any existing error.
+    LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressInit, env, libbz2,
+                        "BZ2_bzDecompressInit");
+    LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompress, env, libbz2,
+                        "BZ2_bzDecompress");
+    LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressEnd, env, libbz2,
+                        "BZ2_bzDecompressEnd");
+
+    // Initialize the requisite fieldIds.
+    Bzip2Decompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+                                                       "Ljava/lang/Class;");
+    Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
+    Bzip2Decompressor_finished = (*env)->GetFieldID(env, class,
+                                                    "finished", "Z");
+    Bzip2Decompressor_compressedDirectBuf = (*env)->GetFieldID(env, class, 
+                                                        "compressedDirectBuf", 
+                                                        "Ljava/nio/Buffer;");
+    Bzip2Decompressor_compressedDirectBufOff = (*env)->GetFieldID(env, class, 
+                                                "compressedDirectBufOff", "I");
+    Bzip2Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env, class, 
+                                                "compressedDirectBufLen", "I");
+    Bzip2Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, 
+                                                "uncompressedDirectBuf", 
+                                                "Ljava/nio/Buffer;");
+    Bzip2Decompressor_directBufferSize = (*env)->GetFieldID(env, class, 
+                                                "directBufferSize", "I");
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_init(
+                                JNIEnv *env, jclass cls, jint conserveMemory)
+{
+    bz_stream *stream = malloc(sizeof(bz_stream));
+    if (stream == 0) {
+        THROW(env, "java/lang/OutOfMemoryError", NULL);
+        return (jlong)0;
+    } 
+    memset((void*)stream, 0, sizeof(bz_stream));
+    
+    int rv = dlsym_BZ2_bzDecompressInit(stream, 0, conserveMemory);
+
+    if (rv != BZ_OK) {
+        // Contingency - Report error by throwing appropriate exceptions.
+        free(stream);
+        stream = NULL;
+
+        switch (rv) {
+        case BZ_MEM_ERROR:
+            {
+                THROW(env, "java/lang/OutOfMemoryError", NULL);
+            }
+            break;
+        default:
+            {
+                THROW(env, "java/lang/InternalError", NULL);
+            }
+            break;
+        }
+    }
+
+    return JLONG(stream);
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect(
+                        JNIEnv *env, jobject this)
+{
+    // Get members of Bzip2Decompressor.
+    bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this,
+                                                Bzip2Decompressor_stream));
+    if (!stream) {
+        THROW(env, "java/lang/NullPointerException", NULL);
+        return (jint)0;
+    } 
+
+    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+                                                 Bzip2Decompressor_clazz);
+    jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env,
+                                this, Bzip2Decompressor_compressedDirectBuf);
+    jint compressed_direct_buf_off = (*env)->GetIntField(env, this, 
+                                Bzip2Decompressor_compressedDirectBufOff);
+    jint compressed_direct_buf_len = (*env)->GetIntField(env, this, 
+                                Bzip2Decompressor_compressedDirectBufLen);
+
+    jarray uncompressed_direct_buf = (jarray)(*env)->GetObjectField(env,
+                                this, Bzip2Decompressor_uncompressedDirectBuf);
+    jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, 
+                                Bzip2Decompressor_directBufferSize);
+
+    // Get the input and output direct buffers.
+    LOCK_CLASS(env, clazz, "Bzip2Decompressor");
+    char* compressed_bytes = (*env)->GetDirectBufferAddress(env, 
+                                                compressed_direct_buf);
+    char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
+                                                uncompressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "Bzip2Decompressor");
+
+    if (!compressed_bytes || !uncompressed_bytes) {
+        return (jint)0;
+    }
+        
+    // Re-calibrate the bz_stream.
+    stream->next_in  = compressed_bytes + compressed_direct_buf_off;
+    stream->avail_in  = compressed_direct_buf_len;
+    stream->next_out = uncompressed_bytes;
+    stream->avail_out = uncompressed_direct_buf_len;
+        
+    // Decompress.
+    int rv = dlsym_BZ2_bzDecompress(stream);
+
+    // Contingency? - Report error by throwing appropriate exceptions.
+    int no_decompressed_bytes = 0;      
+    switch (rv) {
+    case BZ_STREAM_END:
+        {
+            (*env)->SetBooleanField(env, this,
+                                    Bzip2Decompressor_finished,
+                                    JNI_TRUE);
+        } // cascade down
+    case BZ_OK:
+        {
+            compressed_direct_buf_off +=
+                compressed_direct_buf_len - stream->avail_in;
+            (*env)->SetIntField(env, this,
+                                Bzip2Decompressor_compressedDirectBufOff, 
+                                compressed_direct_buf_off);
+            (*env)->SetIntField(env, this,
+                                Bzip2Decompressor_compressedDirectBufLen, 
+                                stream->avail_in);
+            no_decompressed_bytes =
+                uncompressed_direct_buf_len - stream->avail_out;
+        }
+        break;
+    case BZ_DATA_ERROR:
+    case BZ_DATA_ERROR_MAGIC:
+        {
+            THROW(env, "java/io/IOException", NULL);
+        }
+        break;
+    case BZ_MEM_ERROR:
+        {
+            THROW(env, "java/lang/OutOfMemoryError", NULL);
+        }
+        break;
+    default:
+        {
+            THROW(env, "java/lang/InternalError", NULL);
+        }
+        break;
+    }
+    
+    return no_decompressed_bytes;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesRead(
+                                JNIEnv *env, jclass cls, jlong stream)
+{
+    const bz_stream* strm = BZSTREAM(stream);
+    return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesWritten(
+                                JNIEnv *env, jclass cls, jlong stream)
+{
+    const bz_stream* strm = BZSTREAM(stream);
+    return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getRemaining(
+                                JNIEnv *env, jclass cls, jlong stream)
+{
+    return (BZSTREAM(stream))->avail_in;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_end(
+                                JNIEnv *env, jclass cls, jlong stream)
+{
+    if (dlsym_BZ2_bzDecompressEnd(BZSTREAM(stream)) != BZ_OK) {
+        THROW(env, "java/lang/InternalError", 0);
+    } else {
+        free(BZSTREAM(stream));
+    }
+}
+
+/**
+ * vim: sw=2: ts=2: et:
+ */
+

+ 39 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
+#define ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
+
+#include <config.h>
+#include <stddef.h>
+#include <bzlib.h>
+#include <dlfcn.h>
+#include <jni.h>
+
+#include "org_apache_hadoop.h"
+
+#define HADOOP_BZIP2_LIBRARY "libbz2.so.1"
+
+
+/* A helper macro to convert the java 'stream-handle' to a bz_stream pointer. */
+#define BZSTREAM(stream) ((bz_stream*)((ptrdiff_t)(stream)))
+
+/* A helper macro to convert the bz_stream pointer to the java 'stream-handle'. */
+#define JLONG(stream) ((jlong)((ptrdiff_t)(stream)))
+
+#endif //ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -317,6 +317,20 @@
   are discovered using a Java ServiceLoader.</description>
 </property>
 
+<property>
+  <name>io.compression.codec.bzip2.library</name>
+  <value>system-native</value>
+  <description>The native-code library to be used for compression and
+  decompression by the bzip2 codec.  This library could be specified
+  either by by name or the full pathname.  In the former case, the
+  library is located by the dynamic linker, usually searching the
+  directories specified in the environment variable LD_LIBRARY_PATH.
+  
+  The value of "system-native" indicates that the default system
+  library should be used.  To indicate that the algorithm should
+  operate entirely in Java, specify "java-builtin".</description>
+</property>
+
 <property>
   <name>io.serializations</name>
   <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>

+ 47 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -94,12 +95,33 @@ public class TestCodec {
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testBZip2Codec() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("io.compression.codec.bzip2.library", "java-builtin");
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
   }
   
+  @Test(timeout=20000)
+  public void testBZip2NativeCodec() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("io.compression.codec.bzip2.library", "system-native");
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
+        codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
+        codecTest(conf, seed, count, 
+                  "org.apache.hadoop.io.compress.BZip2Codec");
+        conf.set("io.compression.codec.bzip2.library", "java-builtin");
+        codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
+        codecTest(conf, seed, count, 
+                  "org.apache.hadoop.io.compress.BZip2Codec");
+      } else {
+        LOG.warn("Native hadoop library available but native bzip2 is not");
+      }
+    }
+  }
+  
   @Test
   public void testSnappyCodec() throws IOException {
     if (SnappyCodec.isNativeCodeLoaded()) {
@@ -457,14 +479,37 @@ public class TestCodec {
     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException,
       InstantiationException, IllegalAccessException {
+    Configuration conf = new Configuration();
+    conf.set("io.compression.codec.bzip2.library", "java-builtin");
     sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100);
     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);
     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);
   }
 
+  @Test(timeout=20000)
+  public void testSequenceFileBZip2NativeCodec() throws IOException, 
+                        ClassNotFoundException, InstantiationException, 
+                        IllegalAccessException {
+    Configuration conf = new Configuration();
+    conf.set("io.compression.codec.bzip2.library", "system-native");
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
+        sequenceFileCodecTest(conf, 0, 
+                              "org.apache.hadoop.io.compress.BZip2Codec", 100);
+        sequenceFileCodecTest(conf, 100, 
+                              "org.apache.hadoop.io.compress.BZip2Codec", 100);
+        sequenceFileCodecTest(conf, 200000, 
+                              "org.apache.hadoop.io.compress.BZip2Codec", 
+                              1000000);
+      } else {
+        LOG.warn("Native hadoop library available but native bzip2 is not");
+      }
+    }
+  }
+
   @Test
   public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundException,
       InstantiationException, IllegalAccessException {