Browse Source

HADOOP-2402. Fix BlockCompressorStream to ensure it buffers data before sending it down to the compressor so that each write call doesn't compress. Contributed by Chris Douglas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@615077 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 năm trước cách đây
mục cha
commit
0dde9714af

+ 4 - 0
CHANGES.txt

@@ -546,6 +546,10 @@ Trunk (unreleased changes)
     HADOOP-2687. Modify a few log message generated by dfs client to be
     logged only at INFO level. (stack via dhruba)
 
+    HADOOP-2402. Fix BlockCompressorStream to ensure it buffers data before
+    sending it down to the compressor so that each write call doesn't
+    compress. (Chris Douglas via acmurthy) 
+
 Release 0.15.3 - 2008-01-18
 
   BUG FIXES

+ 51 - 8
src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java

@@ -25,7 +25,11 @@ import java.io.OutputStream;
  * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
  * with 'block-based' based compression algorithms, as opposed to 
  * 'stream-based' compression algorithms.
- *  
+ *
+ * It should be noted that this wrapper does not guarantee that blocks will
+ * be sized for the compressor. If the
+ * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to
+ * effect meaningful compression, it is responsible for it.
  */
 class BlockCompressorStream extends CompressorStream {
 
@@ -61,6 +65,14 @@ class BlockCompressorStream extends CompressorStream {
     this(out, compressor, 512, 18);
   }
 
+  /**
+   * Write the data provided to the compression codec, compressing no more
+   * than the buffer size less the compression overhead as specified during
+   * construction for each block.
+   *
+   * Each block contains the uncompressed length for the block, followed by
+   * one or more length-prefixed blocks of compressed data.
+   */
   public void write(byte[] b, int off, int len) throws IOException {
     // Sanity checks
     if (compressor.finished()) {
@@ -75,22 +87,53 @@ class BlockCompressorStream extends CompressorStream {
       return;
     }
 
-    // Write out the length of the original data
-    rawWriteInt(len);
-    
-    // Compress data
-    if (!compressor.finished()) {
+    long limlen = compressor.getBytesRead();
+    if (len + limlen > MAX_INPUT_SIZE && limlen > 0) {
+      // Adding this segment would exceed the maximum size.
+      // Flush data if we have it.
+      finish();
+      compressor.reset();
+    }
+
+    if (len > MAX_INPUT_SIZE) {
+      // The data we're given exceeds the maximum size. Any data
+      // we had have been flushed, so we write out this chunk in segments
+      // not exceeding the maximum size until it is exhausted.
+      rawWriteInt(len);
       do {
-        // Compress atmost 'maxInputSize' chunks at a time
         int bufLen = Math.min(len, MAX_INPUT_SIZE);
         
         compressor.setInput(b, off, bufLen);
-        while (!compressor.needsInput()) {
+        compressor.finish();
+        while (!compressor.finished()) {
           compress();
         }
+        compressor.reset();
         off += bufLen;
         len -= bufLen;
       } while (len > 0);
+      return;
+    }
+
+    // Give data to the compressor
+    compressor.setInput(b, off, len);
+    if (!compressor.needsInput()) {
+      // compressor buffer size might be smaller than the maximum
+      // size, so we permit it to flush if required.
+      rawWriteInt((int)compressor.getBytesRead());
+      do {
+        compress();
+      } while (!compressor.needsInput());
+    }
+  }
+
+  public void finish() throws IOException {
+    if (!compressor.finished()) {
+      rawWriteInt((int)compressor.getBytesRead());
+      compressor.finish();
+      while (!compressor.finished()) {
+        compress();
+      }
     }
   }
 

+ 10 - 1
src/java/org/apache/hadoop/io/compress/Compressor.java

@@ -56,7 +56,16 @@ public interface Compressor {
    * @param len Length
    */
   public void setDictionary(byte[] b, int off, int len);
-  
+
+  /**
+   * Return number of uncompressed bytes input so far.
+   */
+  public long getBytesRead();
+
+  /**
+   * Return number of compressed bytes output so far.
+   */
+  public long getBytesWritten();
 
   /**
    * When called, indicates that compression should end

+ 4 - 6
src/java/org/apache/hadoop/io/compress/CompressorStream.java

@@ -65,12 +65,10 @@ class CompressorStream extends CompressionOutputStream {
     } else if (len == 0) {
       return;
     }
-    
-    if (!compressor.finished()) {
-      compressor.setInput(b, off, len);
-      while (!compressor.needsInput()) {
-        compress();
-      }
+
+    compressor.setInput(b, off, len);
+    while (!compressor.needsInput()) {
+      compress();
     }
   }
 

+ 30 - 83
src/java/org/apache/hadoop/io/compress/LzoCodec.java

@@ -76,9 +76,14 @@ public class LzoCodec implements Configurable, CompressionCodec {
   public static boolean isNativeLzoLoaded(Configuration conf) {
     return nativeLzoLoaded && conf.getBoolean("hadoop.native.lib", true);
   }
-  
+
   public CompressionOutputStream createOutputStream(OutputStream out) 
     throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+  
+  public CompressionOutputStream createOutputStream(OutputStream out, 
+      Compressor compressor) throws IOException {
     // Ensure native-lzo library is loaded & initialized
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");
@@ -107,49 +112,16 @@ public class LzoCodec implements Configurable, CompressionCodec {
     // Create the lzo output-stream
     LzoCompressor.CompressionStrategy strategy = 
       LzoCompressor.CompressionStrategy.valueOf(
-                                                conf.get("io.compression.codec.lzo.compressor",
-                                                         LzoCompressor.CompressionStrategy.LZO1X_1.name()
-                                                         )
-                                                ); 
-    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
-                                 64*1024);
-    int compressionOverhead = 0;
-    if (strategy.name().contains("LZO1")) {
-      compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0);  
-    } else {
-      compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
-    }
-    
-    return new BlockCompressorStream(out, 
-                                     new LzoCompressor(strategy, bufferSize), 
-                                     bufferSize, compressionOverhead);
-  }
-  
-  public CompressionOutputStream createOutputStream(OutputStream out, 
-                                                    Compressor compressor) 
-  throws IOException {
-    // Ensure native-lzo library is loaded & initialized
-    if (!isNativeLzoLoaded(conf)) {
-      throw new RuntimeException("native-lzo library not available");
-    }
-    
-    LzoCompressor.CompressionStrategy strategy = 
-      LzoCompressor.CompressionStrategy.valueOf(
-                                                conf.get("io.compression.codec.lzo.compressor",
-                                                         LzoCompressor.CompressionStrategy.LZO1X_1.name()
-                                                         )
-                                                ); 
-    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
-                                 64*1024);
-    int compressionOverhead = 0;
-    if (strategy.name().contains("LZO1")) {
-      compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0);  
-    } else {
-      compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
-    }
-    
-    return new BlockCompressorStream(out, compressor, bufferSize, 
-                                     compressionOverhead); 
+          conf.get("io.compression.codec.lzo.compressor",
+            LzoCompressor.CompressionStrategy.LZO1X_1.name()));
+    int bufferSize =
+      conf.getInt("io.compression.codec.lzo.buffersize", 64*1024);
+    int compressionOverhead = strategy.name().contains("LZO1")
+      ? (bufferSize >> 4) + 64 + 3
+      : (bufferSize >> 3) + 128 + 3;
+
+    return new BlockCompressorStream(out, compressor, bufferSize,
+                                     compressionOverhead);
   }
 
   public Class getCompressorType() {
@@ -157,7 +129,6 @@ public class LzoCodec implements Configurable, CompressionCodec {
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");
     }
-    
     return LzoCompressor.class;
   }
 
@@ -169,36 +140,17 @@ public class LzoCodec implements Configurable, CompressionCodec {
     
     LzoCompressor.CompressionStrategy strategy = 
       LzoCompressor.CompressionStrategy.valueOf(
-                                                conf.get("io.compression.codec.lzo.compressor",
-                                                         LzoCompressor.CompressionStrategy.LZO1X_1.name()
-                                                         )
-                                                ); 
-    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
-                                 64*1024);
-    
-    return new LzoCompressor(strategy, bufferSize); 
+          conf.get("io.compression.codec.lzo.compressor",
+            LzoCompressor.CompressionStrategy.LZO1X_1.name()));
+    int bufferSize =
+      conf.getInt("io.compression.codec.lzo.buffersize", 64*1024);
+
+    return new LzoCompressor(strategy, bufferSize);
   }
 
-  public CompressionInputStream createInputStream(InputStream in) 
-    throws IOException {
-    // Ensure native-lzo library is loaded & initialized
-    if (!isNativeLzoLoaded(conf)) {
-      throw new IOException("native-lzo library not available");
-    }
-    
-    // Create the lzo input-stream
-    LzoDecompressor.CompressionStrategy strategy = 
-      LzoDecompressor.CompressionStrategy.valueOf(
-                                                  conf.get("io.compression.codec.lzo.decompressor",
-                                                           LzoDecompressor.CompressionStrategy.LZO1X.name()
-                                                           )
-                                                  ); 
-    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
-                                 64*1024);
-
-    return new BlockDecompressorStream(in, 
-                                       new LzoDecompressor(strategy, bufferSize), 
-                                       bufferSize);
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return createInputStream(in, createDecompressor());
   }
 
   public CompressionInputStream createInputStream(InputStream in, 
@@ -208,10 +160,8 @@ public class LzoCodec implements Configurable, CompressionCodec {
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");
     }
-    
     return new BlockDecompressorStream(in, decompressor, 
-                                       conf.getInt("io.compression.codec.lzo.buffersize", 
-                                                   64*1024));
+        conf.getInt("io.compression.codec.lzo.buffersize", 64*1024));
   }
 
   public Class getDecompressorType() {
@@ -219,7 +169,6 @@ public class LzoCodec implements Configurable, CompressionCodec {
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");
     }
-    
     return LzoDecompressor.class;
   }
 
@@ -231,12 +180,10 @@ public class LzoCodec implements Configurable, CompressionCodec {
     
     LzoDecompressor.CompressionStrategy strategy = 
       LzoDecompressor.CompressionStrategy.valueOf(
-                                                  conf.get("io.compression.codec.lzo.decompressor",
-                                                           LzoDecompressor.CompressionStrategy.LZO1X.name()
-                                                           )
-                                                  ); 
-    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
-                                 64*1024);
+          conf.get("io.compression.codec.lzo.decompressor",
+            LzoDecompressor.CompressionStrategy.LZO1X.name()));
+    int bufferSize =
+      conf.getInt("io.compression.codec.lzo.buffersize", 64*1024);
 
     return new LzoDecompressor(strategy, bufferSize); 
   }

+ 71 - 44
src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java

@@ -44,6 +44,9 @@ public class LzoCompressor implements Compressor {
   private Buffer compressedDirectBuf = null;
   private boolean finish, finished;
   
+  private long bytesread = 0L;
+  private long byteswritten = 0L;
+
   private CompressionStrategy strategy; // The lzo compression algorithm.
   private long lzoCompressor = 0;       // The actual lzo compression function.
   private int workingMemoryBufLen = 0;  // The length of 'working memory' buf.
@@ -200,26 +203,34 @@ public class LzoCompressor implements Compressor {
     if (off < 0 || len < 0 || off > b.length - len) {
       throw new ArrayIndexOutOfBoundsException();
     }
+    finished = false;
 
-    this.userBuf = b;
-    this.userBufOff = off;
-    this.userBufLen = len;
-
-    // Reinitialize lzo's output direct-buffer 
-    compressedDirectBuf.limit(directBufferSize);
-    compressedDirectBuf.position(directBufferSize);
+    if (len > uncompressedDirectBuf.remaining()) {
+      // save data; now !needsInput
+      this.userBuf = b;
+      this.userBufOff = off;
+      this.userBufLen = len;
+    } else {
+      ((ByteBuffer)uncompressedDirectBuf).put(b, off, len);
+      uncompressedDirectBufLen = uncompressedDirectBuf.position();
+    }
+    bytesread += len;
   }
 
+  /**
+   * If a write would exceed the capacity of the direct buffers, it is set
+   * aside to be loaded by this function while the compressed data are
+   * consumed.
+   */
   synchronized void setInputFromSavedData() {
-    uncompressedDirectBufLen = userBufLen;
-    if (uncompressedDirectBufLen > directBufferSize) {
-      uncompressedDirectBufLen = directBufferSize;
+    if (0 >= userBufLen) {
+      return;
     }
+    finished = false;
 
-    // Reinitialize lzo's input direct buffer
-    uncompressedDirectBuf.rewind();
-    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,  
-                                            uncompressedDirectBufLen);
+    uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
+    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,
+      uncompressedDirectBufLen);
 
     // Note how much data is being fed to lzo
     userBufOff += uncompressedDirectBufLen;
@@ -230,25 +241,13 @@ public class LzoCompressor implements Compressor {
     // nop
   }
 
+  /** {@inheritDoc} */
   public boolean needsInput() {
-    // Consume remaining compressed data?
-    if (compressedDirectBuf.remaining() > 0) {
-      return false;
-    }
-
-    // Check if lzo has consumed all input
-    if (uncompressedDirectBufLen <= 0) {
-      // Check if we have consumed all user-input
-      if (userBufLen <= 0) {
-        return true;
-      } else {
-        setInputFromSavedData();
-      }
-    }
-    
-    return false;
+    return !(compressedDirectBuf.remaining() > 0
+        || uncompressedDirectBuf.remaining() == 0
+        || userBufLen > 0);
   }
-  
+
   public synchronized void finish() {
     finish = true;
   }
@@ -267,32 +266,42 @@ public class LzoCompressor implements Compressor {
     if (off < 0 || len < 0 || off > b.length - len) {
       throw new ArrayIndexOutOfBoundsException();
     }
-    
-    int n = 0;
-    
+
     // Check if there is compressed data
-    n = compressedDirectBuf.remaining();
+    int n = compressedDirectBuf.remaining();
     if (n > 0) {
       n = Math.min(n, len);
       ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+      byteswritten += n;
       return n;
     }
 
     // Re-initialize the lzo's output direct-buffer
-    compressedDirectBuf.rewind();
-    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.clear();
+    compressedDirectBuf.limit(0);
+    if (0 == uncompressedDirectBuf.position()) {
+      // No compressed data, so we should have !needsInput or !finished
+      setInputFromSavedData();
+      if (0 == uncompressedDirectBuf.position()) {
+        // Called without data; write nothing
+        finished = true;
+        return 0;
+      }
+    }
 
     // Compress data
     n = compressBytesDirect(strategy.getCompressor());
     compressedDirectBuf.limit(n);
-    
+    uncompressedDirectBuf.clear(); // lzo consumes all buffer input
+
     // Set 'finished' if lzo has consumed all user-data
-    if (userBufLen <= 0) {
+    if (0 == userBufLen) {
       finished = true;
     }
-    
+
     // Get atmost 'len' bytes
     n = Math.min(n, len);
+    byteswritten += n;
     ((ByteBuffer)compressedDirectBuf).get(b, off, n);
 
     return n;
@@ -301,13 +310,31 @@ public class LzoCompressor implements Compressor {
   public synchronized void reset() {
     finish = false;
     finished = false;
-    uncompressedDirectBuf.rewind();
+    uncompressedDirectBuf.clear();
     uncompressedDirectBufLen = 0;
-    compressedDirectBuf.limit(directBufferSize);
-    compressedDirectBuf.position(directBufferSize);
+    compressedDirectBuf.clear();
+    compressedDirectBuf.limit(0);
     userBufOff = userBufLen = 0;
+    bytesread = byteswritten = 0L;
   }
-  
+
+  /**
+   * Return number of bytes given to this compressor since last reset.
+   */
+  public synchronized long getBytesRead() {
+    return bytesread;
+  }
+
+  /**
+   * Return number of bytes consumed by callers of compress since last reset.
+   */
+  public synchronized long getBytesWritten() {
+    return byteswritten;
+  }
+
+  /**
+   * Noop.
+   */
   public synchronized void end() {
     // nop
   }