|
@@ -163,6 +163,7 @@ class MapTask extends Task {
|
|
|
throws IOException {
|
|
|
|
|
|
setProgress(getProgress());
|
|
|
+ reportProgress(umbilical);
|
|
|
long beforePos = getPos();
|
|
|
boolean ret = rawIn.next(key, value);
|
|
|
if (ret) {
|
|
@@ -178,22 +179,16 @@ class MapTask extends Task {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- Thread sortProgress = createProgressThread(umbilical);
|
|
|
MapRunnable runner =
|
|
|
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
|
|
|
|
try {
|
|
|
- sortProgress.start();
|
|
|
runner.run(in, collector, reporter);
|
|
|
collector.flush();
|
|
|
} finally {
|
|
|
//close
|
|
|
in.close(); // close input
|
|
|
collector.close();
|
|
|
- sortProgress.interrupt();
|
|
|
- try {
|
|
|
- sortProgress.join();
|
|
|
- } catch (InterruptedException ie){ }
|
|
|
}
|
|
|
done(umbilical);
|
|
|
}
|
|
@@ -220,6 +215,7 @@ class MapTask extends Task {
|
|
|
};
|
|
|
sortProgress.setName("Sort progress reporter for task "+getTaskId());
|
|
|
sortProgress.setDaemon(true);
|
|
|
+ sortProgress.start();
|
|
|
return sortProgress;
|
|
|
}
|
|
|
|
|
@@ -381,10 +377,20 @@ class MapTask extends Task {
|
|
|
for (int i = 0; i < partitions; i++)
|
|
|
totalMem += sortImpl[i].getMemoryUtilized();
|
|
|
if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
|
|
|
- sortAndSpillToDisk();
|
|
|
- keyValBuffer.reset();
|
|
|
- for (int i = 0; i < partitions; i++)
|
|
|
- sortImpl[i].close();
|
|
|
+
|
|
|
+ // Start the progress thread
|
|
|
+ Thread progress = createProgressThread(umbilical);
|
|
|
+
|
|
|
+ try {
|
|
|
+ sortAndSpillToDisk();
|
|
|
+ keyValBuffer.reset();
|
|
|
+ for (int i = 0; i < partitions; i++) {
|
|
|
+ sortImpl[i].close();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // Stop the progress thread
|
|
|
+ progress.interrupt();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -602,13 +608,22 @@ class MapTask extends Task {
|
|
|
}
|
|
|
|
|
|
public void flush() throws IOException {
|
|
|
- //check whether the length of the key/value buffer is 0. If not, then
|
|
|
- //we need to spill that to disk. Note that we reset the key/val buffer
|
|
|
- //upon each spill (so a length > 0 means that we have not spilled yet)
|
|
|
- if (keyValBuffer.getLength() > 0) {
|
|
|
- sortAndSpillToDisk();
|
|
|
+
|
|
|
+ // Start the progress thread
|
|
|
+ Thread progress = createProgressThread(umbilical);
|
|
|
+
|
|
|
+ try {
|
|
|
+ //check whether the length of the key/value buffer is 0. If not, then
|
|
|
+ //we need to spill that to disk. Note that we reset the key/val buffer
|
|
|
+ //upon each spill (so a length > 0 means that we have not spilled yet)
|
|
|
+ if (keyValBuffer.getLength() > 0) {
|
|
|
+ sortAndSpillToDisk();
|
|
|
+ }
|
|
|
+ mergeParts();
|
|
|
+ } finally {
|
|
|
+ // Stop the progress thread
|
|
|
+ progress.interrupt();
|
|
|
}
|
|
|
- mergeParts();
|
|
|
}
|
|
|
}
|
|
|
}
|