|
@@ -55,13 +55,14 @@ class Merger {
|
|
|
int mergeFactor, Path tmpDir,
|
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
|
Counters.Counter readsCounter,
|
|
|
- Counters.Counter writesCounter)
|
|
|
+ Counters.Counter writesCounter,
|
|
|
+ Progress mergePhase)
|
|
|
throws IOException {
|
|
|
return
|
|
|
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
|
|
reporter).merge(keyClass, valueClass,
|
|
|
mergeFactor, tmpDir,
|
|
|
- readsCounter, writesCounter);
|
|
|
+ readsCounter, writesCounter, mergePhase);
|
|
|
}
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
@@ -71,10 +72,11 @@ class Merger {
|
|
|
int mergeFactor, Path tmpDir,
|
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
|
Counters.Counter readsCounter,
|
|
|
- Counters.Counter writesCounter)
|
|
|
+ Counters.Counter writesCounter,
|
|
|
+ Progress mergePhase)
|
|
|
throws IOException {
|
|
|
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
|
|
|
- comparator, reporter, false, readsCounter, writesCounter);
|
|
|
+ comparator, reporter, false, readsCounter, writesCounter, mergePhase);
|
|
|
}
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
@@ -85,12 +87,14 @@ class Merger {
|
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
|
boolean sortSegments,
|
|
|
Counters.Counter readsCounter,
|
|
|
- Counters.Counter writesCounter)
|
|
|
+ Counters.Counter writesCounter,
|
|
|
+ Progress mergePhase)
|
|
|
throws IOException {
|
|
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
|
|
sortSegments).merge(keyClass, valueClass,
|
|
|
mergeFactor, tmpDir,
|
|
|
- readsCounter, writesCounter);
|
|
|
+ readsCounter, writesCounter,
|
|
|
+ mergePhase);
|
|
|
}
|
|
|
|
|
|
static <K extends Object, V extends Object>
|
|
@@ -101,13 +105,15 @@ class Merger {
|
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
|
boolean sortSegments,
|
|
|
Counters.Counter readsCounter,
|
|
|
- Counters.Counter writesCounter)
|
|
|
+ Counters.Counter writesCounter,
|
|
|
+ Progress mergePhase)
|
|
|
throws IOException {
|
|
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
|
|
sortSegments).merge(keyClass, valueClass,
|
|
|
mergeFactor, inMemSegments,
|
|
|
tmpDir,
|
|
|
- readsCounter, writesCounter);
|
|
|
+ readsCounter, writesCounter,
|
|
|
+ mergePhase);
|
|
|
}
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
@@ -235,6 +241,20 @@ class Merger {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Boolean variable for including/considering final merge as part of sort
|
|
|
+ // phase or not. This is true in map task, false in reduce task. It is
|
|
|
+ // used in calculating mergeProgress.
|
|
|
+ static boolean includeFinalMerge = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the boolean variable includeFinalMerge to true. Called from
|
|
|
+ * map task before calling merge() so that final merge of map task
|
|
|
+ * is also considered as part of sort phase.
|
|
|
+ */
|
|
|
+ static void considerFinalMergeForProgress() {
|
|
|
+ includeFinalMerge = true;
|
|
|
+ }
|
|
|
+
|
|
|
private static class MergeQueue<K extends Object, V extends Object>
|
|
|
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
|
|
|
Configuration conf;
|
|
@@ -386,24 +406,41 @@ class Merger {
|
|
|
public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|
|
int factor, Path tmpDir,
|
|
|
Counters.Counter readsCounter,
|
|
|
- Counters.Counter writesCounter)
|
|
|
+ Counters.Counter writesCounter,
|
|
|
+ Progress mergePhase)
|
|
|
throws IOException {
|
|
|
return merge(keyClass, valueClass, factor, 0, tmpDir,
|
|
|
- readsCounter, writesCounter);
|
|
|
+ readsCounter, writesCounter, mergePhase);
|
|
|
}
|
|
|
|
|
|
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|
|
int factor, int inMem, Path tmpDir,
|
|
|
Counters.Counter readsCounter,
|
|
|
- Counters.Counter writesCounter)
|
|
|
+ Counters.Counter writesCounter,
|
|
|
+ Progress mergePhase)
|
|
|
throws IOException {
|
|
|
LOG.info("Merging " + segments.size() + " sorted segments");
|
|
|
-
|
|
|
- //create the MergeStreams from the sorted map created in the constructor
|
|
|
- //and dump the final output to a file
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If there are inMemory segments, then they come first in the segments
|
|
|
+ * list and then the sorted disk segments. Otherwise(if there are only
|
|
|
+ * disk segments), then they are sorted segments if there are more than
|
|
|
+ * factor segments in the segments list.
|
|
|
+ */
|
|
|
int numSegments = segments.size();
|
|
|
int origFactor = factor;
|
|
|
int passNo = 1;
|
|
|
+ if (mergePhase != null) {
|
|
|
+ mergeProgress = mergePhase;
|
|
|
+ }
|
|
|
+
|
|
|
+ long totalBytes = computeBytesInMerges(factor, inMem);
|
|
|
+ if (totalBytes != 0) {
|
|
|
+ progPerByte = 1.0f / (float)totalBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ //create the MergeStreams from the sorted map created in the constructor
|
|
|
+ //and dump the final output to a file
|
|
|
do {
|
|
|
//get the factor for this pass of merge. We assume in-memory segments
|
|
|
//are the first entries in the segment list and that the pass factor
|
|
@@ -460,34 +497,39 @@ class Merger {
|
|
|
//if we have lesser number of segments remaining, then just return the
|
|
|
//iterator, else do another single level merge
|
|
|
if (numSegments <= factor) {
|
|
|
- // Reset totalBytesProcessed to track the progress of the final merge.
|
|
|
- // This is considered the progress of the reducePhase, the 3rd phase
|
|
|
- // of reduce task. Currently totalBytesProcessed is not used in sort
|
|
|
- // phase of reduce task(i.e. when intermediate merges happen).
|
|
|
- totalBytesProcessed = startBytes;
|
|
|
-
|
|
|
- //calculate the length of the remaining segments. Required for
|
|
|
- //calculating the merge progress
|
|
|
- long totalBytes = 0;
|
|
|
- for (int i = 0; i < segmentsToMerge.size(); i++) {
|
|
|
- totalBytes += segmentsToMerge.get(i).getLength();
|
|
|
+ if (!includeFinalMerge) { // for reduce task
|
|
|
+
|
|
|
+ // Reset totalBytesProcessed and recalculate totalBytes from the
|
|
|
+ // remaining segments to track the progress of the final merge.
|
|
|
+ // Final merge is considered as the progress of the reducePhase,
|
|
|
+ // the 3rd phase of reduce task.
|
|
|
+ totalBytesProcessed = 0;
|
|
|
+ totalBytes = 0;
|
|
|
+ for (int i = 0; i < segmentsToMerge.size(); i++) {
|
|
|
+ totalBytes += segmentsToMerge.get(i).getLength();
|
|
|
+ }
|
|
|
}
|
|
|
if (totalBytes != 0) //being paranoid
|
|
|
progPerByte = 1.0f / (float)totalBytes;
|
|
|
|
|
|
+ totalBytesProcessed += startBytes;
|
|
|
if (totalBytes != 0)
|
|
|
mergeProgress.set(totalBytesProcessed * progPerByte);
|
|
|
else
|
|
|
mergeProgress.set(1.0f); // Last pass and no segments left - we're done
|
|
|
|
|
|
LOG.info("Down to the last merge-pass, with " + numSegments +
|
|
|
- " segments left of total size: " + totalBytes + " bytes");
|
|
|
+ " segments left of total size: " +
|
|
|
+ (totalBytes - totalBytesProcessed) + " bytes");
|
|
|
return this;
|
|
|
} else {
|
|
|
LOG.info("Merging " + segmentsToMerge.size() +
|
|
|
" intermediate segments out of a total of " +
|
|
|
(segments.size()+segmentsToMerge.size()));
|
|
|
|
|
|
+ long bytesProcessedInPrevMerges = totalBytesProcessed;
|
|
|
+ totalBytesProcessed += startBytes;
|
|
|
+
|
|
|
//we want to spread the creation of temp files on multiple disks if
|
|
|
//available under the space constraints
|
|
|
long approxOutputSize = 0;
|
|
@@ -516,9 +558,27 @@ class Merger {
|
|
|
// Add the newly create segment to the list of segments to be merged
|
|
|
Segment<K, V> tempSegment =
|
|
|
new Segment<K, V>(conf, fs, outputFile, codec, false);
|
|
|
- segments.add(tempSegment);
|
|
|
+
|
|
|
+ // Insert new merged segment into the sorted list
|
|
|
+ int pos = Collections.binarySearch(segments, tempSegment,
|
|
|
+ segmentComparator);
|
|
|
+ if (pos < 0) {
|
|
|
+ // binary search failed. So position to be inserted at is -pos-1
|
|
|
+ pos = -pos-1;
|
|
|
+ }
|
|
|
+ segments.add(pos, tempSegment);
|
|
|
numSegments = segments.size();
|
|
|
- Collections.sort(segments, segmentComparator);
|
|
|
+
|
|
|
+ // Subtract the difference between expected size of new segment and
|
|
|
+ // actual size of new segment(Expected size of new segment is
|
|
|
+ // inputBytesOfThisMerge) from totalBytes. Expected size and actual
|
|
|
+ // size will match(almost) if combiner is not called in merge.
|
|
|
+ long inputBytesOfThisMerge = totalBytesProcessed -
|
|
|
+ bytesProcessedInPrevMerges;
|
|
|
+ totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
|
|
|
+ if (totalBytes != 0) {
|
|
|
+ progPerByte = 1.0f / (float)totalBytes;
|
|
|
+ }
|
|
|
|
|
|
passNo++;
|
|
|
}
|
|
@@ -560,6 +620,57 @@ class Merger {
|
|
|
}
|
|
|
return subList;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute expected size of input bytes to merges, will be used in
|
|
|
+ * calculating mergeProgress. This simulates the above merge() method and
|
|
|
+ * tries to obtain the number of bytes that are going to be merged in all
|
|
|
+ * merges(assuming that there is no combiner called while merging).
|
|
|
+ * @param factor io.sort.factor
|
|
|
+ * @param inMem number of segments in memory to be merged
|
|
|
+ */
|
|
|
+ long computeBytesInMerges(int factor, int inMem) {
|
|
|
+ int numSegments = segments.size();
|
|
|
+ List<Long> segmentSizes = new ArrayList<Long>(numSegments);
|
|
|
+ long totalBytes = 0;
|
|
|
+ int n = numSegments - inMem;
|
|
|
+ // factor for 1st pass
|
|
|
+ int f = getPassFactor(factor, 1, n) + inMem;
|
|
|
+ n = numSegments;
|
|
|
+
|
|
|
+ for (int i = 0; i < numSegments; i++) {
|
|
|
+ // Not handling empty segments here assuming that it would not affect
|
|
|
+ // much in calculation of mergeProgress.
|
|
|
+ segmentSizes.add(segments.get(i).getLength());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (includeFinalMerge) {
|
|
|
+ // just increment so that the following while loop iterates
|
|
|
+ // for 1 more iteration. This is to include final merge as part of
|
|
|
+ // the computation of expected input bytes of merges
|
|
|
+ n++;
|
|
|
+ }
|
|
|
+ while (n > f) {
|
|
|
+ long mergedSize = 0;
|
|
|
+ f = Math.min(f, segmentSizes.size());
|
|
|
+ for (int j = 0; j < f; j++) {
|
|
|
+ mergedSize += segmentSizes.remove(0);
|
|
|
+ }
|
|
|
+ totalBytes += mergedSize;
|
|
|
+
|
|
|
+ // insert new size into the sorted list
|
|
|
+ int pos = Collections.binarySearch(segmentSizes, mergedSize);
|
|
|
+ if (pos < 0) {
|
|
|
+ pos = -pos-1;
|
|
|
+ }
|
|
|
+ segmentSizes.add(pos, mergedSize);
|
|
|
+
|
|
|
+ n -= (f-1);
|
|
|
+ f = factor;
|
|
|
+ }
|
|
|
+
|
|
|
+ return totalBytes;
|
|
|
+ }
|
|
|
|
|
|
public Progress getProgress() {
|
|
|
return mergeProgress;
|