Browse Source

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1220616 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 years ago
parent
commit
8ff28f4549
25 changed files with 1820 additions and 53 deletions
  1. 2 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 31 0
      hadoop-common-project/hadoop-common/LICENSE.txt
  3. 2 0
      hadoop-common-project/hadoop-common/pom.xml
  4. 9 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
  5. 217 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
  6. 299 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java
  7. 281 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java
  8. 23 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/package-info.java
  9. 3 0
      hadoop-common-project/hadoop-common/src/main/native/Makefile.am
  10. 101 0
      hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
  11. 97 0
      hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
  12. 645 0
      hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c
  13. 13 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
  14. 12 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  15. 14 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  16. 12 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  17. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  18. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  19. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
  20. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  21. 1 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
  22. 1 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java
  23. 2 0
      hadoop-mapreduce-project/CHANGES.txt
  24. 9 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
  25. 38 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -159,6 +159,8 @@ Release 0.23.1 - Unreleased
     HADOOP-7777 Implement a base class for DNSToSwitchMapping implementations 
     HADOOP-7777 Implement a base class for DNSToSwitchMapping implementations 
     that can offer extra topology information. (stevel)
     that can offer extra topology information. (stevel)
 
 
+    HADOOP-7657. Add support for LZ4 compression. (Binglin Chang via todd)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-7801. HADOOP_PREFIX cannot be overriden. (Bruno Mahé via tomwhite)
     HADOOP-7801. HADOOP_PREFIX cannot be overriden. (Bruno Mahé via tomwhite)

+ 31 - 0
hadoop-common-project/hadoop-common/LICENSE.txt

@@ -251,3 +251,34 @@ in src/main/native/src/org/apache/hadoop/util:
  *   All rights reserved. Use of this source code is governed by a
  *   All rights reserved. Use of this source code is governed by a
  *   BSD-style license that can be found in the LICENSE file.
  *   BSD-style license that can be found in the LICENSE file.
  */
  */
+
+ For src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c:
+
+/*
+   LZ4 - Fast LZ compression algorithm
+   Copyright (C) 2011, Yann Collet.
+   BSD License
+
+   Redistribution and use in source and binary forms, with or without
+   modification, are permitted provided that the following conditions are
+   met:
+  
+       * Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+       * Redistributions in binary form must reproduce the above
+   copyright notice, this list of conditions and the following disclaimer
+   in the documentation and/or other materials provided with the
+   distribution.
+  
+   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/

+ 2 - 0
hadoop-common-project/hadoop-common/pom.xml

@@ -536,6 +536,8 @@
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
                     <javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
                   </javahClassNames>
                   </javahClassNames>
                   <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
                   <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -93,7 +93,15 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */
   /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */
   public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
   public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
       256 * 1024;
       256 * 1024;
-  
+
+  /** Internal buffer size for Snappy compressor/decompressors */
+  public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
+      "io.compression.codec.lz4.buffersize";
+
+  /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */
+  public static final int IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT =
+      256 * 1024;
+
   /**
   /**
    * Service Authorization
    * Service Authorization
    */
    */

+ 217 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java

@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * This class creates lz4 compressors/decompressors.
+ */
+public class Lz4Codec implements Configurable, CompressionCodec {
+
+  static {
+    NativeCodeLoader.isNativeCodeLoaded();
+  }
+
+  Configuration conf;
+
+  /**
+   * Set the configuration to be used by this object.
+   *
+   * @param conf the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this objec.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Are the native lz4 libraries loaded & initialized?
+   *
+   * @return true if loaded & initialized, otherwise false
+   */
+  public static boolean isNativeCodeLoaded() {
+    return NativeCodeLoader.isNativeCodeLoaded();
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream}.
+   *
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream} with the given {@link Compressor}.
+   *
+   * @param out        the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out,
+                                                    Compressor compressor)
+      throws IOException {
+    if (!isNativeCodeLoaded()) {
+      throw new RuntimeException("native lz4 library not available");
+    }
+    int bufferSize = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
+
+    int compressionOverhead = Math.max((int)(bufferSize * 0.01), 10);
+
+    return new BlockCompressorStream(out, compressor, bufferSize,
+        compressionOverhead);
+  }
+
+  /**
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    if (!isNativeCodeLoaded()) {
+      throw new RuntimeException("native lz4 library not available");
+    }
+
+    return Lz4Compressor.class;
+  }
+
+  /**
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
+  @Override
+  public Compressor createCompressor() {
+    if (!isNativeCodeLoaded()) {
+      throw new RuntimeException("native lz4 library not available");
+    }
+    int bufferSize = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
+    return new Lz4Compressor(bufferSize);
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * input stream.
+   *
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * {@link InputStream} with the given {@link Decompressor}.
+   *
+   * @param in           the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in,
+                                                  Decompressor decompressor)
+      throws IOException {
+    if (!isNativeCodeLoaded()) {
+      throw new RuntimeException("native lz4 library not available");
+    }
+
+    return new BlockDecompressorStream(in, decompressor, conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
+  }
+
+  /**
+   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    if (!isNativeCodeLoaded()) {
+      throw new RuntimeException("native lz4 library not available");
+    }
+
+    return Lz4Decompressor.class;
+  }
+
+  /**
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
+  @Override
+  public Decompressor createDecompressor() {
+    if (!isNativeCodeLoaded()) {
+      throw new RuntimeException("native lz4 library not available");
+    }
+    int bufferSize = conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
+    return new Lz4Decompressor(bufferSize);
+  }
+
+  /**
+   * Get the default filename extension for this kind of compression.
+   *
+   * @return <code>.lz4</code>.
+   */
+  @Override
+  public String getDefaultExtension() {
+    return ".lz4";
+  }
+}

+ 299 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java

