|
@@ -853,6 +853,7 @@ public class SequenceFile {
|
|
|
|
|
|
private long end;
|
|
|
private int keyLength;
|
|
|
+ private int recordLength;
|
|
|
|
|
|
private boolean decompress;
|
|
|
private boolean blockCompressed;
|
|
@@ -989,9 +990,6 @@ public class SequenceFile {
|
|
|
valLenInFilter = this.codec.createInputStream(valLenBuffer);
|
|
|
valLenIn = new DataInputStream(valLenInFilter);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true);
|
|
|
}
|
|
|
|
|
|
/** Close the file. */
|
|
@@ -1323,6 +1321,82 @@ public class SequenceFile {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Read 'raw' keys.
|
|
|
+ * @param key - The buffer into which the key is read
|
|
|
+ * @return Returns the key length
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public int nextRawKey(DataOutputBuffer key)
|
|
|
+ throws IOException {
|
|
|
+ if (!blockCompressed) {
|
|
|
+ if (in.getPos() >= end)
|
|
|
+ return -1;
|
|
|
+
|
|
|
+ recordLength = checkAndReadSync(in.readInt());
|
|
|
+ keyLength = in.readInt();
|
|
|
+ key.write(in, keyLength);
|
|
|
+ return keyLength;
|
|
|
+ } else {
|
|
|
+ //Reset syncSeen
|
|
|
+ syncSeen = false;
|
|
|
+
|
|
|
+ // Read 'key'
|
|
|
+ if (noBufferedKeys == 0) {
|
|
|
+ if (in.getPos() >= end)
|
|
|
+ return -1;
|
|
|
+
|
|
|
+ try {
|
|
|
+ readBlock();
|
|
|
+ } catch (EOFException eof) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int keyLength = WritableUtils.readVInt(keyLenIn);
|
|
|
+ if (keyLength < 0) {
|
|
|
+ throw new IOException("zero length key found!");
|
|
|
+ }
|
|
|
+ key.write(keyIn, keyLength);
|
|
|
+ --noBufferedKeys;
|
|
|
+
|
|
|
+ return keyLength;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read 'raw' values.
|
|
|
+ * @param val - The 'raw' value
|
|
|
+ * @return Returns the value length
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public int nextRawValue(ValueBytes val)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ // Position stream to current value
|
|
|
+ seekToCurrentValue();
|
|
|
+
|
|
|
+ if (!blockCompressed) {
|
|
|
+ int valLength = recordLength - keyLength;
|
|
|
+ if (decompress) {
|
|
|
+ CompressedBytes value = (CompressedBytes)val;
|
|
|
+ value.reset(in, valLength);
|
|
|
+ } else {
|
|
|
+ UncompressedBytes value = (UncompressedBytes)val;
|
|
|
+ value.reset(in, valLength);
|
|
|
+ }
|
|
|
+
|
|
|
+ return valLength;
|
|
|
+ } else {
|
|
|
+ int valLength = WritableUtils.readVInt(valLenIn);
|
|
|
+ UncompressedBytes rawValue = (UncompressedBytes)val;
|
|
|
+ rawValue.reset(valIn, valLength);
|
|
|
+ --noBufferedValues;
|
|
|
+ return valLength;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
private void handleChecksumException(ChecksumException e)
|
|
|
throws IOException {
|
|
|
if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
|
|
@@ -1459,16 +1533,8 @@ public class SequenceFile {
|
|
|
this.outFile = outFile;
|
|
|
|
|
|
int segments = sortPass(deleteInput);
|
|
|
- int pass = 1;
|
|
|
- while (segments > 1) {
|
|
|
- segments = mergePass(pass, segments <= factor);
|
|
|
- pass++;
|
|
|
- }
|
|
|
-
|
|
|
- // Clean up intermediate files
|
|
|
- for (int i=0; i < pass; ++i) {
|
|
|
- fs.delete(new Path(outFile.toString() + "." + i));
|
|
|
- fs.delete(new Path(outFile.toString() + "." + i + ".index"));
|
|
|
+ if (segments > 1) {
|
|
|
+ segments = mergePass();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1712,228 +1778,493 @@ public class SequenceFile {
|
|
|
}
|
|
|
} // SequenceFile.Sorter.SortPass
|
|
|
|
|
|
- private int mergePass(int pass, boolean last) throws IOException {
|
|
|
- LOG.debug("running merge pass=" + pass);
|
|
|
- MergePass mergePass = new MergePass(pass, last);
|
|
|
- try { // make a merge pass
|
|
|
- return mergePass.run(); // run it
|
|
|
- } finally {
|
|
|
- mergePass.close(); // close it
|
|
|
- }
|
|
|
+ /** The interface to iterate over raw keys/values of SequenceFiles. */
|
|
|
+ public static interface RawKeyValueIterator {
|
|
|
+ /** Gets the current raw key
|
|
|
+ * @return DataOutputBuffer
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ DataOutputBuffer getKey() throws IOException;
|
|
|
+ /** Gets the current raw value
|
|
|
+ * @return ValueBytes
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ ValueBytes getValue() throws IOException;
|
|
|
+ /** Sets up the current key and value (for getKey and getValue)
|
|
|
+ * @return true if there exists a key/value, false otherwise
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ boolean next() throws IOException;
|
|
|
+ /** closes the iterator so that the underlying streams can be closed
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void close() throws IOException;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Merges the list of segments of type <code>SegmentDescriptor</code>
|
|
|
+ * @param segments the list of SegmentDescriptors
|
|
|
+ * @return RawKeyValueIterator
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public RawKeyValueIterator merge(List <SegmentDescriptor> segments)
|
|
|
+ throws IOException {
|
|
|
+ MergeQueue mQueue = new MergeQueue(segments);
|
|
|
+ return mQueue.merge();
|
|
|
}
|
|
|
|
|
|
- private class MergePass {
|
|
|
- private boolean last;
|
|
|
-
|
|
|
- private MergeQueue queue;
|
|
|
- private FSDataInputStream in = null;
|
|
|
- private Path inName;
|
|
|
- private FSDataInputStream indexIn = null;
|
|
|
-
|
|
|
- public MergePass(int pass, boolean last) throws IOException {
|
|
|
- this.last = last;
|
|
|
-
|
|
|
- this.queue =
|
|
|
- new MergeQueue(factor, last?outFile:outFile.suffix("."+pass), last);
|
|
|
-
|
|
|
- this.inName = outFile.suffix("."+(pass-1));
|
|
|
- this.in = fs.open(inName);
|
|
|
- this.indexIn = fs.open(inName.suffix(".index"));
|
|
|
+ /**
|
|
|
+ * Merges the contents of files passed in Path[]
|
|
|
+ * @param inNames the array of path names
|
|
|
+ * @param deleteInputs true if the input files should be deleted when
|
|
|
+ * unnecessary
|
|
|
+ * @return RawKeyValueIterator
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs)
|
|
|
+ throws IOException {
|
|
|
+ //get the segments from inNames
|
|
|
+ ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
|
|
|
+ for (int i = 0; i < inNames.length; i++) {
|
|
|
+ SegmentDescriptor s = new SegmentDescriptor(0,
|
|
|
+ fs.getLength(inNames[i]), inNames[i]);
|
|
|
+ s.preserveInput(!deleteInputs);
|
|
|
+ s.doSync();
|
|
|
+ a.add(s);
|
|
|
}
|
|
|
+ factor = inNames.length;
|
|
|
+ MergeQueue mQueue = new MergeQueue(a);
|
|
|
+ return mQueue.merge();
|
|
|
+ }
|
|
|
|
|
|
- public void close() throws IOException {
|
|
|
- in.close(); // close and delete input
|
|
|
- fs.delete(inName);
|
|
|
+ /**
|
|
|
+ * Clones the attributes (like compression of the input file and creates a
|
|
|
+ * corresponding Writer
|
|
|
+ * @param FileSystem
|
|
|
+ * @param inputFile the path of the input file whose attributes should be
|
|
|
+ * cloned
|
|
|
+ * @param outputFile the path of the output file
|
|
|
+ * @param prog the Progressable to report status during the file write
|
|
|
+ * @return Writer
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile,
|
|
|
+ Path outputFile, Progressable prog) throws IOException {
|
|
|
+ Reader reader = new Reader(fileSys, inputFile, memory/(factor+1), conf);
|
|
|
+ boolean compress = reader.isCompressed();
|
|
|
+ boolean blockCompress = reader.isBlockCompressed();
|
|
|
+ CompressionCodec codec = reader.getCompressionCodec();
|
|
|
+ reader.close();
|
|
|
+ FSDataOutputStream out;
|
|
|
+ if (prog != null)
|
|
|
+ out = fs.create(outputFile, true, memory/(factor+1), prog);
|
|
|
+ else
|
|
|
+ out = fs.create(outputFile, true, memory/(factor+1));
|
|
|
+ Writer writer = createWriter(out, keyClass, valClass, compress,
|
|
|
+ blockCompress, codec);
|
|
|
+ return writer;
|
|
|
+ }
|
|
|
|
|
|
- queue.close(); // close queue
|
|
|
+ /**
|
|
|
+ * Writes records from RawKeyValueIterator into a file represented by the
|
|
|
+ * passed writer
|
|
|
+ * @param records the RawKeyValueIterator
|
|
|
+ * @param writer the Writer created earlier
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void writeFile(RawKeyValueIterator records, Writer writer)
|
|
|
+ throws IOException {
|
|
|
+ while(records.next()) {
|
|
|
+ writer.appendRaw(records.getKey().getData(), 0,
|
|
|
+ records.getKey().getLength(), records.getValue());
|
|
|
}
|
|
|
-
|
|
|
- public int run() throws IOException {
|
|
|
- int segments = 0;
|
|
|
- long end = fs.getLength(inName);
|
|
|
-
|
|
|
- while (in.getPos() < end) {
|
|
|
- LOG.debug("merging segment " + segments);
|
|
|
- long segmentStart = queue.out.getPos();
|
|
|
- while (in.getPos() < end && queue.size() < factor) {
|
|
|
- long segmentOffset = WritableUtils.readVLong(indexIn);
|
|
|
- long segmentLength = WritableUtils.readVLong(indexIn);
|
|
|
- Reader reader = new Reader(fs, inName, memory/(factor+1),
|
|
|
- segmentOffset, segmentLength, conf);
|
|
|
- reader.sync = null; // disable sync on temp files
|
|
|
-
|
|
|
- MergeStream ms = new MergeStream(reader); // add segment to queue
|
|
|
- if (ms.next()) {
|
|
|
- queue.put(ms);
|
|
|
- }
|
|
|
- in.seek(reader.end);
|
|
|
- }
|
|
|
-
|
|
|
- queue.merge(); // do a merge
|
|
|
-
|
|
|
- if (!last) {
|
|
|
- WritableUtils.writeVLong(queue.indexOut, segmentStart);
|
|
|
- WritableUtils.writeVLong(queue.indexOut,
|
|
|
- (queue.out.getPos() - segmentStart));
|
|
|
- }
|
|
|
-
|
|
|
- segments++;
|
|
|
- }
|
|
|
-
|
|
|
- return segments;
|
|
|
+ if (writer instanceof SequenceFile.BlockCompressWriter) {
|
|
|
+ SequenceFile.BlockCompressWriter bcWriter =
|
|
|
+ (SequenceFile.BlockCompressWriter) writer;
|
|
|
+ bcWriter.writeBlock();
|
|
|
}
|
|
|
- } // SequenceFile.Sorter.MergePass
|
|
|
-
|
|
|
- /** Merge the provided files.*/
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Merge the provided files.
|
|
|
+ * @param inFiles the array of input path names
|
|
|
+ * @param outFile the final output file
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
public void merge(Path[] inFiles, Path outFile) throws IOException {
|
|
|
- this.inFiles = inFiles;
|
|
|
- this.outFile = outFile;
|
|
|
- this.factor = inFiles.length;
|
|
|
-
|
|
|
if (fs.exists(outFile)) {
|
|
|
throw new IOException("already exists: " + outFile);
|
|
|
}
|
|
|
+ RawKeyValueIterator r = merge(inFiles, false);
|
|
|
+ Writer writer = cloneFileAttributes(fs,
|
|
|
+ inFiles[0], outFile, null);
|
|
|
+
|
|
|
+ writeFile(r, writer);
|
|
|
|
|
|
- MergeFiles mergeFiles = new MergeFiles();
|
|
|
- try { // make a merge pass
|
|
|
- mergeFiles.run(); // run it
|
|
|
- } finally {
|
|
|
- mergeFiles.close(); // close it
|
|
|
- }
|
|
|
+ writer.close();
|
|
|
}
|
|
|
|
|
|
- private class MergeFiles {
|
|
|
- private MergeQueue queue;
|
|
|
-
|
|
|
- public MergeFiles() throws IOException {
|
|
|
- this.queue = new MergeQueue(factor, outFile, true);
|
|
|
- }
|
|
|
-
|
|
|
- public void close() throws IOException {
|
|
|
- queue.close();
|
|
|
- }
|
|
|
-
|
|
|
- public void run() throws IOException {
|
|
|
- LOG.debug("merging files=" + inFiles.length);
|
|
|
- for (int i = 0; i < inFiles.length; i++) {
|
|
|
- Path inFile = inFiles[i];
|
|
|
- MergeStream ms =
|
|
|
- new MergeStream(new Reader(fs, inFile, memory/(factor+1), conf));
|
|
|
- if (ms.next())
|
|
|
- queue.put(ms);
|
|
|
- }
|
|
|
-
|
|
|
- queue.merge();
|
|
|
- }
|
|
|
- } // SequenceFile.Sorter.MergeFiles
|
|
|
-
|
|
|
- private class MergeStream {
|
|
|
- private Reader in;
|
|
|
+ /** sort calls this to generate the final merged output */
|
|
|
+ private int mergePass() throws IOException {
|
|
|
+ LOG.debug("running merge pass");
|
|
|
+ Writer writer = cloneFileAttributes(fs,
|
|
|
+ outFile.suffix(".0"), outFile, null);
|
|
|
+ RawKeyValueIterator r = merge(outFile.suffix(".0"),
|
|
|
+ outFile.suffix(".0.index"));
|
|
|
+ writeFile(r, writer);
|
|
|
|
|
|
- private DataOutputBuffer rawKey = null;
|
|
|
- private ValueBytes rawValue = null;
|
|
|
-
|
|
|
- public MergeStream(Reader reader) throws IOException {
|
|
|
- if (reader.keyClass != keyClass)
|
|
|
- throw new IOException("wrong key class: " + reader.getKeyClass() +
|
|
|
- " is not " + keyClass);
|
|
|
- if (reader.valClass != valClass)
|
|
|
- throw new IOException("wrong value class: "+reader.getValueClass()+
|
|
|
- " is not " + valClass);
|
|
|
- this.in = reader;
|
|
|
- rawKey = new DataOutputBuffer();
|
|
|
- rawValue = in.createValueBytes();
|
|
|
- }
|
|
|
-
|
|
|
- public boolean next() throws IOException {
|
|
|
- rawKey.reset();
|
|
|
- int recordLength =
|
|
|
- in.nextRaw(rawKey, rawValue);
|
|
|
- return (recordLength >= 0);
|
|
|
- }
|
|
|
- } // SequenceFile.Sorter.MergeStream
|
|
|
+ writer.close();
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
|
|
|
- private class MergeQueue extends PriorityQueue {
|
|
|
- private Path outName;
|
|
|
- private FSDataOutputStream out;
|
|
|
- private FSDataOutputStream indexOut;
|
|
|
- private boolean done;
|
|
|
+ /** Used by mergePass to merge the output of the sort
|
|
|
+ * @param inName the name of the input file containing sorted segments
|
|
|
+ * @param indexIn the offsets of the sorted segments
|
|
|
+ * @return RawKeyValueIterator
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private RawKeyValueIterator merge(Path inName, Path indexIn)
|
|
|
+ throws IOException {
|
|
|
+ //get the segments from indexIn
|
|
|
+ //we create a SegmentContainer so that we can track segments belonging to
|
|
|
+ //inName and delete inName as soon as we see that we have looked at all
|
|
|
+ //the contained segments during the merge process & hence don't need
|
|
|
+ //them anymore
|
|
|
+ SegmentContainer container = new SegmentContainer(inName, indexIn);
|
|
|
+ MergeQueue mQueue = new MergeQueue(container.getSegmentList());
|
|
|
+ return mQueue.merge();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This class implements the core of the merge logic */
|
|
|
+ private class MergeQueue extends PriorityQueue
|
|
|
+ implements RawKeyValueIterator {
|
|
|
private boolean compress;
|
|
|
private boolean blockCompress;
|
|
|
- private CompressionCodec codec = null;
|
|
|
-
|
|
|
- public void put(MergeStream stream) throws IOException {
|
|
|
+ private DataOutputBuffer rawKey = new DataOutputBuffer();
|
|
|
+ private ValueBytes rawValue;
|
|
|
+
|
|
|
+ //a TreeMap used to store the segments sorted by size (segment offset and
|
|
|
+ //segment path name is used to break ties between segments of same sizes)
|
|
|
+ private Map <SegmentDescriptor, Void> sortedSegmentSizes = new TreeMap();
|
|
|
+
|
|
|
+ public void put(SegmentDescriptor stream) throws IOException {
|
|
|
if (size() == 0) {
|
|
|
compress = stream.in.isCompressed();
|
|
|
blockCompress = stream.in.isBlockCompressed();
|
|
|
- codec = stream.in.getCompressionCodec();
|
|
|
} else if (compress != stream.in.isCompressed() ||
|
|
|
blockCompress != stream.in.isBlockCompressed()) {
|
|
|
throw new IOException("All merged files must be compressed or not.");
|
|
|
}
|
|
|
super.put(stream);
|
|
|
}
|
|
|
-
|
|
|
- public MergeQueue(int size, Path outName, boolean done)
|
|
|
- throws IOException {
|
|
|
- initialize(size);
|
|
|
- this.outName = outName;
|
|
|
- this.out = fs.create(this.outName, true, memory/(factor+1));
|
|
|
- if (!done) {
|
|
|
- this.indexOut = fs.create(outName.suffix(".index"), true,
|
|
|
- memory/(factor+1));
|
|
|
+
|
|
|
+ public MergeQueue(List <SegmentDescriptor> segments) {
|
|
|
+ int size = segments.size();
|
|
|
+ for (int i = 0; i < size; i++) {
|
|
|
+ sortedSegmentSizes.put(segments.get(i), null);
|
|
|
}
|
|
|
- this.done = done;
|
|
|
}
|
|
|
-
|
|
|
protected boolean lessThan(Object a, Object b) {
|
|
|
- MergeStream msa = (MergeStream)a;
|
|
|
- MergeStream msb = (MergeStream)b;
|
|
|
- return comparator.compare(msa.rawKey.getData(), 0, msa.rawKey.getLength(),
|
|
|
- msb.rawKey.getData(), 0, msb.rawKey.getLength()) < 0;
|
|
|
+ SegmentDescriptor msa = (SegmentDescriptor)a;
|
|
|
+ SegmentDescriptor msb = (SegmentDescriptor)b;
|
|
|
+ return comparator.compare(msa.getKey().getData(), 0,
|
|
|
+ msa.getKey().getLength(), msb.getKey().getData(), 0,
|
|
|
+ msb.getKey().getLength()) < 0;
|
|
|
}
|
|
|
-
|
|
|
- public void merge() throws IOException {
|
|
|
- Writer writer = createWriter(out, keyClass, valClass,
|
|
|
- compress, blockCompress, codec);
|
|
|
- if (!done) {
|
|
|
- writer.sync = null; // disable sync on temp files
|
|
|
+ public void close() throws IOException {
|
|
|
+ SegmentDescriptor ms; // close inputs
|
|
|
+ while ((ms = (SegmentDescriptor)pop()) != null) {
|
|
|
+ ms.cleanup();
|
|
|
}
|
|
|
-
|
|
|
- while (size() != 0) {
|
|
|
- MergeStream ms = (MergeStream)top();
|
|
|
- writer.appendRaw(ms.rawKey.getData(), 0, ms.rawKey.getLength(),
|
|
|
- ms.rawValue); // write top entry
|
|
|
+ }
|
|
|
+ public DataOutputBuffer getKey() throws IOException {
|
|
|
+ return rawKey;
|
|
|
+ }
|
|
|
+ public ValueBytes getValue() throws IOException {
|
|
|
+ return rawValue;
|
|
|
+ }
|
|
|
+ public boolean next() throws IOException {
|
|
|
+ if (size() == 0)
|
|
|
+ return false;
|
|
|
+ SegmentDescriptor ms = (SegmentDescriptor)top();
|
|
|
+ //save the raw key
|
|
|
+ rawKey.reset();
|
|
|
+ rawKey.write(ms.getKey().getData(), 0, ms.getKey().getLength());
|
|
|
+ //load the raw value. Re-use the existing rawValue buffer
|
|
|
+ if(rawValue == null)
|
|
|
+ rawValue = ms.in.createValueBytes();
|
|
|
+ ms.nextRawValue(rawValue);
|
|
|
+
|
|
|
+ if (ms.nextRawKey()) {
|
|
|
+ adjustTop();
|
|
|
+ } else {
|
|
|
+ pop();
|
|
|
+ ms.cleanup();
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This is the single level merge that is called multiple times
|
|
|
+ * depending on the factor size and the number of segments
|
|
|
+ * @return RawKeyValueIterator
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public RawKeyValueIterator merge() throws IOException {
|
|
|
+ //create the MergeStreams from the sorted map created in the constructor
|
|
|
+ //and dump the final output to a file
|
|
|
+ int numSegments = sortedSegmentSizes.size();
|
|
|
+ int origFactor = factor;
|
|
|
+ int passNo = 1;
|
|
|
+ do {
|
|
|
+ //get the factor for this pass of merge
|
|
|
+ factor = getPassFactor(passNo, numSegments);
|
|
|
+ //extract the smallest 'factor' number of segment pointers from the
|
|
|
+ //TreeMap
|
|
|
+ SegmentDescriptor[] mStream = getSegmentDescriptors(factor);
|
|
|
|
|
|
- if (ms.next()) { // has another entry
|
|
|
- adjustTop();
|
|
|
+ //feed the streams to the priority queue
|
|
|
+ initialize(mStream.length); clear();
|
|
|
+ for (int i = 0; i < mStream.length; i++) {
|
|
|
+ if (mStream[i].nextRawKey()) put(mStream[i]);
|
|
|
+ }
|
|
|
+ //if we have lesser number of segments remaining, then just return the
|
|
|
+ //iterator, else do another single level merge
|
|
|
+ if (numSegments <= factor) {
|
|
|
+ return this;
|
|
|
} else {
|
|
|
- pop(); // done with this file
|
|
|
- ms.in.close();
|
|
|
+ //we want to spread the creation of temp files on multiple disks if
|
|
|
+ //available
|
|
|
+ Path outputFile = conf.getLocalPath("mapred.local.dir",
|
|
|
+ (outFile.suffix("." + passNo)).toString());
|
|
|
+ Writer writer = cloneFileAttributes(fs,
|
|
|
+ mStream[0].segmentPathName, outputFile, null);
|
|
|
+ writer.sync = null; //disable sync for temp files
|
|
|
+ writeFile(this, writer);
|
|
|
+ writer.close();
|
|
|
+
|
|
|
+ //we finished one single level merge; now clean up the priority
|
|
|
+ //queue
|
|
|
+ this.close();
|
|
|
+
|
|
|
+ SegmentDescriptor tempSegment =
|
|
|
+ new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
|
|
|
+ //put the segment back in the TreeMap
|
|
|
+ sortedSegmentSizes.put(tempSegment, null);
|
|
|
+ numSegments = sortedSegmentSizes.size();
|
|
|
+ passNo++;
|
|
|
}
|
|
|
+ //we are worried about only the first pass merge factor. So reset the
|
|
|
+ //factor to what it originally was
|
|
|
+ factor = origFactor;
|
|
|
+ } while(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ //Hadoop-591
|
|
|
+ public int getPassFactor(int passNo, int numSegments) {
|
|
|
+ if (passNo > 1 || numSegments <= factor || factor == 1)
|
|
|
+ return factor;
|
|
|
+ int mod = (numSegments - 1) % (factor - 1);
|
|
|
+ if (mod == 0)
|
|
|
+ return factor;
|
|
|
+ return mod + 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Return (& remove) the requested number of segment descriptors from the
|
|
|
+ * sorted map.
|
|
|
+ */
|
|
|
+ public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
|
|
|
+ if (numDescriptors > sortedSegmentSizes.size())
|
|
|
+ numDescriptors = sortedSegmentSizes.size();
|
|
|
+ SegmentDescriptor[] SegmentDescriptors =
|
|
|
+ new SegmentDescriptor[numDescriptors];
|
|
|
+ Iterator iter = sortedSegmentSizes.keySet().iterator();
|
|
|
+ int i = 0;
|
|
|
+ while (i < numDescriptors) {
|
|
|
+ SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
|
|
|
+ iter.remove();
|
|
|
}
|
|
|
+ return SegmentDescriptors;
|
|
|
+ }
|
|
|
+ } // SequenceFile.Sorter.MergeQueue
|
|
|
|
|
|
- if (writer instanceof SequenceFile.BlockCompressWriter) {
|
|
|
- SequenceFile.BlockCompressWriter bcWriter =
|
|
|
- (SequenceFile.BlockCompressWriter) writer;
|
|
|
- bcWriter.writeBlock();
|
|
|
- }
|
|
|
- out.flush();
|
|
|
+ /** This class defines a merge segment. This class can be subclassed to
|
|
|
+ * provide a customized cleanup method implementation. In this
|
|
|
+ * implementation, cleanup closes the file handle and deletes the file
|
|
|
+ */
|
|
|
+ public class SegmentDescriptor implements Comparable {
|
|
|
+
|
|
|
+ long segmentOffset; //the start of the segment in the file
|
|
|
+ long segmentLength; //the length of the segment
|
|
|
+ Path segmentPathName; //the path name of the file containing the segment
|
|
|
+ boolean ignoreSync = true; //set to true for temp files
|
|
|
+ private Reader in = null;
|
|
|
+ private DataOutputBuffer rawKey = null; //this will hold the current key
|
|
|
+ private boolean preserveInput = false; //delete input segment files?
|
|
|
+
|
|
|
+ /** Constructs a segment
|
|
|
+ * @param segmentOffset the offset of the segment in the file
|
|
|
+ * @param segmentLength the length of the segment
|
|
|
+ * @param segmentPathName the path name of the file containing the segment
|
|
|
+ */
|
|
|
+ public SegmentDescriptor (long segmentOffset, long segmentLength,
|
|
|
+ Path segmentPathName) {
|
|
|
+ this.segmentOffset = segmentOffset;
|
|
|
+ this.segmentLength = segmentLength;
|
|
|
+ this.segmentPathName = segmentPathName;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Do the sync checks */
|
|
|
+ public void doSync() {ignoreSync = false;}
|
|
|
+
|
|
|
+ /** Whether to delete the files when no longer needed */
|
|
|
+ public void preserveInput(boolean preserve) {
|
|
|
+ preserveInput = preserve;
|
|
|
}
|
|
|
|
|
|
- public void close() throws IOException {
|
|
|
- MergeStream ms; // close inputs
|
|
|
- while ((ms = (MergeStream)pop()) != null) {
|
|
|
- ms.in.close();
|
|
|
+ public boolean shouldPreserveInput() {
|
|
|
+ return preserveInput;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int compareTo(Object o) {
|
|
|
+ SegmentDescriptor that = (SegmentDescriptor)o;
|
|
|
+ if (this.segmentLength != that.segmentLength) {
|
|
|
+ return (this.segmentLength < that.segmentLength ? -1 : 1);
|
|
|
}
|
|
|
- out.close(); // close output
|
|
|
- if (indexOut != null) {
|
|
|
- indexOut.close();
|
|
|
+ if (this.segmentOffset != that.segmentOffset) {
|
|
|
+ return (this.segmentOffset < that.segmentOffset ? -1 : 1);
|
|
|
}
|
|
|
+ return (this.segmentPathName.toString()).
|
|
|
+ compareTo(that.segmentPathName.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Fills up the rawKey object with the key returned by the Reader
|
|
|
+ * @return true if there is a key returned; false, otherwise
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public boolean nextRawKey() throws IOException {
|
|
|
+ if (in == null) {
|
|
|
+ Reader reader = new Reader(fs, segmentPathName,
|
|
|
+ memory/(factor+1), segmentOffset,
|
|
|
+ segmentLength, conf);
|
|
|
+
|
|
|
+ //sometimes we ignore syncs especially for temp merge files
|
|
|
+ if (ignoreSync) reader.sync = null;
|
|
|
+
|
|
|
+ if (reader.keyClass != keyClass)
|
|
|
+ throw new IOException("wrong key class: " + reader.getKeyClass() +
|
|
|
+ " is not " + keyClass);
|
|
|
+ if (reader.valClass != valClass)
|
|
|
+ throw new IOException("wrong value class: "+reader.getValueClass()+
|
|
|
+ " is not " + valClass);
|
|
|
+ this.in = reader;
|
|
|
+ rawKey = new DataOutputBuffer();
|
|
|
+ }
|
|
|
+ rawKey.reset();
|
|
|
+ int keyLength =
|
|
|
+ in.nextRawKey(rawKey);
|
|
|
+ return (keyLength >= 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Fills up the passed rawValue with the value corresponding to the key
|
|
|
+ * read earlier
|
|
|
+ * @param rawValue
|
|
|
+ * @return the length of the value
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public int nextRawValue(ValueBytes rawValue) throws IOException {
|
|
|
+ int valLength = in.nextRawValue(rawValue);
|
|
|
+ return valLength;
|
|
|
}
|
|
|
|
|
|
- } // SequenceFile.Sorter.MergeQueue
|
|
|
+ /** Returns the stored rawKey */
|
|
|
+ public DataOutputBuffer getKey() {
|
|
|
+ return rawKey;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** closes the underlying reader */
|
|
|
+ private void close() throws IOException {
|
|
|
+ this.in.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** The default cleanup. Subclasses can override this with a custom
|
|
|
+ * cleanup
|
|
|
+ */
|
|
|
+ public void cleanup() throws IOException {
|
|
|
+ close();
|
|
|
+ if (!preserveInput) {
|
|
|
+ fs.delete(segmentPathName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } // SequenceFile.Sorter.SegmentDescriptor
|
|
|
|
|
|
+ /** This class provisions multiple segments contained within a single
|
|
|
+ * file
|
|
|
+ */
|
|
|
+ private class LinkedSegmentsDescriptor extends SegmentDescriptor {
|
|
|
+
|
|
|
+ SegmentContainer parentContainer = null;
|
|
|
+
|
|
|
+ /** Constructs a segment
|
|
|
+ * @param segmentOffset the offset of the segment in the file
|
|
|
+ * @param segmentLength the length of the segment
|
|
|
+ * @param segmentPathName the path name of the file containing the segment
|
|
|
+ * @param parent the parent SegmentContainer that holds the segment
|
|
|
+ */
|
|
|
+ public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength,
|
|
|
+ Path segmentPathName, SegmentContainer parent) {
|
|
|
+ super(segmentOffset, segmentLength, segmentPathName);
|
|
|
+ this.parentContainer = parent;
|
|
|
+ }
|
|
|
+ /** The default cleanup. Subclasses can override this with a custom
|
|
|
+ * cleanup
|
|
|
+ */
|
|
|
+ public void cleanup() throws IOException {
|
|
|
+ super.close();
|
|
|
+ if (super.shouldPreserveInput()) return;
|
|
|
+ parentContainer.cleanup();
|
|
|
+ }
|
|
|
+ } //SequenceFile.Sorter.LinkedSegmentsDescriptor
|
|
|
+
|
|
|
+ /** The class that defines a container for segments to be merged. Primarily
|
|
|
+ * required to delete temp files as soon as all the contained segments
|
|
|
+ * have been looked at */
|
|
|
+ private class SegmentContainer {
|
|
|
+ private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
|
|
|
+ private int numSegmentsContained; //# of segments contained
|
|
|
+ private Path inName; //input file from where segments are created
|
|
|
+
|
|
|
+ //the list of segments read from the file
|
|
|
+ private ArrayList <SegmentDescriptor> segments =
|
|
|
+ new ArrayList <SegmentDescriptor>();
|
|
|
+ /** This constructor is there primarily to serve the sort routine that
|
|
|
+ * generates a single output file with an associated index file */
|
|
|
+ public SegmentContainer(Path inName, Path indexIn) throws IOException {
|
|
|
+ //get the segments from indexIn
|
|
|
+ FSDataInputStream fsIndexIn = fs.open(indexIn);
|
|
|
+ long end = fs.getLength(indexIn);
|
|
|
+ while (fsIndexIn.getPos() < end) {
|
|
|
+ long segmentOffset = WritableUtils.readVLong(fsIndexIn);
|
|
|
+ long segmentLength = WritableUtils.readVLong(fsIndexIn);
|
|
|
+ Path segmentName = inName;
|
|
|
+ segments.add(new LinkedSegmentsDescriptor(segmentOffset,
|
|
|
+ segmentLength, segmentName, this));
|
|
|
+ }
|
|
|
+ fsIndexIn.close();
|
|
|
+ fs.delete(indexIn);
|
|
|
+ numSegmentsContained = segments.size();
|
|
|
+ this.inName = inName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List <SegmentDescriptor> getSegmentList() {
|
|
|
+ return segments;
|
|
|
+ }
|
|
|
+ public void cleanup() throws IOException {
|
|
|
+ numSegmentsCleanedUp++;
|
|
|
+ if (numSegmentsCleanedUp == numSegmentsContained) {
|
|
|
+ fs.delete(inName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } //SequenceFile.Sorter.SegmentContainer
|
|
|
+
|
|
|
} // SequenceFile.Sorter
|
|
|
|
|
|
} // SequenceFile
|