|
@@ -252,22 +252,23 @@ public class SequenceFile {
|
|
|
*/
|
|
|
public static Writer createWriter(Configuration conf, Writer.Option... opts
|
|
|
) throws IOException {
|
|
|
- Writer.CompressionTypeOption compressionOption =
|
|
|
- Options.getOption(Writer.CompressionTypeOption.class, opts);
|
|
|
+ Writer.CompressionOption compressionOption =
|
|
|
+ Options.getOption(Writer.CompressionOption.class, opts);
|
|
|
CompressionType kind;
|
|
|
if (compressionOption != null) {
|
|
|
kind = compressionOption.getValue();
|
|
|
} else {
|
|
|
kind = getDefaultCompressionType(conf);
|
|
|
+ opts = Options.prependOptions(opts, Writer.compression(kind));
|
|
|
}
|
|
|
switch (kind) {
|
|
|
- default:
|
|
|
- case NONE:
|
|
|
- return new Writer(conf, kind, opts);
|
|
|
- case RECORD:
|
|
|
- return new RecordCompressWriter(conf, kind, opts);
|
|
|
- case BLOCK:
|
|
|
- return new BlockCompressWriter(conf, kind, opts);
|
|
|
+ default:
|
|
|
+ case NONE:
|
|
|
+ return new Writer(conf, opts);
|
|
|
+ case RECORD:
|
|
|
+ return new RecordCompressWriter(conf, opts);
|
|
|
+ case BLOCK:
|
|
|
+ return new BlockCompressWriter(conf, opts);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -311,7 +312,7 @@ public class SequenceFile {
|
|
|
CompressionType compressionType) throws IOException {
|
|
|
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType));
|
|
|
+ Writer.compression(compressionType));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -335,7 +336,7 @@ public class SequenceFile {
|
|
|
Progressable progress) throws IOException {
|
|
|
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
+ Writer.compression(compressionType),
|
|
|
Writer.progressable(progress));
|
|
|
}
|
|
|
|
|
@@ -360,8 +361,7 @@ public class SequenceFile {
|
|
|
CompressionCodec codec) throws IOException {
|
|
|
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec));
|
|
|
+ Writer.compression(compressionType, codec));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -388,8 +388,7 @@ public class SequenceFile {
|
|
|
Progressable progress, Metadata metadata) throws IOException {
|
|
|
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec),
|
|
|
+ Writer.compression(compressionType, codec),
|
|
|
Writer.progressable(progress),
|
|
|
Writer.metadata(metadata));
|
|
|
}
|
|
@@ -425,8 +424,7 @@ public class SequenceFile {
|
|
|
Writer.bufferSize(bufferSize),
|
|
|
Writer.replication(replication),
|
|
|
Writer.blockSize(blockSize),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec),
|
|
|
+ Writer.compression(compressionType, codec),
|
|
|
Writer.progressable(progress),
|
|
|
Writer.metadata(metadata));
|
|
|
}
|
|
@@ -454,8 +452,7 @@ public class SequenceFile {
|
|
|
Progressable progress) throws IOException {
|
|
|
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec),
|
|
|
+ Writer.compression(compressionType, codec),
|
|
|
Writer.progressable(progress));
|
|
|
}
|
|
|
|
|
@@ -481,8 +478,7 @@ public class SequenceFile {
|
|
|
CompressionCodec codec, Metadata metadata) throws IOException {
|
|
|
return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec),
|
|
|
+ Writer.compression(compressionType, codec),
|
|
|
Writer.metadata(metadata));
|
|
|
}
|
|
|
|
|
@@ -506,8 +502,7 @@ public class SequenceFile {
|
|
|
CompressionCodec codec) throws IOException {
|
|
|
return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec));
|
|
|
+ Writer.compression(compressionType, codec));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -839,23 +834,23 @@ public class SequenceFile {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static class CompressionTypeOption implements Option {
|
|
|
+ private static class CompressionOption implements Option {
|
|
|
private final CompressionType value;
|
|
|
- CompressionTypeOption(CompressionType value) {
|
|
|
+ private final CompressionCodec codec;
|
|
|
+ CompressionOption(CompressionType value) {
|
|
|
+ this(value, null);
|
|
|
+ }
|
|
|
+ CompressionOption(CompressionType value, CompressionCodec codec) {
|
|
|
this.value = value;
|
|
|
+ this.codec = (CompressionType.NONE != value && null == codec)
|
|
|
+ ? new DefaultCodec()
|
|
|
+ : codec;
|
|
|
}
|
|
|
CompressionType getValue() {
|
|
|
return value;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private static class CompressionCodecOption implements Option {
|
|
|
- private final CompressionCodec value;
|
|
|
- CompressionCodecOption(CompressionCodec value) {
|
|
|
- this.value = value;
|
|
|
- }
|
|
|
- CompressionCodec getValue() {
|
|
|
- return value;
|
|
|
+ CompressionCodec getCodec() {
|
|
|
+ return codec;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -895,25 +890,23 @@ public class SequenceFile {
|
|
|
return new MetadataOption(value);
|
|
|
}
|
|
|
|
|
|
- public static Option compressionType(CompressionType value) {
|
|
|
- return new CompressionTypeOption(value);
|
|
|
- }
|
|
|
-
|
|
|
- public static Option compressionCodec(CompressionCodec value) {
|
|
|
- return new CompressionCodecOption(value);
|
|
|
+ public static Option compression(CompressionType value) {
|
|
|
+ return new CompressionOption(value);
|
|
|
}
|
|
|
|
|
|
+ public static Option compression(CompressionType value,
|
|
|
+ CompressionCodec codec) {
|
|
|
+ return new CompressionOption(value, codec);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Construct a uncompressed writer from a set of options.
|
|
|
* @param conf the configuration to use
|
|
|
- * @param compressionType the compression type being used
|
|
|
* @param options the options used when creating the writer
|
|
|
* @throws IOException if it fails
|
|
|
*/
|
|
|
Writer(Configuration conf,
|
|
|
- CompressionType compressionType,
|
|
|
Option... opts) throws IOException {
|
|
|
- this.compress = compressionType;
|
|
|
BlockSizeOption blockSizeOption =
|
|
|
Options.getOption(BlockSizeOption.class, opts);
|
|
|
BufferSizeOption bufferSizeOption =
|
|
@@ -928,10 +921,10 @@ public class SequenceFile {
|
|
|
Options.getOption(KeyClassOption.class, opts);
|
|
|
ValueClassOption valueClassOption =
|
|
|
Options.getOption(ValueClassOption.class, opts);
|
|
|
- CompressionCodecOption compressionCodecOption =
|
|
|
- Options.getOption(CompressionCodecOption.class, opts);
|
|
|
MetadataOption metadataOption =
|
|
|
Options.getOption(MetadataOption.class, opts);
|
|
|
+ CompressionOption compressionTypeOption =
|
|
|
+ Options.getOption(CompressionOption.class, opts);
|
|
|
// check consistency of options
|
|
|
if ((fileOption == null) == (streamOption == null)) {
|
|
|
throw new IllegalArgumentException("file or stream must be specified");
|
|
@@ -968,13 +961,8 @@ public class SequenceFile {
|
|
|
Object.class : valueClassOption.getValue();
|
|
|
Metadata metadata = metadataOption == null ?
|
|
|
new Metadata() : metadataOption.getValue();
|
|
|
- CompressionCodec codec;
|
|
|
- if (compressionType == CompressionType.NONE) {
|
|
|
- codec = null;
|
|
|
- } else {
|
|
|
- codec = compressionCodecOption == null ?
|
|
|
- new DefaultCodec() : compressionCodecOption.getValue();
|
|
|
- }
|
|
|
+ this.compress = compressionTypeOption.getValue();
|
|
|
+ final CompressionCodec codec = compressionTypeOption.getCodec();
|
|
|
if (codec != null &&
|
|
|
(codec instanceof GzipCodec) &&
|
|
|
!NativeCodeLoader.isNativeCodeLoaded() &&
|
|
@@ -1207,9 +1195,8 @@ public class SequenceFile {
|
|
|
static class RecordCompressWriter extends Writer {
|
|
|
|
|
|
RecordCompressWriter(Configuration conf,
|
|
|
- CompressionType compressionType,
|
|
|
Option... options) throws IOException {
|
|
|
- super(conf, compressionType, options);
|
|
|
+ super(conf, options);
|
|
|
}
|
|
|
|
|
|
/** Append a key/value pair. */
|
|
@@ -1276,9 +1263,8 @@ public class SequenceFile {
|
|
|
private final int compressionBlockSize;
|
|
|
|
|
|
BlockCompressWriter(Configuration conf,
|
|
|
- CompressionType compressionType,
|
|
|
Option... options) throws IOException {
|
|
|
- super(conf, compressionType, options);
|
|
|
+ super(conf, options);
|
|
|
compressionBlockSize =
|
|
|
conf.getInt("io.seqfile.compress.blocksize", 1000000);
|
|
|
keySerializer.close();
|
|
@@ -2756,14 +2742,10 @@ public class SequenceFile {
|
|
|
}
|
|
|
|
|
|
long segmentStart = out.getPos();
|
|
|
- Writer writer = createWriter(conf,
|
|
|
- Writer.stream(out),
|
|
|
- Writer.keyClass(keyClass),
|
|
|
- Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compressionType),
|
|
|
- Writer.compressionCodec(codec),
|
|
|
- Writer.metadata(done ? metadata :
|
|
|
- new Metadata()));
|
|
|
+ Writer writer = createWriter(conf, Writer.stream(out),
|
|
|
+ Writer.keyClass(keyClass), Writer.valueClass(valClass),
|
|
|
+ Writer.compression(compressionType, codec),
|
|
|
+ Writer.metadata(done ? metadata : new Metadata()));
|
|
|
|
|
|
if (!done) {
|
|
|
writer.sync = null; // disable sync on temp files
|
|
@@ -2943,8 +2925,7 @@ public class SequenceFile {
|
|
|
Writer.file(outputFile),
|
|
|
Writer.keyClass(keyClass),
|
|
|
Writer.valueClass(valClass),
|
|
|
- Writer.compressionType(compress),
|
|
|
- Writer.compressionCodec(codec),
|
|
|
+ Writer.compression(compress, codec),
|
|
|
Writer.progressable(prog));
|
|
|
return writer;
|
|
|
}
|