|
@@ -288,6 +288,22 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
void waitOutputThreads() {
|
|
|
try {
|
|
|
+ if (outThread_ == null) {
|
|
|
+ // This happens only when reducer has empty input(So reduce() is not
|
|
|
+ // called at all in this task). If reducer still generates output,
|
|
|
+ // which is very uncommon and we may not have to support this case.
|
|
|
+ // So we don't write this output to HDFS, but we consume/collect
|
|
|
+ // this output just to avoid reducer hanging forever.
|
|
|
+
|
|
|
+ OutputCollector collector = new OutputCollector() {
|
|
|
+ public void collect(Object key, Object value)
|
|
|
+ throws IOException {
|
|
|
+ //just consume it, no need to write the record anywhere
|
|
|
+ }
|
|
|
+ };
|
|
|
+ Reporter reporter = Reporter.NULL;//dummy reporter
|
|
|
+ startOutputThreads(collector, reporter);
|
|
|
+ }
|
|
|
int exitVal = sim.waitFor();
|
|
|
// how'd it go?
|
|
|
if (exitVal != 0) {
|
|
@@ -506,9 +522,11 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
|
|
|
public void mapRedFinished() {
|
|
|
- logprintln("mapRedFinished");
|
|
|
try {
|
|
|
- if (!doPipe_) return;
|
|
|
+ if (!doPipe_) {
|
|
|
+ logprintln("mapRedFinished");
|
|
|
+ return;
|
|
|
+ }
|
|
|
try {
|
|
|
if (clientOut_ != null) {
|
|
|
clientOut_.flush();
|
|
@@ -518,6 +536,7 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
waitOutputThreads();
|
|
|
if (sim != null) sim.destroy();
|
|
|
+ logprintln("mapRedFinished");
|
|
|
} catch (RuntimeException e) {
|
|
|
logStackTrace(e);
|
|
|
throw e;
|