|
@@ -564,6 +564,12 @@ class ReduceTask extends Task {
|
|
output.close(reducerContext);
|
|
output.close(reducerContext);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static enum CopyOutputErrorType {
|
|
|
|
+ NO_ERROR,
|
|
|
|
+ READ_ERROR,
|
|
|
|
+ OTHER_ERROR
|
|
|
|
+ };
|
|
|
|
+
|
|
class ReduceCopier<K, V> implements MRConstants {
|
|
class ReduceCopier<K, V> implements MRConstants {
|
|
|
|
|
|
/** Reference to the umbilical object */
|
|
/** Reference to the umbilical object */
|
|
@@ -744,6 +750,11 @@ class ReduceTask extends Task {
|
|
*/
|
|
*/
|
|
private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
|
|
private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * The minimum percentage of maps yet to be copied,
|
|
|
|
+ * which indicates end of shuffle
|
|
|
|
+ */
|
|
|
|
+ private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
|
|
/**
|
|
/**
|
|
* Maximum no. of unique maps from which we failed to fetch map-outputs
|
|
* Maximum no. of unique maps from which we failed to fetch map-outputs
|
|
* even after {@link #maxFetchRetriesPerMap} retries; after this the
|
|
* even after {@link #maxFetchRetriesPerMap} retries; after this the
|
|
@@ -857,11 +868,18 @@ class ReduceTask extends Task {
|
|
//a flag signifying whether a copy result is obsolete
|
|
//a flag signifying whether a copy result is obsolete
|
|
private static final int OBSOLETE = -2;
|
|
private static final int OBSOLETE = -2;
|
|
|
|
|
|
|
|
+ private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;
|
|
CopyResult(MapOutputLocation loc, long size) {
|
|
CopyResult(MapOutputLocation loc, long size) {
|
|
this.loc = loc;
|
|
this.loc = loc;
|
|
this.size = size;
|
|
this.size = size;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
|
|
|
|
+ this.loc = loc;
|
|
|
|
+ this.size = size;
|
|
|
|
+ this.error = error;
|
|
|
|
+ }
|
|
|
|
+
|
|
public boolean getSuccess() { return size >= 0; }
|
|
public boolean getSuccess() { return size >= 0; }
|
|
public boolean isObsolete() {
|
|
public boolean isObsolete() {
|
|
return size == OBSOLETE;
|
|
return size == OBSOLETE;
|
|
@@ -869,6 +887,7 @@ class ReduceTask extends Task {
|
|
public long getSize() { return size; }
|
|
public long getSize() { return size; }
|
|
public String getHost() { return loc.getHost(); }
|
|
public String getHost() { return loc.getHost(); }
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
|
|
+ public CopyOutputErrorType getError() { return error; }
|
|
}
|
|
}
|
|
|
|
|
|
private int nextMapOutputCopierId = 0;
|
|
private int nextMapOutputCopierId = 0;
|
|
@@ -1119,6 +1138,7 @@ class ReduceTask extends Task {
|
|
private MapOutputLocation currentLocation = null;
|
|
private MapOutputLocation currentLocation = null;
|
|
private int id = nextMapOutputCopierId++;
|
|
private int id = nextMapOutputCopierId++;
|
|
private Reporter reporter;
|
|
private Reporter reporter;
|
|
|
|
+ private boolean readError = false;
|
|
|
|
|
|
// Decompression of map-outputs
|
|
// Decompression of map-outputs
|
|
private CompressionCodec codec = null;
|
|
private CompressionCodec codec = null;
|
|
@@ -1143,7 +1163,7 @@ class ReduceTask extends Task {
|
|
*/
|
|
*/
|
|
public synchronized boolean fail() {
|
|
public synchronized boolean fail() {
|
|
if (currentLocation != null) {
|
|
if (currentLocation != null) {
|
|
- finish(-1);
|
|
|
|
|
|
+ finish(-1, CopyOutputErrorType.OTHER_ERROR);
|
|
return true;
|
|
return true;
|
|
} else {
|
|
} else {
|
|
return false;
|
|
return false;
|
|
@@ -1161,11 +1181,11 @@ class ReduceTask extends Task {
|
|
currentLocation = loc;
|
|
currentLocation = loc;
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void finish(long size) {
|
|
|
|
|
|
+ private synchronized void finish(long size, CopyOutputErrorType error) {
|
|
if (currentLocation != null) {
|
|
if (currentLocation != null) {
|
|
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
|
|
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
|
|
synchronized (copyResults) {
|
|
synchronized (copyResults) {
|
|
- copyResults.add(new CopyResult(currentLocation, size));
|
|
|
|
|
|
+ copyResults.add(new CopyResult(currentLocation, size, error));
|
|
copyResults.notify();
|
|
copyResults.notify();
|
|
}
|
|
}
|
|
currentLocation = null;
|
|
currentLocation = null;
|
|
@@ -1188,23 +1208,27 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
loc = scheduledCopies.remove(0);
|
|
loc = scheduledCopies.remove(0);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
|
|
|
|
+ readError = false;
|
|
try {
|
|
try {
|
|
shuffleClientMetrics.threadBusy();
|
|
shuffleClientMetrics.threadBusy();
|
|
start(loc);
|
|
start(loc);
|
|
size = copyOutput(loc);
|
|
size = copyOutput(loc);
|
|
shuffleClientMetrics.successFetch();
|
|
shuffleClientMetrics.successFetch();
|
|
|
|
+ error = CopyOutputErrorType.NO_ERROR;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
|
|
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
|
|
loc.getTaskAttemptId() + " from " + loc.getHost());
|
|
loc.getTaskAttemptId() + " from " + loc.getHost());
|
|
LOG.warn(StringUtils.stringifyException(e));
|
|
LOG.warn(StringUtils.stringifyException(e));
|
|
shuffleClientMetrics.failedFetch();
|
|
shuffleClientMetrics.failedFetch();
|
|
-
|
|
|
|
|
|
+ if (readError) {
|
|
|
|
+ error = CopyOutputErrorType.READ_ERROR;
|
|
|
|
+ }
|
|
// Reset
|
|
// Reset
|
|
size = -1;
|
|
size = -1;
|
|
} finally {
|
|
} finally {
|
|
shuffleClientMetrics.threadFree();
|
|
shuffleClientMetrics.threadFree();
|
|
- finish(size);
|
|
|
|
|
|
+ finish(size, error);
|
|
}
|
|
}
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
break; // ALL DONE
|
|
break; // ALL DONE
|
|
@@ -1444,7 +1468,8 @@ class ReduceTask extends Task {
|
|
connection.setConnectTimeout(unit);
|
|
connection.setConnectTimeout(unit);
|
|
while (true) {
|
|
while (true) {
|
|
try {
|
|
try {
|
|
- return connection.getInputStream();
|
|
|
|
|
|
+ connection.connect();
|
|
|
|
+ break;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
// update the total remaining connect-timeout
|
|
// update the total remaining connect-timeout
|
|
connectionTimeout -= unit;
|
|
connectionTimeout -= unit;
|
|
@@ -1463,6 +1488,12 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ try {
|
|
|
|
+ return connection.getInputStream();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ readError = true;
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
|
|
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
|
|
@@ -1548,6 +1579,7 @@ class ReduceTask extends Task {
|
|
IOUtils.cleanup(LOG, input);
|
|
IOUtils.cleanup(LOG, input);
|
|
|
|
|
|
// Re-throw
|
|
// Re-throw
|
|
|
|
+ readError = true;
|
|
throw ioe;
|
|
throw ioe;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1613,7 +1645,13 @@ class ReduceTask extends Task {
|
|
output = rfs.create(localFilename);
|
|
output = rfs.create(localFilename);
|
|
|
|
|
|
byte[] buf = new byte[64 * 1024];
|
|
byte[] buf = new byte[64 * 1024];
|
|
- int n = input.read(buf, 0, buf.length);
|
|
|
|
|
|
+ int n = -1;
|
|
|
|
+ try {
|
|
|
|
+ n = input.read(buf, 0, buf.length);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ readError = true;
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
while (n > 0) {
|
|
while (n > 0) {
|
|
bytesRead += n;
|
|
bytesRead += n;
|
|
shuffleClientMetrics.inputBytes(n);
|
|
shuffleClientMetrics.inputBytes(n);
|
|
@@ -1621,7 +1659,12 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// indicate we're making progress
|
|
// indicate we're making progress
|
|
reporter.progress();
|
|
reporter.progress();
|
|
- n = input.read(buf, 0, buf.length);
|
|
|
|
|
|
+ try {
|
|
|
|
+ n = input.read(buf, 0, buf.length);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ readError = true;
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
LOG.info("Read " + bytesRead + " bytes from map-output for " +
|
|
LOG.info("Read " + bytesRead + " bytes from map-output for " +
|
|
@@ -2037,17 +2080,38 @@ class ReduceTask extends Task {
|
|
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
|
|
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
|
|
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
noFailedFetches + " from " + mapTaskId);
|
|
noFailedFetches + " from " + mapTaskId);
|
|
|
|
+
|
|
|
|
+ // half the number of max fetch retries per map during
|
|
|
|
+ // the end of shuffle
|
|
|
|
+ int fetchRetriesPerMap = maxFetchRetriesPerMap;
|
|
|
|
+ int pendingCopies = numMaps - numCopied;
|
|
|
|
+
|
|
|
|
+ // The check noFailedFetches != maxFetchRetriesPerMap is
|
|
|
|
+ // required to make sure of the notification in case of a
|
|
|
|
+ // corner case :
|
|
|
|
+ // when noFailedFetches reached maxFetchRetriesPerMap and
|
|
|
|
+ // reducer reached the end of shuffle, then we may miss sending
|
|
|
|
+ // a notification if the difference between
|
|
|
|
+ // noFailedFetches and fetchRetriesPerMap is not divisible by 2
|
|
|
|
+ if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
|
|
|
|
+ noFailedFetches != maxFetchRetriesPerMap) {
|
|
|
|
+ fetchRetriesPerMap = fetchRetriesPerMap >> 1;
|
|
|
|
+ }
|
|
|
|
|
|
// did the fetch fail too many times?
|
|
// did the fetch fail too many times?
|
|
// using a hybrid technique for notifying the jobtracker.
|
|
// using a hybrid technique for notifying the jobtracker.
|
|
// a. the first notification is sent after max-retries
|
|
// a. the first notification is sent after max-retries
|
|
// b. subsequent notifications are sent after 2 retries.
|
|
// b. subsequent notifications are sent after 2 retries.
|
|
- if ((noFailedFetches >= maxFetchRetriesPerMap)
|
|
|
|
- && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
|
|
|
|
|
|
+ // c. send notification immediately if it is a read error.
|
|
|
|
+ if (cr.getError().equals(CopyOutputErrorType.READ_ERROR) ||
|
|
|
|
+ ((noFailedFetches >= fetchRetriesPerMap)
|
|
|
|
+ && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
|
|
synchronized (ReduceTask.this) {
|
|
synchronized (ReduceTask.this) {
|
|
taskStatus.addFetchFailedMap(mapTaskId);
|
|
taskStatus.addFetchFailedMap(mapTaskId);
|
|
|
|
+ reporter.progress();
|
|
LOG.info("Failed to fetch map-output from " + mapTaskId +
|
|
LOG.info("Failed to fetch map-output from " + mapTaskId +
|
|
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
|
|
+ + " or it is a read error, "
|
|
+ " reporting to the JobTracker");
|
|
+ " reporting to the JobTracker");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -2103,10 +2167,22 @@ class ReduceTask extends Task {
|
|
// back off exponentially until num_retries <= max_retries
|
|
// back off exponentially until num_retries <= max_retries
|
|
// back off by max_backoff/2 on subsequent failed attempts
|
|
// back off by max_backoff/2 on subsequent failed attempts
|
|
currentTime = System.currentTimeMillis();
|
|
currentTime = System.currentTimeMillis();
|
|
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
|
|
|
|
|
|
+ int currentBackOff = noFailedFetches <= fetchRetriesPerMap
|
|
? BACKOFF_INIT
|
|
? BACKOFF_INIT
|
|
* (1 << (noFailedFetches - 1))
|
|
* (1 << (noFailedFetches - 1))
|
|
: (this.maxBackoff * 1000 / 2);
|
|
: (this.maxBackoff * 1000 / 2);
|
|
|
|
+ // If it is read error,
|
|
|
|
+ // back off for maxMapRuntime/2
|
|
|
|
+ // during end of shuffle,
|
|
|
|
+ // backoff for min(maxMapRuntime/2, currentBackOff)
|
|
|
|
+ if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
|
|
|
|
+ int backOff = maxMapRuntime >> 1;
|
|
|
|
+ if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
|
|
|
|
+ backOff = Math.min(backOff, currentBackOff);
|
|
|
|
+ }
|
|
|
|
+ currentBackOff = backOff;
|
|
|
|
+ }
|
|
|
|
+
|
|
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
cr.getHost() + " to penalty box, next contact in " +
|
|
cr.getHost() + " to penalty box, next contact in " +
|