|
@@ -363,6 +363,7 @@ class Merger {
|
|
|
new ArrayList<Segment<K, V>>();
|
|
|
int segmentsConsidered = 0;
|
|
|
int numSegmentsToConsider = factor;
|
|
|
+ long startBytes = 0; // starting bytes of segments of this merge
|
|
|
while (true) {
|
|
|
//extract the smallest 'factor' number of segments
|
|
|
//Call cleanup on the empty segments (no key/value data)
|
|
@@ -375,8 +376,8 @@ class Merger {
|
|
|
long startPos = segment.getPosition();
|
|
|
boolean hasNext = segment.next();
|
|
|
long endPos = segment.getPosition();
|
|
|
- totalBytesProcessed += endPos - startPos;
|
|
|
- mergeProgress.set(totalBytesProcessed * progPerByte);
|
|
|
+ startBytes += endPos - startPos;
|
|
|
+
|
|
|
if (hasNext) {
|
|
|
segmentsToMerge.add(segment);
|
|
|
segmentsConsidered++;
|
|
@@ -406,6 +407,12 @@ class Merger {
|
|
|
//if we have lesser number of segments remaining, then just return the
|
|
|
//iterator, else do another single level merge
|
|
|
if (numSegments <= factor) {
|
|
|
+ // Reset totalBytesProcessed to track the progress of the final merge.
|
|
|
+ // This is considered the progress of the reducePhase, the 3rd phase
|
|
|
+ // of reduce task. Currently totalBytesProcessed is not used in sort
|
|
|
+ // phase of reduce task(i.e. when intermediate merges happen).
|
|
|
+ totalBytesProcessed = startBytes;
|
|
|
+
|
|
|
//calculate the length of the remaining segments. Required for
|
|
|
//calculating the merge progress
|
|
|
long totalBytes = 0;
|