|
@@ -18,24 +18,25 @@
|
|
|
|
|
|
package org.apache.hadoop.io;
|
|
|
|
|
|
+import java.io.EOFException;
|
|
|
+import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.io.*;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.util.Options;
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
-import org.apache.hadoop.util.Progressable;
|
|
|
-import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
-import org.apache.hadoop.io.SequenceFile.Reader;
|
|
|
-import org.apache.hadoop.io.SequenceFile.Writer;
|
|
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
-import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
+import org.apache.hadoop.io.serial.RawComparator;
|
|
|
+import org.apache.hadoop.io.serial.Serialization;
|
|
|
+import org.apache.hadoop.io.serial.SerializationFactory;
|
|
|
+import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
/** A file-based map from keys to values.
|
|
|
*
|
|
@@ -68,8 +69,11 @@ public class MapFile {
|
|
|
|
|
|
/** Writes a new map. */
|
|
|
public static class Writer implements java.io.Closeable {
|
|
|
- private SequenceFile.Writer data;
|
|
|
- private SequenceFile.Writer index;
|
|
|
+ private final SequenceFile.Writer data;
|
|
|
+ private final SequenceFile.Writer index;
|
|
|
+ private final Configuration conf;
|
|
|
+ private final Serialization<Object> keySerialization;
|
|
|
+ private final Serialization<Object> valueSerialization;
|
|
|
|
|
|
final private static String INDEX_INTERVAL = "io.map.index.interval";
|
|
|
private int indexInterval = 128;
|
|
@@ -78,10 +82,11 @@ public class MapFile {
|
|
|
private LongWritable position = new LongWritable();
|
|
|
|
|
|
// the following fields are used only for checking key order
|
|
|
- private WritableComparator comparator;
|
|
|
- private DataInputBuffer inBuf = new DataInputBuffer();
|
|
|
- private DataOutputBuffer outBuf = new DataOutputBuffer();
|
|
|
- private WritableComparable lastKey;
|
|
|
+ private final RawComparator comparator;
|
|
|
+ private final DataInputBuffer inBuf = new DataInputBuffer();
|
|
|
+ private DataOutputBuffer lastKey;
|
|
|
+ private final DataOutputBuffer currentKey = new DataOutputBuffer();
|
|
|
+ private final DataOutputBuffer currentValue = new DataOutputBuffer();
|
|
|
|
|
|
/** What's the position (in bytes) we wrote when we got the last index */
|
|
|
private long lastIndexPos = -1;
|
|
@@ -97,6 +102,7 @@ public class MapFile {
|
|
|
/** Create the named map for keys of the named class.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass,
|
|
@@ -107,6 +113,7 @@ public class MapFile {
|
|
|
/** Create the named map for keys of the named class.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass, Class valClass,
|
|
@@ -119,6 +126,7 @@ public class MapFile {
|
|
|
/** Create the named map for keys of the named class.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass, Class valClass,
|
|
@@ -131,6 +139,7 @@ public class MapFile {
|
|
|
/** Create the named map for keys of the named class.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
Class<? extends WritableComparable> keyClass, Class valClass,
|
|
@@ -142,6 +151,7 @@ public class MapFile {
|
|
|
/** Create the named map using the named key comparator.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass
|
|
@@ -153,6 +163,7 @@ public class MapFile {
|
|
|
/** Create the named map using the named key comparator.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass,
|
|
@@ -164,6 +175,7 @@ public class MapFile {
|
|
|
/** Create the named map using the named key comparator.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...)} instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass,
|
|
@@ -177,6 +189,7 @@ public class MapFile {
|
|
|
/** Create the named map using the named key comparator.
|
|
|
* @deprecated Use Writer(Configuration, Path, Option...) instead.
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Deprecated
|
|
|
public Writer(Configuration conf, FileSystem fs, String dirName,
|
|
|
WritableComparator comparator, Class valClass,
|
|
@@ -190,28 +203,18 @@ public class MapFile {
|
|
|
// our options are a superset of sequence file writer options
|
|
|
public static interface Option extends SequenceFile.Writer.Option { }
|
|
|
|
|
|
- private static class KeyClassOption extends Options.ClassOption
|
|
|
- implements Option {
|
|
|
- KeyClassOption(Class<?> value) {
|
|
|
+ private static class ComparatorOption extends Options.ComparatorOption
|
|
|
+ implements Option{
|
|
|
+ ComparatorOption(RawComparator value) {
|
|
|
super(value);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private static class ComparatorOption implements Option {
|
|
|
- private final WritableComparator value;
|
|
|
- ComparatorOption(WritableComparator value) {
|
|
|
- this.value = value;
|
|
|
- }
|
|
|
- WritableComparator getValue() {
|
|
|
- return value;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- public static Option keyClass(Class<? extends WritableComparable> value) {
|
|
|
- return new KeyClassOption(value);
|
|
|
+ public static SequenceFile.Writer.Option keyClass(Class<?> value) {
|
|
|
+ return new SequenceFile.Writer.KeyClassOption(value);
|
|
|
}
|
|
|
|
|
|
- public static Option comparator(WritableComparator value) {
|
|
|
+ public static Option comparator(RawComparator value) {
|
|
|
return new ComparatorOption(value);
|
|
|
}
|
|
|
|
|
@@ -234,31 +237,27 @@ public class MapFile {
|
|
|
return SequenceFile.Writer.progressable(value);
|
|
|
}
|
|
|
|
|
|
+ public static
|
|
|
+ SequenceFile.Writer.Option keySerialization(Serialization<?> value) {
|
|
|
+ return SequenceFile.Writer.keySerialization(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static
|
|
|
+ SequenceFile.Writer.Option valueSerialization(Serialization<?> value) {
|
|
|
+ return SequenceFile.Writer.valueSerialization(value);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public Writer(Configuration conf,
|
|
|
Path dirName,
|
|
|
SequenceFile.Writer.Option... opts
|
|
|
) throws IOException {
|
|
|
- KeyClassOption keyClassOption =
|
|
|
- Options.getOption(KeyClassOption.class, opts);
|
|
|
+ this.conf = conf;
|
|
|
ComparatorOption comparatorOption =
|
|
|
Options.getOption(ComparatorOption.class, opts);
|
|
|
- if ((keyClassOption == null) == (comparatorOption == null)) {
|
|
|
- throw new IllegalArgumentException("key class or comparator option "
|
|
|
- + "must be set");
|
|
|
- }
|
|
|
+
|
|
|
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
|
|
|
|
|
|
- Class<? extends WritableComparable> keyClass;
|
|
|
- if (keyClassOption == null) {
|
|
|
- this.comparator = comparatorOption.getValue();
|
|
|
- keyClass = comparator.getKeyClass();
|
|
|
- } else {
|
|
|
- keyClass=
|
|
|
- (Class<? extends WritableComparable>) keyClassOption.getValue();
|
|
|
- this.comparator = WritableComparator.get(keyClass);
|
|
|
- }
|
|
|
- this.lastKey = comparator.newKey();
|
|
|
FileSystem fs = dirName.getFileSystem(conf);
|
|
|
|
|
|
if (!fs.mkdirs(dirName)) {
|
|
@@ -269,13 +268,18 @@ public class MapFile {
|
|
|
|
|
|
SequenceFile.Writer.Option[] dataOptions =
|
|
|
Options.prependOptions(opts,
|
|
|
- SequenceFile.Writer.file(dataFile),
|
|
|
- SequenceFile.Writer.keyClass(keyClass));
|
|
|
+ SequenceFile.Writer.file(dataFile));
|
|
|
this.data = SequenceFile.createWriter(conf, dataOptions);
|
|
|
+ keySerialization = (Serialization<Object>) data.getKeySerialization();
|
|
|
+ valueSerialization = (Serialization<Object>) data.getValueSerialization();
|
|
|
+ if (comparatorOption != null) {
|
|
|
+ comparator = comparatorOption.getValue();
|
|
|
+ } else {
|
|
|
+ comparator = keySerialization.getRawComparator();
|
|
|
+ }
|
|
|
|
|
|
SequenceFile.Writer.Option[] indexOptions =
|
|
|
Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
|
|
|
- SequenceFile.Writer.keyClass(keyClass),
|
|
|
SequenceFile.Writer.valueClass(LongWritable.class),
|
|
|
SequenceFile.Writer.compression(CompressionType.BLOCK));
|
|
|
this.index = SequenceFile.createWriter(conf, indexOptions);
|
|
@@ -296,6 +300,22 @@ public class MapFile {
|
|
|
conf.setInt(INDEX_INTERVAL, interval);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the serialization used for the keys
|
|
|
+ * @return the key serialization
|
|
|
+ */
|
|
|
+ public Serialization<?> getKeySerialization() {
|
|
|
+ return data.getKeySerialization();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the serialization used for the values
|
|
|
+ * @return the value serialization
|
|
|
+ */
|
|
|
+ public Serialization<?> getValueSerialization() {
|
|
|
+ return data.getValueSerialization();
|
|
|
+ }
|
|
|
+
|
|
|
/** Close the map. */
|
|
|
public synchronized void close() throws IOException {
|
|
|
data.close();
|
|
@@ -304,10 +324,14 @@ public class MapFile {
|
|
|
|
|
|
/** Append a key/value pair to the map. The key must be greater or equal
|
|
|
* to the previous key added to the map. */
|
|
|
- public synchronized void append(WritableComparable key, Writable val)
|
|
|
+ public synchronized void append(Object key, Object val)
|
|
|
throws IOException {
|
|
|
|
|
|
- checkKey(key);
|
|
|
+ currentKey.reset();
|
|
|
+ keySerialization.serialize(currentKey, key);
|
|
|
+ checkKey(currentKey, key);
|
|
|
+ currentValue.reset();
|
|
|
+ valueSerialization.serialize(currentValue, val);
|
|
|
|
|
|
long pos = data.getLength();
|
|
|
// Only write an index if we've changed positions. In a block compressed
|
|
@@ -323,17 +347,21 @@ public class MapFile {
|
|
|
size++;
|
|
|
}
|
|
|
|
|
|
- private void checkKey(WritableComparable key) throws IOException {
|
|
|
+ private void checkKey(DataOutputBuffer serialKey, Object key
|
|
|
+ ) throws IOException {
|
|
|
// check that keys are well-ordered
|
|
|
- if (size != 0 && comparator.compare(lastKey, key) > 0)
|
|
|
- throw new IOException("key out of order: "+key+" after "+lastKey);
|
|
|
-
|
|
|
- // update lastKey with a copy of key by writing and reading
|
|
|
- outBuf.reset();
|
|
|
- key.write(outBuf); // write new key
|
|
|
-
|
|
|
- inBuf.reset(outBuf.getData(), outBuf.getLength());
|
|
|
- lastKey.readFields(inBuf); // read into lastKey
|
|
|
+ if (lastKey == null) {
|
|
|
+ lastKey = new DataOutputBuffer();
|
|
|
+ } else if (comparator.compare(lastKey.getData(), 0, lastKey.getLength(),
|
|
|
+ serialKey.getData(),0,serialKey.getLength())
|
|
|
+ > 0) {
|
|
|
+ // rebuild the previous key so that we can explain what's wrong
|
|
|
+ inBuf.reset(lastKey.getData(), 0, lastKey.getLength());
|
|
|
+ Object prevKey = keySerialization.deserialize(inBuf, null, conf);
|
|
|
+ throw new IOException("key out of order: "+ key +" after "+ prevKey);
|
|
|
+ }
|
|
|
+ lastKey.reset();
|
|
|
+ lastKey.write(serialKey.getData(), 0, serialKey.getLength());
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -346,9 +374,12 @@ public class MapFile {
|
|
|
* files using less memory. */
|
|
|
private int INDEX_SKIP = 0;
|
|
|
|
|
|
- private WritableComparator comparator;
|
|
|
+ private RawComparator comparator;
|
|
|
+ private Serialization<Object> keySerialization;
|
|
|
+ private final Configuration conf;
|
|
|
|
|
|
- private WritableComparable nextKey;
|
|
|
+ private DataOutputBuffer nextKey = new DataOutputBuffer();
|
|
|
+ private DataInputBuffer inBuf = new DataInputBuffer();
|
|
|
private long seekPosition = -1;
|
|
|
private int seekIndex = -1;
|
|
|
private long firstPosition;
|
|
@@ -362,36 +393,55 @@ public class MapFile {
|
|
|
|
|
|
// the index, in memory
|
|
|
private int count = -1;
|
|
|
- private WritableComparable[] keys;
|
|
|
+ private byte[][] keys;
|
|
|
private long[] positions;
|
|
|
|
|
|
- /** Returns the class of keys in this file. */
|
|
|
+ /** Returns the class of keys in this file.
|
|
|
+ * @deprecated Use {@link #getKeySerialization} instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Class<?> getKeyClass() { return data.getKeyClass(); }
|
|
|
|
|
|
- /** Returns the class of values in this file. */
|
|
|
+ /** Returns the class of values in this file.
|
|
|
+ * @deprecated Use {@link #getValueSerialization} instead.
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
public Class<?> getValueClass() { return data.getValueClass(); }
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the key serialization for this map file.
|
|
|
+ * @return the serialization for the key
|
|
|
+ */
|
|
|
+ public Serialization<?> getKeySerialization() {
|
|
|
+ return keySerialization;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the value serialization for this map file.
|
|
|
+ * @return the serialization for the value
|
|
|
+ */
|
|
|
+ public Serialization<?> getValueSerialization() {
|
|
|
+ return data.getValueSerialization();
|
|
|
+ }
|
|
|
public static interface Option extends SequenceFile.Reader.Option {}
|
|
|
|
|
|
public static Option comparator(WritableComparator value) {
|
|
|
return new ComparatorOption(value);
|
|
|
}
|
|
|
|
|
|
- static class ComparatorOption implements Option {
|
|
|
- private final WritableComparator value;
|
|
|
- ComparatorOption(WritableComparator value) {
|
|
|
- this.value = value;
|
|
|
- }
|
|
|
- WritableComparator getValue() {
|
|
|
- return value;
|
|
|
+ static class ComparatorOption extends Options.ComparatorOption
|
|
|
+ implements Option {
|
|
|
+ ComparatorOption(RawComparator value) {
|
|
|
+ super(value);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public Reader(Path dir, Configuration conf,
|
|
|
SequenceFile.Reader.Option... opts) throws IOException {
|
|
|
+ this.conf = conf;
|
|
|
ComparatorOption comparatorOption =
|
|
|
Options.getOption(ComparatorOption.class, opts);
|
|
|
- WritableComparator comparator =
|
|
|
+ RawComparator comparator =
|
|
|
comparatorOption == null ? null : comparatorOption.getValue();
|
|
|
INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
|
|
|
open(dir, comparator, conf, opts);
|
|
@@ -415,8 +465,9 @@ public class MapFile {
|
|
|
this(new Path(dirName), conf, comparator(comparator));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
protected synchronized void open(Path dir,
|
|
|
- WritableComparator comparator,
|
|
|
+ RawComparator comparator,
|
|
|
Configuration conf,
|
|
|
SequenceFile.Reader.Option... options
|
|
|
) throws IOException {
|
|
@@ -426,13 +477,13 @@ public class MapFile {
|
|
|
// open the data
|
|
|
this.data = createDataFileReader(dataFile, conf, options);
|
|
|
this.firstPosition = data.getPosition();
|
|
|
+ keySerialization = (Serialization<Object>) data.getKeySerialization();
|
|
|
|
|
|
- if (comparator == null)
|
|
|
- this.comparator =
|
|
|
- WritableComparator.get(data.getKeyClass().
|
|
|
- asSubclass(WritableComparable.class));
|
|
|
- else
|
|
|
+ if (comparator == null) {
|
|
|
+ this.comparator = keySerialization.getRawComparator();
|
|
|
+ } else {
|
|
|
this.comparator = comparator;
|
|
|
+ }
|
|
|
|
|
|
// open the index
|
|
|
SequenceFile.Reader.Option[] indexOptions =
|
|
@@ -463,19 +514,25 @@ public class MapFile {
|
|
|
try {
|
|
|
int skip = INDEX_SKIP;
|
|
|
LongWritable position = new LongWritable();
|
|
|
- WritableComparable lastKey = null;
|
|
|
+ byte[] lastKey = null;
|
|
|
long lastIndex = -1;
|
|
|
- ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
|
|
|
- while (true) {
|
|
|
- WritableComparable k = comparator.newKey();
|
|
|
-
|
|
|
- if (!index.next(k, position))
|
|
|
- break;
|
|
|
+ ArrayList<byte[]> keyBuilder = new ArrayList<byte[]>(1024);
|
|
|
+ DataOutputBuffer key = new DataOutputBuffer();
|
|
|
+ while (index.nextRawKey(key) > 0) {
|
|
|
+ position = (LongWritable) index.getCurrentValue(position);
|
|
|
|
|
|
// check order to make sure comparator is compatible
|
|
|
- if (lastKey != null && comparator.compare(lastKey, k) > 0)
|
|
|
- throw new IOException("key out of order: "+k+" after "+lastKey);
|
|
|
- lastKey = k;
|
|
|
+ if (lastKey != null &&
|
|
|
+ comparator.compare(lastKey, 0, lastKey.length,
|
|
|
+ key.getData(), 0 , key.getLength()) > 0) {
|
|
|
+ inBuf.reset(lastKey, 0, lastKey.length);
|
|
|
+ Object prevKey = keySerialization.deserialize(inBuf, null, conf);
|
|
|
+ inBuf.reset(key.getData(), 0, key.getLength());
|
|
|
+ Object curKey = keySerialization.deserialize(inBuf, null, conf);
|
|
|
+ throw new IOException("key out of order: "+ curKey + " after " +
|
|
|
+ prevKey);
|
|
|
+ }
|
|
|
+ lastKey = Arrays.copyOf(key.getData(), key.getLength());
|
|
|
if (skip > 0) {
|
|
|
skip--;
|
|
|
continue; // skip this entry
|
|
@@ -483,28 +540,28 @@ public class MapFile {
|
|
|
skip = INDEX_SKIP; // reset skip
|
|
|
}
|
|
|
|
|
|
- // don't read an index that is the same as the previous one. Block
|
|
|
- // compressed map files used to do this (multiple entries would point
|
|
|
- // at the same block)
|
|
|
- if (position.get() == lastIndex)
|
|
|
- continue;
|
|
|
+ // don't read an index that is the same as the previous one. Block
|
|
|
+ // compressed map files used to do this (multiple entries would point
|
|
|
+ // at the same block)
|
|
|
+ if (position.get() == lastIndex)
|
|
|
+ continue;
|
|
|
|
|
|
if (count == positions.length) {
|
|
|
- positions = Arrays.copyOf(positions, positions.length * 2);
|
|
|
+ positions = Arrays.copyOf(positions, positions.length * 2);
|
|
|
}
|
|
|
|
|
|
- keyBuilder.add(k);
|
|
|
+ keyBuilder.add(lastKey);
|
|
|
positions[count] = position.get();
|
|
|
count++;
|
|
|
}
|
|
|
|
|
|
- this.keys = keyBuilder.toArray(new WritableComparable[count]);
|
|
|
+ this.keys = keyBuilder.toArray(new byte[count][]);
|
|
|
positions = Arrays.copyOf(positions, count);
|
|
|
} catch (EOFException e) {
|
|
|
LOG.warn("Unexpected EOF reading " + index +
|
|
|
- " at entry #" + count + ". Ignoring.");
|
|
|
+ " at entry #" + count + ". Ignoring.");
|
|
|
} finally {
|
|
|
- indexClosed = true;
|
|
|
+ indexClosed = true;
|
|
|
index.close();
|
|
|
}
|
|
|
}
|
|
@@ -517,22 +574,23 @@ public class MapFile {
|
|
|
/** Get the key at approximately the middle of the file. Or null if the
|
|
|
* file is empty.
|
|
|
*/
|
|
|
- public synchronized WritableComparable midKey() throws IOException {
|
|
|
+ public synchronized Object midKey() throws IOException {
|
|
|
|
|
|
readIndex();
|
|
|
if (count == 0) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- return keys[(count - 1) / 2];
|
|
|
+ byte[] rawKey = keys[(count -1) / 2];
|
|
|
+ inBuf.reset(rawKey, 0, rawKey.length);
|
|
|
+ return keySerialization.deserialize(inBuf, null, conf);
|
|
|
}
|
|
|
|
|
|
/** Reads the final key from the file.
|
|
|
*
|
|
|
* @param key key to read into
|
|
|
*/
|
|
|
- public synchronized void finalKey(WritableComparable key)
|
|
|
- throws IOException {
|
|
|
+ public synchronized Object finalKey(Object key) throws IOException {
|
|
|
|
|
|
long originalPosition = data.getPosition(); // save position
|
|
|
try {
|
|
@@ -542,8 +600,12 @@ public class MapFile {
|
|
|
} else {
|
|
|
reset(); // start at the beginning
|
|
|
}
|
|
|
- while (data.next(key)) {} // scan to eof
|
|
|
-
|
|
|
+ Object prevKey = null;
|
|
|
+ do {
|
|
|
+ prevKey = key;
|
|
|
+ key = data.nextKey(key);
|
|
|
+ } while (key != null);
|
|
|
+ return prevKey;
|
|
|
} finally {
|
|
|
data.seek(originalPosition); // restore position
|
|
|
}
|
|
@@ -553,7 +615,7 @@ public class MapFile {
|
|
|
* first entry after the named key. Returns true iff the named key exists
|
|
|
* in this map.
|
|
|
*/
|
|
|
- public synchronized boolean seek(WritableComparable key) throws IOException {
|
|
|
+ public synchronized boolean seek(Object key) throws IOException {
|
|
|
return seekInternal(key) == 0;
|
|
|
}
|
|
|
|
|
@@ -565,7 +627,7 @@ public class MapFile {
|
|
|
* < 0 - positioned at next record
|
|
|
* 1 - no more records in file
|
|
|
*/
|
|
|
- private synchronized int seekInternal(WritableComparable key)
|
|
|
+ private synchronized int seekInternal(Object key)
|
|
|
throws IOException {
|
|
|
return seekInternal(key, false);
|
|
|
}
|
|
@@ -582,19 +644,24 @@ public class MapFile {
|
|
|
* < 0 - positioned at next record
|
|
|
* 1 - no more records in file
|
|
|
*/
|
|
|
- private synchronized int seekInternal(WritableComparable key,
|
|
|
- final boolean before)
|
|
|
- throws IOException {
|
|
|
+ private synchronized int seekInternal(Object key,
|
|
|
+ final boolean before
|
|
|
+ ) throws IOException {
|
|
|
readIndex(); // make sure index is read
|
|
|
+ DataOutputBuffer keyBuffer = new DataOutputBuffer();
|
|
|
+ keySerialization.serialize(keyBuffer, key);
|
|
|
|
|
|
if (seekIndex != -1 // seeked before
|
|
|
&& seekIndex+1 < count
|
|
|
- && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
|
|
|
- && comparator.compare(key, nextKey)
|
|
|
- >= 0) { // but after last seeked
|
|
|
+ && comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
|
|
|
+ keys[seekIndex+1], 0, keys[seekIndex+1].length)
|
|
|
+ < 0 // before next indexed
|
|
|
+ && comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
|
|
|
+ nextKey.getData(), 0, nextKey.getLength())
|
|
|
+ >= 0) { // but after last seeked
|
|
|
// do nothing
|
|
|
} else {
|
|
|
- seekIndex = binarySearch(key);
|
|
|
+ seekIndex = binarySearch(keyBuffer.getData(), keyBuffer.getLength());
|
|
|
if (seekIndex < 0) // decode insertion point
|
|
|
seekIndex = -seekIndex-2;
|
|
|
|
|
@@ -605,17 +672,15 @@ public class MapFile {
|
|
|
}
|
|
|
data.seek(seekPosition);
|
|
|
|
|
|
- if (nextKey == null)
|
|
|
- nextKey = comparator.newKey();
|
|
|
-
|
|
|
// If we're looking for the key before, we need to keep track
|
|
|
// of the position we got the current key as well as the position
|
|
|
// of the key before it.
|
|
|
long prevPosition = -1;
|
|
|
long curPosition = seekPosition;
|
|
|
|
|
|
- while (data.next(nextKey)) {
|
|
|
- int c = comparator.compare(key, nextKey);
|
|
|
+ while (data.nextRawKey(nextKey) != -1) {
|
|
|
+ int c = comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
|
|
|
+ nextKey.getData(), 0 , nextKey.getLength());
|
|
|
if (c <= 0) { // at or beyond desired
|
|
|
if (before && c != 0) {
|
|
|
if (prevPosition == -1) {
|
|
@@ -627,7 +692,7 @@ public class MapFile {
|
|
|
} else {
|
|
|
// We have a previous record to back up to
|
|
|
data.seek(prevPosition);
|
|
|
- data.next(nextKey);
|
|
|
+ data.nextRawKey(nextKey);
|
|
|
// now that we've rewound, the search key must be greater than this key
|
|
|
return 1;
|
|
|
}
|
|
@@ -639,18 +704,24 @@ public class MapFile {
|
|
|
curPosition = data.getPosition();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ // if we have fallen off the end of the file and we want the before key
|
|
|
+ // then back up to the previous key
|
|
|
+ if (before && prevPosition != -1) {
|
|
|
+ data.seek(prevPosition);
|
|
|
+ data.nextRawKey(nextKey);
|
|
|
+ }
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
|
- private int binarySearch(WritableComparable key) {
|
|
|
+ private int binarySearch(byte[] key, int length) {
|
|
|
int low = 0;
|
|
|
int high = count-1;
|
|
|
|
|
|
while (low <= high) {
|
|
|
int mid = (low + high) >>> 1;
|
|
|
- WritableComparable midVal = keys[mid];
|
|
|
- int cmp = comparator.compare(midVal, key);
|
|
|
+ byte[] midVal = keys[mid];
|
|
|
+ int cmp = comparator.compare(midVal, 0, midVal.length,
|
|
|
+ key, 0, length);
|
|
|
|
|
|
if (cmp < 0)
|
|
|
low = mid + 1;
|
|
@@ -664,18 +735,59 @@ public class MapFile {
|
|
|
|
|
|
/** Read the next key/value pair in the map into <code>key</code> and
|
|
|
* <code>val</code>. Returns true if such a pair exists and false when at
|
|
|
- * the end of the map */
|
|
|
+ * the end of the map
|
|
|
+ * @deprecated Use {@link #nextKey} and {@link #getCurrentValue} instead.
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Deprecated
|
|
|
public synchronized boolean next(WritableComparable key, Writable val)
|
|
|
throws IOException {
|
|
|
return data.next(key, val);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read the next key in the map.
|
|
|
+ * @param reusable an object that may be re-used for holding the next key
|
|
|
+ * @return the key that was read or null if there is not another key
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Object nextKey(Object reusable) throws IOException {
|
|
|
+ return data.nextKey(reusable);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the current value in the map.
|
|
|
+ * @param reusable an object that may be re-used for hold the value
|
|
|
+ * @return the value that was read in
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Object getCurrentValue(Object reusable) throws IOException {
|
|
|
+ return data.getCurrentValue(reusable);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the value for the named key, or null if none exists.
|
|
|
+ * @param key the key to look for
|
|
|
+ * @param value a object to read into
|
|
|
+ * @return the value that was found or null if the key wasn't found
|
|
|
+ * @throws IOException
|
|
|
+ * @deprecated Use {@link #seek} and {@link #getCurrentValue} instead.
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Deprecated
|
|
|
+ public synchronized Writable get(WritableComparable key,
|
|
|
+ Writable value) throws IOException {
|
|
|
+ if (seek(key)) {
|
|
|
+ return (Writable) data.getCurrentValue(value);
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/** Return the value for the named key, or null if none exists. */
|
|
|
- public synchronized Writable get(WritableComparable key, Writable val)
|
|
|
- throws IOException {
|
|
|
+ public synchronized Object get(Object key, Object val) throws IOException{
|
|
|
if (seek(key)) {
|
|
|
- data.getCurrentValue(val);
|
|
|
- return val;
|
|
|
+ return data.getCurrentValue(val);
|
|
|
} else
|
|
|
return null;
|
|
|
}
|
|
@@ -689,9 +801,8 @@ public class MapFile {
|
|
|
- * @param val - data value if key is found
|
|
|
- * @return - the key that was the closest match or null if eof.
|
|
|
*/
|
|
|
- public synchronized WritableComparable getClosest(WritableComparable key,
|
|
|
- Writable val)
|
|
|
- throws IOException {
|
|
|
+ public Object getClosest(Object key,
|
|
|
+ Object val) throws IOException {
|
|
|
return getClosest(key, val, false);
|
|
|
}
|
|
|
|
|
@@ -705,9 +816,10 @@ public class MapFile {
|
|
|
* return the record that sorts just after.
|
|
|
* @return - the key that was the closest match or null if eof.
|
|
|
*/
|
|
|
- public synchronized WritableComparable getClosest(WritableComparable key,
|
|
|
- Writable val, final boolean before)
|
|
|
- throws IOException {
|
|
|
+ public synchronized Object getClosest(Object key,
|
|
|
+ Object val,
|
|
|
+ final boolean before
|
|
|
+ ) throws IOException {
|
|
|
|
|
|
int c = seekInternal(key, before);
|
|
|
|
|
@@ -720,7 +832,9 @@ public class MapFile {
|
|
|
}
|
|
|
|
|
|
data.getCurrentValue(val);
|
|
|
- return nextKey;
|
|
|
+ // deserialize the key
|
|
|
+ inBuf.reset(nextKey.getData(), 0, nextKey.getLength());
|
|
|
+ return keySerialization.deserialize(inBuf, null, conf);
|
|
|
}
|
|
|
|
|
|
/** Close the map. */
|
|
@@ -764,17 +878,24 @@ public class MapFile {
|
|
|
* @return number of valid entries in this MapFile, or -1 if no fixing was needed
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
public static long fix(FileSystem fs, Path dir,
|
|
|
- Class<? extends Writable> keyClass,
|
|
|
- Class<? extends Writable> valueClass, boolean dryrun,
|
|
|
- Configuration conf) throws Exception {
|
|
|
+ Class<?> keyClass,
|
|
|
+ Class<?> valueClass, boolean dryrun,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
String dr = (dryrun ? "[DRY RUN ] " : "");
|
|
|
Path data = new Path(dir, DATA_FILE_NAME);
|
|
|
Path index = new Path(dir, INDEX_FILE_NAME);
|
|
|
int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
|
|
|
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
|
|
|
+ Serialization<Object> keySerialization = (Serialization<Object>)
|
|
|
+ factory.getSerializationByType(keyClass);
|
|
|
+ Serialization<Object> valueSerialization = (Serialization<Object>)
|
|
|
+ factory.getSerializationByType(valueClass);
|
|
|
if (!fs.exists(data)) {
|
|
|
// there's nothing we can do to fix this!
|
|
|
- throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
|
|
|
+ throw new IOException(dr + "Missing data file in " + dir +
|
|
|
+ ", impossible to fix this.");
|
|
|
}
|
|
|
if (fs.exists(index)) {
|
|
|
// no fixing needed
|
|
@@ -782,17 +903,17 @@ public class MapFile {
|
|
|
}
|
|
|
SequenceFile.Reader dataReader =
|
|
|
new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
|
|
|
- if (!dataReader.getKeyClass().equals(keyClass)) {
|
|
|
- throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
|
|
|
- ", got " + dataReader.getKeyClass().getName());
|
|
|
+ if (!dataReader.getKeySerialization().equals(keySerialization)) {
|
|
|
+ throw new IOException(dr + "Wrong key serialization in " + dir +
|
|
|
+ ", expected" + keySerialization +
|
|
|
+ ", got " + dataReader.getKeySerialization());
|
|
|
}
|
|
|
- if (!dataReader.getValueClass().equals(valueClass)) {
|
|
|
- throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
|
|
|
- ", got " + dataReader.getValueClass().getName());
|
|
|
+ if (!dataReader.getValueSerialization().equals(valueSerialization)) {
|
|
|
+ throw new IOException(dr + "Wrong value serialization in " + dir +
|
|
|
+ ", expected" + valueSerialization +
|
|
|
+ ", got " + dataReader.getValueSerialization());
|
|
|
}
|
|
|
long cnt = 0L;
|
|
|
- Writable key = ReflectionUtils.newInstance(keyClass, conf);
|
|
|
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
|
|
SequenceFile.Writer indexWriter = null;
|
|
|
if (!dryrun) {
|
|
|
indexWriter =
|
|
@@ -805,7 +926,10 @@ public class MapFile {
|
|
|
try {
|
|
|
long pos = 0L;
|
|
|
LongWritable position = new LongWritable();
|
|
|
- while(dataReader.next(key, value)) {
|
|
|
+ Object key = null;
|
|
|
+ Object value = null;
|
|
|
+ while((key = dataReader.nextKey(key)) != null) {
|
|
|
+ value = dataReader.getCurrentValue(value);
|
|
|
cnt++;
|
|
|
if (cnt % indexInterval == 0) {
|
|
|
position.set(pos);
|
|
@@ -834,21 +958,21 @@ public class MapFile {
|
|
|
String out = args[1];
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
- FileSystem fs = FileSystem.getLocal(conf);
|
|
|
- MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
|
|
|
+ MapFile.Reader reader = new MapFile.Reader(new Path(in), conf);
|
|
|
+ Serialization<?> keySerialization = reader.getKeySerialization();
|
|
|
+ Serialization<?> valueSerialization = reader.getValueSerialization();
|
|
|
MapFile.Writer writer =
|
|
|
- new MapFile.Writer(conf, fs, out,
|
|
|
- reader.getKeyClass().asSubclass(WritableComparable.class),
|
|
|
- reader.getValueClass());
|
|
|
+ new MapFile.Writer(conf, new Path(out),
|
|
|
+ Writer.keySerialization(keySerialization),
|
|
|
+ Writer.valueSerialization(valueSerialization));
|
|
|
|
|
|
- WritableComparable key =
|
|
|
- ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
|
|
|
- Writable value =
|
|
|
- ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
|
|
|
+ Object key = null;
|
|
|
+ Object value = null;
|
|
|
|
|
|
- while (reader.next(key, value)) // copy all entries
|
|
|
+ while ((key = reader.nextKey(key)) != null) { // copy all entries
|
|
|
+ value = reader.getCurrentValue(value);
|
|
|
writer.append(key, value);
|
|
|
-
|
|
|
+ }
|
|
|
writer.close();
|
|
|
}
|
|
|
|