|
@@ -406,6 +406,55 @@ public class SequenceFile {
|
|
|
return writer;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Construct the preferred type of SequenceFile Writer.
|
|
|
+ * @param fs The configured filesystem.
|
|
|
+ * @param conf The configuration.
|
|
|
+ * @param name The name of the file.
|
|
|
+ * @param keyClass The 'key' type.
|
|
|
+ * @param valClass The 'value' type.
|
|
|
+ * @param bufferSize buffer size for the underlaying outputstream.
|
|
|
+ * @param replication replication factor for the file.
|
|
|
+ * @param blockSize block size for the file.
|
|
|
+ * @param createParent create parent directory if non-existent
|
|
|
+ * @param compressionType The compression type.
|
|
|
+ * @param codec The compression codec.
|
|
|
+ * @param progress The Progressable object to track progress.
|
|
|
+ * @param metadata The metadata of the file.
|
|
|
+ * @return Returns the handle to the constructed SequenceFile Writer.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static Writer
|
|
|
+ createWriter(FileSystem fs, Configuration conf, Path name,
|
|
|
+ Class keyClass, Class valClass, int bufferSize,
|
|
|
+ short replication, long blockSize, boolean createParent,
|
|
|
+ CompressionType compressionType, CompressionCodec codec,
|
|
|
+ Metadata metadata) throws IOException {
|
|
|
+ if ((codec instanceof GzipCodec) &&
|
|
|
+ !NativeCodeLoader.isNativeCodeLoaded() &&
|
|
|
+ !ZlibFactory.isNativeZlibLoaded(conf)) {
|
|
|
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
|
|
|
+ "GzipCodec without native-hadoop code!");
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (compressionType) {
|
|
|
+ case NONE:
|
|
|
+ return new Writer(conf,
|
|
|
+ fs.createNonRecursive(name, true, bufferSize, replication, blockSize, null),
|
|
|
+ keyClass, valClass, metadata).ownStream();
|
|
|
+ case RECORD:
|
|
|
+ return new RecordCompressWriter(conf,
|
|
|
+ fs.createNonRecursive(name, true, bufferSize, replication, blockSize, null),
|
|
|
+ keyClass, valClass, codec, metadata).ownStream();
|
|
|
+ case BLOCK:
|
|
|
+ return new BlockCompressWriter(conf,
|
|
|
+ fs.createNonRecursive(name, true, bufferSize, replication, blockSize, null),
|
|
|
+ keyClass, valClass, codec, metadata).ownStream();
|
|
|
+ default:
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Construct the preferred type of SequenceFile Writer.
|
|
|
* @param fs The configured filesystem.
|
|
@@ -849,7 +898,7 @@ public class SequenceFile {
|
|
|
}
|
|
|
|
|
|
/** Write to an arbitrary stream using a specified buffer size. */
|
|
|
- private Writer(Configuration conf, FSDataOutputStream out,
|
|
|
+ Writer(Configuration conf, FSDataOutputStream out,
|
|
|
Class keyClass, Class valClass, Metadata metadata)
|
|
|
throws IOException {
|
|
|
this.ownOutputStream = false;
|
|
@@ -876,6 +925,8 @@ public class SequenceFile {
|
|
|
boolean isCompressed() { return compress; }
|
|
|
boolean isBlockCompressed() { return false; }
|
|
|
|
|
|
+ Writer ownStream() { this.ownOutputStream = true; return this; }
|
|
|
+
|
|
|
/** Write and flush the file header. */
|
|
|
void writeFileHeader()
|
|
|
throws IOException {
|
|
@@ -1096,7 +1147,7 @@ public class SequenceFile {
|
|
|
}
|
|
|
|
|
|
/** Write to an arbitrary stream using a specified buffer size. */
|
|
|
- private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
|
|
|
+ RecordCompressWriter(Configuration conf, FSDataOutputStream out,
|
|
|
Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
|
|
|
throws IOException {
|
|
|
this.ownOutputStream = false;
|
|
@@ -1221,12 +1272,12 @@ public class SequenceFile {
|
|
|
}
|
|
|
|
|
|
/** Write to an arbitrary stream using a specified buffer size. */
|
|
|
- private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
|
|
|
+ BlockCompressWriter(Configuration conf, FSDataOutputStream out,
|
|
|
Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
|
|
|
throws IOException {
|
|
|
this.ownOutputStream = false;
|
|
|
super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
|
|
|
- init(1000000);
|
|
|
+ init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
|
|
|
|
|
|
initializeFileHeader();
|
|
|
writeFileHeader();
|