Browse Source

HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
(Zheng Shao via dhruba).



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@735109 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 16 years ago
parent
commit
e1c353217f

+ 3 - 0
CHANGES.txt

@@ -590,6 +590,9 @@ Release 0.19.1 - Unreleased
     HADOOP-4906. Fix TaskTracker OOM by keeping a shallow copy of JobConf in
     TaskTracker.TaskInProgress. (Sharad Agarwal via acmurthy) 
 
+    HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
+    (Zheng Shao via dhruba).
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

+ 59 - 39
src/core/org/apache/hadoop/io/compress/BZip2Codec.java

@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
 
@@ -67,7 +69,7 @@ public class BZip2Codec implements
    */
   public CompressionOutputStream createOutputStream(OutputStream out,
       Compressor compressor) throws IOException {
-    throw new UnsupportedOperationException();
+    return createOutputStream(out);
   }
 
   /**
@@ -76,8 +78,8 @@ public class BZip2Codec implements
   * @throws java.lang.UnsupportedOperationException
   *             Throws UnsupportedOperationException
   */
-  public Class<org.apache.hadoop.io.compress.Compressor> getCompressorType() {
-    throw new UnsupportedOperationException();
+  public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
+    return BZip2DummyCompressor.class;
   }
 
   /**
@@ -87,7 +89,7 @@ public class BZip2Codec implements
   *             Throws UnsupportedOperationException
   */
   public Compressor createCompressor() {
-    throw new UnsupportedOperationException();
+    return new BZip2DummyCompressor();
   }
 
   /**
@@ -112,8 +114,7 @@ public class BZip2Codec implements
   */
   public CompressionInputStream createInputStream(InputStream in,
       Decompressor decompressor) throws IOException {
-    throw new UnsupportedOperationException();
-
+    return createInputStream(in);
   }
 
   /**
@@ -122,8 +123,8 @@ public class BZip2Codec implements
   * @throws java.lang.UnsupportedOperationException
   *             Throws UnsupportedOperationException
   */