@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lz4;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A {@link Compressor} based on the lz4 compression algorithm.
+ * http://code.google.com/p/lz4/
+ */
+public class Lz4Compressor implements Compressor {
+  private static final Log LOG =
+      LogFactory.getLog(Lz4Compressor.class.getName());
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+  // HACK - Use this as a global lock in the JNI layer
+  @SuppressWarnings({"unchecked", "unused"})
+  private static Class clazz = Lz4Compressor.class;
+
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int uncompressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finish, finished;
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+
+
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      // Initialize the native library
+      try {
+        initIDs();
+      } catch (Throwable t) {
+        // Ignore failure to load/initialize lz4
+        LOG.warn(t.toString());
+      }
+    } else {
+      LOG.error("Cannot load " + Lz4Compressor.class.getName() +
+          " without native hadoop library!");
+    }
+  }
+
+  /**
+   * Creates a new compressor.
+   *
+   * @param directBufferSize size of the direct buffer to be used.
+   */
+  public Lz4Compressor(int directBufferSize) {
+    this.directBufferSize = directBufferSize;
+
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+
+  /**
+   * Creates a new compressor with the default buffer size.
+   */
+  public Lz4Compressor() {
+    this(DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /**
+   * Sets input data for compression.
+   * This should be called whenever #needsInput() returns
+   * <code>true</code> indicating that more input data is required.
+   *
+   * @param b   Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  @Override
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    finished = false;
+
+    if (len > uncompressedDirectBuf.remaining()) {
+      // save data; now !needsInput
+      this.userBuf = b;
+      this.userBufOff = off;
+      this.userBufLen = len;
+    } else {
+      ((ByteBuffer) uncompressedDirectBuf).put(b, off, len);
+      uncompressedDirectBufLen = uncompressedDirectBuf.position();
+    }
+
+    bytesRead += len;
+  }
+
+  /**
+   * If a write would exceed the capacity of the direct buffers, it is set
+   * aside to be loaded by this function while the compressed data are
+   * consumed.
+   */
+  synchronized void setInputFromSavedData() {
+    if (0 >= userBufLen) {
+      return;
+    }
+    finished = false;
+
+    uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+    ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff,
+        uncompressedDirectBufLen);
+
+    // Note how much data is being fed to lz4
+    userBufOff += uncompressedDirectBufLen;
+    userBufLen -= uncompressedDirectBufLen;
+  }
+
+  /**
+   * Does nothing.
+   */
+  @Override
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  /**
+   * Returns true if the input data buffer is empty and
+   * #setInput() should be called to provide more input.
+   *
+   * @return <code>true</code> if the input data buffer is empty and
+   *         #setInput() should be called in order to provide more input.
+   */
+  @Override
+  public synchronized boolean needsInput() {
+    return !(compressedDirectBuf.remaining() > 0
+        || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
+  }
+
+  /**
+   * When called, indicates that compression should end
+   * with the current contents of the input buffer.
+   */
+  @Override
+  public synchronized void finish() {
+    finish = true;
+  }
+
+  /**
+   * Returns true if the end of the compressed
+   * data output stream has been reached.
+   *
+   * @return <code>true</code> if the end of the compressed
+   *         data output stream has been reached.
+   */
+  @Override
+  public synchronized boolean finished() {
+    // Check if all uncompressed data has been consumed
+    return (finish && finished && compressedDirectBuf.remaining() == 0);
+  }
+
+  /**
+   * Fills specified buffer with compressed data. Returns actual number
+   * of bytes of compressed data. A return value of 0 indicates that
+   * needsInput() should be called in order to determine if more input
+   * data is required.
+   *
+   * @param b   Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   */
+  @Override
+  public synchronized int compress(byte[] b, int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // Check if there is compressed data
+    int n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer) compressedDirectBuf).get(b, off, n);
+      bytesWritten += n;
+      return n;
+    }
+
+    // Re-initialize the lz4's output direct-buffer
+    compressedDirectBuf.clear();
+    compressedDirectBuf.limit(0);
+    if (0 == uncompressedDirectBuf.position()) {
+      // No compressed data, so we should have !needsInput or !finished
+      setInputFromSavedData();
+      if (0 == uncompressedDirectBuf.position()) {
+        // Called without data; write nothing
+        finished = true;
+        return 0;
+      }
+    }
+
+    // Compress data
+    n = compressBytesDirect();
+    compressedDirectBuf.limit(n);
+    uncompressedDirectBuf.clear(); // lz4 consumes all buffer input
+
+    // Set 'finished' if snapy has consumed all user-data
+    if (0 == userBufLen) {
+      finished = true;
+    }
+
+    // Get atmost 'len' bytes
+    n = Math.min(n, len);
+    bytesWritten += n;
+    ((ByteBuffer) compressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+
+  /**
+   * Resets compressor so that a new set of input data can be processed.
+   */
+  @Override
+  public synchronized void reset() {
+    finish = false;
+    finished = false;
+    uncompressedDirectBuf.clear();
+    uncompressedDirectBufLen = 0;
+    compressedDirectBuf.clear();
+    compressedDirectBuf.limit(0);
+    userBufOff = userBufLen = 0;
+    bytesRead = bytesWritten = 0L;
+  }
+
+  /**
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration
+   *
+   * @param conf Configuration from which new setting are fetched
+   */
+  @Override
+  public synchronized void reinit(Configuration conf) {
+    reset();
+  }
+
+  /**
+   * Return number of bytes given to this compressor since last reset.
+   */
+  @Override
+  public synchronized long getBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Return number of bytes consumed by callers of compress since last reset.
+   */
+  @Override
+  public synchronized long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  /**
+   * Closes the compressor and discards any unprocessed input.
+   */
+  @Override
+  public synchronized void end() {
+  }
+
+  private native static void initIDs();
+
+  private native int compressBytesDirect();
+}

+ 281 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java

@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lz4;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A {@link Decompressor} based on the lz4 compression algorithm.
+ * http://code.google.com/p/lz4/
+ */
+public class Lz4Decompressor implements Decompressor {
+  private static final Log LOG =
+      LogFactory.getLog(Lz4Compressor.class.getName());
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+  // HACK - Use this as a global lock in the JNI layer
+  @SuppressWarnings({"unchecked", "unused"})
+  private static Class clazz = Lz4Decompressor.class;
+
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int compressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finished;
+
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      // Initialize the native library
+      try {
+        initIDs();
+      } catch (Throwable t) {
+        // Ignore failure to load/initialize lz4
+        LOG.warn(t.toString());
+      }
+    } else {
+      LOG.error("Cannot load " + Lz4Compressor.class.getName() +
+          " without native hadoop library!");
+    }
+  }
+
+  /**
+   * Creates a new compressor.
+   *
+   * @param directBufferSize size of the direct buffer to be used.
+   */
+  public Lz4Decompressor(int directBufferSize) {
+    this.directBufferSize = directBufferSize;
+
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+
+  }
+
+  /**
+   * Creates a new decompressor with the default buffer size.
+   */
+  public Lz4Decompressor() {
+    this(DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /**
+   * Sets input data for decompression.
+   * This should be called if and only if {@link #needsInput()} returns
+   * <code>true</code> indicating that more input data is required.
+   * (Both native and non-native versions of various Decompressors require
+   * that the data passed in via <code>b[]</code> remain unmodified until
+   * the caller is explicitly notified--via {@link #needsInput()}--that the
+   * buffer may be safely modified.  With this requirement, an extra
+   * buffer-copy can be avoided.)
+   *
+   * @param b   Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  @Override
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+
+    setInputFromSavedData();
+
+    // Reinitialize lz4's output direct-buffer
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+
+  /**
+   * If a write would exceed the capacity of the direct buffers, it is set
+   * aside to be loaded by this function while the compressed data are
+   * consumed.
+   */
+  synchronized void setInputFromSavedData() {
+    compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+
+    // Reinitialize lz4's input direct buffer
+    compressedDirectBuf.rewind();
+    ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff,
+        compressedDirectBufLen);
+
+    // Note how much data is being fed to lz4
+    userBufOff += compressedDirectBufLen;
+    userBufLen -= compressedDirectBufLen;
+  }
+
+  /**
+   * Does nothing.
+   */
+  @Override
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    // do nothing
+  }
+
+  /**
+   * Returns true if the input data buffer is empty and
+   * {@link #setInput(byte[], int, int)} should be called to
+   * provide more input.
+   *
+   * @return <code>true</code> if the input data buffer is empty and
+   *         {@link #setInput(byte[], int, int)} should be called in
+   *         order to provide more input.
+   */
+  @Override
+  public synchronized boolean needsInput() {
+    // Consume remaining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Check if lz4 has consumed all input
+    if (compressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Returns <code>false</code>.
+   *
+   * @return <code>false</code>.
+   */
+  @Override
+  public synchronized boolean needsDictionary() {
+    return false;
+  }
+
+  /**
+   * Returns true if the end of the decompressed
+   * data output stream has been reached.
+   *
+   * @return <code>true</code> if the end of the decompressed
+   *         data output stream has been reached.
+   */
+  @Override
+  public synchronized boolean finished() {
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  /**
+   * Fills specified buffer with uncompressed data. Returns actual number
+   * of bytes of uncompressed data. A return value of 0 indicates that
+   * {@link #needsInput()} should be called in order to determine if more
+   * input data is required.
+   *
+   * @param b   Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   * @throws IOException
+   */
+  @Override
+  public synchronized int decompress(byte[] b, int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    int n = 0;
+
+    // Check if there is uncompressed data
+    n = uncompressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer) uncompressedDirectBuf).get(b, off, n);
+      return n;
+    }
+    if (compressedDirectBufLen > 0) {
+      // Re-initialize the lz4's output direct buffer
+      uncompressedDirectBuf.rewind();
+      uncompressedDirectBuf.limit(directBufferSize);
+
+      // Decompress data
+      n = decompressBytesDirect();
+      uncompressedDirectBuf.limit(n);
+
+      if (userBufLen <= 0) {
+        finished = true;
+      }
+
+      // Get atmost 'len' bytes
+      n = Math.min(n, len);
+      ((ByteBuffer) uncompressedDirectBuf).get(b, off, n);
+    }
+
+    return n;
+  }
+
+  /**
+   * Returns <code>0</code>.
+   *
+   * @return <code>0</code>.
+   */
+  @Override
+  public synchronized int getRemaining() {
+    // Never use this function in BlockDecompressorStream.
+    return 0;
+  }
+
+  public synchronized void reset() {
+    finished = false;
+    compressedDirectBufLen = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+
+  /**
+   * Resets decompressor and input and output buffers so that a new set of
+   * input data can be processed.
+   */
+  @Override
+  public synchronized void end() {
+    // do nothing
+  }
+
+  private native static void initIDs();
+
+  private native int decompressBytesDirect();
+}

