|
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.compress.Decompressor;
|
|
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
import org.apache.hadoop.io.compress.GzipCodec;
|
|
|
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
|
|
+import org.apache.hadoop.io.serializer.Deserializer;
|
|
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
|
|
import org.apache.hadoop.io.serializer.Serializer;
|
|
|
import org.apache.hadoop.conf.*;
|
|
@@ -1403,6 +1404,9 @@ public class SequenceFile {
|
|
|
private CompressionInputStream valInFilter = null;
|
|
|
private DataInputStream valIn = null;
|
|
|
private Decompressor valDecompressor = null;
|
|
|
+
|
|
|
+ private Deserializer keyDeserializer;
|
|
|
+ private Deserializer valDeserializer;
|
|
|
|
|
|
/** Open the named file. */
|
|
|
public Reader(FileSystem fs, Path file, Configuration conf)
|
|
@@ -1540,9 +1544,27 @@ public class SequenceFile {
|
|
|
valLenDecompressor);
|
|
|
valLenIn = new DataInputStream(valLenInFilter);
|
|
|
}
|
|
|
+
|
|
|
+ SerializationFactory serializationFactory =
|
|
|
+ new SerializationFactory(conf);
|
|
|
+ this.keyDeserializer =
|
|
|
+ getDeserializer(serializationFactory, getKeyClass());
|
|
|
+ if (!blockCompressed) {
|
|
|
+ this.keyDeserializer.open(valBuffer);
|
|
|
+ } else {
|
|
|
+ this.keyDeserializer.open(keyIn);
|
|
|
+ }
|
|
|
+ this.valDeserializer =
|
|
|
+ getDeserializer(serializationFactory, getValueClass());
|
|
|
+ this.valDeserializer.open(valIn);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private Deserializer getDeserializer(SerializationFactory sf, Class c) {
|
|
|
+ return sf.getDeserializer(c);
|
|
|
+ }
|
|
|
+
|
|
|
/** Close the file. */
|
|
|
public synchronized void close() throws IOException {
|
|
|
// Return the decompressors to the pool
|
|
@@ -1551,6 +1573,13 @@ public class SequenceFile {
|
|
|
CodecPool.returnDecompressor(valLenDecompressor);
|
|
|
CodecPool.returnDecompressor(valDecompressor);
|
|
|
|
|
|
+ if (keyDeserializer != null) {
|
|
|
+ keyDeserializer.close();
|
|
|
+ }
|
|
|
+ if (valDeserializer != null) {
|
|
|
+ valDeserializer.close();
|
|
|
+ }
|
|
|
+
|
|
|
// Close the input-stream
|
|
|
in.close();
|
|
|
}
|
|
@@ -1743,6 +1772,51 @@ public class SequenceFile {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the 'value' corresponding to the last read 'key'.
|
|
|
+ * @param val : The 'value' to be read.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public synchronized Object getCurrentValue(Object val)
|
|
|
+ throws IOException {
|
|
|
+ if (val instanceof Configurable) {
|
|
|
+ ((Configurable) val).setConf(this.conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Position stream to 'current' value
|
|
|
+ seekToCurrentValue();
|
|
|
+
|
|
|
+ if (!blockCompressed) {
|
|
|
+ val = deserializeValue(val);
|
|
|
+
|
|
|
+ if (valIn.read() > 0) {
|
|
|
+ LOG.info("available bytes: " + valIn.available());
|
|
|
+ throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
|
|
|
+ + " bytes, should read " +
|
|
|
+ (valBuffer.getLength()-keyLength));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Get the value
|
|
|
+ int valLength = WritableUtils.readVInt(valLenIn);
|
|
|
+ val = deserializeValue(val);
|
|
|
+
|
|
|
+ // Read another compressed 'value'
|
|
|
+ --noBufferedValues;
|
|
|
+
|
|
|
+ // Sanity check
|
|
|
+ if (valLength < 0) {
|
|
|
+ LOG.debug(val + " is a zero-length value");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return val;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private Object deserializeValue(Object val) throws IOException {
|
|
|
+ return valDeserializer.deserialize(val);
|
|
|
+ }
|
|
|
+
|
|
|
/** Read the next key in the file into <code>key</code>, skipping its
|
|
|
* value. True if another entry exists, and false at end of file. */
|
|
|
public synchronized boolean next(Writable key) throws IOException {
|
|
@@ -1974,6 +2048,60 @@ public class SequenceFile {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /** Read the next key in the file, skipping its
|
|
|
+ * value. Return null at end of file. */
|
|
|
+ public synchronized Object next(Object key) throws IOException {
|
|
|
+ if (key != null && key.getClass() != getKeyClass()) {
|
|
|
+ throw new IOException("wrong key class: "+key.getClass().getName()
|
|
|
+ +" is not "+keyClass);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!blockCompressed) {
|
|
|
+ outBuf.reset();
|
|
|
+
|
|
|
+ keyLength = next(outBuf);
|
|
|
+ if (keyLength < 0)
|
|
|
+ return null;
|
|
|
+
|
|
|
+ valBuffer.reset(outBuf.getData(), outBuf.getLength());
|
|
|
+
|
|
|
+ key = deserializeKey(key);
|
|
|
+ valBuffer.mark(0);
|
|
|
+ if (valBuffer.getPosition() != keyLength)
|
|
|
+ throw new IOException(key + " read " + valBuffer.getPosition()
|
|
|
+ + " bytes, should read " + keyLength);
|
|
|
+ } else {
|
|
|
+ //Reset syncSeen
|
|
|
+ syncSeen = false;
|
|
|
+
|
|
|
+ if (noBufferedKeys == 0) {
|
|
|
+ try {
|
|
|
+ readBlock();
|
|
|
+ } catch (EOFException eof) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int keyLength = WritableUtils.readVInt(keyLenIn);
|
|
|
+
|
|
|
+ // Sanity check
|
|
|
+ if (keyLength < 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ //Read another compressed 'key'
|
|
|
+ key = deserializeKey(key);
|
|
|
+ --noBufferedKeys;
|
|
|
+ }
|
|
|
+
|
|
|
+ return key;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private Object deserializeKey(Object key) throws IOException {
|
|
|
+ return keyDeserializer.deserialize(key);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Read 'raw' values.
|
|
|
* @param val - The 'raw' value
|