瀏覽代碼

Revert HADOOP-10047, wrong patch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543538 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 11 年之前
父節點
當前提交
a6acf72aaa

+ 0 - 3
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -387,9 +387,6 @@ Release 2.3.0 - UNRELEASED
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
 
-    HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via
-    acmurthy)
-
   BUG FIXES
 
     HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize

+ 0 - 69
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java

@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.compress;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface DirectCompressor extends Compressor {
-  /**
-   * Example usage
-   * <pre> {@code
-   * private void compress(DirectCompressor comp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
-   *    ByteBuffer outBB = ByteBuffer.allocateDirect(64*1024);
-   *    outBB.clear();
-   *    // returns inBB.remaining() &gt; 0 || inBB == null 
-   *    // if you do a inBB.put(), remember to do a inBB.flip()
-   *    ByteBuffer inBB = in.get();       
-   *    while(!comp.finished()) {
-   *      comp.compress(outBB, inBB);
-   *      if(outBB.remaining() == 0) {
-   *        // flush when the buffer only when it is full
-   *        outBB.flip();          
-   *        // has to consume the buffer, because it is reused
-   *        out.put(outBB);
-   *        outBB.clear();
-   *      }
-   *      if(inBB != null &amp;&amp; inBB.remaining() == 0) {
-   *        inBB = in.get();
-   *        if(inBB == null) {
-   *          // EOF
-   *          comp.finish();
-   *        }
-   *      }
-   *    }
-   *    
-   *    if(outBB.position() &gt; 0) {
-   *      outBB.flip();
-   *      out.put(outBB);
-   *      outBB.clear();
-   *    }
-   *  }
-   * } </pre>
-   * @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
-   * @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
-   * @return bytes stored into dst
-   * @throws IOException if compression fails
-   */
-	public int compress(ByteBuffer dst, ByteBuffer src) throws IOException;
-}

+ 0 - 71
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java

@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.compress;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface DirectDecompressor extends Decompressor {
-  /**
-   * Example usage
-   * 
-   * <pre>{@code
-   * private void decompress(DirectDecompressor decomp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
-   *    ByteBuffer outBB = ByteBuffer.allocate(64*1024);
-   *    outBB.clear();
-   *    // returns inBB.remaining() &gt; 0 || inBB == null 
-   *    // if you do a inBB.put(), remember to do a inBB.flip()
-   *    ByteBuffer inBB = in.get();
-   *    if(inBB == null) {
-   *      // no data at all?
-   *    }
-   *    while(!decomp.finished()) {
-   *      decomp.decompress(outBB, inBB);
-   *      if(outBB.remaining() == 0) {
-   *        // flush when the buffer is full
-   *        outBB.flip();
-   *        // has to consume the buffer, because it is reused
-   *        out.put(outBB);
-   *        outBB.clear();
-   *      }
-   *      if(inBB != null &amp;&amp; inBB.remaining() == 0) {
-   *        // inBB = null for EOF
-   *        inBB = in.get();
-   *      }
-   *    }
-   *    
-   *    if(outBB.position() &gt; 0) {
-   *      outBB.flip();
-   *      out.put(outBB);
-   *      outBB.clear();
-   *    }
-   *  }
-   * }</pre>
-   * @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
-   * @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
-   * @return bytes stored into dst (dst.postion += more)
-   * @throws IOException if compression fails   
-   */
-	public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException;
-}

+ 1 - 107
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java

@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.DirectCompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +35,7 @@ import org.apache.commons.logging.LogFactory;
  * http://www.zlib.net/
  * 
  */