+ 23 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/package-info.java

@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.io.compress.lz4;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+

+ 3 - 0
hadoop-common-project/hadoop-common/src/main/native/Makefile.am

@@ -46,6 +46,9 @@ libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \
                        src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \
                        src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \
                        src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c \
                        src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c \
                        src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c \
                        src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c \
+                       src/org/apache/hadoop/io/compress/lz4/lz4.c \
+                       src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c \
+                       src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c \
                        src/org/apache/hadoop/security/getGroup.c \
                        src/org/apache/hadoop/security/getGroup.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \
                        src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \

+ 101 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c

@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined HAVE_CONFIG_H
+  #include <config.h>
+#endif
+
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_io_compress_lz4_Lz4Compressor.h"
+
+//****************************
+// Simple Functions
+//****************************
+
+extern int LZ4_compress   (char* source, char* dest, int isize);
+
+/*
+LZ4_compress() :
+ return : the number of bytes in compressed buffer dest
+ note : destination buffer must be already allocated.
+  To avoid any problem, size it to handle worst cases situations (input data not compressible)
+  Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes.
+
+*/
+
+static jfieldID Lz4Compressor_clazz;
+static jfieldID Lz4Compressor_uncompressedDirectBuf;
+static jfieldID Lz4Compressor_uncompressedDirectBufLen;
+static jfieldID Lz4Compressor_compressedDirectBuf;
+static jfieldID Lz4Compressor_directBufferSize;
+
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_initIDs
+(JNIEnv *env, jclass clazz){
+
+  Lz4Compressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
+                                                 "Ljava/lang/Class;");
+  Lz4Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
+                                                           "uncompressedDirectBuf",
+                                                           "Ljava/nio/Buffer;");
+  Lz4Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz,
+                                                              "uncompressedDirectBufLen", "I");
+  Lz4Compressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz,
+                                                         "compressedDirectBuf",
+                                                         "Ljava/nio/Buffer;");
+  Lz4Compressor_directBufferSize = (*env)->GetFieldID(env, clazz,
+                                                       "directBufferSize", "I");
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirect
+(JNIEnv *env, jobject thisj){
+  // Get members of Lz4Compressor
+  jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz);
+  jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
+  jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
+  jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
+  jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize);
+
+  // Get the input direct buffer
+  LOCK_CLASS(env, clazz, "Lz4Compressor");
+  const char* uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "Lz4Compressor");
+
+  if (uncompressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  // Get the output direct buffer
+  LOCK_CLASS(env, clazz, "Lz4Compressor");
+  char* compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "Lz4Compressor");
+
+  if (compressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  compressed_direct_buf_len = LZ4_compress(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len);
+  if (compressed_direct_buf_len < 0){
+    THROW(env, "Ljava/lang/InternalError", "LZ4_compress failed");
+  }
+
+  (*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);
+
+  return (jint)compressed_direct_buf_len;
+}
+

+ 97 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c

