Procházet zdrojové kódy

Revert commit 1137690 (HADOOP-7206)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1139387 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins před 14 roky
rodič
revize
75de23c0d3

+ 0 - 2
common/CHANGES.txt

@@ -47,8 +47,6 @@ Trunk (unreleased changes)
     HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
     in ObjectWritable. (todd)
 
-    HADOOP-7206. Integrate Snappy compression. (T Jake Luciani via tomwhite)
-
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

+ 0 - 4
common/ivy.xml

@@ -327,9 +327,5 @@
       name="protobuf-java"
       rev="${protobuf.version}"
       conf="common->default"/>
-    <dependency org="org.xerial.snappy"
-      name="snappy-java"
-      rev="${snappy-java.version}"
-      conf="common->default"/>
   </dependencies>
 </ivy-module>

+ 0 - 5
common/ivy/hadoop-common-template.xml

@@ -155,10 +155,5 @@
       <artifactId>protobuf-java</artifactId>
       <version>2.4.0a</version>
     </dependency>
-    <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>java-snappy</artifactId>
-      <version>1.0.3-rc2</version>
-    </dependency>
   </dependencies>
 </project>

+ 0 - 1
common/ivy/libraries.properties

@@ -74,7 +74,6 @@ rats-lib.version=0.6
 servlet.version=4.0.6
 servlet-api-2.5.version=6.1.14
 servlet-api.version=2.5
-snappy-java.version=1.0.3-rc2
 slf4j-api.version=1.5.11
 slf4j-log4j12.version=1.5.11
 

+ 1 - 1
common/src/java/core-default.xml

@@ -174,7 +174,7 @@
 
 <property>
   <name>io.compression.codecs</name>
-  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.DeflateCodec</value>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec</value>
   <description>A list of the compression codec classes that can be used 
                for compression/decompression.</description>
 </property>

+ 0 - 150
common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java

@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.compress;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
-import org.xerial.snappy.Snappy;
-import org.xerial.snappy.SnappyError;
-
-public class SnappyCodec implements Configurable, CompressionCodec {
-  private static final Log logger = LogFactory.getLog(SnappyCodec.class
-      .getName());
-  private static boolean nativeSnappyLoaded = false;
-  private Configuration conf;
-
-  public static final String SNAPPY_BUFFER_SIZE_KEY = "io.compression.codec.snappy.buffersize";
-  public static final int DEFAULT_SNAPPY_BUFFER_SIZE = 256 * 1024;
-
-  public SnappyCodec() {
-
-  }
-
-  public SnappyCodec(Configuration conf) {
-    setConf(conf);
-  }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  static {
-    try {
-      if (Snappy.getNativeLibraryVersion() != null) {
-        logger
-            .info("Successfully loaded & initialized native-snappy library [snappy-java rev "
-                + Snappy.getNativeLibraryVersion() + "]");
-
-        nativeSnappyLoaded = true;
-      } else {
-        logger.info("Failed to load native-snappy library");
-      }
-
-    } catch (SnappyError e) {
-      logger.error("Native Snappy load error: ", e);
-    }
-  }
-
-  public static boolean isNativeSnappyLoaded(Configuration conf) {
-    return nativeSnappyLoaded;
-  }
-
-  public CompressionOutputStream createOutputStream(OutputStream out)
-      throws IOException {
-    return createOutputStream(out, createCompressor());
-  }
-
-  public CompressionOutputStream createOutputStream(OutputStream out,
-      Compressor compressor) throws IOException {
-
-    if (!isNativeSnappyLoaded(conf)) {
-      throw new RuntimeException("native-snappy library not available");
-    }
-
-    int bufferSize = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
-        DEFAULT_SNAPPY_BUFFER_SIZE);
-
-    int compressionOverhead = Snappy.maxCompressedLength(bufferSize) - bufferSize; 
-
-    return new BlockCompressorStream(out, compressor, bufferSize,
-        compressionOverhead);
-  }
-
-  public Class<? extends Compressor> getCompressorType() {
-    if (!isNativeSnappyLoaded(conf)) {
-      throw new RuntimeException("native-snappy library not available");
-    }
-    return SnappyCompressor.class;
-  }
-
-  public Compressor createCompressor() {
-    if (!isNativeSnappyLoaded(conf)) {
-      throw new RuntimeException("native-snappy library not available");
-    }
-
-    return new SnappyCompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
-        DEFAULT_SNAPPY_BUFFER_SIZE));
-  }
-
-  public CompressionInputStream createInputStream(InputStream in)
-      throws IOException {
-    return createInputStream(in, createDecompressor());
-  }
-
-  public CompressionInputStream createInputStream(InputStream in,
-      Decompressor decompressor) throws IOException {
-    if (!isNativeSnappyLoaded(conf)) {
-      throw new RuntimeException("native-snappy library not available");
-    }
-    return new BlockDecompressorStream(in, decompressor, conf.getInt(
-        SNAPPY_BUFFER_SIZE_KEY, DEFAULT_SNAPPY_BUFFER_SIZE));
-  }
-
-  public Class<? extends Decompressor> getDecompressorType() {
-    if (!isNativeSnappyLoaded(conf)) {
-      throw new RuntimeException("native-snappy library not available");
-    }
-    return SnappyDecompressor.class;
-  }
-
-  public Decompressor createDecompressor() {
-    if (!isNativeSnappyLoaded(conf)) {
-      throw new RuntimeException("native-snappy library not available");
-    }
-
-    return new SnappyDecompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
-        DEFAULT_SNAPPY_BUFFER_SIZE));
-  }
-
-  public String getDefaultExtension() {
-    return ".snappy";
-  }
-}