-public class ZlibCompressor implements Compressor,DirectCompressor {
+public class ZlibCompressor implements Compressor {
 
   private static final Log LOG = LogFactory.getLog(ZlibCompressor.class);
 
@@ -421,7 +420,6 @@ public class ZlibCompressor implements Compressor,DirectCompressor {
     compressedDirectBuf.limit(directBufferSize);
     compressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;
-    userBuf = null;
   }
   
   @Override
@@ -437,110 +435,6 @@ public class ZlibCompressor implements Compressor,DirectCompressor {
       throw new NullPointerException();
   }
   
-  private int put(ByteBuffer dst, ByteBuffer src) {
-    // this will lop off data from src[pos:limit] into dst[pos:limit]
-    int l1 = src.remaining();
-    int l2 = dst.remaining();
-    int pos1 = src.position();
-    int pos2 = dst.position();
-    int len = Math.min(l1, l2);
-
-    if (len == 0) {
-      return 0;
-    }
-
-    ByteBuffer slice = src.slice();
-    slice.limit(len);
-    dst.put(slice);
-    src.position(pos1 + len);
-    return len;
-  }
-
-  public int compress(ByteBuffer dst, ByteBuffer src) throws IOException {
-    assert dst.remaining() > 0 : "dst.remaining() == 0";
-    int n = 0;
-    
-    /* fast path for clean state and direct buffers */
-    /* TODO: reset should free userBuf? */
-    if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
-      /*
-       * TODO: fix these assumptions in inflateDirect(), eventually by allowing
-       * it to read position()/limit() directly
-       */
-      boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.capacity() >= directBufferSize);
-      boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen == 0 && compressedDirectBuf.remaining() == 0);
-      /* use the buffers directly */
-      if(cleanDst && cleanState) {
-        Buffer originalCompressed = compressedDirectBuf;
-        Buffer originalUncompressed = uncompressedDirectBuf;
-        int originalBufferSize = directBufferSize;
-        uncompressedDirectBuf = src;
-        uncompressedDirectBufOff = src.position();
-        uncompressedDirectBufLen = src.remaining();
-        compressedDirectBuf = dst;
-        directBufferSize = dst.remaining();
-        // Compress data
-        n = deflateBytesDirect();
-        // we move dst.position() forward, not limit() 
-        // unlike the local buffer case, which moves it when we put() into the dst
-        dst.position(n);
-        if(uncompressedDirectBufLen > 0) {
-          src.position(uncompressedDirectBufOff);
-        } else {
-          src.position(src.limit());
-        }
-        compressedDirectBuf = originalCompressed;
-        uncompressedDirectBuf = originalUncompressed;
-        uncompressedDirectBufOff = 0;
-        uncompressedDirectBufLen = 0;
-        directBufferSize = originalBufferSize;
-        return n;
-      }
-    }
-    
-    // Check if there is compressed data
-    if (compressedDirectBuf.remaining() > 0) {
-      n = put(dst, (ByteBuffer) compressedDirectBuf);
-    }
-
-    if (dst.remaining() == 0) {
-      return n;
-    } else {
-      needsInput();
-
-      // if we have drained userBuf, read from src (ideally, do not mix buffer
-      // modes, but sometimes you can)
-      if (userBufLen == 0 && src != null && src.remaining() > 0) {
-        put((ByteBuffer) uncompressedDirectBuf, src);
-        uncompressedDirectBufLen = uncompressedDirectBuf.position();
-      }
-
-      // Re-initialize the zlib's output direct buffer
-      compressedDirectBuf.rewind();
-      compressedDirectBuf.limit(directBufferSize);
-
-      // Compress data
-      int more = deflateBytesDirect();
-
-      compressedDirectBuf.limit(more);
-
-      // Check if zlib consumed all input buffer
-      // set keepUncompressedBuf properly
-      if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
-        keepUncompressedBuf = false;
-        uncompressedDirectBuf.clear();
-        uncompressedDirectBufOff = 0;
-        uncompressedDirectBufLen = 0;
-      } else { // zlib did not consume all input buffer
-        keepUncompressedBuf = true;
-      }
-
-      // fill the dst buffer from compressedDirectBuf
-      int fill = put(dst, ((ByteBuffer) compressedDirectBuf));
-      return n + fill;
-    }
-  }
-  
   private native static void initIDs();
   private native static long init(int level, int strategy, int windowBits);
   private native static void setDictionary(long strm, byte[] b, int off,

+ 1 - 105
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java

@@ -23,7 +23,6 @@ import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -32,7 +31,7 @@ import org.apache.hadoop.util.NativeCodeLoader;
  * http://www.zlib.net/
  * 
  */
-public class ZlibDecompressor implements Decompressor,DirectDecompressor {
+public class ZlibDecompressor implements Decompressor {
   private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
   
   // HACK - Use this as a global lock in the JNI layer
@@ -281,7 +280,6 @@ public class ZlibDecompressor implements Decompressor,DirectDecompressor {
     uncompressedDirectBuf.limit(directBufferSize);
     uncompressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;
-    userBuf = null;
   }
 
   @Override
@@ -301,108 +299,6 @@ public class ZlibDecompressor implements Decompressor,DirectDecompressor {
     if (stream == 0)
       throw new NullPointerException();
   }
-    
-  private int put(ByteBuffer dst, ByteBuffer src) {
-    // this will lop off data from src[pos:limit] into dst[pos:limit], using the
-    // min() of both remaining()
-    int l1 = src.remaining();
-    int l2 = dst.remaining();
-    int pos1 = src.position();
-    int pos2 = dst.position();
-    int len = Math.min(l1, l2);
-
-    if (len == 0) {
-      return 0;
-    }
-
-    ByteBuffer slice = src.slice();
-    slice.limit(len);
-    dst.put(slice);
-    src.position(pos1 + len);
-    return len;
-  }
-
-  public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException {
-    assert dst.remaining() > 0 : "dst.remaining == 0";
-    int n = 0;
-    
-    /* fast path for clean state and direct buffers */
-    if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
-      /*
-       * TODO: fix these assumptions in inflateDirect(), eventually by allowing
-       * it to read position()/limit() directly
-       */
-      boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.remaining() >= directBufferSize);
-      boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining() == 0);
-      /* use the buffers directly */
-      if(cleanDst && cleanState) {
-        Buffer originalCompressed = compressedDirectBuf;
-        Buffer originalUncompressed = uncompressedDirectBuf;
-        int originalBufferSize = directBufferSize;
-        compressedDirectBuf = src;
-        compressedDirectBufOff = src.position();
-        compressedDirectBufLen = src.remaining();
-        uncompressedDirectBuf = dst;
-        directBufferSize = dst.remaining();
-        // Compress data
-        n = inflateBytesDirect();
-        dst.position(n);
-        if(compressedDirectBufLen > 0) {
-          src.position(compressedDirectBufOff);
-        } else {
-          src.position(src.limit());
-        }
-        compressedDirectBuf = originalCompressed;
-        uncompressedDirectBuf = originalUncompressed;        
-        compressedDirectBufOff = 0;
-        compressedDirectBufLen = 0;
-        directBufferSize = originalBufferSize;
-        return n;
-      }
-    }
-    
-    // Check if there is compressed data
-    if (uncompressedDirectBuf.remaining() > 0) {
-      n = put(dst, (ByteBuffer) uncompressedDirectBuf);
-    }
-
-    if (dst.remaining() == 0) {
-      return n;
-    } else {
-      if (needsInput()) {
-        // this does not update buffers if we have no userBuf
-        if (userBufLen <= 0) {
-          compressedDirectBufOff = 0;
-          compressedDirectBufLen = 0;
-          compressedDirectBuf.rewind().limit(directBufferSize);
-        }
-        if (src != null) {
-          assert src.remaining() > 0 : "src.remaining() == 0";
-        }
-      }
-
-      // if we have drained userBuf, read from src (ideally, do not mix buffer
-      // modes, but sometimes you can)
-      if (userBufLen == 0 && src != null && src.remaining() > 0) {
-        compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src);
-      }
-      
-      // Re-initialize the zlib's output direct buffer
-      uncompressedDirectBuf.rewind();
-      uncompressedDirectBuf.limit(directBufferSize);
-
-      // Compress data
-      int more = inflateBytesDirect();
-
-      uncompressedDirectBuf.limit(more);
-
-      // Get atmost 'len' bytes
-      int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf));
-      return n + fill;
-    }
-  }
-
-  
   
   private native static void initIDs();
   private native static long init(int windowBits);