@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined HAVE_CONFIG_H
+  #include <config.h>
+#endif
+
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_io_compress_lz4_Lz4Decompressor.h"
+
+int LZ4_uncompress_unknownOutputSize (char* source, char* dest, int isize, int maxOutputSize);
+
+/*
+LZ4_uncompress_unknownOutputSize() :
+ isize  : is the input size, therefore the compressed size
+ maxOutputSize : is the size of the destination buffer (which must be already allocated)
+ return : the number of bytes decoded in the destination buffer (necessarily <= maxOutputSize)
+    If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction
+    This version never writes beyond dest + maxOutputSize, and is therefore protected against malicious data packets
+ note   : This version is a bit slower than LZ4_uncompress
+*/
+
+
+static jfieldID Lz4Decompressor_clazz;
+static jfieldID Lz4Decompressor_compressedDirectBuf;
+static jfieldID Lz4Decompressor_compressedDirectBufLen;
+static jfieldID Lz4Decompressor_uncompressedDirectBuf;
+static jfieldID Lz4Decompressor_directBufferSize;
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_initIDs
+(JNIEnv *env, jclass clazz){
+
+  Lz4Decompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
+                                                   "Ljava/lang/Class;");
+  Lz4Decompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
+                                                           "compressedDirectBuf",
+                                                           "Ljava/nio/Buffer;");
+  Lz4Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz,
+                                                              "compressedDirectBufLen", "I");
+  Lz4Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz,
+                                                             "uncompressedDirectBuf",
+                                                             "Ljava/nio/Buffer;");
+  Lz4Decompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
+                                                         "directBufferSize", "I");
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_decompressBytesDirect
+(JNIEnv *env, jobject thisj){
+  // Get members of Lz4Decompressor
+  jobject clazz = (*env)->GetStaticObjectField(env,thisj, Lz4Decompressor_clazz);
+  jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf);
+  jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, Lz4Decompressor_compressedDirectBufLen);
+  jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_uncompressedDirectBuf);
+  size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Decompressor_directBufferSize);
+
+  // Get the input direct buffer
+  LOCK_CLASS(env, clazz, "Lz4Decompressor");
+  const char* compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "Lz4Decompressor");
+
+  if (compressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  // Get the output direct buffer
+  LOCK_CLASS(env, clazz, "Lz4Decompressor");
+  char* uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "Lz4Decompressor");
+
+  if (uncompressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  uncompressed_direct_buf_len = LZ4_uncompress_unknownOutputSize(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
+  if (uncompressed_direct_buf_len < 0) {
+    THROW(env, "Ljava/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
+  }
+
+  (*env)->SetIntField(env, thisj, Lz4Decompressor_compressedDirectBufLen, 0);
+
+  return (jint)uncompressed_direct_buf_len;
+}

+ 645 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c

@@ -0,0 +1,645 @@
+/*
+   LZ4 - Fast LZ compression algorithm
+   Copyright (C) 2011, Yann Collet.
+   BSD License
+
+   Redistribution and use in source and binary forms, with or without
+   modification, are permitted provided that the following conditions are
+   met:
+  
+       * Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+       * Redistributions in binary form must reproduce the above
+   copyright notice, this list of conditions and the following disclaimer
+   in the documentation and/or other materials provided with the
+   distribution.
+  
+   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+//**************************************
+//  Copy from:
+// URL: http://lz4.googlecode.com/svn/trunk/lz4.c
+// Repository Root: http://lz4.googlecode.com/svn
+// Repository UUID: 650e7d94-2a16-8b24-b05c-7c0b3f6821cd
+// Revision: 43
+// Node Kind: file
+// Last Changed Author: yann.collet.73@gmail.com
+// Last Changed Rev: 43
+// Last Changed Date: 2011-12-16 15:41:46 -0800 (Fri, 16 Dec 2011)
+// Sha1: 9db7b2c57698c528d79572e6bce2e7dc33fa5998
+//**************************************
+
+//**************************************
+// Compilation Directives
+//**************************************
+#if __STDC_VERSION__ >= 199901L
+  /* "restrict" is a known keyword */
+#else
+#define restrict  // Disable restrict
+#endif
+
+
+//**************************************
+// Includes
+//**************************************
+#include <stdlib.h>   // for malloc
+#include <string.h>   // for memset
+
+
+//**************************************
+// Performance parameter               
+//**************************************
+// Increasing this value improves compression ratio
+// Lowering this value reduces memory usage
+// Lowering may also improve speed, typically on reaching cache size limits (L1 32KB for Intel, 64KB for AMD)
+// Memory usage formula for 32 bits systems : N->2^(N+2) Bytes (examples : 17 -> 512KB ; 12 -> 16KB)
+#define HASH_LOG 12
+
+
+//**************************************
+// Basic Types
+//**************************************
+#if defined(_MSC_VER)    // Visual Studio does not support 'stdint' natively
+#define BYTE	unsigned __int8
+#define U16		unsigned __int16
+#define U32		unsigned __int32
+#define S32		__int32
+#else
+#include <stdint.h>
+#define BYTE	uint8_t
+#define U16		uint16_t
+#define U32		uint32_t
+#define S32		int32_t
+#endif
+
+
+//**************************************
+// Constants
+//**************************************
+#define MINMATCH 4
+#define SKIPSTRENGTH 6
+#define STACKLIMIT 13
+#define HEAPMODE (HASH_LOG>STACKLIMIT)  // Defines if memory is allocated into the stack (local variable), or into the heap (malloc()).
+#define COPYTOKEN 4
+#define COPYLENGTH 8
+#define LASTLITERALS 5
+#define MFLIMIT (COPYLENGTH+MINMATCH)
+#define MINLENGTH (MFLIMIT+1)
+
+#define MAXD_LOG 16
+#define MAX_DISTANCE ((1 << MAXD_LOG) - 1)
+
+#define HASHTABLESIZE (1 << HASH_LOG)
+#define HASH_MASK (HASHTABLESIZE - 1)
+
+#define ML_BITS 4
+#define ML_MASK ((1U<<ML_BITS)-1)
+#define RUN_BITS (8-ML_BITS)
+#define RUN_MASK ((1U<<RUN_BITS)-1)
+
+
+//**************************************
+// Local structures
+//**************************************
+struct refTables
+{
+	const BYTE* hashTable[HASHTABLESIZE];
+};
+
+#ifdef __GNUC__
+#  define _PACKED __attribute__ ((packed))
+#else
+#  define _PACKED
+#endif
+
+typedef struct _U32_S
+{
+	U32 v;
+} _PACKED U32_S;
+
+typedef struct _U16_S
+{
+	U16 v;
+} _PACKED U16_S;
+
+#define A32(x) (((U32_S *)(x))->v)
+#define A16(x) (((U16_S *)(x))->v)
+
+
+//**************************************
+// Macros
+//**************************************
+#define LZ4_HASH_FUNCTION(i)	(((i) * 2654435761U) >> ((MINMATCH*8)-HASH_LOG))
+#define LZ4_HASH_VALUE(p)		LZ4_HASH_FUNCTION(A32(p))
+#define LZ4_COPYPACKET(s,d)		A32(d) = A32(s); d+=4; s+=4; A32(d) = A32(s); d+=4; s+=4;
+#define LZ4_WILDCOPY(s,d,e)		do { LZ4_COPYPACKET(s,d) } while (d<e);
+#define LZ4_BLINDCOPY(s,d,l)	{ BYTE* e=d+l; LZ4_WILDCOPY(s,d,e); d=e; }
+
+
+
+//****************************
+// Compression CODE
+//****************************
+
+int LZ4_compressCtx(void** ctx,
+				 char* source, 
+				 char* dest,
+				 int isize)
+{	
+#if HEAPMODE
+	struct refTables *srt = (struct refTables *) (*ctx);
+	const BYTE** HashTable;
+#else
+	const BYTE* HashTable[HASHTABLESIZE] = {0};
+#endif
+
+	const BYTE* ip = (BYTE*) source;       
+	const BYTE* anchor = ip;
+	const BYTE* const iend = ip + isize;
+	const BYTE* const mflimit = iend - MFLIMIT;
+#define matchlimit (iend - LASTLITERALS)
+
+	BYTE* op = (BYTE*) dest;
+	
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+	const size_t DeBruijnBytePos[32] = { 0, 0, 3, 0, 3, 1, 3, 0, 3, 2, 2, 1, 3, 2, 0, 1, 3, 3, 1, 2, 2, 2, 2, 0, 3, 1, 2, 0, 1, 0, 1, 1 };
+#endif
+	int len, length;
+	const int skipStrength = SKIPSTRENGTH;
+	U32 forwardH;
+
+
+	// Init 
+	if (isize<MINLENGTH) goto _last_literals;
+#if HEAPMODE
+	if (*ctx == NULL) 
+	{
+		srt = (struct refTables *) malloc ( sizeof(struct refTables) );
+		*ctx = (void*) srt;
+	}
+	HashTable = srt->hashTable;
+	memset((void*)HashTable, 0, sizeof(srt->hashTable));
+#else
+	(void) ctx;
+#endif
+
+
+	// First Byte
+	HashTable[LZ4_HASH_VALUE(ip)] = ip;
+	ip++; forwardH = LZ4_HASH_VALUE(ip);
+	
+	// Main Loop
+    for ( ; ; ) 
+	{
+		int findMatchAttempts = (1U << skipStrength) + 3;
+		const BYTE* forwardIp = ip;
+		const BYTE* ref;
+		BYTE* token;
+
+		// Find a match
+		do {
+			U32 h = forwardH;
+			int step = findMatchAttempts++ >> skipStrength;
+			ip = forwardIp;
+			forwardIp = ip + step;
+
+			if (forwardIp > mflimit) { goto _last_literals; }
+
+			forwardH = LZ4_HASH_VALUE(forwardIp);
+			ref = HashTable[h];
+			HashTable[h] = ip;
+
+		} while ((ref < ip - MAX_DISTANCE) || (A32(ref) != A32(ip)));
+
+		// Catch up
+		while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; }  
+
+		// Encode Literal length
+		length = ip - anchor;
+		token = op++;
+		if (length>=(int)RUN_MASK) { *token=(RUN_MASK<<ML_BITS); len = length-RUN_MASK; for(; len > 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; } 
+		else *token = (length<<ML_BITS);
+
+		// Copy Literals
+		LZ4_BLINDCOPY(anchor, op, length);
+
+
+_next_match:
+		// Encode Offset
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+		A16(op) = (ip-ref); op+=2;
+#else
+		{ int delta = ip-ref; *op++ = delta; *op++ = delta>>8; }
+#endif
+
+		// Start Counting
+		ip+=MINMATCH; ref+=MINMATCH;   // MinMatch verified
+		anchor = ip;
+		while (ip<matchlimit-3)
+		{
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+			int diff = A32(ref) ^ A32(ip);
+			if (!diff) { ip+=4; ref+=4; continue; }
+			ip += DeBruijnBytePos[((U32)((diff & -diff) * 0x077CB531U)) >> 27];
+#else
+			if (A32(ref) == A32(ip)) { ip+=4; ref+=4; continue; }
+			if (A16(ref) == A16(ip)) { ip+=2; ref+=2; }
+			if (*ref == *ip) ip++;
+#endif
+			goto _endCount;
+		}
+		if ((ip<(matchlimit-1)) && (A16(ref) == A16(ip))) { ip+=2; ref+=2; }
+		if ((ip<matchlimit) && (*ref == *ip)) ip++;
+_endCount:
+		len = (ip - anchor);
+		
+		// Encode MatchLength
+		if (len>=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; } 
+		else *token += len;	
+
+		// Test end of chunk
+		if (ip > mflimit) { anchor = ip;  break; }
+
+		// Fill table
+		HashTable[LZ4_HASH_VALUE(ip-2)] = ip-2;
+
+		// Test next position
+		ref = HashTable[LZ4_HASH_VALUE(ip)];
+		HashTable[LZ4_HASH_VALUE(ip)] = ip;
+		if ((ref > ip - (MAX_DISTANCE + 1)) && (A32(ref) == A32(ip))) { token = op++; *token=0; goto _next_match; }
+
+		// Prepare next loop
+		anchor = ip++; 
+		forwardH = LZ4_HASH_VALUE(ip);
+	}
+
+_last_literals:
+	// Encode Last Literals
+	{
+		int lastRun = iend - anchor;
+		if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK<<ML_BITS); lastRun-=RUN_MASK; for(; lastRun > 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; } 
+		else *op++ = (lastRun<<ML_BITS);
+		memcpy(op, anchor, iend - anchor);
+		op += iend-anchor;
+	} 
+
+	// End
+	return (int) (((char*)op)-dest);
+}
+
+
+
+// Note : this function is valid only if isize < LZ4_64KLIMIT
+#define LZ4_64KLIMIT ((1U<<16) + (MFLIMIT-1))
+#define HASHLOG64K (HASH_LOG+1)
+#define LZ4_HASH64K_FUNCTION(i)	(((i) * 2654435761U) >> ((MINMATCH*8)-HASHLOG64K))
+#define LZ4_HASH64K_VALUE(p)	LZ4_HASH64K_FUNCTION(A32(p))
+int LZ4_compress64kCtx(void** ctx,
+				 char* source, 
+				 char* dest,
+				 int isize)
+{	
+#if HEAPMODE
+	struct refTables *srt = (struct refTables *) (*ctx);
+	U16* HashTable;
+#else
+	U16 HashTable[HASHTABLESIZE<<1] = {0};
+#endif
+
+	const BYTE* ip = (BYTE*) source;       
+	const BYTE* anchor = ip;
+	const BYTE* const base = ip;
+	const BYTE* const iend = ip + isize;
+	const BYTE* const mflimit = iend - MFLIMIT;
+#define matchlimit (iend - LASTLITERALS)
+
+	BYTE* op = (BYTE*) dest;
+	
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+	const size_t DeBruijnBytePos[32] = { 0, 0, 3, 0, 3, 1, 3, 0, 3, 2, 2, 1, 3, 2, 0, 1, 3, 3, 1, 2, 2, 2, 2, 0, 3, 1, 2, 0, 1, 0, 1, 1 };
+#endif
+	int len, length;
+	const int skipStrength = SKIPSTRENGTH;
+	U32 forwardH;
+
+
+	// Init 
+	if (isize<MINLENGTH) goto _last_literals;
+#if HEAPMODE
+	if (*ctx == NULL) 
+	{
+		srt = (struct refTables *) malloc ( sizeof(struct refTables) );
+		*ctx = (void*) srt;
+	}
+	HashTable = (U16*)(srt->hashTable);
+	memset((void*)HashTable, 0, sizeof(srt->hashTable));
+#else
+	(void) ctx;
+#endif
+
+
+	// First Byte
+	ip++; forwardH = LZ4_HASH64K_VALUE(ip);
+	
+	// Main Loop
+    for ( ; ; ) 
+	{
+		int findMatchAttempts = (1U << skipStrength) + 3;
+		const BYTE* forwardIp = ip;
+		const BYTE* ref;
+		BYTE* token;
+
+		// Find a match
+		do {
+			U32 h = forwardH;
+			int step = findMatchAttempts++ >> skipStrength;
+			ip = forwardIp;
+			forwardIp = ip + step;
+
+			if (forwardIp > mflimit) { goto _last_literals; }
+
+			forwardH = LZ4_HASH64K_VALUE(forwardIp);
+			ref = base + HashTable[h];
+			HashTable[h] = ip - base;
+
+		} while (A32(ref) != A32(ip));
+
+		// Catch up
+		while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; }  
+
+		// Encode Literal length
+		length = ip - anchor;
+		token = op++;
+		if (length>=(int)RUN_MASK) { *token=(RUN_MASK<<ML_BITS); len = length-RUN_MASK; for(; len > 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; } 
+		else *token = (length<<ML_BITS);
+
+		// Copy Literals
+		LZ4_BLINDCOPY(anchor, op, length);
+
+
+_next_match:
+		// Encode Offset
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+		A16(op) = (ip-ref); op+=2;
+#else
+		{ int delta = ip-ref; *op++ = delta; *op++ = delta>>8; }
+#endif
+
+		// Start Counting
+		ip+=MINMATCH; ref+=MINMATCH;   // MinMatch verified
+		anchor = ip;
+		while (ip<matchlimit-3)
+		{
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+			int diff = A32(ref) ^ A32(ip);
+			if (!diff) { ip+=4; ref+=4; continue; }
+			ip += DeBruijnBytePos[((U32)((diff & -diff) * 0x077CB531U)) >> 27];
+#else
+			if (A32(ref) == A32(ip)) { ip+=4; ref+=4; continue; }
+			if (A16(ref) == A16(ip)) { ip+=2; ref+=2; }
+			if (*ref == *ip) ip++;
+#endif
+			goto _endCount;
+		}
+		if ((ip<(matchlimit-1)) && (A16(ref) == A16(ip))) { ip+=2; ref+=2; }
+		if ((ip<matchlimit) && (*ref == *ip)) ip++;
+_endCount:
+		len = (ip - anchor);
+		
+		// Encode MatchLength
+		if (len>=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; } 
+		else *token += len;	
+
+		// Test end of chunk
+		if (ip > mflimit) { anchor = ip;  break; }
+
+		// Test next position
+		ref = base + HashTable[LZ4_HASH64K_VALUE(ip)];
+		HashTable[LZ4_HASH64K_VALUE(ip)] = ip - base;
+		if (A32(ref) == A32(ip)) { token = op++; *token=0; goto _next_match; }
+
+		// Prepare next loop
+		anchor = ip++; 
+		forwardH = LZ4_HASH64K_VALUE(ip);
+	}
+
+_last_literals:
+	// Encode Last Literals
+	{
+		int lastRun = iend - anchor;
+		if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK<<ML_BITS); lastRun-=RUN_MASK; for(; lastRun > 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; } 
+		else *op++ = (lastRun<<ML_BITS);
+		memcpy(op, anchor, iend - anchor);
+		op += iend-anchor;
+	} 
+
+	// End
+	return (int) (((char*)op)-dest);
+}
+
+
+
+int LZ4_compress(char* source, 
+				 char* dest,
+				 int isize)
+{
+#if HEAPMODE
+	void* ctx = malloc(sizeof(struct refTables));
+	int result;
+	if (isize < LZ4_64KLIMIT)
+		result = LZ4_compress64kCtx(&ctx, source, dest, isize);
+	else result = LZ4_compressCtx(&ctx, source, dest, isize);
+	free(ctx);
+	return result;
+#else
+	if (isize < (int)LZ4_64KLIMIT) return LZ4_compress64kCtx(NULL, source, dest, isize);
+	return LZ4_compressCtx(NULL, source, dest, isize);
+#endif
+}
+
+
+
+
+//****************************
+// Decompression CODE
+//****************************
+
+// Note : The decoding functions LZ4_uncompress() and LZ4_uncompress_unknownOutputSize() 
+//		are safe against "buffer overflow" attack type
+//		since they will *never* write outside of the provided output buffer :
+//		they both check this condition *before* writing anything.
+//		A corrupted packet however can make them *read* within the first 64K before the output buffer.
+
+int LZ4_uncompress(char* source, 
+				 char* dest,
+				 int osize)
+{	
+	// Local Variables
+	const BYTE* restrict ip = (const BYTE*) source;
+	const BYTE* restrict ref;
+
+	BYTE* restrict op = (BYTE*) dest;
+	BYTE* const oend = op + osize;
+	BYTE* cpy;
+
+	BYTE token;
+	
+	U32	dec[4]={0, 3, 2, 3};
+	int	len, length;
+
+
+	// Main Loop
+	while (1)
+	{
+		// get runlength
+		token = *ip++;
+		if ((length=(token>>ML_BITS)) == RUN_MASK)  { for (;(len=*ip++)==255;length+=255){} length += len; } 
+
+		// copy literals
+		cpy = op+length;
+		if (cpy>oend-COPYLENGTH) 
+		{ 
+			if (cpy > oend) goto _output_error;
+			memcpy(op, ip, length);
+			ip += length;
+			break;    // Necessarily EOF
+		}
+		LZ4_WILDCOPY(ip, op, cpy); ip -= (op-cpy); op = cpy;
+
+
+		// get offset
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+		ref = cpy - A16(ip); ip+=2;
+#else
+		{ int delta = *ip++; delta += *ip++ << 8; ref = cpy - delta; }
+#endif
+
+		// get matchlength
+		if ((length=(token&ML_MASK)) == ML_MASK) { for (;*ip==255;length+=255) {ip++;} length += *ip++; } 
+
+		// copy repeated sequence
+		if (op-ref<COPYTOKEN)
+		{
+			*op++ = *ref++;
+			*op++ = *ref++;
+			*op++ = *ref++;
+			*op++ = *ref++;
+			ref -= dec[op-ref];
+			A32(op)=A32(ref); 
+		} else { A32(op)=A32(ref); op+=4; ref+=4; }
+		cpy = op + length;
+		if (cpy > oend-COPYLENGTH)
+		{
+			if (cpy > oend) goto _output_error;	
+			LZ4_WILDCOPY(ref, op, (oend-COPYLENGTH));
+			while(op<cpy) *op++=*ref++;
+			op=cpy;
+			if (op == oend) break;    // Check EOF (should never happen, since last 5 bytes are supposed to be literals)
+			continue;
+		}
+		LZ4_WILDCOPY(ref, op, cpy);
+		op=cpy;		// correction
+	}
+
+	// end of decoding
+	return (int) (((char*)ip)-source);
+
+	// write overflow error detected
+_output_error:
+	return (int) (-(((char*)ip)-source));
+}
+
+
+int LZ4_uncompress_unknownOutputSize(
+				char* source, 
+				char* dest,
+				int isize,
+				int maxOutputSize)
+{	
+	// Local Variables
+	const BYTE* restrict ip = (const BYTE*) source;
+	const BYTE* const iend = ip + isize;
+	const BYTE* restrict ref;
+
+	BYTE* restrict op = (BYTE*) dest;
+	BYTE* const oend = op + maxOutputSize;
+	BYTE* cpy;
+
+	BYTE token;
+	
+	U32	dec[4]={0, 3, 2, 3};
+	int	len, length;
+
+
+	// Main Loop
+	while (ip<iend)
+	{
+		// get runlength
+		token = *ip++;
+		if ((length=(token>>ML_BITS)) == RUN_MASK)  { for (;(len=*ip++)==255;length+=255){} length += len; } 
+
+		// copy literals
+		cpy = op+length;
+		if (cpy>oend-COPYLENGTH) 
+		{ 
+			if (cpy > oend) goto _output_error;
+			memcpy(op, ip, length);
+			op += length;
+			break;    // Necessarily EOF
+		}
+		LZ4_WILDCOPY(ip, op, cpy); ip -= (op-cpy); op = cpy;
+		if (ip>=iend) break;    // check EOF
+
+		// get offset
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+		ref = cpy - A16(ip); ip+=2;
+#else
+		{ int delta = *ip++; delta += *ip++ << 8; ref = cpy - delta; }
+#endif
+
+		// get matchlength
+		if ((length=(token&ML_MASK)) == ML_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; }
+
+		// copy repeated sequence
+		if (op-ref<COPYTOKEN)
+		{
+			*op++ = *ref++;
+			*op++ = *ref++;
+			*op++ = *ref++;
+			*op++ = *ref++;
+			ref -= dec[op-ref];
+			A32(op)=A32(ref); 
+		} else { A32(op)=A32(ref); op+=4; ref+=4; }
+		cpy = op + length;
+		if (cpy>oend-COPYLENGTH)
+		{
+			if (cpy > oend) goto _output_error;	
+			LZ4_WILDCOPY(ref, op, (oend-COPYLENGTH));
+			while(op<cpy) *op++=*ref++;
+			op=cpy;
+			if (op == oend) break;    // Check EOF (should never happen, since last 5 bytes are supposed to be literals)
+			continue;
+		}
+		LZ4_WILDCOPY(ref, op, cpy);
+		op=cpy;		// correction
+	}
+
+	// end of decoding
+	return (int) (((char*)op)-dest);
+
+	// write overflow error detected
+_output_error:
+	return (int) (-(((char*)ip)-source));
+}
+

