|
@@ -24,6 +24,7 @@ import java.io.*;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.util.Options;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@@ -31,6 +32,8 @@ import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
+import org.apache.hadoop.io.SequenceFile.Reader;
|
|
|
+import org.apache.hadoop.io.SequenceFile.Writer;
|
|
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
|
|
@@ -91,94 +94,194 @@ public class MapFile {
|
|
|
private long lastIndexKeyCount = Long.MIN_VALUE;
|
|
|
|
|
|
|
|
|
- /** Create the named map for keys of the named class. */
|
|
|
+ /** Create the named map for keys of the named class.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
- Class<? extends WritableComparable> keyClass, Class valClass)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName,
|
|
|
- WritableComparator.get(keyClass), valClass,
|
|
|
- SequenceFile.getCompressionType(conf));
|
|
|
+ Class<? extends WritableComparable> keyClass,
|
|
|
+ Class valClass) throws IOException {
|
|
|
+ this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
|
|
|
}
|
|
|
|
|
|
- /** Create the named map for keys of the named class. */
|
|
|
+ /** Create the named map for keys of the named class.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass, Class valClass,
|
|
|
- CompressionType compress, Progressable progress)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
|
|
|
- compress, progress);
|
|
|
+ CompressionType compress,
|
|
|
+ Progressable progress) throws IOException {
|
|
|
+ this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
|
|
|
+ compressionType(compress), progressable(progress));
|
|
|
}
|
|
|
|
|
|
- /** Create the named map for keys of the named class. */
|
|
|
+ /** Create the named map for keys of the named class.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass, Class valClass,
|
|
|
CompressionType compress, CompressionCodec codec,
|
|
|
- Progressable progress)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
|
|
|
- compress, codec, progress);
|
|
|
+ Progressable progress) throws IOException {
|
|
|
+ this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
|
|
|
+ compressionType(compress), compressionCodec(codec),
|
|
|
+ progressable(progress));
|
|
|
}
|
|
|
|
|
|
- /** Create the named map for keys of the named class. */
|
|
|
+ /** Create the named map for keys of the named class.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass, Class valClass,
|
|
|
- CompressionType compress)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
|
|
|
+ CompressionType compress) throws IOException {
|
|
|
+ this(conf, new Path(dirName), keyClass(keyClass),
|
|
|
+ valueClass(valClass), compressionType(compress));
|
|
|
}
|
|
|
|
|
|
- /** Create the named map using the named key comparator. */
|
|
|
+ /** Create the named map using the named key comparator.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
- WritableComparator comparator, Class valClass)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName, comparator, valClass,
|
|
|
- SequenceFile.getCompressionType(conf));
|
|
|
+ WritableComparator comparator, Class valClass
|
|
|
+ ) throws IOException {
|
|
|
+ this(conf, new Path(dirName), comparator(comparator),
|
|
|
+ valueClass(valClass));
|
|
|
}
|
|
|
- /** Create the named map using the named key comparator. */
|
|
|
+
|
|
|
+ /** Create the named map using the named key comparator.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass,
|
|
|
- SequenceFile.CompressionType compress)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName, comparator, valClass, compress, null);
|
|
|
+ SequenceFile.CompressionType compress) throws IOException {
|
|
|
+ this(conf, new Path(dirName), comparator(comparator),
|
|
|
+ valueClass(valClass), compressionType(compress));
|
|
|
}
|
|
|
- /** Create the named map using the named key comparator. */
|
|
|
+
|
|
|
+ /** Create the named map using the named key comparator.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...)} instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass,
|
|
|
SequenceFile.CompressionType compress,
|
|
|
- Progressable progress)
|
|
|
- throws IOException {
|
|
|
- this(conf, fs, dirName, comparator, valClass,
|
|
|
- compress, new DefaultCodec(), progress);
|
|
|
+ Progressable progress) throws IOException {
|
|
|
+ this(conf, new Path(dirName), comparator(comparator),
|
|
|
+ valueClass(valClass), compressionType(compress),
|
|
|
+ progressable(progress));
|
|
|
}
|
|
|
- /** Create the named map using the named key comparator. */
|
|
|
+
|
|
|
+ /** Create the named map using the named key comparator.
|
|
|
+ * @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass,
|
|
|
SequenceFile.CompressionType compress, CompressionCodec codec,
|
|
|
- Progressable progress)
|
|
|
- throws IOException {
|
|
|
+ Progressable progress) throws IOException {
|
|
|
+ this(conf, new Path(dirName), comparator(comparator),
|
|
|
+ valueClass(valClass), compressionType(compress),
|
|
|
+ compressionCodec(codec), progressable(progress));
|
|
|
+ }
|
|
|
+
|
|
|
+ // our options are a superset of sequence file writer options
|
|
|
+ public static interface Option extends SequenceFile.Writer.Option { }
|
|
|
+
|
|
|
+ private static class KeyClassOption extends Options.ClassOption
|
|
|
+ implements Option {
|
|
|
+ KeyClassOption(Class<?> value) {
|
|
|
+ super(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class ComparatorOption implements Option {
|
|
|
+ private final WritableComparator value;
|
|
|
+ ComparatorOption(WritableComparator value) {
|
|
|
+ this.value = value;
|
|
|
+ }
|
|
|
+ WritableComparator getValue() {
|
|
|
+ return value;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Option keyClass(Class<? extends WritableComparable> value) {
|
|
|
+ return new KeyClassOption(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Option comparator(WritableComparator value) {
|
|
|
+ return new ComparatorOption(value);
|
|
|
+ }
|
|
|
|
|
|
+ public static SequenceFile.Writer.Option valueClass(Class<?> value) {
|
|
|
+ return SequenceFile.Writer.valueClass(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static
|
|
|
+ SequenceFile.Writer.Option compressionType(CompressionType value) {
|
|
|
+ return SequenceFile.Writer.compressionType(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static
|
|
|
+ SequenceFile.Writer.Option compressionCodec(CompressionCodec value) {
|
|
|
+ return SequenceFile.Writer.compressionCodec(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static SequenceFile.Writer.Option progressable(Progressable value) {
|
|
|
+ return SequenceFile.Writer.progressable(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public Writer(Configuration conf,
|
|
|
+ Path dirName,
|
|
|
+ SequenceFile.Writer.Option... opts
|
|
|
+ ) throws IOException {
|
|
|
+ KeyClassOption keyClassOption =
|
|
|
+ Options.getOption(KeyClassOption.class, opts);
|
|
|
+ ComparatorOption comparatorOption =
|
|
|
+ Options.getOption(ComparatorOption.class, opts);
|
|
|
+ if ((keyClassOption == null) == (comparatorOption == null)) {
|
|
|
+ throw new IllegalArgumentException("key class or comparator option "
|
|
|
+ + "must be set");
|
|
|
+ }
|
|
|
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
|
|
|
|
|
|
- this.comparator = comparator;
|
|
|
+ Class<? extends WritableComparable> keyClass;
|
|
|
+ if (keyClassOption == null) {
|
|
|
+ this.comparator = comparatorOption.getValue();
|
|
|
+ keyClass = comparator.getKeyClass();
|
|
|
+ } else {
|
|
|
+ keyClass=
|
|
|
+ (Class<? extends WritableComparable>) keyClassOption.getValue();
|
|
|
+ this.comparator = WritableComparator.get(keyClass);
|
|
|
+ }
|
|
|
this.lastKey = comparator.newKey();
|
|
|
+ FileSystem fs = dirName.getFileSystem(conf);
|
|
|
|
|
|
- Path dir = new Path(dirName);
|
|
|
- if (!fs.mkdirs(dir)) {
|
|
|
- throw new IOException("Mkdirs failed to create directory " + dir.toString());
|
|
|
+ if (!fs.mkdirs(dirName)) {
|
|
|
+ throw new IOException("Mkdirs failed to create directory " + dirName);
|
|
|
}
|
|
|
- Path dataFile = new Path(dir, DATA_FILE_NAME);
|
|
|
- Path indexFile = new Path(dir, INDEX_FILE_NAME);
|
|
|
+ Path dataFile = new Path(dirName, DATA_FILE_NAME);
|
|
|
+ Path indexFile = new Path(dirName, INDEX_FILE_NAME);
|
|
|
|
|
|
- Class keyClass = comparator.getKeyClass();
|
|
|
- this.data =
|
|
|
- SequenceFile.createWriter
|
|
|
- (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
|
|
|
- this.index =
|
|
|
- SequenceFile.createWriter
|
|
|
- (fs, conf, indexFile, keyClass, LongWritable.class,
|
|
|
- CompressionType.BLOCK, progress);
|
|
|
+ SequenceFile.Writer.Option[] dataOptions =
|
|
|
+ Options.prependOptions(opts,
|
|
|
+ SequenceFile.Writer.file(dataFile),
|
|
|
+ SequenceFile.Writer.keyClass(keyClass));
|
|
|
+ this.data = SequenceFile.createWriter(conf, dataOptions);
|
|
|
+
|
|
|
+ SequenceFile.Writer.Option[] indexOptions =
|
|
|
+ Options.prependOptions(opts,
|
|
|
+ SequenceFile.Writer.file(indexFile),
|
|
|
+ SequenceFile.Writer.keyClass(keyClass),
|
|
|
+ SequenceFile.Writer.valueClass(LongWritable.class),
|
|
|
+ SequenceFile.Writer.compressionType(CompressionType.BLOCK));
|
|
|
+ this.index = SequenceFile.createWriter(conf, indexOptions);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/** The number of entries that are added before an index entry is added.*/
|
|
|
public int getIndexInterval() { return indexInterval; }
|
|
|
|
|
@@ -269,58 +372,86 @@ public class MapFile {
|
|
|
/** Returns the class of values in this file. */
|
|
|
public Class<?> getValueClass() { return data.getValueClass(); }
|
|
|
|
|
|
- /** Construct a map reader for the named map.*/
|
|
|
- public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
|
|
|
- this(fs, dirName, null, conf);
|
|
|
- INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
|
|
|
+ public static interface Option extends SequenceFile.Reader.Option {}
|
|
|
+
|
|
|
+ public static Option comparator(WritableComparator value) {
|
|
|
+ return new ComparatorOption(value);
|
|
|
}
|
|
|
|
|
|
- /** Construct a map reader for the named map using the named comparator.*/
|
|
|
- public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
|
|
|
- throws IOException {
|
|
|
- this(fs, dirName, comparator, conf, true);
|
|
|
+ static class ComparatorOption implements Option {
|
|
|
+ private final WritableComparator value;
|
|
|
+ ComparatorOption(WritableComparator value) {
|
|
|
+ this.value = value;
|
|
|
+ }
|
|
|
+ WritableComparator getValue() {
|
|
|
+ return value;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Hook to allow subclasses to defer opening streams until further
|
|
|
- * initialization is complete.
|
|
|
- * @see #createDataFileReader(FileSystem, Path, Configuration)
|
|
|
+
|
|
|
+ public Reader(Path dir, Configuration conf,
|
|
|
+ SequenceFile.Reader.Option... opts) throws IOException {
|
|
|
+ ComparatorOption comparatorOption =
|
|
|
+ Options.getOption(ComparatorOption.class, opts);
|
|
|
+ WritableComparator comparator =
|
|
|
+ comparatorOption == null ? null : comparatorOption.getValue();
|
|
|
+ INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
|
|
|
+ open(dir, comparator, conf, opts);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Construct a map reader for the named map.
|
|
|
+ * @deprecated
|
|
|
*/
|
|
|
- protected Reader(FileSystem fs, String dirName,
|
|
|
- WritableComparator comparator, Configuration conf, boolean open)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- if (open) {
|
|
|
- open(fs, dirName, comparator, conf);
|
|
|
- }
|
|
|
+ @Deprecated
|
|
|
+ public Reader(FileSystem fs, String dirName,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
+ this(new Path(dirName), conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Construct a map reader for the named map using the named comparator.
|
|
|
+ * @deprecated
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
+ public Reader(FileSystem fs, String dirName, WritableComparator comparator,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
+ this(new Path(dirName), conf, comparator(comparator));
|
|
|
}
|
|
|
|
|
|
- protected synchronized void open(FileSystem fs, String dirName,
|
|
|
- WritableComparator comparator, Configuration conf) throws IOException {
|
|
|
- Path dir = new Path(dirName);
|
|
|
+ protected synchronized void open(Path dir,
|
|
|
+ WritableComparator comparator,
|
|
|
+ Configuration conf,
|
|
|
+ SequenceFile.Reader.Option... options
|
|
|
+ ) throws IOException {
|
|
|
Path dataFile = new Path(dir, DATA_FILE_NAME);
|
|
|
Path indexFile = new Path(dir, INDEX_FILE_NAME);
|
|
|
|
|
|
// open the data
|
|
|
- this.data = createDataFileReader(fs, dataFile, conf);
|
|
|
+ this.data = createDataFileReader(dataFile, conf, options);
|
|
|
this.firstPosition = data.getPosition();
|
|
|
|
|
|
if (comparator == null)
|
|
|
- this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
|
|
|
+ this.comparator =
|
|
|
+ WritableComparator.get(data.getKeyClass().
|
|
|
+ asSubclass(WritableComparable.class));
|
|
|
else
|
|
|
this.comparator = comparator;
|
|
|
|
|
|
// open the index
|
|
|
- this.index = new SequenceFile.Reader(fs, indexFile, conf);
|
|
|
+ SequenceFile.Reader.Option[] indexOptions =
|
|
|
+ Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
|
|
|
+ this.index = new SequenceFile.Reader(conf, indexOptions);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Override this method to specialize the type of
|
|
|
* {@link SequenceFile.Reader} returned.
|
|
|
*/
|
|
|
- protected SequenceFile.Reader createDataFileReader(FileSystem fs,
|
|
|
- Path dataFile, Configuration conf) throws IOException {
|
|
|
- return new SequenceFile.Reader(fs, dataFile, conf);
|
|
|
+ protected SequenceFile.Reader
|
|
|
+ createDataFileReader(Path dataFile, Configuration conf,
|
|
|
+ SequenceFile.Reader.Option... options
|
|
|
+ ) throws IOException {
|
|
|
+ SequenceFile.Reader.Option[] newOptions =
|
|
|
+ Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
|
|
|
+ return new SequenceFile.Reader(conf, newOptions);
|
|
|
}
|
|
|
|
|
|
private void readIndex() throws IOException {
|
|
@@ -650,7 +781,8 @@ public class MapFile {
|
|
|
// no fixing needed
|
|
|
return -1;
|
|
|
}
|
|
|
- SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
|
|
|
+ SequenceFile.Reader dataReader =
|
|
|
+ new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
|
|
|
if (!dataReader.getKeyClass().equals(keyClass)) {
|
|
|
throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
|
|
|
", got " + dataReader.getKeyClass().getName());
|
|
@@ -663,7 +795,14 @@ public class MapFile {
|
|
|
Writable key = ReflectionUtils.newInstance(keyClass, conf);
|
|
|
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
|
|
SequenceFile.Writer indexWriter = null;
|
|
|
- if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
|
|
|
+ if (!dryrun) {
|
|
|
+ indexWriter =
|
|
|
+ SequenceFile.createWriter(conf,
|
|
|
+ SequenceFile.Writer.file(index),
|
|
|
+ SequenceFile.Writer.keyClass(keyClass),
|
|
|
+ SequenceFile.Writer.valueClass
|
|
|
+ (LongWritable.class));
|
|
|
+ }
|
|
|
try {
|
|
|
long pos = 0L;
|
|
|
LongWritable position = new LongWritable();
|