|
@@ -505,7 +505,8 @@ public class MergeManager<K, V> {
|
|
|
}
|
|
|
|
|
|
private class OnDiskMerger extends MergeThread<MapOutput<K,V>,K,V> {
|
|
|
-
|
|
|
+ private int mergeCount = 0;
|
|
|
+
|
|
|
public OnDiskMerger(MergeManager<K, V> manager) {
|
|
|
super(manager, ioSortFactor, exceptionReporter);
|
|
|
setName("OnDiskMerger - Thread to merge on-disk map-outputs");
|
|
@@ -537,8 +538,10 @@ public class MergeManager<K, V> {
|
|
|
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
|
|
|
|
|
|
// 2. Start the on-disk merge process
|
|
|
+ ++mergeCount;
|
|
|
Path outputPath =
|
|
|
- localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
|
|
+ localDirAllocator.getLocalPathForWrite(
|
|
|
+ new Path(reduceId.toString(), "diskmerge" + mergeCount).toString(),
|
|
|
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
|
|
Writer<K,V> writer =
|
|
|
new Writer<K,V>(jobConf, rfs, outputPath,
|