|
@@ -163,6 +163,7 @@ class MapTask extends Task {
|
|
|
throws IOException {
|
|
|
|
|
|
setProgress(getProgress());
|
|
|
+ reportProgress(umbilical);
|
|
|
long beforePos = getPos();
|
|
|
boolean ret = rawIn.next(key, value);
|
|
|
if (ret) {
|
|
@@ -178,22 +179,16 @@ class MapTask extends Task {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- Thread sortProgress = createProgressThread(umbilical);
|
|
|
MapRunnable runner =
|
|
|
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
|
|
|
|
try {
|
|
|
- sortProgress.start();
|
|
|
runner.run(in, collector, reporter);
|
|
|
collector.flush();
|
|
|
} finally {
|
|
|
//close
|
|
|
in.close(); // close input
|
|
|
collector.close();
|
|
|
- sortProgress.interrupt();
|
|
|
- try {
|
|
|
- sortProgress.join();
|
|
|
- } catch (InterruptedException ie){ }
|
|
|
}
|
|
|
done(umbilical);
|
|
|
}
|
|
@@ -220,6 +215,7 @@ class MapTask extends Task {
|
|
|
};
|
|
|
sortProgress.setName("Sort progress reporter for task "+getTaskId());
|
|
|
sortProgress.setDaemon(true);
|
|
|
+ sortProgress.start();
|
|
|
return sortProgress;
|
|
|
}
|
|
|
|
|
@@ -273,6 +269,7 @@ class MapTask extends Task {
|
|
|
private Partitioner partitioner;
|
|
|
private JobConf job;
|
|
|
private Reporter reporter;
|
|
|
+ final private TaskUmbilicalProtocol umbilical;
|
|
|
|
|
|
private DataOutputBuffer keyValBuffer; //the buffer where key/val will
|
|
|
//be stored before they are
|
|
@@ -302,6 +299,7 @@ class MapTask extends Task {
|
|
|
|
|
|
this.job = job;
|
|
|
this.reporter = reporter;
|
|
|
+ this.umbilical = umbilical;
|
|
|
this.comparator = job.getOutputKeyComparator();
|
|
|
this.keyClass = job.getMapOutputKeyClass();
|
|
|
this.valClass = job.getMapOutputValueClass();
|
|
@@ -324,6 +322,7 @@ class MapTask extends Task {
|
|
|
job.getClass("map.sort.class", MergeSorter.class,
|
|
|
BufferSorter.class), job);
|
|
|
}
|
|
|
+
|
|
|
private void startPartition(int partNumber) throws IOException {
|
|
|
//We create the sort output as multiple sequence files within a spilled
|
|
|
//file. So we create a writer for each partition.
|
|
@@ -376,10 +375,20 @@ class MapTask extends Task {
|
|
|
for (int i = 0; i < partitions; i++)
|
|
|
totalMem += sortImpl[i].getMemoryUtilized();
|
|
|
if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
|
|
|
- sortAndSpillToDisk();
|
|
|
- keyValBuffer.reset();
|
|
|
- for (int i = 0; i < partitions; i++)
|
|
|
- sortImpl[i].close();
|
|
|
+
|
|
|
+ // Start the progress thread
|
|
|
+ Thread progress = createProgressThread(umbilical);
|
|
|
+
|
|
|
+ try {
|
|
|
+ sortAndSpillToDisk();
|
|
|
+ keyValBuffer.reset();
|
|
|
+ for (int i = 0; i < partitions; i++) {
|
|
|
+ sortImpl[i].close();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // Stop the progress thread
|
|
|
+ progress.interrupt();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -597,13 +606,22 @@ class MapTask extends Task {
|
|
|
}
|
|
|
|
|
|
public void flush() throws IOException {
|
|
|
- //check whether the length of the key/value buffer is 0. If not, then
|
|
|
- //we need to spill that to disk. Note that we reset the key/val buffer
|
|
|
- //upon each spill (so a length > 0 means that we have not spilled yet)
|
|
|
- if (keyValBuffer.getLength() > 0) {
|
|
|
- sortAndSpillToDisk();
|
|
|
+
|
|
|
+ // Start the progress thread
|
|
|
+ Thread progress = createProgressThread(umbilical);
|
|
|
+
|
|
|
+ try {
|
|
|
+ //check whether the length of the key/value buffer is 0. If not, then
|
|
|
+ //we need to spill that to disk. Note that we reset the key/val buffer
|
|
|
+ //upon each spill (so a length > 0 means that we have not spilled yet)
|
|
|
+ if (keyValBuffer.getLength() > 0) {
|
|
|
+ sortAndSpillToDisk();
|
|
|
+ }
|
|
|
+ mergeParts();
|
|
|
+ } finally {
|
|
|
+ // Stop the progress thread
|
|
|
+ progress.interrupt();
|
|
|
}
|
|
|
- mergeParts();
|
|
|
}
|
|
|
}
|
|
|
}
|