|
@@ -339,6 +339,7 @@ class ReduceTask extends Task {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
|
|
|
throws IOException, InterruptedException, ClassNotFoundException {
|
|
|
+ this.umbilical = umbilical;
|
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
|
|
|
|
if (isMapOrReduce()) {
|
|
@@ -374,10 +375,7 @@ class ReduceTask extends Task {
|
|
|
reduceCopier = new ReduceCopier(umbilical, job, reporter);
|
|
|
if (!reduceCopier.fetchOutputs()) {
|
|
|
if(reduceCopier.mergeThrowable instanceof FSError) {
|
|
|
- LOG.error("Task: " + getTaskID() + " - FSError: " +
|
|
|
- StringUtils.stringifyException(reduceCopier.mergeThrowable));
|
|
|
- umbilical.fsError(getTaskID(),
|
|
|
- reduceCopier.mergeThrowable.getMessage());
|
|
|
+ throw (FSError)reduceCopier.mergeThrowable;
|
|
|
}
|
|
|
throw new IOException("Task: " + getTaskID() +
|
|
|
" - The reduce copier failed", reduceCopier.mergeThrowable);
|
|
@@ -1220,8 +1218,9 @@ class ReduceTask extends Task {
|
|
|
StringUtils.stringifyException(io));
|
|
|
}
|
|
|
} catch (Throwable th) {
|
|
|
- LOG.error("Map output copy failure: " +
|
|
|
- StringUtils.stringifyException(th));
|
|
|
+ String msg = getTaskID() + " : Map output copy failure : "
|
|
|
+ + StringUtils.stringifyException(th);
|
|
|
+ reportFatalError(getTaskID(), th, msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1654,10 +1653,14 @@ class ReduceTask extends Task {
|
|
|
if (bytesRead != mapOutputLength) {
|
|
|
try {
|
|
|
mapOutput.discard();
|
|
|
- } catch (Throwable th) {
|
|
|
+ } catch (Exception ioe) {
|
|
|
// IGNORED because we are cleaning up
|
|
|
LOG.info("Failed to discard map-output from " +
|
|
|
- mapOutputLoc.getTaskAttemptId(), th);
|
|
|
+ mapOutputLoc.getTaskAttemptId(), ioe);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ String msg = getTaskID() + " : Failed in shuffle to disk :"
|
|
|
+ + StringUtils.stringifyException(t);
|
|
|
+ reportFatalError(getTaskID(), t, msg);
|
|
|
}
|
|
|
mapOutput = null;
|
|
|
|
|
@@ -2119,9 +2122,9 @@ class ReduceTask extends Task {
|
|
|
try {
|
|
|
getMapEventsThread.join();
|
|
|
LOG.info("getMapsEventsThread joined.");
|
|
|
- } catch (Throwable t) {
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
LOG.info("getMapsEventsThread threw an exception: " +
|
|
|
- StringUtils.stringifyException(t));
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
}
|
|
|
|
|
|
synchronized (copiers) {
|
|
@@ -2153,13 +2156,13 @@ class ReduceTask extends Task {
|
|
|
inMemFSMergeThread.join();
|
|
|
LOG.info("In-memory merge complete: " +
|
|
|
mapOutputsFilesInMemory.size() + " files left.");
|
|
|
- } catch (Throwable t) {
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
|
- StringUtils.stringifyException(t));
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
// check if the last merge generated an error
|
|
|
if (mergeThrowable != null) {
|
|
|
- mergeThrowable = t;
|
|
|
+ mergeThrowable = ie;
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -2466,14 +2469,18 @@ class ReduceTask extends Task {
|
|
|
" Local output file is " + outputPath + " of size " +
|
|
|
localFileSys.getFileStatus(outputPath).getLen());
|
|
|
}
|
|
|
- } catch (Throwable t) {
|
|
|
+ } catch (Exception e) {
|
|
|
LOG.warn(reduceTask.getTaskID()
|
|
|
+ " Merging of the local FS files threw an exception: "
|
|
|
- + StringUtils.stringifyException(t));
|
|
|
+ + StringUtils.stringifyException(e));
|
|
|
if (mergeThrowable == null) {
|
|
|
- mergeThrowable = t;
|
|
|
+ mergeThrowable = e;
|
|
|
}
|
|
|
- }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ String msg = getTaskID() + " : Failed to merge on the local FS"
|
|
|
+ + StringUtils.stringifyException(t);
|
|
|
+ reportFatalError(getTaskID(), t, msg);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2494,11 +2501,15 @@ class ReduceTask extends Task {
|
|
|
doInMemMerge();
|
|
|
}
|
|
|
} while (!exit);
|
|
|
- } catch (Throwable t) {
|
|
|
+ } catch (Exception e) {
|
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
|
" Merge of the inmemory files threw an exception: "
|
|
|
- + StringUtils.stringifyException(t));
|
|
|
- ReduceCopier.this.mergeThrowable = t;
|
|
|
+ + StringUtils.stringifyException(e));
|
|
|
+ ReduceCopier.this.mergeThrowable = e;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ String msg = getTaskID() + " : Failed to merge in memory"
|
|
|
+ + StringUtils.stringifyException(t);
|
|
|
+ reportFatalError(getTaskID(), t, msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2605,9 +2616,10 @@ class ReduceTask extends Task {
|
|
|
return;
|
|
|
}
|
|
|
catch (Throwable t) {
|
|
|
- LOG.warn(reduceTask.getTaskID() +
|
|
|
- " GetMapEventsThread Ignoring exception : " +
|
|
|
- StringUtils.stringifyException(t));
|
|
|
+ String msg = reduceTask.getTaskID()
|
|
|
+ + " GetMapEventsThread Ignoring exception : "
|
|
|
+ + StringUtils.stringifyException(t);
|
|
|
+ reportFatalError(getTaskID(), t, msg);
|
|
|
}
|
|
|
} while (!exitGetMapEvents);
|
|
|
|