+ 13 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.binary.Base64;
@@ -108,6 +109,18 @@ public class TestCodec {
       }
       }
     }
     }
   }
   }
+  
+  @Test
+  public void testLz4Codec() throws IOException {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      if (Lz4Codec.isNativeCodeLoaded()) {
+        codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
+        codecTest(conf, seed, count, "org.apache.hadoop.io.compress.Lz4Codec");
+      } else {
+        Assert.fail("Native hadoop library available but lz4 not");
+      }
+    }
+  }
 
 
   @Test
   @Test
   public void testDeflateCodec() throws IOException {
   public void testDeflateCodec() throws IOException {

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -172,6 +172,12 @@ Trunk (unreleased changes)
 
 
     HDFS-2694. Removal of Avro broke non-PB NN services. (atm)
     HDFS-2694. Removal of Avro broke non-PB NN services. (atm)
 
 
+    HDFS-2687. Tests failing with ClassCastException post protobuf RPC
+    changes. (suresh)
+
+    HDFS-2700. Fix failing TestDataNodeMultipleRegistrations in trunk
+    (Uma Maheswara Rao G via todd)
+
 Release 0.23.1 - UNRELEASED
 Release 0.23.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
@@ -228,6 +234,9 @@ Release 0.23.1 - UNRELEASED
     HDFS-2675. Reduce warning verbosity when double-closing edit logs
     HDFS-2675. Reduce warning verbosity when double-closing edit logs
     (todd)
     (todd)
 
 
+    HDFS-2335. DataNodeCluster and NNStorage always pull fresh entropy.
+    (Uma Maheswara Rao G via eli)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-2130. Switch default checksum to CRC32C. (todd)
     HDFS-2130. Switch default checksum to CRC32C. (todd)
@@ -269,6 +278,9 @@ Release 0.23.1 - UNRELEASED
 
 
     HDFS-2640. Javadoc generation hangs. (tomwhite)
     HDFS-2640. Javadoc generation hangs. (tomwhite)
 
 
+    HDFS-2553. Fix BlockPoolSliceScanner spinning in a tight loop (Uma
+    Maheswara Rao G via todd)
+
 Release 0.23.0 - 2011-11-01 
 Release 0.23.0 - 2011-11-01 
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -24,6 +24,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
+import java.security.SecureRandom;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
@@ -68,11 +69,23 @@ public class DFSUtil {
       return new Random();
       return new Random();
     }
     }
   };
   };
