|
@@ -895,6 +895,7 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
private int nextMapOutputCopierId = 0;
|
|
|
+ private boolean reportReadErrorImmediately;
|
|
|
|
|
|
/**
|
|
|
* Abstraction to track a map-output.
|
|
@@ -1836,6 +1837,8 @@ class ReduceTask extends Task {
|
|
|
);
|
|
|
this.random = new Random(randomSeed);
|
|
|
this.maxMapRuntime = 0;
|
|
|
+ this.reportReadErrorImmediately =
|
|
|
+ conf.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
|
|
|
}
|
|
|
|
|
|
private boolean busyEnough(int numInFlight) {
|
|
@@ -2109,8 +2112,10 @@ class ReduceTask extends Task {
|
|
|
// using a hybrid technique for notifying the jobtracker.
|
|
|
// a. the first notification is sent after max-retries
|
|
|
// b. subsequent notifications are sent after 2 retries.
|
|
|
- // c. send notification immediately if it is a read error.
|
|
|
- if (cr.getError().equals(CopyOutputErrorType.READ_ERROR) ||
|
|
|
+ // c. send notification immediately if it is a read error and
|
|
|
+ // "mapreduce.reduce.shuffle.notify.readerror" set true.
|
|
|
+ if ((reportReadErrorImmediately && cr.getError().equals(
|
|
|
+ CopyOutputErrorType.READ_ERROR)) ||
|
|
|
((noFailedFetches >= fetchRetriesPerMap)
|
|
|
&& ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
|
|
|
synchronized (ReduceTask.this) {
|