|
@@ -74,8 +74,6 @@ import org.apache.hadoop.metrics.MetricsContext;
|
|
|
import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
import org.apache.hadoop.metrics.MetricsUtil;
|
|
|
import org.apache.hadoop.metrics.Updater;
|
|
|
-import org.apache.hadoop.net.ConnTimeoutException;
|
|
|
-import org.apache.hadoop.net.ReadTimeoutException;
|
|
|
import org.apache.hadoop.util.Progress;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
@@ -96,12 +94,6 @@ class ReduceTask extends Task {
|
|
|
private int numMaps;
|
|
|
private ReduceCopier reduceCopier;
|
|
|
|
|
|
- private static enum CopyOutputErrorType {
|
|
|
- NO_ERROR,
|
|
|
- CONNECTION_ERROR,
|
|
|
- READ_ERROR
|
|
|
- }
|
|
|
-
|
|
|
private CompressionCodec codec;
|
|
|
|
|
|
|
|
@@ -110,7 +102,6 @@ class ReduceTask extends Task {
|
|
|
setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
|
|
|
}
|
|
|
|
|
|
-
|
|
|
private Progress copyPhase;
|
|
|
private Progress sortPhase;
|
|
|
private Progress reducePhase;
|
|
@@ -637,15 +628,9 @@ class ReduceTask extends Task {
|
|
|
Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();
|
|
|
|
|
|
/**
|
|
|
- * A map of taskId -> no. of failed fetches in connect
|
|
|
- */
|
|
|
- Map<TaskAttemptID, Integer> mapTaskToConnectFailedFetchesMap =
|
|
|
- new HashMap<TaskAttemptID, Integer>();
|
|
|
-
|
|
|
- /**
|
|
|
- * A map of taskId -> no. of failed fetches in read
|
|
|
+ * A map of taskId -> no. of failed fetches
|
|
|
*/
|
|
|
- Map<TaskAttemptID, Integer> mapTaskToReadFailedFetchesMap =
|
|
|
+ Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap =
|
|
|
new HashMap<TaskAttemptID, Integer>();
|
|
|
|
|
|
/**
|
|
@@ -665,7 +650,6 @@ class ReduceTask extends Task {
|
|
|
Collections.synchronizedList(new LinkedList<MapOutput>());
|
|
|
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* This class contains the methods that should be used for metrics-reporting
|
|
|
* the specific metrics for shuffle. This class actually reports the
|
|
@@ -728,8 +712,7 @@ class ReduceTask extends Task {
|
|
|
/** Represents the result of an attempt to copy a map output */
|
|
|
private class CopyResult {
|
|
|
|
|
|
-
|
|
|
- // the map output location against which a copy attempt was made
|
|
|
+ // the map output location against which a copy attempt was made
|
|
|
private final MapOutputLocation loc;
|
|
|
|
|
|
// the size of the file copied, -1 if the transfer failed
|
|
@@ -737,14 +720,10 @@ class ReduceTask extends Task {
|
|
|
|
|
|
//a flag signifying whether a copy result is obsolete
|
|
|
private static final int OBSOLETE = -2;
|
|
|
-
|
|
|
- CopyOutputErrorType errorType;
|
|
|
-
|
|
|
- CopyResult(MapOutputLocation loc, long size,
|
|
|
- CopyOutputErrorType errorType) {
|
|
|
+
|
|
|
+ CopyResult(MapOutputLocation loc, long size) {
|
|
|
this.loc = loc;
|
|
|
this.size = size;
|
|
|
- this.errorType = errorType;
|
|
|
}
|
|
|
|
|
|
public boolean getSuccess() { return size >= 0; }
|
|
@@ -753,9 +732,6 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
public long getSize() { return size; }
|
|
|
public String getHost() { return loc.getHost(); }
|
|
|
- public CopyOutputErrorType getErrorType() {
|
|
|
- return ((size < 0) ? errorType: CopyOutputErrorType.NO_ERROR);
|
|
|
- }
|
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
|
}
|
|
|
|
|
@@ -1025,6 +1001,19 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Fail the current file that we are fetching
|
|
|
+ * @return were we currently fetching?
|
|
|
+ */
|
|
|
+ public synchronized boolean fail() {
|
|
|
+ if (currentLocation != null) {
|
|
|
+ finish(-1);
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get the current map output location.
|
|
|
*/
|
|
@@ -1036,12 +1025,11 @@ class ReduceTask extends Task {
|
|
|
currentLocation = loc;
|
|
|
}
|
|
|
|
|
|
- private synchronized void finish(long size,
|
|
|
- CopyOutputErrorType errorType) {
|
|
|
+ private synchronized void finish(long size) {
|
|
|
if (currentLocation != null) {
|
|
|
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
|
|
|
synchronized (copyResults) {
|
|
|
- copyResults.add(new CopyResult(currentLocation, size, errorType));
|
|
|
+ copyResults.add(new CopyResult(currentLocation, size));
|
|
|
copyResults.notify();
|
|
|
}
|
|
|
currentLocation = null;
|
|
@@ -1057,7 +1045,6 @@ class ReduceTask extends Task {
|
|
|
try {
|
|
|
MapOutputLocation loc = null;
|
|
|
long size = -1;
|
|
|
- CopyOutputErrorType errorType = CopyOutputErrorType.NO_ERROR;
|
|
|
|
|
|
synchronized (scheduledCopies) {
|
|
|
while (scheduledCopies.isEmpty()) {
|
|
@@ -1079,18 +1066,9 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// Reset
|
|
|
size = -1;
|
|
|
-
|
|
|
- // Identify the error type
|
|
|
- if (e.getClass() == ConnTimeoutException.class) {
|
|
|
- errorType = CopyOutputErrorType.CONNECTION_ERROR;
|
|
|
- }
|
|
|
- else if (e.getClass() == ReadTimeoutException.class) {
|
|
|
- errorType = CopyOutputErrorType.READ_ERROR;
|
|
|
- }
|
|
|
-
|
|
|
} finally {
|
|
|
shuffleClientMetrics.threadFree();
|
|
|
- finish(size, errorType);
|
|
|
+ finish(size);
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
return; // ALL DONE
|
|
@@ -1276,17 +1254,26 @@ class ReduceTask extends Task {
|
|
|
connection.setReadTimeout(readTimeout);
|
|
|
// set the connect timeout to the unit-connect-timeout
|
|
|
connection.setConnectTimeout(unit);
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ return connection.getInputStream();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ // update the total remaining connect-timeout
|
|
|
+ connectionTimeout -= unit;
|
|
|
|
|
|
- try {
|
|
|
- connection.connect();
|
|
|
- } catch (IOException ioe) {
|
|
|
- throw new ConnTimeoutException("Connection Timed out");
|
|
|
- }
|
|
|
+ // throw an exception if we have waited for timeout amount of time
|
|
|
+ // note that the updated value if timeout is used here
|
|
|
+ if (connectionTimeout == 0) {
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- return connection.getInputStream();
|
|
|
- } catch (IOException ioe) {
|
|
|
- throw new ReadTimeoutException("Read Timed out");
|
|
|
+ // reset the connect timeout for the last try
|
|
|
+ if (connectionTimeout < unit) {
|
|
|
+ unit = connectionTimeout;
|
|
|
+ // reset the connect time out for the final connect
|
|
|
+ connection.setConnectTimeout(unit);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1857,61 +1844,33 @@ class ReduceTask extends Task {
|
|
|
cr.getHost());
|
|
|
} else {
|
|
|
retryFetches.add(cr.getLocation());
|
|
|
-
|
|
|
- CopyOutputErrorType errorType = cr.getErrorType();
|
|
|
|
|
|
// note the failed-fetch
|
|
|
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
|
|
|
TaskID mapId = cr.getLocation().getTaskId();
|
|
|
|
|
|
totalFailures++;
|
|
|
-
|
|
|
- Integer noFailedFetches = 0;
|
|
|
-
|
|
|
- Integer noReadFailedFetches =
|
|
|
- mapTaskToReadFailedFetchesMap.get(mapTaskId);
|
|
|
-
|
|
|
- if (noReadFailedFetches == null) noReadFailedFetches = 0;
|
|
|
-
|
|
|
- Integer noConnectFailedFetches =
|
|
|
- mapTaskToConnectFailedFetchesMap.get(mapTaskId);
|
|
|
-
|
|
|
- if (noConnectFailedFetches == null) noConnectFailedFetches = 0;
|
|
|
-
|
|
|
- if (errorType == CopyOutputErrorType.READ_ERROR) {
|
|
|
- noReadFailedFetches ++;
|
|
|
- mapTaskToReadFailedFetchesMap.put (mapTaskId,
|
|
|
- noReadFailedFetches);
|
|
|
+ Integer noFailedFetches =
|
|
|
+ mapTaskToFailedFetchesMap.get(mapTaskId);
|
|
|
+ noFailedFetches =
|
|
|
+ (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
|
|
|
+ mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
|
|
|
+ LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
|
+ noFailedFetches + " from " + mapTaskId);
|
|
|
+
|
|
|
+ // did the fetch fail too many times?
|
|
|
+ // using a hybrid technique for notifying the jobtracker.
|
|
|
+ // a. the first notification is sent after max-retries
|
|
|
+ // b. subsequent notifications are sent after 2 retries.
|
|
|
+ if ((noFailedFetches >= maxFetchRetriesPerMap)
|
|
|
+ && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
|
|
|
synchronized (ReduceTask.this) {
|
|
|
taskStatus.addFetchFailedMap(mapTaskId);
|
|
|
LOG.info("Failed to fetch map-output from " + mapTaskId +
|
|
|
- " Got a Read Time out," +
|
|
|
- " reporting to the JobTracker");
|
|
|
- }
|
|
|
- } else if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
|
|
|
- noConnectFailedFetches ++;
|
|
|
- mapTaskToConnectFailedFetchesMap.put (
|
|
|
- mapTaskId, noConnectFailedFetches);
|
|
|
-
|
|
|
- LOG.info("Task " + getTaskID() + ": Failed fetch #"
|
|
|
- + noConnectFailedFetches + " from " + mapTaskId);
|
|
|
-
|
|
|
- if ((noConnectFailedFetches >= maxFetchRetriesPerMap) &&
|
|
|
- ((noConnectFailedFetches - maxFetchRetriesPerMap) % 2)
|
|
|
- == 0) {
|
|
|
- synchronized (ReduceTask.this) {
|
|
|
- taskStatus.addFetchFailedMap(mapTaskId);
|
|
|
- LOG.info("Failed to fetch map-output from " + mapTaskId
|
|
|
- + " even after MAX_FETCH_RETRIES_PER_MAP"
|
|
|
- + " (connect) retries... "
|
|
|
- + " reporting to the JobTracker");
|
|
|
- }
|
|
|
+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
|
+ + " reporting to the JobTracker");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- noFailedFetches = noConnectFailedFetches +
|
|
|
- noReadFailedFetches;
|
|
|
-
|
|
|
// note unique failed-fetch maps
|
|
|
if (noFailedFetches == maxFetchRetriesPerMap) {
|
|
|
fetchFailedMaps.add(mapId);
|
|
@@ -1960,32 +1919,22 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
|
|
|
- // back off exponentially until num_retries <= max_retries
|
|
|
- // back off by max_backoff/2 on subsequent failed attempts
|
|
|
- currentTime = System.currentTimeMillis();
|
|
|
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
|
|
|
+ // back off exponentially until num_retries <= max_retries
|
|
|
+ // back off by max_backoff/2 on subsequent failed attempts
|
|
|
+ currentTime = System.currentTimeMillis();
|
|
|
+ int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
|
|
|
? BACKOFF_INIT
|
|
|
* (1 << (noFailedFetches - 1))
|
|
|
: (this.maxBackoff * 1000 / 2);
|
|
|
- penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
|
- LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
|
+ penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
|
+ LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
|
cr.getHost() + " to penalty box, next contact in " +
|
|
|
(currentBackOff/1000) + " seconds");
|
|
|
- } else if (errorType == CopyOutputErrorType.READ_ERROR) {
|
|
|
- int backOff = Math.max(maxMapRuntime/2,
|
|
|
- (this.maxBackoff * 1000));
|
|
|
- penaltyBox.put(cr.getHost(), currentTime + backOff);
|
|
|
- LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
|
- cr.getHost() + " to penalty box, next contact in " +
|
|
|
- (backOff/1000) + " seconds");
|
|
|
- }
|
|
|
-
|
|
|
- } // Fetch Failure
|
|
|
+ }
|
|
|
uniqueHosts.remove(cr.getHost());
|
|
|
numInFlight--;
|
|
|
- } // while (numInFlight > 0)
|
|
|
- } // while (copiedMaps < numMaps)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
// all done, inform the copiers to exit
|
|
|
synchronized (copiers) {
|