|
@@ -2690,6 +2690,7 @@ public class SequenceFile {
|
|
|
private Progress mergeProgress = new Progress();
|
|
|
private Path tmpDir;
|
|
|
private Progressable progress = null; //handle to the progress reporting object
|
|
|
+ private SegmentDescriptor minSegment;
|
|
|
|
|
|
//a TreeMap used to store the segments sorted by size (segment offset and
|
|
|
//segment path name is used to break ties between segments of same sizes)
|
|
@@ -2738,6 +2739,7 @@ public class SequenceFile {
|
|
|
while ((ms = (SegmentDescriptor)pop()) != null) {
|
|
|
ms.cleanup();
|
|
|
}
|
|
|
+ minSegment = null;
|
|
|
}
|
|
|
public DataOutputBuffer getKey() throws IOException {
|
|
|
return rawKey;
|
|
@@ -2748,21 +2750,25 @@ public class SequenceFile {
|
|
|
public boolean next() throws IOException {
|
|
|
if (size() == 0)
|
|
|
return false;
|
|
|
- SegmentDescriptor ms = (SegmentDescriptor)top();
|
|
|
- //save the raw key
|
|
|
- rawKey.reset();
|
|
|
- rawKey.write(ms.getKey().getData(), 0, ms.getKey().getLength());
|
|
|
+ int valLength;
|
|
|
+ if (minSegment != null) {
|
|
|
+ //minSegment is non-null for all invocations of next except the first
|
|
|
+ //one. For the first invocation, the priority queue is ready for use
|
|
|
+ //but for the subsequent invocations, first adjust the queue
|
|
|
+ adjustPriorityQueue(minSegment);
|
|
|
+ if (size() == 0) {
|
|
|
+ minSegment = null;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ minSegment = (SegmentDescriptor)top();
|
|
|
+ //save the raw key reference
|
|
|
+ rawKey = minSegment.getKey();
|
|
|
//load the raw value. Re-use the existing rawValue buffer
|
|
|
- if (rawValue == null)
|
|
|
- rawValue = ms.in.createValueBytes();
|
|
|
- int valLength = ms.nextRawValue(rawValue);
|
|
|
-
|
|
|
- if (ms.nextRawKey()) {
|
|
|
- adjustTop();
|
|
|
- } else {
|
|
|
- pop();
|
|
|
- ms.cleanup();
|
|
|
+ if (rawValue == null) {
|
|
|
+ rawValue = minSegment.in.createValueBytes();
|
|
|
}
|
|
|
+ valLength = minSegment.nextRawValue(rawValue);
|
|
|
if (progPerByte > 0) {
|
|
|
totalBytesProcessed += rawKey.getLength() + valLength;
|
|
|
mergeProgress.set(totalBytesProcessed * progPerByte);
|
|
@@ -2774,6 +2780,14 @@ public class SequenceFile {
|
|
|
return mergeProgress;
|
|
|
}
|
|
|
|
|
|
+ private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
|
|
|
+ if (ms.nextRawKey()) {
|
|
|
+ adjustTop();
|
|
|
+ } else {
|
|
|
+ pop();
|
|
|
+ ms.cleanup();
|
|
|
+ }
|
|
|
+ }
|
|
|
/** This is the single level merge that is called multiple times
|
|
|
* depending on the factor size and the number of segments
|
|
|
* @return RawKeyValueIterator
|