|
@@ -44,6 +44,7 @@ import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedSet;
|
|
|
import java.util.TreeSet;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -513,11 +514,6 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private Set<String> uniqueHosts;
|
|
|
|
|
|
- /**
|
|
|
- * the last time we polled the job tracker
|
|
|
- */
|
|
|
- private long lastPollTime;
|
|
|
-
|
|
|
/**
|
|
|
* A reference to the RamManager for writing the map outputs to.
|
|
|
*/
|
|
@@ -544,6 +540,11 @@ class ReduceTask extends Task {
|
|
|
* A flag to indicate when to exit localFS merge
|
|
|
*/
|
|
|
private volatile boolean exitLocalFSMerge = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A flag to indicate when to exit getMapEvents thread
|
|
|
+ */
|
|
|
+ private volatile boolean exitGetMapEvents = false;
|
|
|
|
|
|
/**
|
|
|
* When we accumulate maxInMemOutputs number of files in ram, we merge/spill
|
|
@@ -604,7 +605,7 @@ class ReduceTask extends Task {
|
|
|
/**
|
|
|
* Maximum number of fetch-retries per-map.
|
|
|
*/
|
|
|
- private int maxFetchRetriesPerMap;
|
|
|
+ private volatile int maxFetchRetriesPerMap;
|
|
|
|
|
|
/**
|
|
|
* Combiner class to run during in-memory merge, if defined.
|
|
@@ -671,7 +672,13 @@ class ReduceTask extends Task {
|
|
|
private final List<MapOutput> mapOutputsFilesInMemory =
|
|
|
Collections.synchronizedList(new LinkedList<MapOutput>());
|
|
|
|
|
|
-
|
|
|
+ /**
|
|
|
+ * The map for (Hosts, List of MapIds from this Host) maintaining
|
|
|
+ * map output locations
|
|
|
+ */
|
|
|
+ private final Map<String, List<MapOutputLocation>> mapLocations =
|
|
|
+ new ConcurrentHashMap<String, List<MapOutputLocation>>();
|
|
|
+
|
|
|
/**
|
|
|
* This class contains the methods that should be used for metrics-reporting
|
|
|
* the specific metrics for shuffle. This class actually reports the
|
|
@@ -1611,9 +1618,6 @@ class ReduceTask extends Task {
|
|
|
// hostnames
|
|
|
this.uniqueHosts = new HashSet<String>();
|
|
|
|
|
|
- this.lastPollTime = 0;
|
|
|
-
|
|
|
-
|
|
|
// Seed the random number generator with a reasonably globally unique seed
|
|
|
long randomSeed = System.nanoTime() +
|
|
|
(long)Math.pow(this.reduceTask.getPartition(),
|
|
@@ -1630,9 +1634,6 @@ class ReduceTask extends Task {
|
|
|
long reducerInputBytes = 0;
|
|
|
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
|
- //The map for (Hosts, List of MapIds from this Host)
|
|
|
- HashMap<String, List<MapOutputLocation>> mapLocations =
|
|
|
- new HashMap<String, List<MapOutputLocation>>();
|
|
|
int totalFailures = 0;
|
|
|
int numInFlight = 0, numCopied = 0;
|
|
|
long bytesTransferred = 0;
|
|
@@ -1641,6 +1642,7 @@ class ReduceTask extends Task {
|
|
|
reduceTask.getProgress().phase();
|
|
|
LocalFSMerger localFSMergerThread = null;
|
|
|
InMemFSMergeThread inMemFSMergeThread = null;
|
|
|
+ GetMapEventsThread getMapEventsThread = null;
|
|
|
|
|
|
for (int i = 0; i < numMaps; i++) {
|
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
@@ -1664,15 +1666,15 @@ class ReduceTask extends Task {
|
|
|
localFSMergerThread.start();
|
|
|
inMemFSMergeThread.start();
|
|
|
|
|
|
+ // start the map events thread
|
|
|
+ getMapEventsThread = new GetMapEventsThread();
|
|
|
+ getMapEventsThread.start();
|
|
|
+
|
|
|
// start the clock for bandwidth measurement
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
long currentTime = startTime;
|
|
|
long lastProgressTime = startTime;
|
|
|
long lastOutputTime = 0;
|
|
|
- IntWritable fromEventId = new IntWritable(0);
|
|
|
-
|
|
|
- //List of unique hosts containing map outputs
|
|
|
- List<String> hostList = new ArrayList<String>();
|
|
|
|
|
|
// loop until we get all required outputs
|
|
|
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
|
|
@@ -1688,73 +1690,43 @@ class ReduceTask extends Task {
|
|
|
+ (numMaps - copiedMapOutputs.size()) + " map output(s) "
|
|
|
+ "where " + numInFlight + " is already in progress");
|
|
|
}
|
|
|
-
|
|
|
- try {
|
|
|
- // Put the hash entries for the failed fetches.
|
|
|
- Iterator<MapOutputLocation> locItr = retryFetches.iterator();
|
|
|
- while (locItr.hasNext()) {
|
|
|
- MapOutputLocation loc = locItr.next();
|
|
|
- List<MapOutputLocation> locList =
|
|
|
- mapLocations.get(loc.getHost());
|
|
|
- if (locList == null) {
|
|
|
- locList = new LinkedList<MapOutputLocation>();
|
|
|
- mapLocations.put(loc.getHost(), locList);
|
|
|
- hostList.add(loc.getHost());
|
|
|
- }
|
|
|
- //Add to the beginning of the list so that this map is
|
|
|
- //tried again before the others and we can hasten the
|
|
|
- //re-execution of this map should there be a problem
|
|
|
- locList.add(0, loc);
|
|
|
- }
|
|
|
-
|
|
|
- // The call getMapCompletionEvents will update fromEventId to
|
|
|
- // used for the next call to getMapCompletionEvents
|
|
|
|
|
|
- int currentNumObsoleteMapIds = obsoleteMapIds.size();
|
|
|
-
|
|
|
- int numNewOutputs = getMapCompletionEvents(fromEventId,
|
|
|
- mapLocations,
|
|
|
- hostList);
|
|
|
-
|
|
|
- if (numNewOutputs > 0 || logNow) {
|
|
|
- LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
- "Got " + numNewOutputs +
|
|
|
- " new map-outputs");
|
|
|
- }
|
|
|
-
|
|
|
- int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds;
|
|
|
+ // Put the hash entries for the failed fetches.
|
|
|
+ Iterator<MapOutputLocation> locItr = retryFetches.iterator();
|
|
|
+
|
|
|
+ while (locItr.hasNext()) {
|
|
|
+ MapOutputLocation loc = locItr.next();
|
|
|
+ List<MapOutputLocation> locList =
|
|
|
+ mapLocations.get(loc.getHost());
|
|
|
+ //Add to the beginning of the list so that this map is
|
|
|
+ //tried again before the others and we can hasten the
|
|
|
+ //re-execution of this map should there be a problem
|
|
|
+ locList.add(0, loc);
|
|
|
+ }
|
|
|
|
|
|
- if (numNewObsoleteMaps > 0) {
|
|
|
- LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
- "Got " + numNewObsoleteMaps +
|
|
|
- " obsolete map-outputs from tasktracker ");
|
|
|
- }
|
|
|
-
|
|
|
- if (retryFetches.size() > 0) {
|
|
|
- LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
+ if (retryFetches.size() > 0) {
|
|
|
+ LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
"Got " + retryFetches.size() +
|
|
|
" map-outputs from previous failures");
|
|
|
- }
|
|
|
- // clear the "failed" fetches hashmap
|
|
|
- retryFetches.clear();
|
|
|
- }
|
|
|
- catch (IOException ie) {
|
|
|
- LOG.warn(reduceTask.getTaskID() +
|
|
|
- " Problem locating map outputs: " +
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
}
|
|
|
-
|
|
|
+ // clear the "failed" fetches hashmap
|
|
|
+ retryFetches.clear();
|
|
|
+
|
|
|
// now walk through the cache and schedule what we can
|
|
|
int numScheduled = 0;
|
|
|
int numDups = 0;
|
|
|
|
|
|
synchronized (scheduledCopies) {
|
|
|
-
|
|
|
+
|
|
|
// Randomize the map output locations to prevent
|
|
|
// all reduce-tasks swamping the same tasktracker
|
|
|
- Collections.shuffle(hostList, this.random);
|
|
|
+ List<String> hostList = new ArrayList<String>();
|
|
|
+ hostList.addAll(mapLocations.keySet());
|
|
|
|
|
|
+ Collections.shuffle(hostList, this.random);
|
|
|
+
|
|
|
Iterator<String> hostsItr = hostList.iterator();
|
|
|
+
|
|
|
while (hostsItr.hasNext()) {
|
|
|
|
|
|
String host = hostsItr.next();
|
|
@@ -1762,9 +1734,13 @@ class ReduceTask extends Task {
|
|
|
List<MapOutputLocation> knownOutputsByLoc =
|
|
|
mapLocations.get(host);
|
|
|
|
|
|
+ if (knownOutputsByLoc.size() == 0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
//Identify duplicate hosts here
|
|
|
if (uniqueHosts.contains(host)) {
|
|
|
- numDups += knownOutputsByLoc.size() -1;
|
|
|
+ numDups += knownOutputsByLoc.size();
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -1782,34 +1758,32 @@ class ReduceTask extends Task {
|
|
|
if (penalized)
|
|
|
continue;
|
|
|
|
|
|
- Iterator<MapOutputLocation> locItr =
|
|
|
- knownOutputsByLoc.iterator();
|
|
|
+ synchronized (knownOutputsByLoc) {
|
|
|
+
|
|
|
+ locItr = knownOutputsByLoc.iterator();
|
|
|
|
|
|
- while (locItr.hasNext()) {
|
|
|
+ while (locItr.hasNext()) {
|
|
|
|
|
|
- MapOutputLocation loc = locItr.next();
|
|
|
+ MapOutputLocation loc = locItr.next();
|
|
|
|
|
|
- // Do not schedule fetches from OBSOLETE maps
|
|
|
- if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
|
|
|
- locItr.remove();
|
|
|
- continue;
|
|
|
+ // Do not schedule fetches from OBSOLETE maps
|
|
|
+ if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
|
|
|
+ locItr.remove();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ uniqueHosts.add(host);
|
|
|
+ scheduledCopies.add(loc);
|
|
|
+ locItr.remove(); // remove from knownOutputs
|
|
|
+ numInFlight++; numScheduled++;
|
|
|
+
|
|
|
+ break; //we have a map from this host
|
|
|
}
|
|
|
-
|
|
|
- uniqueHosts.add(host);
|
|
|
- scheduledCopies.add(loc);
|
|
|
- locItr.remove(); // remove from knownOutputs
|
|
|
- numInFlight++; numScheduled++;
|
|
|
-
|
|
|
- break; //we have a map from this host
|
|
|
- }
|
|
|
-
|
|
|
- if (knownOutputsByLoc.size() == 0) {
|
|
|
- mapLocations.remove(host);
|
|
|
- hostsItr.remove();
|
|
|
}
|
|
|
}
|
|
|
scheduledCopies.notifyAll();
|
|
|
}
|
|
|
+
|
|
|
if (numScheduled > 0 || logNow) {
|
|
|
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
|
|
|
" outputs (" + penaltyBox.size() +
|
|
@@ -1823,7 +1797,7 @@ class ReduceTask extends Task {
|
|
|
((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// if we have no copies in flight and we can't schedule anything
|
|
|
// new, just wait for a bit
|
|
|
try {
|
|
@@ -1869,7 +1843,7 @@ class ReduceTask extends Task {
|
|
|
+ " at " +
|
|
|
mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
|
|
|
- // Note successfull fetch for this mapId to invalidate
|
|
|
+ // Note successful fetch for this mapId to invalidate
|
|
|
// (possibly) old fetch-failures
|
|
|
fetchFailedMaps.remove(cr.getLocation().getTaskId());
|
|
|
} else if (cr.isObsolete()) {
|
|
@@ -1974,6 +1948,15 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
// all done, inform the copiers to exit
|
|
|
+ exitGetMapEvents= true;
|
|
|
+ try {
|
|
|
+ getMapEventsThread.join();
|
|
|
+ LOG.info("getMapsEventsThread joined.");
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.info("getMapsEventsThread threw an exception: " +
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
+ }
|
|
|
+
|
|
|
synchronized (copiers) {
|
|
|
synchronized (scheduledCopies) {
|
|
|
for (MapOutputCopier copier : copiers) {
|
|
@@ -2218,109 +2201,6 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Queries the {@link TaskTracker} for a set of map-completion events from
|
|
|
- * a given event ID.
|
|
|
- *
|
|
|
- * @param fromEventId the first event ID we want to start from, this is
|
|
|
- * modified by the call to this method
|
|
|
- * @param mapLocations the hash map of map locations by host
|
|
|
- * @param hostsList the list that contains unique hosts having
|
|
|
- * map outputs, will be updated on the return
|
|
|
- * of this method
|
|
|
- * @return the number of new map-completion events from the given event ID
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private int getMapCompletionEvents(IntWritable fromEventId,
|
|
|
- HashMap<String,List<MapOutputLocation>> mapLocations,
|
|
|
- List<String> hostsList)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- long currentTime = System.currentTimeMillis();
|
|
|
- long pollTime = lastPollTime + MIN_POLL_INTERVAL;
|
|
|
- int numNewMaps = 0;
|
|
|
- while (currentTime < pollTime) {
|
|
|
- try {
|
|
|
- Thread.sleep(pollTime-currentTime);
|
|
|
- } catch (InterruptedException ie) { } // IGNORE
|
|
|
- currentTime = System.currentTimeMillis();
|
|
|
- }
|
|
|
-
|
|
|
- MapTaskCompletionEventsUpdate update =
|
|
|
- umbilical.getMapCompletionEvents(reduceTask.getJobID(),
|
|
|
- fromEventId.get(), MAX_EVENTS_TO_FETCH,
|
|
|
- reduceTask.getTaskID());
|
|
|
- TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
|
|
|
-
|
|
|
- // Check if the reset is required.
|
|
|
- // Since there is no ordering of the task completion events at the
|
|
|
- // reducer, the only option to sync with the new jobtracker is to reset
|
|
|
- // the events index
|
|
|
- if (update.shouldReset()) {
|
|
|
- fromEventId.set(0);
|
|
|
- obsoleteMapIds.clear(); // clear the obsolete map
|
|
|
- }
|
|
|
-
|
|
|
- // Note the last successful poll time-stamp
|
|
|
- lastPollTime = currentTime;
|
|
|
-
|
|
|
- // Update the last seen event ID
|
|
|
- fromEventId.set(fromEventId.get() + events.length);
|
|
|
-
|
|
|
- // Process the TaskCompletionEvents:
|
|
|
- // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
|
|
|
- // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
|
|
|
- // from those maps.
|
|
|
- // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
|
|
|
- // outputs at all.
|
|
|
- for (TaskCompletionEvent event : events) {
|
|
|
- switch (event.getTaskStatus()) {
|
|
|
- case SUCCEEDED:
|
|
|
- {
|
|
|
- URI u = URI.create(event.getTaskTrackerHttp());
|
|
|
- String host = u.getHost();
|
|
|
- TaskAttemptID taskId = event.getTaskAttemptId();
|
|
|
- int duration = event.getTaskRunTime();
|
|
|
- if (duration > maxMapRuntime) {
|
|
|
- maxMapRuntime = duration;
|
|
|
- // adjust max-fetch-retries based on max-map-run-time
|
|
|
- maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
|
|
|
- getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
|
|
|
- }
|
|
|
- URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
|
|
|
- "/mapOutput?job=" + taskId.getJobID() +
|
|
|
- "&map=" + taskId +
|
|
|
- "&reduce=" + getPartition());
|
|
|
- List<MapOutputLocation> loc = mapLocations.get(host);
|
|
|
- if (loc == null) {
|
|
|
- loc = new LinkedList<MapOutputLocation>();
|
|
|
- mapLocations.put(host, loc);
|
|
|
- hostsList.add(host);
|
|
|
- }
|
|
|
- loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
|
|
|
- numNewMaps ++;
|
|
|
- }
|
|
|
- break;
|
|
|
- case FAILED:
|
|
|
- case KILLED:
|
|
|
- case OBSOLETE:
|
|
|
- {
|
|
|
- obsoleteMapIds.add(event.getTaskAttemptId());
|
|
|
- LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
|
|
|
- " map-task: '" + event.getTaskAttemptId() + "'");
|
|
|
- }
|
|
|
- break;
|
|
|
- case TIPFAILED:
|
|
|
- {
|
|
|
- copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
|
|
|
- LOG.info("Ignoring output of failed map TIP: '" +
|
|
|
- event.getTaskAttemptId() + "'");
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- return numNewMaps;
|
|
|
- }
|
|
|
|
|
|
|
|
|
/** Starts merging the local copy (on disk) of the map's output so that
|
|
@@ -2552,6 +2432,130 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class GetMapEventsThread extends Thread {
|
|
|
+
|
|
|
+ private IntWritable fromEventId = new IntWritable(0);
|
|
|
+ private static final long SLEEP_TIME = 1000;
|
|
|
+
|
|
|
+ public GetMapEventsThread() {
|
|
|
+ setName("Thread for polling Map Completion Events");
|
|
|
+ setDaemon(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+
|
|
|
+ LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
|
|
|
+
|
|
|
+ do {
|
|
|
+ try {
|
|
|
+ int numNewMaps = getMapCompletionEvents();
|
|
|
+ if (numNewMaps > 0) {
|
|
|
+ LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
+ "Got " + numNewMaps + " new map-outputs");
|
|
|
+ }
|
|
|
+ Thread.sleep(SLEEP_TIME);
|
|
|
+ }
|
|
|
+ catch (InterruptedException e) {
|
|
|
+ LOG.warn(reduceTask.getTaskID() +
|
|
|
+ " GetMapEventsThread returning after an " +
|
|
|
+ " interrupted exception");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ catch (Throwable t) {
|
|
|
+ LOG.warn(reduceTask.getTaskID() +
|
|
|
+ " GetMapEventsThread Ignoring exception : " +
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
+ }
|
|
|
+ } while (!exitGetMapEvents);
|
|
|
+
|
|
|
+ LOG.info("GetMapEventsThread exiting");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Queries the {@link TaskTracker} for a set of map-completion events
|
|
|
+ * from a given event ID.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private int getMapCompletionEvents() throws IOException {
|
|
|
+
|
|
|
+ int numNewMaps = 0;
|
|
|
+
|
|
|
+ MapTaskCompletionEventsUpdate update =
|
|
|
+ umbilical.getMapCompletionEvents(reduceTask.getJobID(),
|
|
|
+ fromEventId.get(),
|
|
|
+ MAX_EVENTS_TO_FETCH,
|
|
|
+ reduceTask.getTaskID());
|
|
|
+ TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
|
|
|
+
|
|
|
+ // Check if the reset is required.
|
|
|
+ // Since there is no ordering of the task completion events at the
|
|
|
+ // reducer, the only option to sync with the new jobtracker is to reset
|
|
|
+ // the events index
|
|
|
+ if (update.shouldReset()) {
|
|
|
+ fromEventId.set(0);
|
|
|
+ obsoleteMapIds.clear(); // clear the obsolete map
|
|
|
+ }
|
|
|
+
|
|
|
+ // Update the last seen event ID
|
|
|
+ fromEventId.set(fromEventId.get() + events.length);
|
|
|
+
|
|
|
+ // Process the TaskCompletionEvents:
|
|
|
+ // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
|
|
|
+ // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
|
|
|
+ // fetching from those maps.
|
|
|
+ // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
|
|
|
+ // outputs at all.
|
|
|
+ for (TaskCompletionEvent event : events) {
|
|
|
+ switch (event.getTaskStatus()) {
|
|
|
+ case SUCCEEDED:
|
|
|
+ {
|
|
|
+ URI u = URI.create(event.getTaskTrackerHttp());
|
|
|
+ String host = u.getHost();
|
|
|
+ TaskAttemptID taskId = event.getTaskAttemptId();
|
|
|
+ int duration = event.getTaskRunTime();
|
|
|
+ if (duration > maxMapRuntime) {
|
|
|
+ maxMapRuntime = duration;
|
|
|
+ // adjust max-fetch-retries based on max-map-run-time
|
|
|
+ maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
|
|
|
+ getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
|
|
|
+ }
|
|
|
+ URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
|
|
|
+ "/mapOutput?job=" + taskId.getJobID() +
|
|
|
+ "&map=" + taskId +
|
|
|
+ "&reduce=" + getPartition());
|
|
|
+ List<MapOutputLocation> loc = mapLocations.get(host);
|
|
|
+ if (loc == null) {
|
|
|
+ loc = Collections.synchronizedList
|
|
|
+ (new LinkedList<MapOutputLocation>());
|
|
|
+ mapLocations.put(host, loc);
|
|
|
+ }
|
|
|
+ loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
|
|
|
+ numNewMaps ++;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case FAILED:
|
|
|
+ case KILLED:
|
|
|
+ case OBSOLETE:
|
|
|
+ {
|
|
|
+ obsoleteMapIds.add(event.getTaskAttemptId());
|
|
|
+ LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
|
|
|
+ " map-task: '" + event.getTaskAttemptId() + "'");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case TIPFAILED:
|
|
|
+ {
|
|
|
+ copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
|
|
|
+ LOG.info("Ignoring output of failed map TIP: '" +
|
|
|
+ event.getTaskAttemptId() + "'");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return numNewMaps;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|