+  
+  private static final ThreadLocal<SecureRandom> SECURE_RANDOM = new ThreadLocal<SecureRandom>() {
+    @Override
+    protected SecureRandom initialValue() {
+      return new SecureRandom();
+    }
+  };
 
 
-  /** @return a pseudorandom number generator. */
+  /** @return a pseudo random number generator. */
   public static Random getRandom() {
   public static Random getRandom() {
     return RANDOM.get();
     return RANDOM.get();
   }
   }
+  
+  /** @return a pseudo secure random number generator. */
+  public static SecureRandom getSecureRandom() {
+    return SECURE_RANDOM.get();
+  }
 
 
   /**
   /**
    * Compartor for sorting DataNodeInfo[] based on decommissioned states.
    * Compartor for sorting DataNodeInfo[] based on decommissioned states.

+ 12 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -990,25 +990,15 @@ public class PBHelper {
   public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
   public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
     if (fs == null)
     if (fs == null)
       return null;
       return null;
-    if (fs.hasLocations()) {
-      return new HdfsLocatedFileStatus(
-          fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), 
-          fs.getBlockReplication(), fs.getBlocksize(),
-          fs.getModificationTime(), fs.getAccessTime(),
-          PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), 
-          fs.getFileType().equals(FileType.IS_SYMLINK) ? 
-              fs.getSymlink().toByteArray() : null,
-          fs.getPath().toByteArray(),
-          PBHelper.convert(fs.hasLocations() ? fs.getLocations() : null));
-    }
-    return new HdfsFileStatus(
-      fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), 
-      fs.getBlockReplication(), fs.getBlocksize(),
-      fs.getModificationTime(), fs.getAccessTime(),
-      PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), 
-      fs.getFileType().equals(FileType.IS_SYMLINK) ? 
-          fs.getSymlink().toByteArray() : null,
-      fs.getPath().toByteArray());
+    return new HdfsLocatedFileStatus(
+        fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), 
+        fs.getBlockReplication(), fs.getBlocksize(),
+        fs.getModificationTime(), fs.getAccessTime(),
+        PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), 
+        fs.getFileType().equals(FileType.IS_SYMLINK) ? 
+            fs.getSymlink().toByteArray() : null,
+        fs.getPath().toByteArray(),
+        fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null);
   }
   }
 
 
   public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
   public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
@@ -1070,7 +1060,7 @@ public class PBHelper {
       return null;
       return null;
     List<HdfsFileStatusProto> partList =  dl.getPartialListingList();
     List<HdfsFileStatusProto> partList =  dl.getPartialListingList();
     return new DirectoryListing( 
     return new DirectoryListing( 
-        partList.isEmpty() ? new HdfsFileStatus[0] 
+        partList.isEmpty() ? new HdfsLocatedFileStatus[0] 
           : PBHelper.convert(
           : PBHelper.convert(
               partList.toArray(new HdfsFileStatusProto[partList.size()])),
               partList.toArray(new HdfsFileStatusProto[partList.size()])),
         dl.getRemainingEntries());
         dl.getRemainingEntries());
@@ -1216,7 +1206,8 @@ public class PBHelper {
   public static CorruptFileBlocks convert(CorruptFileBlocksProto c) {
   public static CorruptFileBlocks convert(CorruptFileBlocksProto c) {
     if (c == null)
     if (c == null)
       return null;
       return null;
-    return new CorruptFileBlocks((String[]) c.getFilesList().toArray(),
+    List<String> fileList = c.getFilesList();
+    return new CorruptFileBlocks(fileList.toArray(new String[fileList.size()]),
         c.getCookie());
         c.getCookie());
   }
   }
 
 

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.ipc.RPC;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Joiner;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
@@ -392,7 +392,7 @@ class BPServiceActor implements Runnable {
   private synchronized void cleanUp() {
   private synchronized void cleanUp() {
     
     
     shouldServiceRun = false;
     shouldServiceRun = false;
-    RPC.stopProxy(bpNamenode);
+    IOUtils.cleanup(LOG, bpNamenode);
     bpos.shutdownActor(this);
     bpos.shutdownActor(this);
   }
   }
 
 

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java

@@ -450,14 +450,14 @@ class BlockPoolSliceScanner {
   }
   }
   
   
   private synchronized long getEarliestScanTime() {
   private synchronized long getEarliestScanTime() {
-    if ( blockInfoSet.size() > 0 ) {
+    if (!blockInfoSet.isEmpty()) {
       return blockInfoSet.first().lastScanTime;
       return blockInfoSet.first().lastScanTime;
     }
     }
     return Long.MAX_VALUE; 
     return Long.MAX_VALUE; 
   }
   }
   
   
   private synchronized boolean isFirstBlockProcessed() {
   private synchronized boolean isFirstBlockProcessed() {
-    if (blockInfoSet.size() > 0 ) {
+    if (!blockInfoSet.isEmpty()) {
       long blockId = blockInfoSet.first().block.getBlockId();
       long blockId = blockInfoSet.first().block.getBlockId();
       if ((processedBlocks.get(blockId) != null)
       if ((processedBlocks.get(blockId) != null)
           && (processedBlocks.get(blockId) == 1)) {
           && (processedBlocks.get(blockId) == 1)) {
@@ -471,7 +471,7 @@ class BlockPoolSliceScanner {
   private void verifyFirstBlock() {
   private void verifyFirstBlock() {
     Block block = null;
     Block block = null;
     synchronized (this) {
     synchronized (this) {
-      if ( blockInfoSet.size() > 0 ) {
+      if (!blockInfoSet.isEmpty()) {
         block = blockInfoSet.first().block;
         block = blockInfoSet.first().block;
       }
       }
     }
     }
@@ -560,7 +560,7 @@ class BlockPoolSliceScanner {
      * lastModificationTime > 0.
      * lastModificationTime > 0.
      */    
      */    
     synchronized (this) {
     synchronized (this) {
-      if (blockInfoSet.size() > 0 ) {
+      if (!blockInfoSet.isEmpty()) {
         BlockScanInfo info;
         BlockScanInfo info;
         while ((info =  blockInfoSet.first()).lastScanTime < 0) {
         while ((info =  blockInfoSet.first()).lastScanTime < 0) {
           delBlockInfo(info);        
           delBlockInfo(info);        
@@ -630,7 +630,7 @@ class BlockPoolSliceScanner {
           }
           }
         }
         }
         if (((now - getEarliestScanTime()) >= scanPeriod)
         if (((now - getEarliestScanTime()) >= scanPeriod)
-            || (!(this.isFirstBlockProcessed()))) {
+            || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) {
           verifyFirstBlock();
           verifyFirstBlock();
         } else {
         } else {
           if (LOG.isDebugEnabled()) {
           if (LOG.isDebugEnabled()) {

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -65,7 +65,6 @@ import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
-import java.security.SecureRandom;
 import java.util.AbstractList;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -988,7 +987,7 @@ public class DataNode extends Configured
       LOG.warn("Could not find ip address of \"default\" inteface.");
       LOG.warn("Could not find ip address of \"default\" inteface.");
     }
     }
     
     
-    int rand = new SecureRandom().nextInt(Integer.MAX_VALUE);
+    int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
     return "DS-" + rand + "-" + ip + "-" + port + "-"
     return "DS-" + rand + "-" + ip + "-" + port + "-"
         + System.currentTimeMillis();
         + System.currentTimeMillis();
   }
   }

+ 1 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java

@@ -26,8 +26,6 @@ import java.io.RandomAccessFile;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
@@ -992,13 +990,7 @@ public class NNStorage extends Storage implements Closeable {
       throw e;
       throw e;
     }
     }
     
     
-    int rand = 0;
-    try {
-      rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
-    } catch (NoSuchAlgorithmException e) {
-      LOG.warn("Could not use SecureRandom");
-      rand = DFSUtil.getRandom().nextInt(Integer.MAX_VALUE);
-    }
+    int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
     String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
     String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
     return bpid;
     return bpid;
   }
   }