+ 0 - 177
common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java

@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.compress.snappy;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.Compressor;
-import org.xerial.snappy.Snappy;
-import org.xerial.snappy.SnappyException;
-
-public class SnappyCompressor implements Compressor {
-  private static final Log logger = LogFactory.getLog(SnappyCompressor.class
-      .getName());
-
-  private boolean finish, finished;
-  private ByteBuffer outBuf;
-  private ByteBuffer compressedBuf;
-
-  private long bytesRead = 0L;
-  private long bytesWritten = 0L;
-
-  public SnappyCompressor(int bufferSize) {
-    outBuf = ByteBuffer.allocateDirect(bufferSize);
-    compressedBuf = ByteBuffer.allocateDirect(Snappy
-        .maxCompressedLength(bufferSize));
-
-    reset();
-  }
-
-  public synchronized void setInput(byte[] b, int off, int len) {
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-    finished = false;
-
-    outBuf.put(b, off, len);
-
-    bytesRead += len;
-  }
-
-  public synchronized void setDictionary(byte[] b, int off, int len) {
-    // do nothing
-  }
-
-  public synchronized boolean needsInput() {
-    // needs input if compressed data was consumed
-    if (compressedBuf.position() > 0
-        && compressedBuf.limit() > compressedBuf.position())
-      return false;
-
-    return true;
-  }
-
-  public synchronized void finish() {
-    finish = true;
-  }
-
-  public synchronized boolean finished() {
-    // Check if all compressed data has been consumed
-    return (finish && finished);
-  }
-
-  public synchronized int compress(byte[] b, int off, int len)
-      throws IOException {
-
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-
-    if (finished || outBuf.position() == 0) {
-      finished = true;
-      return 0;
-    }
-
-    // Only need todo this once
-    if (compressedBuf.position() == 0) {
-      try {
-        outBuf.limit(outBuf.position());
-        outBuf.rewind();
-
-        int lim = Snappy.compress(outBuf, compressedBuf);
-
-        compressedBuf.limit(lim);
-        compressedBuf.rewind();
-      } catch (SnappyException e) {
-        throw new IOException(e);
-      }
-    }
-
-    int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len
-        : (compressedBuf.limit() - compressedBuf.position());
-
-    if (n == 0) {
-      finished = true;
-      return 0;
-    }
-
-    compressedBuf.get(b, off, n);
-
-    bytesWritten += n;
-
-    // Set 'finished' if snappy has consumed all user-data
-    if (compressedBuf.position() == compressedBuf.limit()) {
-      finished = true;
-
-      outBuf.limit(outBuf.capacity());
-      outBuf.rewind();
-
-      compressedBuf.limit(compressedBuf.capacity());
-      compressedBuf.rewind();
-
-    }
-
-    return n;
-  }
-
-  public synchronized void reset() {
-    finish = false;
-    finished = false;
-
-    outBuf.limit(outBuf.capacity());
-    outBuf.rewind();
-
-    compressedBuf.limit(compressedBuf.capacity());
-    compressedBuf.rewind();
-
-    bytesRead = bytesWritten = 0L;
-  }
-
-  public synchronized void reinit(Configuration conf) {
-    reset();
-  }
-
-  /**
-   * Return number of bytes given to this compressor since last reset.
-   */
-  public synchronized long getBytesRead() {
-    return bytesRead;
-  }
-
-  /**
-   * Return number of bytes consumed by callers of compress since last reset.
-   */
-  public synchronized long getBytesWritten() {
-    return bytesWritten;
-  }
-
-  public synchronized void end() {
-  }
-
-}

