|
@@ -18,15 +18,19 @@ package org.apache.hadoop.io;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.*;
|
|
|
-import java.util.zip.*;
|
|
|
import java.net.InetAddress;
|
|
|
import java.rmi.server.UID;
|
|
|
import java.security.MessageDigest;
|
|
|
import org.apache.lucene.util.PriorityQueue;
|
|
|
import org.apache.commons.logging.*;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
+import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
+import org.apache.hadoop.io.compress.CompressionInputStream;
|
|
|
+import org.apache.hadoop.io.compress.CompressionOutputStream;
|
|
|
+import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
/** Support for flat files of binary key/value pairs. */
|
|
|
public class SequenceFile {
|
|
@@ -36,8 +40,9 @@ public class SequenceFile {
|
|
|
private SequenceFile() {} // no public ctor
|
|
|
|
|
|
private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
|
|
|
+ private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
|
|
|
private static byte[] VERSION = new byte[] {
|
|
|
- (byte)'S', (byte)'E', (byte)'Q', BLOCK_COMPRESS_VERSION
|
|
|
+ (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION
|
|
|
};
|
|
|
|
|
|
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
|
|
@@ -79,9 +84,11 @@ public class SequenceFile {
|
|
|
if (compressionType == CompressionType.NONE) {
|
|
|
writer = new Writer(fs, conf, name, keyClass, valClass);
|
|
|
} else if (compressionType == CompressionType.RECORD) {
|
|
|
- writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass);
|
|
|
+ writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
|
|
|
+ new DefaultCodec());
|
|
|
} else if (compressionType == CompressionType.BLOCK){
|
|
|
- writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass);
|
|
|
+ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
|
|
|
+ new DefaultCodec());
|
|
|
}
|
|
|
|
|
|
return writer;
|
|
@@ -105,14 +112,79 @@ public class SequenceFile {
|
|
|
Progressable progress) throws IOException {
|
|
|
Writer writer = null;
|
|
|
|
|
|
+ if (compressionType == CompressionType.NONE) {
|
|
|
+ writer = new Writer(fs, conf, name, keyClass, valClass, progress);
|
|
|
+ } else if (compressionType == CompressionType.RECORD) {
|
|
|
+ writer = new RecordCompressWriter(fs, conf, name,
|
|
|
+ keyClass, valClass, new DefaultCodec(), progress);
|
|
|
+ } else if (compressionType == CompressionType.BLOCK){
|
|
|
+ writer = new BlockCompressWriter(fs, conf, name,
|
|
|
+ keyClass, valClass, new DefaultCodec(), progress);
|
|
|
+ }
|
|
|
+
|
|
|
+ return writer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Construct the preferred type of SequenceFile Writer.
|
|
|
+ * @param fs The configured filesystem.
|
|
|
+ * @param conf The configuration.
|
|
|
+ * @param name The name of the file.
|
|
|
+ * @param keyClass The 'key' type.
|
|
|
+ * @param valClass The 'value' type.
|
|
|
+ * @param compressionType The compression type.
|
|
|
+ * @param codec The compression codec.
|
|
|
+ * @return Returns the handle to the constructed SequenceFile Writer.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static Writer
|
|
|
+ createWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
+ Class keyClass, Class valClass,
|
|
|
+ CompressionType compressionType, CompressionCodec codec)
|
|
|
+ throws IOException {
|
|
|
+ Writer writer = null;
|
|
|
+
|
|
|
+ if (compressionType == CompressionType.NONE) {
|
|
|
+ writer = new Writer(fs, conf, name, keyClass, valClass);
|
|
|
+ } else if (compressionType == CompressionType.RECORD) {
|
|
|
+ writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
|
|
|
+ codec);
|
|
|
+ } else if (compressionType == CompressionType.BLOCK){
|
|
|
+ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
|
|
|
+ codec);
|
|
|
+ }
|
|
|
+
|
|
|
+ return writer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Construct the preferred type of SequenceFile Writer.
|
|
|
+ * @param fs The configured filesystem.
|
|
|
+ * @param conf The configuration.
|
|
|
+ * @param name The name of the file.
|
|
|
+ * @param keyClass The 'key' type.
|
|
|
+ * @param valClass The 'value' type.
|
|
|
+ * @param compressionType The compression type.
|
|
|
+ * @param codec The compression codec.
|
|
|
+ * @param progress The Progressable object to track progress.
|
|
|
+ * @return Returns the handle to the constructed SequenceFile Writer.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static Writer
|
|
|
+ createWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
+ Class keyClass, Class valClass,
|
|
|
+ CompressionType compressionType, CompressionCodec codec,
|
|
|
+ Progressable progress) throws IOException {
|
|
|
+ Writer writer = null;
|
|
|
+
|
|
|
if (compressionType == CompressionType.NONE) {
|
|
|
writer = new Writer(fs, conf, name, keyClass, valClass, progress);
|
|
|
} else if (compressionType == CompressionType.RECORD) {
|
|
|
writer = new RecordCompressWriter(fs, conf, name,
|
|
|
- keyClass, valClass, progress);
|
|
|
+ keyClass, valClass, codec, progress);
|
|
|
} else if (compressionType == CompressionType.BLOCK){
|
|
|
writer = new BlockCompressWriter(fs, conf, name,
|
|
|
- keyClass, valClass, progress);
|
|
|
+ keyClass, valClass, codec, progress);
|
|
|
}
|
|
|
|
|
|
return writer;
|
|
@@ -130,16 +202,17 @@ public class SequenceFile {
|
|
|
*/
|
|
|
private static Writer
|
|
|
createWriter(FSDataOutputStream out,
|
|
|
- Class keyClass, Class valClass, boolean compress, boolean blockCompress)
|
|
|
+ Class keyClass, Class valClass, boolean compress, boolean blockCompress,
|
|
|
+ CompressionCodec codec)
|
|
|
throws IOException {
|
|
|
Writer writer = null;
|
|
|
|
|
|
if (!compress) {
|
|
|
writer = new Writer(out, keyClass, valClass);
|
|
|
} else if (compress && !blockCompress) {
|
|
|
- writer = new RecordCompressWriter(out, keyClass, valClass);
|
|
|
+ writer = new RecordCompressWriter(out, keyClass, valClass, codec);
|
|
|
} else {
|
|
|
- writer = new BlockCompressWriter(out, keyClass, valClass);
|
|
|
+ writer = new BlockCompressWriter(out, keyClass, valClass, codec);
|
|
|
}
|
|
|
|
|
|
return writer;
|
|
@@ -200,11 +273,14 @@ public class SequenceFile {
|
|
|
private static class CompressedBytes implements ValueBytes {
|
|
|
private int dataSize;
|
|
|
private byte[] data;
|
|
|
- private Inflater zlibInflater = null;
|
|
|
+ DataInputBuffer rawData = null;
|
|
|
+ CompressionCodec codec = null;
|
|
|
+ CompressionInputStream decompressedStream = null;
|
|
|
|
|
|
- private CompressedBytes() {
|
|
|
+ private CompressedBytes(CompressionCodec codec) {
|
|
|
data = null;
|
|
|
dataSize = 0;
|
|
|
+ this.codec = codec;
|
|
|
}
|
|
|
|
|
|
private void reset(DataInputStream in, int length) throws IOException {
|
|
@@ -221,21 +297,18 @@ public class SequenceFile {
|
|
|
|
|
|
public void writeUncompressedBytes(DataOutputStream outStream)
|
|
|
throws IOException {
|
|
|
- if (zlibInflater == null) {
|
|
|
- zlibInflater = new Inflater();
|
|
|
+ if (decompressedStream == null) {
|
|
|
+ rawData = new DataInputBuffer();
|
|
|
+ decompressedStream = codec.createInputStream(rawData);
|
|
|
} else {
|
|
|
- zlibInflater.reset();
|
|
|
+ decompressedStream.resetState();
|
|
|
}
|
|
|
- zlibInflater.setInput(data, 0, dataSize);
|
|
|
+ rawData.reset(data, 0, dataSize);
|
|
|
|
|
|
byte[] buffer = new byte[8192];
|
|
|
- while (!zlibInflater.finished()) {
|
|
|
- try {
|
|
|
- int noDecompressedBytes = zlibInflater.inflate(buffer);
|
|
|
- outStream.write(buffer, 0, noDecompressedBytes);
|
|
|
- } catch (DataFormatException e) {
|
|
|
- throw new IOException (e.toString());
|
|
|
- }
|
|
|
+ int bytesRead = 0;
|
|
|
+ while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
|
|
|
+ outStream.write(buffer, 0, bytesRead);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -256,11 +329,9 @@ public class SequenceFile {
|
|
|
Class valClass;
|
|
|
|
|
|
private boolean compress;
|
|
|
- Deflater deflater = new Deflater(Deflater.BEST_SPEED);
|
|
|
- DeflaterOutputStream deflateFilter =
|
|
|
- new DeflaterOutputStream(buffer, deflater);
|
|
|
- DataOutputStream deflateOut =
|
|
|
- new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
|
|
+ CompressionCodec codec = null;
|
|
|
+ CompressionOutputStream deflateFilter = null;
|
|
|
+ DataOutputStream deflateOut = null;
|
|
|
|
|
|
// Insert a globally unique 16-byte value every few entries, so that one
|
|
|
// can seek into the middle of a file and then synchronize with record
|
|
@@ -309,7 +380,7 @@ public class SequenceFile {
|
|
|
public Writer(FileSystem fs, Path name,
|
|
|
Class keyClass, Class valClass, boolean compress)
|
|
|
throws IOException {
|
|
|
- init(name, fs.create(name), keyClass, valClass, compress);
|
|
|
+ init(name, fs.create(name), keyClass, valClass, compress, null);
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|
|
@@ -324,7 +395,8 @@ public class SequenceFile {
|
|
|
Class keyClass, Class valClass, boolean compress,
|
|
|
Progressable progress)
|
|
|
throws IOException {
|
|
|
- init(name, fs.create(name, progress), keyClass, valClass, compress);
|
|
|
+ init(name, fs.create(name, progress), keyClass, valClass,
|
|
|
+ compress, null);
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|
|
@@ -347,8 +419,8 @@ public class SequenceFile {
|
|
|
|
|
|
/** Write to an arbitrary stream using a specified buffer size. */
|
|
|
private Writer(FSDataOutputStream out, Class keyClass, Class valClass)
|
|
|
- throws IOException {
|
|
|
- init(null, out, keyClass, valClass, false);
|
|
|
+ throws IOException {
|
|
|
+ init(null, out, keyClass, valClass, false, null);
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|
|
@@ -379,18 +451,28 @@ public class SequenceFile {
|
|
|
|
|
|
out.writeBoolean(this.isCompressed());
|
|
|
out.writeBoolean(this.isBlockCompressed());
|
|
|
+
|
|
|
+ if(this.isCompressed()) {
|
|
|
+ Text.writeString(out, (codec.getClass()).getName());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Initialize. */
|
|
|
void init(Path name, FSDataOutputStream out,
|
|
|
Class keyClass, Class valClass,
|
|
|
- boolean compress)
|
|
|
+ boolean compress, CompressionCodec codec)
|
|
|
throws IOException {
|
|
|
this.target = name;
|
|
|
this.out = out;
|
|
|
this.keyClass = keyClass;
|
|
|
this.valClass = valClass;
|
|
|
this.compress = compress;
|
|
|
+ this.codec = codec;
|
|
|
+ if(this.codec != null) {
|
|
|
+ this.deflateFilter = this.codec.createOutputStream(buffer);
|
|
|
+ this.deflateOut =
|
|
|
+ new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Returns the class of keys in this file. */
|
|
@@ -399,6 +481,9 @@ public class SequenceFile {
|
|
|
/** Returns the class of values in this file. */
|
|
|
public Class getValueClass() { return valClass; }
|
|
|
|
|
|
+ /** Returns the compression codec of data in this file. */
|
|
|
+ public CompressionCodec getCompressionCodec() { return codec; }
|
|
|
+
|
|
|
/** Close the file. */
|
|
|
public synchronized void close() throws IOException {
|
|
|
if (out != null) {
|
|
@@ -435,7 +520,7 @@ public class SequenceFile {
|
|
|
|
|
|
// Append the 'value'
|
|
|
if (compress) {
|
|
|
- deflater.reset();
|
|
|
+ deflateFilter.resetState();
|
|
|
val.write(deflateOut);
|
|
|
deflateOut.flush();
|
|
|
deflateFilter.finish();
|
|
@@ -495,8 +580,9 @@ public class SequenceFile {
|
|
|
|
|
|
/** Create the named file. */
|
|
|
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
- Class keyClass, Class valClass) throws IOException {
|
|
|
- super.init(name, fs.create(name), keyClass, valClass, true);
|
|
|
+ Class keyClass, Class valClass, CompressionCodec codec)
|
|
|
+ throws IOException {
|
|
|
+ super.init(name, fs.create(name), keyClass, valClass, true, codec);
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|
|
@@ -505,9 +591,11 @@ public class SequenceFile {
|
|
|
|
|
|
/** Create the named file with write-progress reporter. */
|
|
|
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
- Class keyClass, Class valClass, Progressable progress)
|
|
|
+ Class keyClass, Class valClass, CompressionCodec codec,
|
|
|
+ Progressable progress)
|
|
|
throws IOException {
|
|
|
- super.init(name, fs.create(name, progress), keyClass, valClass, true);
|
|
|
+ super.init(name, fs.create(name, progress),
|
|
|
+ keyClass, valClass, true, codec);
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|
|
@@ -516,9 +604,9 @@ public class SequenceFile {
|
|
|
|
|
|
/** Write to an arbitrary stream using a specified buffer size. */
|
|
|
private RecordCompressWriter(FSDataOutputStream out,
|
|
|
- Class keyClass, Class valClass)
|
|
|
+ Class keyClass, Class valClass, CompressionCodec codec)
|
|
|
throws IOException {
|
|
|
- super.init(null, out, keyClass, valClass, true);
|
|
|
+ super.init(null, out, keyClass, valClass, true, codec);
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|
|
@@ -546,7 +634,7 @@ public class SequenceFile {
|
|
|
throw new IOException("zero length keys not allowed: " + key);
|
|
|
|
|
|
// Compress 'value' and append it
|
|
|
- deflater.reset();
|
|
|
+ deflateFilter.resetState();
|
|
|
val.write(deflateOut);
|
|
|
deflateOut.flush();
|
|
|
deflateFilter.finish();
|
|
@@ -590,19 +678,13 @@ public class SequenceFile {
|
|
|
private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
|
|
|
private DataOutputBuffer valBuffer = new DataOutputBuffer();
|
|
|
|
|
|
- private DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
|
|
|
- private Deflater deflater = new Deflater(Deflater.BEST_SPEED);
|
|
|
- private DeflaterOutputStream deflateFilter =
|
|
|
- new DeflaterOutputStream(compressedDataBuffer, deflater);
|
|
|
- private DataOutputStream deflateOut =
|
|
|
- new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
|
|
-
|
|
|
private int compressionBlockSize;
|
|
|
|
|
|
/** Create the named file. */
|
|
|
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
- Class keyClass, Class valClass) throws IOException {
|
|
|
- super.init(name, fs.create(name), keyClass, valClass, true);
|
|
|
+ Class keyClass, Class valClass, CompressionCodec codec)
|
|
|
+ throws IOException {
|
|
|
+ super.init(name, fs.create(name), keyClass, valClass, true, codec);
|
|
|
init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
|
|
|
|
|
|
initializeFileHeader();
|
|
@@ -612,9 +694,11 @@ public class SequenceFile {
|
|
|
|
|
|
/** Create the named file with write-progress reporter. */
|
|
|
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
- Class keyClass, Class valClass, Progressable progress)
|
|
|
+ Class keyClass, Class valClass, CompressionCodec codec,
|
|
|
+ Progressable progress)
|
|
|
throws IOException {
|
|
|
- super.init(name, fs.create(name, progress), keyClass, valClass, true);
|
|
|
+ super.init(name, fs.create(name, progress), keyClass, valClass,
|
|
|
+ true, codec);
|
|
|
init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
|
|
|
|
|
|
initializeFileHeader();
|
|
@@ -624,9 +708,9 @@ public class SequenceFile {
|
|
|
|
|
|
/** Write to an arbitrary stream using a specified buffer size. */
|
|
|
private BlockCompressWriter(FSDataOutputStream out,
|
|
|
- Class keyClass, Class valClass)
|
|
|
+ Class keyClass, Class valClass, CompressionCodec codec)
|
|
|
throws IOException {
|
|
|
- super.init(null, out, keyClass, valClass, true);
|
|
|
+ super.init(null, out, keyClass, valClass, true, codec);
|
|
|
init(1000000);
|
|
|
|
|
|
initializeFileHeader();
|
|
@@ -644,17 +728,17 @@ public class SequenceFile {
|
|
|
|
|
|
/** Workhorse to check and write out compressed data/lengths */
|
|
|
private synchronized
|
|
|
- void writeBuffer(DataOutputBuffer buffer)
|
|
|
+ void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
|
|
|
throws IOException {
|
|
|
- deflater.reset();
|
|
|
- compressedDataBuffer.reset();
|
|
|
- deflateOut.write(buffer.getData(), 0, buffer.getLength());
|
|
|
+ deflateFilter.resetState();
|
|
|
+ buffer.reset();
|
|
|
+ deflateOut.write(uncompressedDataBuffer.getData(), 0,
|
|
|
+ uncompressedDataBuffer.getLength());
|
|
|
deflateOut.flush();
|
|
|
deflateFilter.finish();
|
|
|
|
|
|
- WritableUtils.writeVInt(out, compressedDataBuffer.getLength());
|
|
|
- out.write(compressedDataBuffer.getData(),
|
|
|
- 0, compressedDataBuffer.getLength());
|
|
|
+ WritableUtils.writeVInt(out, buffer.getLength());
|
|
|
+ out.write(buffer.getData(), 0, buffer.getLength());
|
|
|
}
|
|
|
|
|
|
/** Compress and flush contents to dfs */
|
|
@@ -771,6 +855,8 @@ public class SequenceFile {
|
|
|
private Class keyClass;
|
|
|
private Class valClass;
|
|
|
|
|
|
+ private CompressionCodec codec = null;
|
|
|
+
|
|
|
private byte[] sync = new byte[SYNC_HASH_SIZE];
|
|
|
private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
|
|
|
private boolean syncSeen;
|
|
@@ -791,17 +877,17 @@ public class SequenceFile {
|
|
|
private int noBufferedValues = 0;
|
|
|
|
|
|
private DataInputBuffer keyLenBuffer = null;
|
|
|
- private Inflater keyLenInflater = null;
|
|
|
+ private CompressionInputStream keyLenInFilter = null;
|
|
|
private DataInputStream keyLenIn = null;
|
|
|
private DataInputBuffer keyBuffer = null;
|
|
|
- private Inflater keyInflater = null;
|
|
|
+ private CompressionInputStream keyInFilter = null;
|
|
|
private DataInputStream keyIn = null;
|
|
|
|
|
|
private DataInputBuffer valLenBuffer = null;
|
|
|
- private Inflater valLenInflater = null;
|
|
|
+ private CompressionInputStream valLenInFilter = null;
|
|
|
private DataInputStream valLenIn = null;
|
|
|
private DataInputBuffer valBuffer = null;
|
|
|
- private Inflater valInflater = null;
|
|
|
+ private CompressionInputStream valInFilter = null;
|
|
|
private DataInputStream valIn = null;
|
|
|
|
|
|
/** @deprecated Call {@link #SequenceFile.Reader(FileSystem,Path,Configuration)}.*/
|
|
@@ -870,6 +956,19 @@ public class SequenceFile {
|
|
|
this.blockCompressed = in.readBoolean(); // is block-compressed?
|
|
|
}
|
|
|
|
|
|
+ // if version >= 5
|
|
|
+ // setup the compression codec
|
|
|
+ if (version >= CUSTOM_COMPRESS_VERSION && this.decompress) {
|
|
|
+ try {
|
|
|
+ this.codec = (CompressionCodec)
|
|
|
+ ReflectionUtils.newInstance(conf.getClassByName(Text.readString(in)),
|
|
|
+ conf);
|
|
|
+ } catch (ClassNotFoundException cnfe) {
|
|
|
+ cnfe.printStackTrace();
|
|
|
+ throw new IllegalArgumentException("Unknown codec: " + cnfe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if (version > 1) { // if version > 1
|
|
|
in.readFully(sync); // read sync bytes
|
|
|
}
|
|
@@ -877,10 +976,8 @@ public class SequenceFile {
|
|
|
// Initialize
|
|
|
valBuffer = new DataInputBuffer();
|
|
|
if (decompress) {
|
|
|
- valInflater = new Inflater();
|
|
|
- valIn = new DataInputStream(new BufferedInputStream(
|
|
|
- new InflaterInputStream(valBuffer, valInflater))
|
|
|
- );
|
|
|
+ valInFilter = this.codec.createInputStream(valBuffer);
|
|
|
+ valIn = new DataInputStream(new BufferedInputStream(valInFilter));
|
|
|
} else {
|
|
|
valIn = new DataInputStream(new BufferedInputStream(valBuffer));
|
|
|
}
|
|
@@ -890,19 +987,14 @@ public class SequenceFile {
|
|
|
keyBuffer = new DataInputBuffer();
|
|
|
valLenBuffer = new DataInputBuffer();
|
|
|
|
|
|
- keyLenInflater = new Inflater();
|
|
|
- keyLenIn = new DataInputStream(new BufferedInputStream(
|
|
|
- new InflaterInputStream(keyLenBuffer, keyLenInflater))
|
|
|
- );
|
|
|
-
|
|
|
- keyInflater = new Inflater();
|
|
|
- keyIn = new DataInputStream(new BufferedInputStream(
|
|
|
- new InflaterInputStream(keyBuffer, keyInflater)));
|
|
|
-
|
|
|
- valLenInflater = new Inflater();
|
|
|
- valLenIn = new DataInputStream(new BufferedInputStream(
|
|
|
- new InflaterInputStream(valLenBuffer, valLenInflater))
|
|
|
- );
|
|
|
+ keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
|
|
|
+ keyLenIn = new DataInputStream(new BufferedInputStream(keyLenInFilter));
|
|
|
+
|
|
|
+ keyInFilter = this.codec.createInputStream(keyBuffer);
|
|
|
+ keyIn = new DataInputStream(new BufferedInputStream(keyInFilter));
|
|
|
+
|
|
|
+ valLenInFilter = this.codec.createInputStream(valLenBuffer);
|
|
|
+ valLenIn = new DataInputStream(new BufferedInputStream(valLenInFilter));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -926,18 +1018,20 @@ public class SequenceFile {
|
|
|
/** Returns true if records are block-compressed. */
|
|
|
public boolean isBlockCompressed() { return blockCompressed; }
|
|
|
|
|
|
+ /** Returns the compression codec of data in this file. */
|
|
|
+ public CompressionCodec getCompressionCodec() { return codec; }
|
|
|
+
|
|
|
/** Read a compressed buffer */
|
|
|
- private synchronized void readBuffer(
|
|
|
- DataInputBuffer buffer, Inflater inflater, boolean castAway
|
|
|
- ) throws IOException {
|
|
|
+ private synchronized void readBuffer(DataInputBuffer buffer,
|
|
|
+ CompressionInputStream filter, boolean castAway) throws IOException {
|
|
|
// Read data into a temporary buffer
|
|
|
DataOutputBuffer dataBuffer = new DataOutputBuffer();
|
|
|
int dataBufferLength = WritableUtils.readVInt(in);
|
|
|
dataBuffer.write(in, dataBufferLength);
|
|
|
|
|
|
if (false == castAway) {
|
|
|
- // Reset the inflater
|
|
|
- inflater.reset();
|
|
|
+ // Reset the codec
|
|
|
+ filter.resetState();
|
|
|
|
|
|
// Set up 'buffer' connected to the input-stream
|
|
|
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
|
|
@@ -970,14 +1064,14 @@ public class SequenceFile {
|
|
|
noBufferedRecords = WritableUtils.readVInt(in);
|
|
|
|
|
|
// Read key lengths and keys
|
|
|
- readBuffer(keyLenBuffer, keyLenInflater, false);
|
|
|
- readBuffer(keyBuffer, keyInflater, false);
|
|
|
+ readBuffer(keyLenBuffer, keyLenInFilter, false);
|
|
|
+ readBuffer(keyBuffer, keyInFilter, false);
|
|
|
noBufferedKeys = noBufferedRecords;
|
|
|
|
|
|
// Read value lengths and values
|
|
|
if (!lazyDecompress) {
|
|
|
- readBuffer(valLenBuffer, valLenInflater, false);
|
|
|
- readBuffer(valBuffer, valInflater, false);
|
|
|
+ readBuffer(valLenBuffer, valLenInFilter, false);
|
|
|
+ readBuffer(valBuffer, valInFilter, false);
|
|
|
noBufferedValues = noBufferedRecords;
|
|
|
valuesDecompressed = true;
|
|
|
}
|
|
@@ -990,14 +1084,14 @@ public class SequenceFile {
|
|
|
private synchronized void seekToCurrentValue() throws IOException {
|
|
|
if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
|
|
|
if (decompress) {
|
|
|
- valInflater.reset();
|
|
|
+ valInFilter.resetState();
|
|
|
}
|
|
|
} else {
|
|
|
// Check if this is the first value in the 'block' to be read
|
|
|
if (lazyDecompress && !valuesDecompressed) {
|
|
|
// Read the value lengths and values
|
|
|
- readBuffer(valLenBuffer, valLenInflater, false);
|
|
|
- readBuffer(valBuffer, valInflater, false);
|
|
|
+ readBuffer(valLenBuffer, valLenInFilter, false);
|
|
|
+ readBuffer(valBuffer, valInFilter, false);
|
|
|
noBufferedValues = noBufferedRecords;
|
|
|
valuesDecompressed = true;
|
|
|
}
|
|
@@ -1168,7 +1262,7 @@ public class SequenceFile {
|
|
|
if (!decompress || blockCompressed) {
|
|
|
val = new UncompressedBytes();
|
|
|
} else {
|
|
|
- val = new CompressedBytes();
|
|
|
+ val = new CompressedBytes(codec);
|
|
|
}
|
|
|
return val;
|
|
|
}
|
|
@@ -1418,6 +1512,7 @@ public class SequenceFile {
|
|
|
boolean atEof = (currentFile >= inFiles.length);
|
|
|
boolean isCompressed = false;
|
|
|
boolean isBlockCompressed = false;
|
|
|
+ CompressionCodec codec = null;
|
|
|
segmentLengths.clear();
|
|
|
if (atEof) {
|
|
|
return 0;
|
|
@@ -1427,6 +1522,8 @@ public class SequenceFile {
|
|
|
in = new Reader(fs, inFiles[currentFile], conf);
|
|
|
isCompressed = in.isCompressed();
|
|
|
isBlockCompressed = in.isBlockCompressed();
|
|
|
+ codec = in.getCompressionCodec();
|
|
|
+
|
|
|
for (int i=0; i < rawValues.length; ++i) {
|
|
|
rawValues[i] = null;
|
|
|
}
|
|
@@ -1479,7 +1576,7 @@ public class SequenceFile {
|
|
|
LOG.debug("flushing segment " + segments);
|
|
|
rawBuffer = rawKeys.getData();
|
|
|
sort(count);
|
|
|
- flush(count, bytesProcessed, isCompressed, isBlockCompressed,
|
|
|
+ flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec,
|
|
|
segments==0 && atEof);
|
|
|
segments++;
|
|
|
}
|
|
@@ -1523,7 +1620,8 @@ public class SequenceFile {
|
|
|
}
|
|
|
|
|
|
private void flush(int count, int bytesProcessed, boolean isCompressed,
|
|
|
- boolean isBlockCompressed, boolean done) throws IOException {
|
|
|
+ boolean isBlockCompressed, CompressionCodec codec, boolean done)
|
|
|
+ throws IOException {
|
|
|
if (out == null) {
|
|
|
outName = done ? outFile : outFile.suffix(".0");
|
|
|
out = fs.create(outName);
|
|
@@ -1534,7 +1632,7 @@ public class SequenceFile {
|
|
|
|
|
|
long segmentStart = out.getPos();
|
|
|
Writer writer = createWriter(out, keyClass, valClass,
|
|
|
- isCompressed, isBlockCompressed);
|
|
|
+ isCompressed, isBlockCompressed, codec);
|
|
|
|
|
|
if (!done) {
|
|
|
writer.sync = null; // disable sync on temp files
|
|
@@ -1757,11 +1855,13 @@ public class SequenceFile {
|
|
|
private boolean done;
|
|
|
private boolean compress;
|
|
|
private boolean blockCompress;
|
|
|
+ private CompressionCodec codec = null;
|
|
|
|
|
|
public void put(MergeStream stream) throws IOException {
|
|
|
if (size() == 0) {
|
|
|
compress = stream.in.isCompressed();
|
|
|
blockCompress = stream.in.isBlockCompressed();
|
|
|
+ codec = stream.in.getCompressionCodec();
|
|
|
} else if (compress != stream.in.isCompressed() ||
|
|
|
blockCompress != stream.in.isBlockCompressed()) {
|
|
|
throw new IOException("All merged files must be compressed or not.");
|
|
@@ -1790,7 +1890,7 @@ public class SequenceFile {
|
|
|
|
|
|
public void merge() throws IOException {
|
|
|
Writer writer = createWriter(out, keyClass, valClass,
|
|
|
- compress, blockCompress);
|
|
|
+ compress, blockCompress, codec);
|
|
|
if (!done) {
|
|
|
writer.sync = null; // disable sync on temp files
|
|
|
}
|