+ 1 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java

@@ -19,10 +19,7 @@ package org.apache.hadoop.hdfs;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
 import java.util.Arrays;
 import java.util.Arrays;
-import java.util.Random;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -234,12 +231,7 @@ public class DataNodeCluster {
       System.out.println("Could not find ip address of \"default\" inteface.");
       System.out.println("Could not find ip address of \"default\" inteface.");
     }
     }
     
     
-    int rand = 0;
-    try {
-      rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
-    } catch (NoSuchAlgorithmException e) {
-      rand = (new Random()).nextInt(Integer.MAX_VALUE);
-    }
+    int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
     return "/Rack-" + rand + "-"+ ip  + "-" + 
     return "/Rack-" + rand + "-"+ ip  + "-" + 
                       System.currentTimeMillis();
                       System.currentTimeMillis();
   }
   }

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -331,6 +331,8 @@ Release 0.23.1 - Unreleased
     before the job started, so that it works properly with oozie throughout
     before the job started, so that it works properly with oozie throughout
     the job execution. (Robert Joseph Evans via vinodkv)
     the job execution. (Robert Joseph Evans via vinodkv)
 
 
+    MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url without a port. (atm via harsh)
+
 Release 0.23.0 - 2011-11-01 
 Release 0.23.0 - 2011-11-01 
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 9 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java

