|
@@ -114,6 +114,11 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
*/
|
|
|
private volatile boolean mergeInProgress = false;
|
|
|
|
|
|
+ /**
|
|
|
+ * When we accumulate merge_threshold number of files in ram, we merge/spill
|
|
|
+ */
|
|
|
+ private int mergeThreshold = 500;
|
|
|
+
|
|
|
/**
|
|
|
* The threads for fetching the files.
|
|
|
*/
|
|
@@ -316,8 +321,11 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
" output from " + loc.getHost() + ".");
|
|
|
//Create a thread to do merges. Synchronize access/update to
|
|
|
//mergeInProgress
|
|
|
- if (!mergeInProgress && inMemFileSys.getPercentUsed() >=
|
|
|
- MAX_INMEM_FILESYS_USE) {
|
|
|
+ if (!mergeInProgress &&
|
|
|
+ (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE ||
|
|
|
+ (mergeThreshold > 0 &&
|
|
|
+ inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= mergeThreshold))&&
|
|
|
+ mergeThrowable == null) {
|
|
|
LOG.info(reduceId + " InMemoryFileSystem " +
|
|
|
inMemFileSys.getUri().toString() +
|
|
|
" is " + inMemFileSys.getPercentUsed() +
|
|
@@ -383,6 +391,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
this.copyResults = new ArrayList(100);
|
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
|
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
|
|
|
+ this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
|
|
|
|
|
|
//we want to distinguish inmem fs instances for different reduces. Hence,
|
|
|
//append a unique string in the uri for the inmem fs name
|
|
@@ -619,20 +628,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (mergeThrowable != null) {
|
|
|
- //set the task state to FAILED
|
|
|
- TaskTracker tracker = ReduceTaskRunner.this.getTracker();
|
|
|
- TaskTracker.TaskInProgress tip =
|
|
|
- tracker.runningTasks.get(reduceTask.getTaskId());
|
|
|
- tip.runstate = TaskStatus.State.FAILED;
|
|
|
- try {
|
|
|
- tip.cleanup();
|
|
|
- } catch (Throwable ie2) {
|
|
|
- // Ignore it, we are just trying to cleanup.
|
|
|
- }
|
|
|
- inMemFileSys.close();
|
|
|
- }
|
|
|
-
|
|
|
//Do a merge of in-memory files (if there are any)
|
|
|
if (!killed && mergeThrowable == null) {
|
|
|
try {
|
|
@@ -644,6 +639,11 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
" Copying of all map outputs complete. " +
|
|
|
"Initiating the last merge on the remaining files in " +
|
|
|
inMemFileSys.getUri());
|
|
|
+ if (mergeThrowable != null) {
|
|
|
+ //this could happen if the merge that
|
|
|
+ //was in progress threw an exception
|
|
|
+ throw mergeThrowable;
|
|
|
+ }
|
|
|
//initiate merge
|
|
|
Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
|
|
|
if (inMemClosedFiles.length == 0) {
|
|
@@ -651,16 +651,28 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
inMemFileSys.getUri());
|
|
|
return numCopied == numOutputs;
|
|
|
}
|
|
|
- RawKeyValueIterator rIter =
|
|
|
- sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length,
|
|
|
- new Path(reduceTask.getTaskId()));
|
|
|
//name this output file same as the name of the first file that is
|
|
|
//there in the current list of inmem files (this is guaranteed to be
|
|
|
//absent on the disk currently. So we don't overwrite a prev.
|
|
|
- //created spill)
|
|
|
+ //created spill). Also we need to create the output file now since
|
|
|
+ //it is not guaranteed that this file will be present after merge
|
|
|
+ //is called (we delete empty sequence files as soon as we see them
|
|
|
+ //in the merge method)
|
|
|
SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
|
inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
|
localFileSys.makeQualified(inMemClosedFiles[0]), null);
|
|
|
+
|
|
|
+ RawKeyValueIterator rIter = null;
|
|
|
+ try {
|
|
|
+ rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length,
|
|
|
+ new Path(reduceTask.getTaskId()));
|
|
|
+ } catch (Exception e) {
|
|
|
+ //make sure that we delete the ondisk file that we created earlier
|
|
|
+ //when we invoked cloneFileAttributes
|
|
|
+ writer.close();
|
|
|
+ localFileSys.delete(inMemClosedFiles[0]);
|
|
|
+ throw new IOException (StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
sorter.writeFile(rIter, writer);
|
|
|
writer.close();
|
|
|
LOG.info(reduceTask.getTaskId() +
|
|
@@ -668,14 +680,15 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
" files in InMemoryFileSystem complete." +
|
|
|
" Local file is " + inMemClosedFiles[0]);
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.warn("Merge of the inmemory files threw an exception: " +
|
|
|
+ LOG.warn(reduceTask.getTaskId() +
|
|
|
+ " Final merge of the inmemory files threw an exception: " +
|
|
|
StringUtils.stringifyException(t));
|
|
|
- inMemFileSys.close();
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
return mergeThrowable == null && numCopied == numOutputs && !killed;
|
|
|
} finally {
|
|
|
+ inMemFileSys.close();
|
|
|
pingTimer.interrupt();
|
|
|
}
|
|
|
}
|
|
@@ -780,15 +793,27 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
//output files to merge to get the benefit of in-memory merge
|
|
|
if (inMemClosedFiles.length >=
|
|
|
(int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
|
|
|
- RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true,
|
|
|
- inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
|
|
|
//name this output file same as the name of the first file that is
|
|
|
//there in the current list of inmem files (this is guaranteed to be
|
|
|
//absent on the disk currently. So we don't overwrite a prev.
|
|
|
- //created spill)
|
|
|
+ //created spill). Also we need to create the output file now since
|
|
|
+ //it is not guaranteed that this file will be present after merge
|
|
|
+ //is called (we delete empty sequence files as soon as we see them
|
|
|
+ //in the merge method)
|
|
|
SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
|
inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
|
localFileSys.makeQualified(inMemClosedFiles[0]), null);
|
|
|
+ RawKeyValueIterator rIter;
|
|
|
+ try {
|
|
|
+ rIter = sorter.merge(inMemClosedFiles, true,
|
|
|
+ inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
|
|
|
+ } catch (Exception e) {
|
|
|
+ //make sure that we delete the ondisk file that we created earlier
|
|
|
+ //when we invoked cloneFileAttributes
|
|
|
+ writer.close();
|
|
|
+ localFileSys.delete(inMemClosedFiles[0]);
|
|
|
+ throw new IOException (StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
sorter.writeFile(rIter, writer);
|
|
|
writer.close();
|
|
|
LOG.info(reduceTask.getTaskId() +
|
|
@@ -801,7 +826,8 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
inMemFileSys.getUri());
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.warn("Merge of the inmemory files threw an exception: " +
|
|
|
+ LOG.warn(reduceTask.getTaskId() +
|
|
|
+ " Intermediate Merge of the inmemory files threw an exception: " +
|
|
|
StringUtils.stringifyException(t));
|
|
|
ReduceTaskRunner.this.mergeThrowable = t;
|
|
|
}
|