|
@@ -49,6 +49,7 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.ChecksumFileSystem;
|
|
|
+import org.apache.hadoop.fs.FSError;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
@@ -358,7 +359,14 @@ class ReduceTask extends Task {
|
|
|
if (!isLocal) {
|
|
|
reduceCopier = new ReduceCopier(umbilical, job);
|
|
|
if (!reduceCopier.fetchOutputs()) {
|
|
|
- throw new IOException(getTaskID() + "The reduce copier failed");
|
|
|
+ if(reduceCopier.mergeThrowable instanceof FSError) {
|
|
|
+ LOG.error("Task: " + getTaskID() + " - FSError: " +
|
|
|
+ StringUtils.stringifyException(reduceCopier.mergeThrowable));
|
|
|
+ umbilical.fsError(getTaskID(),
|
|
|
+ reduceCopier.mergeThrowable.getMessage());
|
|
|
+ }
|
|
|
+ throw new IOException("Task: " + getTaskID() +
|
|
|
+ " - The reduce copier failed", reduceCopier.mergeThrowable);
|
|
|
}
|
|
|
}
|
|
|
copyPhase.complete(); // copy is already complete
|
|
@@ -1079,6 +1087,15 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
return; // ALL DONE
|
|
|
+ } catch (FSError e) {
|
|
|
+ LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ try {
|
|
|
+ umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
|
|
|
+ } catch (IOException io) {
|
|
|
+ LOG.error("Could not notify TT of FSError: " +
|
|
|
+ StringUtils.stringifyException(io));
|
|
|
+ }
|
|
|
} catch (Throwable th) {
|
|
|
LOG.error("Map output copy failure: " +
|
|
|
StringUtils.stringifyException(th));
|