@@ -58,8 +58,15 @@ public class ConverterUtils {
    */
    */
   public static Path getPathFromYarnURL(URL url) throws URISyntaxException {
   public static Path getPathFromYarnURL(URL url) throws URISyntaxException {
     String scheme = url.getScheme() == null ? "" : url.getScheme();
     String scheme = url.getScheme() == null ? "" : url.getScheme();
-    String authority = url.getHost() != null ? url.getHost() + ":" + url.getPort()
-        : "";
+    
+    String authority = "";
+    if (url.getHost() != null) {
+      authority = url.getHost();
+      if (url.getPort() > 0) {
+        authority += ":" + url.getPort();
+      }
+    }
+    
     return new Path(
     return new Path(
         (new URI(scheme, authority, url.getFile(), null, null)).normalize());
         (new URI(scheme, authority, url.getFile(), null, null)).normalize());
   }
   }

+ 38 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java

@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.util;
+
+import static org.junit.Assert.*;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.junit.Test;
+
+public class TestConverterUtils {
+  
+  @Test
+  public void testConvertUrlWithNoPort() throws URISyntaxException {
+    Path expectedPath = new Path("hdfs://foo.com");
+    URL url = ConverterUtils.getYarnUrlFromPath(expectedPath);
+    Path actualPath = ConverterUtils.getPathFromYarnURL(url);
+    assertEquals(expectedPath, actualPath);
+  }
+
+}