+ 0 - 171
common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java

@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.compress.snappy;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.compress.Decompressor;
-import org.xerial.snappy.Snappy;
-import org.xerial.snappy.SnappyException;
-
-public class SnappyDecompressor implements Decompressor {
-
-  private static final Log logger = LogFactory.getLog(SnappyDecompressor.class
-      .getName());
-
-  private boolean finished;
-  private ByteBuffer outBuf;
-  private ByteBuffer uncompressedBuf;
-
-  private long bytesRead = 0L;
-  private long bytesWritten = 0L;
-
-  public SnappyDecompressor(int bufferSize) {
-    outBuf = ByteBuffer.allocateDirect(bufferSize);
-    uncompressedBuf = ByteBuffer.allocateDirect(bufferSize);
-
-    reset();
-  }
-
-  public synchronized void setInput(byte[] b, int off, int len) {
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-
-    finished = false;
-
-    outBuf.put(b, off, len);
-
-    bytesRead += len;
-  }
-
-  public synchronized void setDictionary(byte[] b, int off, int len) {
-    // do nothing
-  }
-
-  public synchronized boolean needsInput() {
-    // needs input if the uncompressed data was consumed
-    if (uncompressedBuf.position() > 0
-        && uncompressedBuf.limit() > uncompressedBuf.position())
-      return false;
-
-    return true;
-  }
-
-  public synchronized boolean needsDictionary() {
-    return false;
-  }
-
-  public synchronized boolean finished() {
-    return finished;
-  }
-
-  public synchronized int decompress(byte[] b, int off, int len)
-      throws IOException {
-
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-
-    // nothing to decompress
-    if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) || finished) {
-      reset();
-      finished = true;
-
-      return 0;
-    }
-
-    // only needs to do this once per input
-    if (uncompressedBuf.position() == 0) {
-      try {
-        outBuf.limit(outBuf.position());
-        outBuf.rewind();
-
-        int neededLen = Snappy.uncompressedLength(outBuf);
-        outBuf.rewind();
-
-        if (neededLen > uncompressedBuf.capacity())
-          uncompressedBuf = ByteBuffer.allocateDirect(neededLen);
-
-        int lim = Snappy.uncompress(outBuf, uncompressedBuf);
-
-        uncompressedBuf.limit(lim);
-        uncompressedBuf.rewind();
-      } catch (SnappyException e) {
-        throw new IOException(e);
-      }
-    }
-
-    int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len
-        : (uncompressedBuf.limit() - uncompressedBuf.position());
-
-    if (n == 0) {
-      reset();
-      finished = true;
-      return 0;
-    }
-
-    uncompressedBuf.get(b, off, n);
-
-    bytesWritten += n;
-
-    // Set 'finished' if snappy has consumed all user-data
-    if (uncompressedBuf.position() == uncompressedBuf.limit()) {
-      reset();
-      finished = true;
-    }
-
-    return n;
-  }
-
-  public synchronized int getRemaining() {
-    // Never use this function in BlockDecompressorStream.
-    return 0;
-  }
-
-  public synchronized void reset() {
-    finished = false;
-
-    uncompressedBuf.limit(uncompressedBuf.capacity());
-    uncompressedBuf.rewind();
-
-    outBuf.limit(outBuf.capacity());
-    outBuf.rewind();
-
-    bytesRead = bytesWritten = 0L;
-  }
-
-  public synchronized void end() {
-    // do nothing
-  }
-
-  protected void finalize() {
-    end();
-  }
-
-}

+ 0 - 6
common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java

@@ -102,12 +102,6 @@ public class TestCodec {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec");
   }
-  
-  @Test
-  public void testSnappyCodec() throws IOException {
-    codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
-    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
-  }
 
   @Test
   public void testGzipCodecWithParam() throws IOException {