+ 0 - 152
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java

@@ -19,13 +19,8 @@ package org.apache.hadoop.io.compress.zlib;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Console;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,12 +33,8 @@ import org.apache.hadoop.io.compress.DecompressorStream;
 import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
-import org.apache.log4j.ConsoleAppender;
 import org.junit.Before;
 import org.junit.Test;
-
-import sun.util.logging.resources.logging;
-
 import com.google.common.collect.ImmutableSet;
 
 public class TestZlibCompressorDecompressor {
@@ -159,149 +150,6 @@ public class TestZlibCompressorDecompressor {
     }
   }
   
-  private void compressDecompressLoop(int rawDataSize, int inSize, int outSize)
-      throws IOException {
-    byte[] rawData = null;
-    rawData = generate(rawDataSize);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize);
-    ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize);
-    ZlibCompressor compressor = new ZlibCompressor();
-    ZlibDecompressor decompressor = new ZlibDecompressor();
-    outBuf.clear();
-    /* compression loop */
-    int off = 0;
-    int len = rawDataSize;
-    int min = Math.min(inBuf.remaining(), len);
-    if (min > 0) {
-      inBuf.put(rawData, off, min);
-    }
-    inBuf.flip();
-    len -= min;
-    off += min;
-    while (!compressor.finished()) {
-      compressor.compress(outBuf, inBuf);
-      if (outBuf.remaining() == 0) {
-        // flush when the buffer is full
-        outBuf.flip();
-        while (outBuf.remaining() > 0) {
-          baos.write(outBuf.get());
-        }
-        outBuf.clear();
-      }
-      if (inBuf != null && inBuf.remaining() == 0) {
-        inBuf.clear();
-        if (len > 0) {
-          min = Math.min(inBuf.remaining(), len);
-          inBuf.put(rawData, off, min);
-          inBuf.flip();
-          len -= min;
-          off += min;
-        } else {
-          inBuf = null;
-          compressor.finish();
-        }
-      }
-    }
-
-    outBuf.flip();
-    if (outBuf.remaining() > 0) {
-      while (outBuf.remaining() > 0) {
-        baos.write(outBuf.get());
-      }
-      outBuf.clear();
-    }
-
-    compressor.end();
-
-    byte[] compressed = baos.toByteArray();
-    ByteBuffer expected = ByteBuffer.wrap(rawData);
-    outBuf.clear();
-    inBuf = ByteBuffer.allocateDirect(inSize);
-    inBuf.clear();
-
-    // zlib always has header
-    if (compressed.length != 0) {
-      off = 0;
-      len = compressed.length;
-      min = Math.min(inBuf.remaining(), len);
-      inBuf.put(compressed, off, min);
-      inBuf.flip();
-      len -= min;
-      off += min;
-      while (!decompressor.finished()) {
-        decompressor.decompress(outBuf, inBuf);
-        if (outBuf.remaining() == 0) {
-          outBuf.flip();
-          while (outBuf.remaining() > 0) {
-            assertEquals(expected.get(), outBuf.get());
-          }
-          outBuf.clear();
-        }
-
-        if (inBuf != null && inBuf.remaining() == 0) {
-          inBuf.clear();
-          if (len > 0) {
-            min = Math.min(inBuf.remaining(), len);
-            inBuf.put(compressed, off, min);
-            inBuf.flip();
-            len -= min;
-            off += min;
-          }
-        }
-      }
-    }
-
-    outBuf.flip();
-    if (outBuf.remaining() > 0) {
-      while (outBuf.remaining() > 0) {
-        assertEquals(expected.get(), outBuf.get());
-      }
-      outBuf.clear();
-    }
-
-    assertEquals(0, expected.remaining());
-  }
-  
-  @Test
-  public void testZlibDirectCompressDecompress() {
-    int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024,
-        1024 * 1024 };
-    try {
-      // 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf)
-      compressDecompressLoop(0, 4096, 4096);
-      compressDecompressLoop(0, 1, 1);
-      compressDecompressLoop(1, 1, 2);
-      compressDecompressLoop(1, 2, 1);
-      compressDecompressLoop(2, 3, 2);
-
-      for (int i = 0; i < size.length; i++) {
-        compressDecompressLoop(size[i], 4096, 4096);
-        compressDecompressLoop(size[i], 1, 1);
-        compressDecompressLoop(size[i], 1, 2);
-        compressDecompressLoop(size[i], 2, 1);
-        compressDecompressLoop(size[i], 3, 2);
-        compressDecompressLoop(size[i], size[i], 4096);
-        compressDecompressLoop(size[i], size[i] - 1, 4096);
-        compressDecompressLoop(size[i], size[i] + 1, 4096);
-        compressDecompressLoop(size[i], 4096, size[i]);
-        compressDecompressLoop(size[i], 4096, size[i] - 1);
-        compressDecompressLoop(size[i], 4096, size[i] + 1);
-        compressDecompressLoop(size[i], size[i] - 1, size[i] - 1);
-
-        compressDecompressLoop(size[i], size[i] / 2, 4096);
-        compressDecompressLoop(size[i], size[i] / 2 - 1, 4096);
-        compressDecompressLoop(size[i], size[i] / 2 + 1, 4096);
-        compressDecompressLoop(size[i], 4096, size[i] / 2);
-        compressDecompressLoop(size[i], 4096, size[i] / 2 - 1);
-        compressDecompressLoop(size[i], 4096, size[i] / 2 + 1);
-        compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1);
-      }
-    } catch (IOException ex) {
-      fail("testZlibDirectCompressDecompress ex !!!" + ex);
-    }
-  }
-  
   @Test
   public void testZlibCompressorDecompressorSetDictionary() {
     Configuration conf = new Configuration();