|
@@ -413,14 +413,14 @@ class ReduceTask extends Task {
|
|
private volatile Throwable mergeThrowable;
|
|
private volatile Throwable mergeThrowable;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * A flag to indicate that localFS merge is in progress
|
|
|
|
|
|
+ * A flag to indicate when to exit localFS merge
|
|
*/
|
|
*/
|
|
- private volatile boolean localFSMergeInProgress = false;
|
|
|
|
|
|
+ private volatile boolean exitLocalFSMerge = false;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * A flag to indicate that merge is in progress
|
|
|
|
|
|
+ * A flag to indicate when to exit InMemMerge
|
|
*/
|
|
*/
|
|
- private volatile boolean mergeInProgress = false;
|
|
|
|
|
|
+ private volatile boolean exitInMemMerge = false;
|
|
|
|
|
|
/**
|
|
/**
|
|
* When we accumulate mergeThreshold number of files in ram, we merge/spill
|
|
* When we accumulate mergeThreshold number of files in ram, we merge/spill
|
|
@@ -873,27 +873,10 @@ class ReduceTask extends Task {
|
|
if (mapOutput.inMemory) {
|
|
if (mapOutput.inMemory) {
|
|
// Save it in the synchronized list of map-outputs
|
|
// Save it in the synchronized list of map-outputs
|
|
mapOutputsFilesInMemory.add(mapOutput);
|
|
mapOutputsFilesInMemory.add(mapOutput);
|
|
-
|
|
|
|
- //Create a thread to do merges. Synchronize access/update to
|
|
|
|
- //mergeInProgress
|
|
|
|
- if (!mergeInProgress &&
|
|
|
|
- ((ramManager.getPercentUsed() >= MAX_INMEM_FILESYS_USE &&
|
|
|
|
- ramManager.getReservedFiles() >=
|
|
|
|
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) ||
|
|
|
|
- (mergeThreshold > 0 &&
|
|
|
|
- ramManager.getReservedFiles() >= mergeThreshold)) &&
|
|
|
|
- mergeThrowable == null) {
|
|
|
|
- LOG.info(reduceId + " RamManager " +
|
|
|
|
- " is " + ramManager.getPercentUsed() + " full with " +
|
|
|
|
- mapOutputsFilesInMemory.size() + " files." +
|
|
|
|
- " Triggering merge");
|
|
|
|
-
|
|
|
|
- InMemFSMergeThread m =
|
|
|
|
- new InMemFSMergeThread((LocalFileSystem)localFileSys);
|
|
|
|
- m.setName("Thread for merging in-memory files");
|
|
|
|
- m.setDaemon(true);
|
|
|
|
- mergeInProgress = true;
|
|
|
|
- m.start();
|
|
|
|
|
|
+
|
|
|
|
+ //notify the InMemFSMergeThread
|
|
|
|
+ synchronized(ramManager){
|
|
|
|
+ ramManager.notify();
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
// Rename the temporary file to the final file;
|
|
// Rename the temporary file to the final file;
|
|
@@ -908,7 +891,7 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
|
|
|
|
synchronized (mapOutputFilesOnDisk) {
|
|
synchronized (mapOutputFilesOnDisk) {
|
|
- mapOutputFilesOnDisk.add(localFileSys.getFileStatus(filename));
|
|
|
|
|
|
+ addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1188,6 +1171,8 @@ class ReduceTask extends Task {
|
|
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
|
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
|
final Progress copyPhase =
|
|
final Progress copyPhase =
|
|
reduceTask.getProgress().phase();
|
|
reduceTask.getProgress().phase();
|
|
|
|
+ LocalFSMerger localFSMergerThread = null;
|
|
|
|
+ InMemFSMergeThread inMemFSMergeThread = null;
|
|
|
|
|
|
for (int i = 0; i < numOutputs; i++) {
|
|
for (int i = 0; i < numOutputs; i++) {
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
@@ -1204,6 +1189,13 @@ class ReduceTask extends Task {
|
|
copier.start();
|
|
copier.start();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //start the on-disk-merge thread
|
|
|
|
+ localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
|
|
|
|
+ //start the in memory merger thread
|
|
|
|
+ inMemFSMergeThread = new InMemFSMergeThread();
|
|
|
|
+ localFSMergerThread.start();
|
|
|
|
+ inMemFSMergeThread.start();
|
|
|
|
+
|
|
// start the clock for bandwidth measurement
|
|
// start the clock for bandwidth measurement
|
|
long startTime = System.currentTimeMillis();
|
|
long startTime = System.currentTimeMillis();
|
|
long currentTime = startTime;
|
|
long currentTime = startTime;
|
|
@@ -1329,25 +1321,6 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // Check if a on-disk merge can be done. This will help if there
|
|
|
|
- // are no copies to be fetched but sufficient copies to be merged.
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
|
- if (!localFSMergeInProgress
|
|
|
|
- && (mapOutputFilesOnDisk.size() >= (2 * ioSortFactor - 1))) {
|
|
|
|
- // make sure that only one thread merges the disk files
|
|
|
|
- localFSMergeInProgress = true;
|
|
|
|
- // start the on-disk-merge process
|
|
|
|
- LOG.info(reduceTask.getTaskID() + "We have " +
|
|
|
|
- mapOutputFilesOnDisk.size() + " map outputs on disk. " +
|
|
|
|
- "Triggering merge of " + ioSortFactor + " files");
|
|
|
|
- LocalFSMerger lfsm =
|
|
|
|
- new LocalFSMerger((LocalFileSystem)localFileSys);
|
|
|
|
- lfsm.setName("Thread for merging on-disk files");
|
|
|
|
- lfsm.setDaemon(true);
|
|
|
|
- lfsm.start();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
// if we have no copies in flight and we can't schedule anything
|
|
// if we have no copies in flight and we can't schedule anything
|
|
// new, just wait for a bit
|
|
// new, just wait for a bit
|
|
try {
|
|
try {
|
|
@@ -1519,83 +1492,25 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // copiers are done, exit and notify the waiting merge threads
|
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
|
+ exitLocalFSMerge = true;
|
|
|
|
+ mapOutputFilesOnDisk.notify();
|
|
|
|
+ }
|
|
|
|
+ synchronized (ramManager) {
|
|
|
|
+ exitInMemMerge = true;
|
|
|
|
+ ramManager.notify();
|
|
|
|
+ }
|
|
|
|
+
|
|
//Do a merge of in-memory files (if there are any)
|
|
//Do a merge of in-memory files (if there are any)
|
|
if (mergeThrowable == null) {
|
|
if (mergeThrowable == null) {
|
|
try {
|
|
try {
|
|
// Wait for the on-disk merge to complete
|
|
// Wait for the on-disk merge to complete
|
|
- while (localFSMergeInProgress) {
|
|
|
|
- Thread.sleep(200);
|
|
|
|
- }
|
|
|
|
|
|
+ localFSMergerThread.join();
|
|
|
|
|
|
//wait for an ongoing merge (if it is in flight) to complete
|
|
//wait for an ongoing merge (if it is in flight) to complete
|
|
- while (mergeInProgress) {
|
|
|
|
- Thread.sleep(200);
|
|
|
|
- }
|
|
|
|
- LOG.info(reduceTask.getTaskID() +
|
|
|
|
- " Copying of all map outputs complete. " +
|
|
|
|
- "Initiating the last merge on the remaining files " +
|
|
|
|
- "in-memory");
|
|
|
|
- if (mergeThrowable != null) {
|
|
|
|
- //this could happen if the merge that
|
|
|
|
- //was in progress threw an exception
|
|
|
|
- throw mergeThrowable;
|
|
|
|
- }
|
|
|
|
- //initiate merge
|
|
|
|
- if (mapOutputsFilesInMemory.size() == 0) {
|
|
|
|
- LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
|
|
|
|
- "in-memory map-outputs");
|
|
|
|
- return (copiedMapOutputs.size() == numOutputs);
|
|
|
|
- }
|
|
|
|
- //name this output file same as the name of the first file that is
|
|
|
|
- //there in the current list of inmem files (this is guaranteed to be
|
|
|
|
- //absent on the disk currently. So we don't overwrite a prev.
|
|
|
|
- //created spill). Also we need to create the output file now since
|
|
|
|
- //it is not guaranteed that this file will be present after merge
|
|
|
|
- //is called (we delete empty map-output files as soon as we see them
|
|
|
|
- //in the merge method)
|
|
|
|
- TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
|
|
|
|
- Path outputPath =
|
|
|
|
- localFileSys.makeQualified(
|
|
|
|
- mapOutputFile.getInputFileForWrite(mapId,
|
|
|
|
- reduceTask.getTaskID(),
|
|
|
|
- ramfsMergeOutputSize));
|
|
|
|
- Writer writer =
|
|
|
|
- new Writer(conf, localFileSys, outputPath,
|
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
|
- codec);
|
|
|
|
- List<Segment<K, V>> inMemorySegments = createInMemorySegments();
|
|
|
|
- int noInMemSegments = inMemorySegments.size();
|
|
|
|
- RawKeyValueIterator rIter = null;
|
|
|
|
- try {
|
|
|
|
- rIter = Merger.merge(conf, localFileSys,
|
|
|
|
- (Class<K>)conf.getMapOutputKeyClass(),
|
|
|
|
- (Class<V>)conf.getMapOutputValueClass(),
|
|
|
|
- inMemorySegments, inMemorySegments.size(),
|
|
|
|
- new Path(reduceTask.getTaskID().toString()),
|
|
|
|
- conf.getOutputKeyComparator(), reporter);
|
|
|
|
-
|
|
|
|
- Merger.writeFile(rIter, writer, reporter);
|
|
|
|
- writer.close();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- //make sure that we delete the ondisk file that we created earlier
|
|
|
|
- //when we invoked cloneFileAttributes
|
|
|
|
- writer.close();
|
|
|
|
- localFileSys.delete(outputPath, true);
|
|
|
|
- throw new IOException (StringUtils.stringifyException(e));
|
|
|
|
- }
|
|
|
|
- LOG.info(reduceTask.getTaskID() +
|
|
|
|
- " Merge of the " + noInMemSegments +
|
|
|
|
- " files in InMemoryFileSystem complete." +
|
|
|
|
- " Local file is " + outputPath +
|
|
|
|
- " of size " +
|
|
|
|
- localFileSys.getFileStatus(outputPath).getLen());
|
|
|
|
-
|
|
|
|
- FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
|
- mapOutputFilesOnDisk.add(status);
|
|
|
|
- }
|
|
|
|
- } catch (Throwable t) {
|
|
|
|
|
|
+ inMemFSMergeThread.join();
|
|
|
|
+ } catch (Throwable t) {
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
StringUtils.stringifyException(t));
|
|
StringUtils.stringifyException(t));
|
|
@@ -1642,6 +1557,13 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void addToMapOutputFilesOnDisk(FileStatus status) {
|
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
|
+ mapOutputFilesOnDisk.add(status);
|
|
|
|
+ mapOutputFilesOnDisk.notify();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Queries the {@link TaskTracker} for a set of map-completion events from
|
|
* Queries the {@link TaskTracker} for a set of map-completion events from
|
|
* a given event ID.
|
|
* a given event ID.
|
|
@@ -1734,77 +1656,93 @@ class ReduceTask extends Task {
|
|
|
|
|
|
public LocalFSMerger(LocalFileSystem fs) {
|
|
public LocalFSMerger(LocalFileSystem fs) {
|
|
this.localFileSys = fs;
|
|
this.localFileSys = fs;
|
|
|
|
+ setName("Thread for merging on-disk files");
|
|
|
|
+ setDaemon(true);
|
|
}
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public void run() {
|
|
public void run() {
|
|
try {
|
|
try {
|
|
- List<Path> mapFiles = new ArrayList<Path>();
|
|
|
|
- long approxOutputSize = 0;
|
|
|
|
- int bytesPerSum =
|
|
|
|
- reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
|
|
|
|
- LOG.info(reduceTask.getTaskID()
|
|
|
|
- + " Merging map output files on disk");
|
|
|
|
- // 1. Prepare the list of files to be merged. This list is prepared
|
|
|
|
- // using a list of map output files on disk. Currently we merge
|
|
|
|
- // io.sort.factor files into 1.
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
|
- for (int i = 0; i < ioSortFactor; ++i) {
|
|
|
|
- FileStatus filestatus = mapOutputFilesOnDisk.first();
|
|
|
|
- mapOutputFilesOnDisk.remove(filestatus);
|
|
|
|
- mapFiles.add(filestatus.getPath());
|
|
|
|
- approxOutputSize += filestatus.getLen();
|
|
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
|
|
|
|
+ while(!exitLocalFSMerge){
|
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
|
+ while (!exitLocalFSMerge &&
|
|
|
|
+ mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
|
|
|
|
+ mapOutputFilesOnDisk.wait();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- // sanity check
|
|
|
|
- if (mapFiles.size() == 0) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // add the checksum length
|
|
|
|
- approxOutputSize += ChecksumFileSystem
|
|
|
|
- .getChecksumLength(approxOutputSize,
|
|
|
|
- bytesPerSum);
|
|
|
|
-
|
|
|
|
- // 2. Start the on-disk merge process
|
|
|
|
- Path outputPath =
|
|
|
|
- lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
|
|
|
|
- approxOutputSize, conf)
|
|
|
|
- .suffix(".merged");
|
|
|
|
- Writer writer =
|
|
|
|
- new Writer(conf, localFileSys, outputPath,
|
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
|
- codec);
|
|
|
|
- RawKeyValueIterator iter = null;
|
|
|
|
- Path tmpDir = new Path(reduceTask.getTaskID().toString());
|
|
|
|
- final Reporter reporter = getReporter(umbilical);
|
|
|
|
- try {
|
|
|
|
- iter = Merger.merge(conf, localFileSys,
|
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
|
- codec, mapFiles.toArray(new Path[mapFiles.size()]),
|
|
|
|
- true, ioSortFactor, tmpDir,
|
|
|
|
- conf.getOutputKeyComparator(), reporter);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
|
+ if(exitLocalFSMerge) {//to avoid running one extra time in the end
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ List<Path> mapFiles = new ArrayList<Path>();
|
|
|
|
+ long approxOutputSize = 0;
|
|
|
|
+ int bytesPerSum =
|
|
|
|
+ reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + "We have " +
|
|
|
|
+ mapOutputFilesOnDisk.size() + " map outputs on disk. " +
|
|
|
|
+ "Triggering merge of " + ioSortFactor + " files");
|
|
|
|
+ // 1. Prepare the list of files to be merged. This list is prepared
|
|
|
|
+ // using a list of map output files on disk. Currently we merge
|
|
|
|
+ // io.sort.factor files into 1.
|
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
|
+ for (int i = 0; i < ioSortFactor; ++i) {
|
|
|
|
+ FileStatus filestatus = mapOutputFilesOnDisk.first();
|
|
|
|
+ mapOutputFilesOnDisk.remove(filestatus);
|
|
|
|
+ mapFiles.add(filestatus.getPath());
|
|
|
|
+ approxOutputSize += filestatus.getLen();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // sanity check
|
|
|
|
+ if (mapFiles.size() == 0) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // add the checksum length
|
|
|
|
+ approxOutputSize += ChecksumFileSystem
|
|
|
|
+ .getChecksumLength(approxOutputSize,
|
|
|
|
+ bytesPerSum);
|
|
|
|
+
|
|
|
|
+ // 2. Start the on-disk merge process
|
|
|
|
+ Path outputPath =
|
|
|
|
+ lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
|
|
|
|
+ approxOutputSize, conf)
|
|
|
|
+ .suffix(".merged");
|
|
|
|
+ Writer writer =
|
|
|
|
+ new Writer(conf, localFileSys, outputPath,
|
|
|
|
+ conf.getMapOutputKeyClass(),
|
|
|
|
+ conf.getMapOutputValueClass(),
|
|
|
|
+ codec);
|
|
|
|
+ RawKeyValueIterator iter = null;
|
|
|
|
+ Path tmpDir = new Path(reduceTask.getTaskID().toString());
|
|
|
|
+ final Reporter reporter = getReporter(umbilical);
|
|
|
|
+ try {
|
|
|
|
+ iter = Merger.merge(conf, localFileSys,
|
|
|
|
+ conf.getMapOutputKeyClass(),
|
|
|
|
+ conf.getMapOutputValueClass(),
|
|
|
|
+ codec, mapFiles.toArray(new Path[mapFiles.size()]),
|
|
|
|
+ true, ioSortFactor, tmpDir,
|
|
|
|
+ conf.getOutputKeyComparator(), reporter);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ writer.close();
|
|
|
|
+ localFileSys.delete(outputPath, true);
|
|
|
|
+ throw new IOException (StringUtils.stringifyException(e));
|
|
|
|
+ }
|
|
|
|
+ Merger.writeFile(iter, writer, reporter);
|
|
writer.close();
|
|
writer.close();
|
|
- localFileSys.delete(outputPath, true);
|
|
|
|
- throw new IOException (StringUtils.stringifyException(e));
|
|
|
|
- }
|
|
|
|
- Merger.writeFile(iter, writer, reporter);
|
|
|
|
- writer.close();
|
|
|
|
-
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
|
- mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- LOG.info(reduceTask.getTaskID() +
|
|
|
|
- " Finished merging " + mapFiles.size() +
|
|
|
|
- " map output files on disk of total-size " +
|
|
|
|
- approxOutputSize + "." +
|
|
|
|
- " Local output file is " + outputPath + " of size " +
|
|
|
|
- localFileSys.getFileStatus(outputPath).getLen());
|
|
|
|
|
|
+
|
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
|
+ addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LOG.info(reduceTask.getTaskID() +
|
|
|
|
+ " Finished merging " + mapFiles.size() +
|
|
|
|
+ " map output files on disk of total-size " +
|
|
|
|
+ approxOutputSize + "." +
|
|
|
|
+ " Local output file is " + outputPath + " of size " +
|
|
|
|
+ localFileSys.getFileStatus(outputPath).getLen());
|
|
|
|
+ }
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.warn(reduceTask.getTaskID()
|
|
LOG.warn(reduceTask.getTaskID()
|
|
+ " Merging of the local FS files threw an exception: "
|
|
+ " Merging of the local FS files threw an exception: "
|
|
@@ -1812,94 +1750,126 @@ class ReduceTask extends Task {
|
|
if (mergeThrowable == null) {
|
|
if (mergeThrowable == null) {
|
|
mergeThrowable = t;
|
|
mergeThrowable = t;
|
|
}
|
|
}
|
|
- } finally {
|
|
|
|
- localFSMergeInProgress = false;
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
private class InMemFSMergeThread extends Thread {
|
|
private class InMemFSMergeThread extends Thread {
|
|
- private LocalFileSystem localFileSys;
|
|
|
|
|
|
|
|
- public InMemFSMergeThread( LocalFileSystem localFileSys) {
|
|
|
|
- this.localFileSys = localFileSys;
|
|
|
|
|
|
+ public InMemFSMergeThread() {
|
|
|
|
+ setName("Thread for merging in memory files");
|
|
|
|
+ setDaemon(true);
|
|
}
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public void run() {
|
|
public void run() {
|
|
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
|
|
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
|
|
try {
|
|
try {
|
|
- if (mapOutputsFilesInMemory.size() >=
|
|
|
|
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
|
|
|
|
- //name this output file same as the name of the first file that is
|
|
|
|
- //there in the current list of inmem files (this is guaranteed to
|
|
|
|
- //be absent on the disk currently. So we don't overwrite a prev.
|
|
|
|
- //created spill). Also we need to create the output file now since
|
|
|
|
- //it is not guaranteed that this file will be present after merge
|
|
|
|
- //is called (we delete empty files as soon as we see them
|
|
|
|
- //in the merge method)
|
|
|
|
-
|
|
|
|
- //figure out the mapId
|
|
|
|
- TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
|
|
|
|
-
|
|
|
|
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
|
|
|
|
- reduceTask.getTaskID(), ramfsMergeOutputSize);
|
|
|
|
-
|
|
|
|
- Writer writer =
|
|
|
|
- new Writer(conf, localFileSys, outputPath,
|
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
|
- codec);
|
|
|
|
-
|
|
|
|
- List<Segment<K, V>> inMemorySegments = createInMemorySegments();
|
|
|
|
- int noInMemorySegments = inMemorySegments.size();
|
|
|
|
-
|
|
|
|
- RawKeyValueIterator rIter = null;
|
|
|
|
- final Reporter reporter = getReporter(umbilical);
|
|
|
|
- try {
|
|
|
|
- rIter = Merger.merge(conf, localFileSys,
|
|
|
|
- (Class<K>)conf.getMapOutputKeyClass(),
|
|
|
|
- (Class<V>)conf.getMapOutputValueClass(),
|
|
|
|
- inMemorySegments, inMemorySegments.size(),
|
|
|
|
- new Path(reduceTask.getTaskID().toString()),
|
|
|
|
- conf.getOutputKeyComparator(), reporter);
|
|
|
|
- if (null == combinerClass) {
|
|
|
|
- Merger.writeFile(rIter, writer, reporter);
|
|
|
|
- } else {
|
|
|
|
- combineCollector.setWriter(writer);
|
|
|
|
- combineAndSpill(rIter, reduceCombineInputCounter);
|
|
|
|
|
|
+ while(!exitInMemMerge) {
|
|
|
|
+ synchronized(ramManager) {
|
|
|
|
+ while(!exitInMemMerge &&
|
|
|
|
+ ((ramManager.getPercentUsed() < MAX_INMEM_FILESYS_USE ||
|
|
|
|
+ ramManager.getReservedFiles() <
|
|
|
|
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION))
|
|
|
|
+ &&
|
|
|
|
+ (mergeThreshold <= 0 ||
|
|
|
|
+ ramManager.getReservedFiles() < mergeThreshold))) {
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
|
|
|
|
+ ramManager.wait();
|
|
}
|
|
}
|
|
- } catch (Exception e) {
|
|
|
|
- //make sure that we delete the ondisk file that we created
|
|
|
|
- //earlier when we invoked cloneFileAttributes
|
|
|
|
- writer.close();
|
|
|
|
- localFileSys.delete(outputPath, true);
|
|
|
|
- throw (IOException)new IOException
|
|
|
|
- ("Intermedate merge failed").initCause(e);
|
|
|
|
}
|
|
}
|
|
- writer.close();
|
|
|
|
- LOG.info(reduceTask.getTaskID() +
|
|
|
|
- " Merge of the " + noInMemorySegments +
|
|
|
|
- " files in-memory complete." +
|
|
|
|
- " Local file is " + outputPath + " of size " +
|
|
|
|
- localFileSys.getFileStatus(outputPath).getLen());
|
|
|
|
-
|
|
|
|
- FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
|
- mapOutputFilesOnDisk.add(status);
|
|
|
|
|
|
+ if(exitInMemMerge) {//to avoid running one extra time in the end
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + " RamManager " +
|
|
|
|
+ " is " + ramManager.getPercentUsed() + " full with " +
|
|
|
|
+ mapOutputsFilesInMemory.size() + " files." +
|
|
|
|
+ " Triggering merge");
|
|
|
|
+ if (mapOutputsFilesInMemory.size() >=
|
|
|
|
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
|
|
|
|
+ doInMemMerge();
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- else {
|
|
|
|
- LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
|
|
|
|
|
|
+ //see if any remaining files are there for merge
|
|
|
|
+ LOG.info(reduceTask.getTaskID() +
|
|
|
|
+ " Copying of all map outputs complete. " +
|
|
|
|
+ "Initiating the last merge on the remaining files " +
|
|
|
|
+ "in-memory");
|
|
|
|
+ if (mapOutputsFilesInMemory.size() == 0) {
|
|
|
|
+ LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
|
|
|
|
+ "in-memory map-outputs");
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
+ doInMemMerge();
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
- " Intermediate Merge of the inmemory files threw an exception: "
|
|
|
|
|
|
+ " Merge of the inmemory files threw an exception: "
|
|
+ StringUtils.stringifyException(t));
|
|
+ StringUtils.stringifyException(t));
|
|
ReduceCopier.this.mergeThrowable = t;
|
|
ReduceCopier.this.mergeThrowable = t;
|
|
}
|
|
}
|
|
- finally {
|
|
|
|
- mergeInProgress = false;
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ private void doInMemMerge() throws IOException{
|
|
|
|
+ //name this output file same as the name of the first file that is
|
|
|
|
+ //there in the current list of inmem files (this is guaranteed to
|
|
|
|
+ //be absent on the disk currently. So we don't overwrite a prev.
|
|
|
|
+ //created spill). Also we need to create the output file now since
|
|
|
|
+ //it is not guaranteed that this file will be present after merge
|
|
|
|
+ //is called (we delete empty files as soon as we see them
|
|
|
|
+ //in the merge method)
|
|
|
|
+
|
|
|
|
+ //figure out the mapId
|
|
|
|
+ TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
|
|
|
|
+
|
|
|
|
+ Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
|
|
|
|
+ reduceTask.getTaskID(), ramfsMergeOutputSize);
|
|
|
|
+
|
|
|
|
+ Writer writer =
|
|
|
|
+ new Writer(conf, localFileSys, outputPath,
|
|
|
|
+ conf.getMapOutputKeyClass(),
|
|
|
|
+ conf.getMapOutputValueClass(),
|
|
|
|
+ codec);
|
|
|
|
+
|
|
|
|
+ List<Segment<K, V>> inMemorySegments = createInMemorySegments();
|
|
|
|
+ int noInMemorySegments = inMemorySegments.size();
|
|
|
|
+
|
|
|
|
+ RawKeyValueIterator rIter = null;
|
|
|
|
+ final Reporter reporter = getReporter(umbilical);
|
|
|
|
+ try {
|
|
|
|
+ rIter = Merger.merge(conf, localFileSys,
|
|
|
|
+ (Class<K>)conf.getMapOutputKeyClass(),
|
|
|
|
+ (Class<V>)conf.getMapOutputValueClass(),
|
|
|
|
+ inMemorySegments, inMemorySegments.size(),
|
|
|
|
+ new Path(reduceTask.getTaskID().toString()),
|
|
|
|
+ conf.getOutputKeyComparator(), reporter);
|
|
|
|
+ if (null == combinerClass) {
|
|
|
|
+ Merger.writeFile(rIter, writer, reporter);
|
|
|
|
+ } else {
|
|
|
|
+ combineCollector.setWriter(writer);
|
|
|
|
+ combineAndSpill(rIter, reduceCombineInputCounter);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ //make sure that we delete the ondisk file that we created
|
|
|
|
+ //earlier when we invoked cloneFileAttributes
|
|
|
|
+ writer.close();
|
|
|
|
+ localFileSys.delete(outputPath, true);
|
|
|
|
+ throw (IOException)new IOException
|
|
|
|
+ ("Intermedate merge failed").initCause(e);
|
|
|
|
+ }
|
|
|
|
+ writer.close();
|
|
|
|
+ LOG.info(reduceTask.getTaskID() +
|
|
|
|
+ " Merge of the " + noInMemorySegments +
|
|
|
|
+ " files in-memory complete." +
|
|
|
|
+ " Local file is " + outputPath + " of size " +
|
|
|
|
+ localFileSys.getFileStatus(outputPath).getLen());
|
|
|
|
+
|
|
|
|
+ FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
|
+ addToMapOutputFilesOnDisk(status);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|