فهرست منبع

commit 72253b609a0886a82f9e7902263e91724d099fd2
Author: Eli Collins <eli@apache.org>
Date: Mon Nov 22 21:54:58 2010 +0000

HADOOP-6683. svn merge -c 1037901 from trunk


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1037903 13f79535-47bb-0310-9956-ffa450edef68


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/yahoo-merge@1079140 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 سال پیش
والد
کامیت
e8e2586fc7
2فایلهای تغییر یافته به همراه33 افزوده شده و 15 حذف شده
  1. 3 0
      CHANGES.txt
  2. 30 15
      src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java

+ 3 - 0
CHANGES.txt

@@ -195,6 +195,9 @@ Release 0.22.0 - Unreleased
     HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
     (Erik Steffl via szetszwo)
 
+    HADOOP-6683. ZlibCompressor does not fully utilize the buffer.
+    (Kang Xiao via eli)
+
   BUG FIXES
 
     HADOOP-6638. try to relogin in a case of failed RPC connection (expired 

+ 30 - 15
src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java

@@ -53,6 +53,7 @@ public class ZlibCompressor implements Compressor {
   private int userBufOff = 0, userBufLen = 0;
   private Buffer uncompressedDirectBuf = null;
   private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+  private boolean keepUncompressedBuf = false;
   private Buffer compressedDirectBuf = null;
   private boolean finish, finished;
 
@@ -269,6 +270,7 @@ public class ZlibCompressor implements Compressor {
     this.userBuf = b;
     this.userBufOff = off;
     this.userBufLen = len;
+    uncompressedDirectBufOff = 0;
     setInputFromSavedData();
     
     // Reinitialize zlib's output direct buffer 
@@ -276,21 +278,13 @@ public class ZlibCompressor implements Compressor {
     compressedDirectBuf.position(directBufferSize);
   }
   
+  //copy enough data from userBuf to uncompressedDirectBuf
   synchronized void setInputFromSavedData() {
-    uncompressedDirectBufOff = 0;
-    uncompressedDirectBufLen = userBufLen;
-    if (uncompressedDirectBufLen > directBufferSize) {
-      uncompressedDirectBufLen = directBufferSize;
-    }
-
-    // Reinitialize zlib's input direct buffer
-    uncompressedDirectBuf.rewind();
-    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,  
-                                            uncompressedDirectBufLen);
-
-    // Note how much data is being fed to zlib
-    userBufOff += uncompressedDirectBufLen;
-    userBufLen -= uncompressedDirectBufLen;
+    int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
+    userBufLen -= len;
+    userBufOff += len;
+    uncompressedDirectBufLen = uncompressedDirectBuf.position();
   }
 
   public synchronized void setDictionary(byte[] b, int off, int len) {
@@ -310,12 +304,21 @@ public class ZlibCompressor implements Compressor {
     }
 
     // Check if zlib has consumed all input
-    if (uncompressedDirectBufLen <= 0) {
+    // compress should be invoked if keepUncompressedBuf true
+    if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
+      return false;
+    
+    if (uncompressedDirectBuf.remaining() > 0) {
       // Check if we have consumed all user-input
       if (userBufLen <= 0) {
         return true;
       } else {
+        // copy enough data from userBuf to uncompressedDirectBuf
         setInputFromSavedData();
+        if (uncompressedDirectBuf.remaining() > 0) // uncompressedDirectBuf is not full
+          return true;
+        else 
+          return false;
       }
     }
     
@@ -359,6 +362,17 @@ public class ZlibCompressor implements Compressor {
     n = deflateBytesDirect();
     compressedDirectBuf.limit(n);
     
+    // Check if zlib consumed all input buffer
+    // set keepUncompressedBuf properly
+    if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
+      keepUncompressedBuf = false;
+      uncompressedDirectBuf.clear();
+      uncompressedDirectBufOff = 0;
+      uncompressedDirectBufLen = 0;
+    } else { // zlib did not consume all input buffer
+      keepUncompressedBuf = true;
+    }
+    
     // Get atmost 'len' bytes
     n = Math.min(n, len);
     ((ByteBuffer)compressedDirectBuf).get(b, off, n);
@@ -393,6 +407,7 @@ public class ZlibCompressor implements Compressor {
     finished = false;
     uncompressedDirectBuf.rewind();
     uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+    keepUncompressedBuf = false;
     compressedDirectBuf.limit(directBufferSize);
     compressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;