-  public Class<org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
-    throw new UnsupportedOperationException();
+  public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
+    return BZip2DummyDecompressor.class;
   }
 
   /**
@@ -133,7 +134,7 @@ public class BZip2Codec implements
   *             Throws UnsupportedOperationException
   */
   public Decompressor createDecompressor() {
-    throw new UnsupportedOperationException();
+    return new BZip2DummyDecompressor();
   }
 
   /**
@@ -149,14 +150,13 @@ public class BZip2Codec implements
 
     // class data starts here//
     private CBZip2OutputStream output;
-
+    private boolean needsReset; 
     // class data ends here//
 
     public BZip2CompressionOutputStream(OutputStream out)
         throws IOException {
       super(out);
-      writeStreamHeader();
-      this.output = new CBZip2OutputStream(out);
+      needsReset = true;
     }
 
     private void writeStreamHeader() throws IOException {
@@ -168,32 +168,43 @@ public class BZip2Codec implements
       }
     }
 
-    public void write(byte[] b, int off, int len) throws IOException {
-      this.output.write(b, off, len);
-
-    }
-
     public void finish() throws IOException {
-      this.output.flush();
+      this.output.finish();
+      needsReset = true;
     }
 
+    private void internalReset() throws IOException {
+      if (needsReset) {
+        needsReset = false;
+        writeStreamHeader();
+        this.output = new CBZip2OutputStream(out);
+      }
+    }    
+    
     public void resetState() throws IOException {
-
+      // Cannot write to out at this point because out might not be ready
+      // yet, as in SequenceFile.Writer implementation.
+      needsReset = true;
     }
 
     public void write(int b) throws IOException {
+      if (needsReset) {
+        internalReset();
+      }
       this.output.write(b);
     }
 
+    public void write(byte[] b, int off, int len) throws IOException {
+      if (needsReset) {
+        internalReset();
+      }
+      this.output.write(b, off, len);
+    }
+
     public void close() throws IOException {
       this.output.flush();
       this.output.close();
-    }
-
-    protected void finalize() throws IOException {
-      if (this.output != null) {
-        this.close();
-      }
+      needsReset = true;
     }
 
   }// end of class BZip2CompressionOutputStream
@@ -202,14 +213,13 @@ public class BZip2Codec implements
 
     // class data starts here//
     private CBZip2InputStream input;
-
+    boolean needsReset;
     // class data ends here//
 
     public BZip2CompressionInputStream(InputStream in) throws IOException {
 
       super(in);
-      BufferedInputStream bufferedIn = readStreamHeader();
-      input = new CBZip2InputStream(bufferedIn);
+      needsReset = true;
     }
 
     private BufferedInputStream readStreamHeader() throws IOException {
@@ -239,29 +249,39 @@ public class BZip2Codec implements
     }// end of method
 
     public void close() throws IOException {
-      this.input.close();
+      if (!needsReset) {
+        input.close();
+        needsReset = true;
+      }
     }
 
     public int read(byte[] b, int off, int len) throws IOException {
-
+      if (needsReset) {
+        internalReset();
+      }
       return this.input.read(b, off, len);
 
     }
 
+    private void internalReset() throws IOException {
+      if (needsReset) {
+        needsReset = false;
+        BufferedInputStream bufferedIn = readStreamHeader();
+        input = new CBZip2InputStream(bufferedIn);
+      }
+    }    
+    
     public void resetState() throws IOException {
-
+      // Cannot read from bufferedIn at this point because bufferedIn might not be ready
+      // yet, as in SequenceFile.Reader implementation.
+      needsReset = true;
     }
 
     public int read() throws IOException {
-      return this.input.read();
-
-    }
-
-    protected void finalize() throws IOException {
-      if (this.input != null) {
-        this.close();
+      if (needsReset) {
+        internalReset();
       }
-
+      return this.input.read();
     }
 
   }// end of BZip2CompressionInputStream

+ 62 - 0
src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java

@@ -0,0 +1,62 @@
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * This is a dummy compressor for BZip2.
+ */
+public class BZip2DummyCompressor implements Compressor {
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void end() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void finish() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean finished() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBytesRead() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBytesWritten() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean needsInput() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reset() {
+    // do nothing
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+}

+ 52 - 0
src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java

@@ -0,0 +1,52 @@
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * This is a dummy decompressor for BZip2.
+ */
+public class BZip2DummyDecompressor implements Decompressor {
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void end() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean finished() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean needsInput() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reset() {
+    // do nothing
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+}

+ 12 - 5
src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java

@@ -703,13 +703,13 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
   * Overriden to close the stream.
   */
   protected void finalize() throws Throwable {
-    close();
+    finish();
     super.finalize();
   }
 
-  public void close() throws IOException {
-    OutputStream outShadow = this.out;
-    if (outShadow != null) {
+  
+  public void finish() throws IOException {
+    if (out != null) {
       try {
         if (this.runLength > 0) {
           writeRun();
@@ -717,7 +717,6 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
         this.currentChar = -1;
         endBlock();
         endCompression();
-        outShadow.close();
       } finally {
         this.out = null;
         this.data = null;
@@ -725,6 +724,14 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
     }
   }
 
+  public void close() throws IOException {
+    if (out != null) {
+      OutputStream outShadow = this.out;
+      finish();
+      outShadow.close();
+    }
+  }
+  
   public void flush() throws IOException {
     OutputStream outShadow = this.out;
     if (outShadow != null) {

+ 70 - 3
src/test/org/apache/hadoop/io/compress/TestCodec.java

@@ -29,10 +29,17 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 
 public class TestCodec extends TestCase {
@@ -53,7 +60,7 @@ public class TestCodec extends TestCase {
   }
   
   public void testBZip2Codec() throws IOException {    
-      codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");    
+    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");    
   }
 
   private static void codecTest(Configuration conf, int seed, int count, 
@@ -96,8 +103,6 @@ public class TestCodec extends TestCase {
     deflateOut.write(data.getData(), 0, data.getLength());
     deflateOut.flush();
     deflateFilter.finish();
-    //Necessary to close the stream for BZip2 Codec to write its final output.  Flush is not enough.
-    deflateOut.close();
     LOG.info("Finished compressing data");
     
     // De-compress data
@@ -123,6 +128,68 @@ public class TestCodec extends TestCase {
     }
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }
+
+
+  public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, 
+      InstantiationException, IllegalAccessException {
+    sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100);
+    sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
+  }
+  
+  public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, 
+      InstantiationException, IllegalAccessException {
+    sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);    
+    sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);    
+  }
+  
+  private static void sequenceFileCodecTest(Configuration conf, int lines, 
+                                String codecClass, int blockSize) 
+    throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+
+    Path filePath = new Path("SequenceFileCodecTest." + codecClass);
+    // Configuration
+    conf.setInt("io.seqfile.compress.blocksize", blockSize);
+    
+    // Create the SequenceFile
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("Creating SequenceFile with codec \"" + codecClass + "\"");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath, 
+        Text.class, Text.class, CompressionType.BLOCK, 
+        (CompressionCodec)Class.forName(codecClass).newInstance());
+    
+    // Write some data
+    LOG.info("Writing to SequenceFile...");
+    for (int i=0; i<lines; i++) {
+      Text key = new Text("key" + i);
+      Text value = new Text("value" + i);
+      writer.append(key, value);
+    }
+    writer.close();
+    
+    // Read the data back and check
+    LOG.info("Reading from the SequenceFile...");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+    
+    Writable key = (Writable)reader.getKeyClass().newInstance();
+    Writable value = (Writable)reader.getValueClass().newInstance();
+    
+    int lc = 0;
+    try {
+      while (reader.next(key, value)) {
+        assertEquals("key" + lc, key.toString());
+        assertEquals("value" + lc, value.toString());
+        lc ++;
+      }
+    } finally {
+      reader.close();
+    }
+    assertEquals(lines, lc);
+
+    // Delete temporary files
+    fs.delete(filePath, false);
+
+    LOG.info("SUCCESS! Completed SequenceFileCodecTest with codec \"" + codecClass + "\"");
+  }
   
   public static void main(String[] args) {
     int count = 10000;