|
@@ -422,11 +422,6 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private volatile boolean exitLocalFSMerge = false;
|
|
|
|
|
|
- /**
|
|
|
- * A flag to indicate when to exit InMemMerge
|
|
|
- */
|
|
|
- private volatile boolean exitInMemMerge = false;
|
|
|
-
|
|
|
/**
|
|
|
* When we accumulate mergeThreshold number of files in ram, we merge/spill
|
|
|
*/
|
|
@@ -712,42 +707,60 @@ class ReduceTask extends Task {
|
|
|
class ShuffleRamManager implements RamManager {
|
|
|
/* Maximum percentage of the in-memory limit that a single shuffle can
|
|
|
* consume*/
|
|
|
- private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f;
|
|
|
+ private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
|
|
|
|
|
|
- private boolean closed = false;
|
|
|
+ /* Maximum percentage of shuffle-threads which can be stalled
|
|
|
+ * simultaneously after which a merge is triggered. */
|
|
|
+ private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;
|
|
|
|
|
|
- volatile private int numClosed = 0;
|
|
|
- volatile private int size = 0;
|
|
|
private final int maxSize;
|
|
|
private final int maxSingleShuffleLimit;
|
|
|
|
|
|
+ private int size = 0;
|
|
|
+
|
|
|
private Object dataAvailable = new Object();
|
|
|
- private volatile int fullSize = 0;
|
|
|
+ private int fullSize = 0;
|
|
|
+ private int numPendingRequests = 0;
|
|
|
+ private int numRequiredMapOutputs = 0;
|
|
|
+ private int numClosed = 0;
|
|
|
+ private boolean closed = false;
|
|
|
|
|
|
public ShuffleRamManager(Configuration conf) {
|
|
|
maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
|
|
|
- maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT);
|
|
|
+ maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
|
|
|
LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
|
|
|
", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
|
|
|
}
|
|
|
|
|
|
- public synchronized boolean reserve(int requestedSize, InputStream in) {
|
|
|
+ public synchronized boolean reserve(int requestedSize, InputStream in)
|
|
|
+ throws InterruptedException {
|
|
|
+ // Wait till the request can be fulfilled...
|
|
|
while ((size + requestedSize) > maxSize) {
|
|
|
- try {
|
|
|
- // Close the connection
|
|
|
- if (in != null) {
|
|
|
- try {
|
|
|
- in.close();
|
|
|
- } catch (IOException ie) {
|
|
|
- LOG.info("Failed to close connection with: " + ie);
|
|
|
- } finally {
|
|
|
- in = null;
|
|
|
- }
|
|
|
+
|
|
|
+ // Close the input...
|
|
|
+ if (in != null) {
|
|
|
+ try {
|
|
|
+ in.close();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.info("Failed to close connection with: " + ie);
|
|
|
+ } finally {
|
|
|
+ in = null;
|
|
|
}
|
|
|
-
|
|
|
- // Wait for memory to free up
|
|
|
- wait();
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
+ }
|
|
|
+
|
|
|
+ // Track pending requests
|
|
|
+ synchronized (dataAvailable) {
|
|
|
+ ++numPendingRequests;
|
|
|
+ dataAvailable.notify();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for memory to free up
|
|
|
+ wait();
|
|
|
+
|
|
|
+ // Track pending requests
|
|
|
+ synchronized (dataAvailable) {
|
|
|
+ --numPendingRequests;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
size += requestedSize;
|
|
@@ -767,20 +780,25 @@ class ReduceTask extends Task {
|
|
|
notifyAll();
|
|
|
}
|
|
|
|
|
|
- public void waitForDataToMerge() {
|
|
|
+ public boolean waitForDataToMerge() throws InterruptedException {
|
|
|
+ boolean done = false;
|
|
|
synchronized (dataAvailable) {
|
|
|
while (!closed &&
|
|
|
(getPercentUsed() < MAX_INMEM_FILESYS_USE ||
|
|
|
- getReservedFiles() <
|
|
|
+ numClosed <
|
|
|
(int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
|
|
|
)
|
|
|
&&
|
|
|
- (mergeThreshold <= 0 || getReservedFiles() < mergeThreshold)) {
|
|
|
- try {
|
|
|
- dataAvailable.wait();
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
+ (mergeThreshold <= 0 || numClosed < mergeThreshold)
|
|
|
+ &&
|
|
|
+ (numPendingRequests <
|
|
|
+ numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION &&
|
|
|
+ numPendingRequests < numRequiredMapOutputs)) {
|
|
|
+ dataAvailable.wait();
|
|
|
}
|
|
|
+ done = closed;
|
|
|
}
|
|
|
+ return done;
|
|
|
}
|
|
|
|
|
|
public void closeInMemoryFile(int requestedSize) {
|
|
@@ -791,6 +809,13 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
|
|
|
+ synchronized (dataAvailable) {
|
|
|
+ this.numRequiredMapOutputs = numRequiredMapOutputs;
|
|
|
+ dataAvailable.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void close() {
|
|
|
synchronized (dataAvailable) {
|
|
|
closed = true;
|
|
@@ -799,14 +824,10 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- float getPercentUsed() {
|
|
|
+ private float getPercentUsed() {
|
|
|
return (float)fullSize/maxSize;
|
|
|
}
|
|
|
-
|
|
|
- int getReservedFiles() {
|
|
|
- return numClosed;
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
int getMemoryLimit() {
|
|
|
return maxSize;
|
|
|
}
|
|
@@ -978,7 +999,8 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
// Note that we successfully copied the map-output
|
|
|
- copiedMapOutputs.add(loc.getTaskId());
|
|
|
+ noteCopiedMapOutput(loc.getTaskId());
|
|
|
+
|
|
|
return bytes;
|
|
|
}
|
|
|
|
|
@@ -1004,12 +1026,22 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
// Note that we successfully copied the map-output
|
|
|
- copiedMapOutputs.add(loc.getTaskId());
|
|
|
+ noteCopiedMapOutput(loc.getTaskId());
|
|
|
}
|
|
|
|
|
|
return bytes;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Save the map taskid whose output we just copied.
|
|
|
+ * This function assumes that it has been synchronized on ReduceTask.this.
|
|
|
+ *
|
|
|
+ * @param taskId map taskid
|
|
|
+ */
|
|
|
+ private void noteCopiedMapOutput(TaskID taskId) {
|
|
|
+ copiedMapOutputs.add(taskId);
|
|
|
+ ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get the map output into a local file (either in the inmemory fs or on the
|
|
@@ -1248,6 +1280,7 @@ class ReduceTask extends Task {
|
|
|
this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
|
|
|
this.umbilical = umbilical;
|
|
|
this.reduceTask = ReduceTask.this;
|
|
|
+
|
|
|
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
|
|
|
this.copyResults = new ArrayList<CopyResult>(100);
|
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
@@ -1304,7 +1337,6 @@ class ReduceTask extends Task {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
|
- final int numOutputs = reduceTask.getNumMaps();
|
|
|
List<MapOutputLocation> knownOutputs =
|
|
|
new ArrayList<MapOutputLocation>(numCopiers);
|
|
|
int totalFailures = 0;
|
|
@@ -1316,7 +1348,7 @@ class ReduceTask extends Task {
|
|
|
LocalFSMerger localFSMergerThread = null;
|
|
|
InMemFSMergeThread inMemFSMergeThread = null;
|
|
|
|
|
|
- for (int i = 0; i < numOutputs; i++) {
|
|
|
+ for (int i = 0; i < numMaps; i++) {
|
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
|
}
|
|
|
|
|
@@ -1346,7 +1378,7 @@ class ReduceTask extends Task {
|
|
|
IntWritable fromEventId = new IntWritable(0);
|
|
|
|
|
|
// loop until we get all required outputs
|
|
|
- while (copiedMapOutputs.size() < numOutputs && mergeThrowable == null) {
|
|
|
+ while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
|
|
|
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
boolean logNow = false;
|
|
@@ -1356,7 +1388,7 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
if (logNow) {
|
|
|
LOG.info(reduceTask.getTaskID() + " Need another "
|
|
|
- + (numOutputs - copiedMapOutputs.size()) + " map output(s) "
|
|
|
+ + (numMaps - copiedMapOutputs.size()) + " map output(s) "
|
|
|
+ "where " + numInFlight + " is already in progress");
|
|
|
}
|
|
|
|
|
@@ -1503,7 +1535,7 @@ class ReduceTask extends Task {
|
|
|
float transferRate = mbs/secsSinceStart;
|
|
|
|
|
|
copyPhase.startNextPhase();
|
|
|
- copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
|
|
|
+ copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
|
|
|
+ " at " +
|
|
|
mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
|
|
@@ -1640,7 +1672,6 @@ class ReduceTask extends Task {
|
|
|
mapOutputFilesOnDisk.notify();
|
|
|
}
|
|
|
|
|
|
- exitInMemMerge = true;
|
|
|
ramManager.close();
|
|
|
|
|
|
//Do a merge of in-memory files (if there are any)
|
|
@@ -1648,9 +1679,13 @@ class ReduceTask extends Task {
|
|
|
try {
|
|
|
// Wait for the on-disk merge to complete
|
|
|
localFSMergerThread.join();
|
|
|
+ LOG.info("Interleaved on-disk merge complete: " +
|
|
|
+ mapOutputFilesOnDisk.size() + " files left.");
|
|
|
|
|
|
//wait for an ongoing merge (if it is in flight) to complete
|
|
|
inMemFSMergeThread.join();
|
|
|
+ LOG.info("In-memory merge complete: " +
|
|
|
+ mapOutputsFilesInMemory.size() + " files left.");
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
@@ -1662,7 +1697,7 @@ class ReduceTask extends Task {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
- return mergeThrowable == null && copiedMapOutputs.size() == numOutputs;
|
|
|
+ return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
|
|
|
}
|
|
|
|
|
|
private List<Segment<K, V>> createInMemorySegments() {
|
|
@@ -1908,10 +1943,11 @@ class ReduceTask extends Task {
|
|
|
public void run() {
|
|
|
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
|
|
|
try {
|
|
|
- while (!exitInMemMerge) {
|
|
|
- ramManager.waitForDataToMerge();
|
|
|
+ boolean exit = false;
|
|
|
+ do {
|
|
|
+ exit = ramManager.waitForDataToMerge();
|
|
|
doInMemMerge();
|
|
|
- }
|
|
|
+ } while (!exit);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
|
" Merge of the inmemory files threw an exception: "
|
|
@@ -1923,7 +1959,6 @@ class ReduceTask extends Task {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private void doInMemMerge() throws IOException{
|
|
|
if (mapOutputsFilesInMemory.size() == 0) {
|
|
|
- LOG.info("Noting to merge... ");
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -1953,12 +1988,16 @@ class ReduceTask extends Task {
|
|
|
RawKeyValueIterator rIter = null;
|
|
|
final Reporter reporter = getReporter(umbilical);
|
|
|
try {
|
|
|
+ LOG.info("Initiating in-memory merge with " + noInMemorySegments +
|
|
|
+ " segments...");
|
|
|
+
|
|
|
rIter = Merger.merge(conf, localFileSys,
|
|
|
(Class<K>)conf.getMapOutputKeyClass(),
|
|
|
(Class<V>)conf.getMapOutputValueClass(),
|
|
|
inMemorySegments, inMemorySegments.size(),
|
|
|
new Path(reduceTask.getTaskID().toString()),
|
|
|
conf.getOutputKeyComparator(), reporter);
|
|
|
+
|
|
|
if (null == combinerClass) {
|
|
|
Merger.writeFile(rIter, writer, reporter);
|
|
|
} else {
|
|
@@ -1966,6 +2005,12 @@ class ReduceTask extends Task {
|
|
|
combineAndSpill(rIter, reduceCombineInputCounter);
|
|
|
}
|
|
|
writer.close();
|
|
|
+
|
|
|
+ LOG.info(reduceTask.getTaskID() +
|
|
|
+ " Merge of the " + noInMemorySegments +
|
|
|
+ " files in-memory complete." +
|
|
|
+ " Local file is " + outputPath + " of size " +
|
|
|
+ localFileSys.getFileStatus(outputPath).getLen());
|
|
|
} catch (Exception e) {
|
|
|
//make sure that we delete the ondisk file that we created
|
|
|
//earlier when we invoked cloneFileAttributes
|
|
@@ -1973,12 +2018,8 @@ class ReduceTask extends Task {
|
|
|
throw (IOException)new IOException
|
|
|
("Intermedate merge failed").initCause(e);
|
|
|
}
|
|
|
- LOG.info(reduceTask.getTaskID() +
|
|
|
- " Merge of the " + noInMemorySegments +
|
|
|
- " files in-memory complete." +
|
|
|
- " Local file is " + outputPath + " of size " +
|
|
|
- localFileSys.getFileStatus(outputPath).getLen());
|
|
|
-
|
|
|
+
|
|
|
+ // Note the output of the merge
|
|
|
FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
synchronized (mapOutputFilesOnDisk) {
|
|
|
addToMapOutputFilesOnDisk(status);
|