|
@@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
|
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
|
|
|
-@SuppressWarnings("deprecation")
|
|
|
|
class EventFetcher<K,V> extends Thread {
|
|
class EventFetcher<K,V> extends Thread {
|
|
private static final long SLEEP_TIME = 1000;
|
|
private static final long SLEEP_TIME = 1000;
|
|
- private static final int MAX_EVENTS_TO_FETCH = 10000;
|
|
|
|
private static final int MAX_RETRIES = 10;
|
|
private static final int MAX_RETRIES = 10;
|
|
private static final int RETRY_PERIOD = 5000;
|
|
private static final int RETRY_PERIOD = 5000;
|
|
private static final Log LOG = LogFactory.getLog(EventFetcher.class);
|
|
private static final Log LOG = LogFactory.getLog(EventFetcher.class);
|
|
@@ -38,7 +36,8 @@ class EventFetcher<K,V> extends Thread {
|
|
private final TaskAttemptID reduce;
|
|
private final TaskAttemptID reduce;
|
|
private final TaskUmbilicalProtocol umbilical;
|
|
private final TaskUmbilicalProtocol umbilical;
|
|
private final ShuffleScheduler<K,V> scheduler;
|
|
private final ShuffleScheduler<K,V> scheduler;
|
|
- private int fromEventId = 0;
|
|
|
|
|
|
+ private int fromEventIdx = 0;
|
|
|
|
+ private int maxEventsToFetch;
|
|
private ExceptionReporter exceptionReporter = null;
|
|
private ExceptionReporter exceptionReporter = null;
|
|
|
|
|
|
private int maxMapRuntime = 0;
|
|
private int maxMapRuntime = 0;
|
|
@@ -48,13 +47,15 @@ class EventFetcher<K,V> extends Thread {
|
|
public EventFetcher(TaskAttemptID reduce,
|
|
public EventFetcher(TaskAttemptID reduce,
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskUmbilicalProtocol umbilical,
|
|
ShuffleScheduler<K,V> scheduler,
|
|
ShuffleScheduler<K,V> scheduler,
|
|
- ExceptionReporter reporter) {
|
|
|
|
|
|
+ ExceptionReporter reporter,
|
|
|
|
+ int maxEventsToFetch) {
|
|
setName("EventFetcher for fetching Map Completion Events");
|
|
setName("EventFetcher for fetching Map Completion Events");
|
|
setDaemon(true);
|
|
setDaemon(true);
|
|
this.reduce = reduce;
|
|
this.reduce = reduce;
|
|
this.umbilical = umbilical;
|
|
this.umbilical = umbilical;
|
|
this.scheduler = scheduler;
|
|
this.scheduler = scheduler;
|
|
exceptionReporter = reporter;
|
|
exceptionReporter = reporter;
|
|
|
|
+ this.maxEventsToFetch = maxEventsToFetch;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -112,46 +113,47 @@ class EventFetcher<K,V> extends Thread {
|
|
* from a given event ID.
|
|
* from a given event ID.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private int getMapCompletionEvents() throws IOException {
|
|
|
|
|
|
+ protected int getMapCompletionEvents() throws IOException {
|
|
|
|
|
|
int numNewMaps = 0;
|
|
int numNewMaps = 0;
|
|
-
|
|
|
|
- MapTaskCompletionEventsUpdate update =
|
|
|
|
- umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID)
|
|
|
|
- reduce.getJobID(),
|
|
|
|
- fromEventId,
|
|
|
|
- MAX_EVENTS_TO_FETCH,
|
|
|
|
- (org.apache.hadoop.mapred.TaskAttemptID)
|
|
|
|
- reduce);
|
|
|
|
- TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
|
|
|
|
- LOG.debug("Got " + events.length + " map completion events from " +
|
|
|
|
- fromEventId);
|
|
|
|
-
|
|
|
|
- // Check if the reset is required.
|
|
|
|
- // Since there is no ordering of the task completion events at the
|
|
|
|
- // reducer, the only option to sync with the new jobtracker is to reset
|
|
|
|
- // the events index
|
|
|
|
- if (update.shouldReset()) {
|
|
|
|
- fromEventId = 0;
|
|
|
|
- scheduler.resetKnownMaps();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Update the last seen event ID
|
|
|
|
- fromEventId += events.length;
|
|
|
|
-
|
|
|
|
- // Process the TaskCompletionEvents:
|
|
|
|
- // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
|
|
|
|
- // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
|
|
|
|
- // fetching from those maps.
|
|
|
|
- // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
|
|
|
|
- // outputs at all.
|
|
|
|
- for (TaskCompletionEvent event : events) {
|
|
|
|
- switch (event.getTaskStatus()) {
|
|
|
|
|
|
+ TaskCompletionEvent events[] = null;
|
|
|
|
+
|
|
|
|
+ do {
|
|
|
|
+ MapTaskCompletionEventsUpdate update =
|
|
|
|
+ umbilical.getMapCompletionEvents(
|
|
|
|
+ (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
|
|
|
|
+ fromEventIdx,
|
|
|
|
+ maxEventsToFetch,
|
|
|
|
+ (org.apache.hadoop.mapred.TaskAttemptID)reduce);
|
|
|
|
+ events = update.getMapTaskCompletionEvents();
|
|
|
|
+ LOG.debug("Got " + events.length + " map completion events from " +
|
|
|
|
+ fromEventIdx);
|
|
|
|
+
|
|
|
|
+ // Check if the reset is required.
|
|
|
|
+ // Since there is no ordering of the task completion events at the
|
|
|
|
+ // reducer, the only option to sync with the new jobtracker is to reset
|
|
|
|
+ // the events index
|
|
|
|
+ if (update.shouldReset()) {
|
|
|
|
+ fromEventIdx = 0;
|
|
|
|
+ scheduler.resetKnownMaps();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Update the last seen event ID
|
|
|
|
+ fromEventIdx += events.length;
|
|
|
|
+
|
|
|
|
+ // Process the TaskCompletionEvents:
|
|
|
|
+ // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
|
|
|
|
+ // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
|
|
|
|
+ // fetching from those maps.
|
|
|
|
+ // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
|
|
|
|
+ // outputs at all.
|
|
|
|
+ for (TaskCompletionEvent event : events) {
|
|
|
|
+ switch (event.getTaskStatus()) {
|
|
case SUCCEEDED:
|
|
case SUCCEEDED:
|
|
URI u = getBaseURI(event.getTaskTrackerHttp());
|
|
URI u = getBaseURI(event.getTaskTrackerHttp());
|
|
scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
|
|
scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
|
|
- u.toString(),
|
|
|
|
- event.getTaskAttemptId());
|
|
|
|
|
|
+ u.toString(),
|
|
|
|
+ event.getTaskAttemptId());
|
|
numNewMaps ++;
|
|
numNewMaps ++;
|
|
int duration = event.getTaskRunTime();
|
|
int duration = event.getTaskRunTime();
|
|
if (duration > maxMapRuntime) {
|
|
if (duration > maxMapRuntime) {
|
|
@@ -164,15 +166,17 @@ class EventFetcher<K,V> extends Thread {
|
|
case OBSOLETE:
|
|
case OBSOLETE:
|
|
scheduler.obsoleteMapOutput(event.getTaskAttemptId());
|
|
scheduler.obsoleteMapOutput(event.getTaskAttemptId());
|
|
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
|
|
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
|
|
- " map-task: '" + event.getTaskAttemptId() + "'");
|
|
|
|
|
|
+ " map-task: '" + event.getTaskAttemptId() + "'");
|
|
break;
|
|
break;
|
|
case TIPFAILED:
|
|
case TIPFAILED:
|
|
scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
|
|
scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
|
|
LOG.info("Ignoring output of failed map TIP: '" +
|
|
LOG.info("Ignoring output of failed map TIP: '" +
|
|
- event.getTaskAttemptId() + "'");
|
|
|
|
|
|
+ event.getTaskAttemptId() + "'");
|
|
break;
|
|
break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ } while (events.length == maxEventsToFetch);
|
|
|
|
+
|
|
return numNewMaps;
|
|
return numNewMaps;
|
|
}
|
|
}
|
|
|
|
|