|
@@ -122,19 +122,9 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
|
|
|
/**
|
|
|
* the number of map output locations to poll for at one time
|
|
|
- */
|
|
|
- private int probe_sample_size = 50;
|
|
|
-
|
|
|
- /**
|
|
|
- * a Random used during the map output fetching
|
|
|
*/
|
|
|
- private Random randForProbing;
|
|
|
-
|
|
|
- /**
|
|
|
- * a hashmap from mapId to MapOutputLocation for retrials
|
|
|
- */
|
|
|
- private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
|
|
|
-
|
|
|
+ private static final int PROBE_SAMPLE_SIZE = 50;
|
|
|
+
|
|
|
/** Represents the result of an attempt to copy a map output */
|
|
|
private class CopyResult {
|
|
|
|
|
@@ -378,10 +368,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
Random backoff = new Random();
|
|
|
final Progress copyPhase = getTask().getProgress().phase();
|
|
|
|
|
|
- //tweak the probe sample size (make it a function of numCopiers)
|
|
|
- probe_sample_size = Math.max(numCopiers*5, 50);
|
|
|
- randForProbing = new Random(reduceTask.getPartition() * 100);
|
|
|
-
|
|
|
for (int i = 0; i < numOutputs; i++) {
|
|
|
neededOutputs.add(new Integer(i));
|
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
@@ -399,8 +385,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
// start the clock for bandwidth measurement
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
long currentTime = startTime;
|
|
|
- int fromEventId = 0;
|
|
|
-
|
|
|
PingTimer pingTimer = new PingTimer();
|
|
|
pingTimer.setName("Map output copy reporter for task " +
|
|
|
reduceTask.getTaskId());
|
|
@@ -417,36 +401,17 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
|
|
|
" map output location(s)");
|
|
|
try {
|
|
|
- // the call to queryJobTracker will modify fromEventId to a value
|
|
|
- // that it should be for the next call to queryJobTracker
|
|
|
- MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
|
|
|
+ MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
|
|
|
|
|
|
// remove discovered outputs from needed list
|
|
|
// and put them on the known list
|
|
|
- int gotLocs = (locs == null ? 0 : locs.length);
|
|
|
for (int i=0; i < locs.length; i++) {
|
|
|
- // check whether we actually need an output. It could happen
|
|
|
- // that a map task that successfully ran earlier got lost, but
|
|
|
- // if we already have copied the output of that unfortunate task
|
|
|
- // we need not copy it again from the new TT (we will ignore
|
|
|
- // the event for the new rescheduled execution)
|
|
|
- if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
|
|
|
- // remove the mapId from the retryFetches hashmap since we now
|
|
|
- // prefer the new location instead of what we saved earlier
|
|
|
- retryFetches.remove(new Integer(locs[i].getMapId()));
|
|
|
- knownOutputs.add(locs[i]);
|
|
|
- gotLocs--;
|
|
|
- }
|
|
|
-
|
|
|
+ neededOutputs.remove(new Integer(locs[i].getMapId()));
|
|
|
+ knownOutputs.add(locs[i]);
|
|
|
}
|
|
|
- // now put the remaining hash entries for the failed fetches
|
|
|
- // and clear the hashmap
|
|
|
- knownOutputs.addAll(retryFetches.values());
|
|
|
LOG.info(reduceTask.getTaskId() +
|
|
|
- " Got " + gotLocs +
|
|
|
- " new map outputs from jobtracker and " + retryFetches.size() +
|
|
|
- " map outputs from previous failures");
|
|
|
- retryFetches.clear();
|
|
|
+ " Got " + (locs == null ? 0 : locs.length) +
|
|
|
+ " map outputs from jobtracker");
|
|
|
}
|
|
|
catch (IOException ie) {
|
|
|
LOG.warn(reduceTask.getTaskId() +
|
|
@@ -520,7 +485,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
} else {
|
|
|
// this copy failed, put it back onto neededOutputs
|
|
|
neededOutputs.add(new Integer(cr.getMapId()));
|
|
|
- retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
|
|
|
|
|
|
// wait a random amount of time for next contact
|
|
|
currentTime = System.currentTimeMillis();
|
|
@@ -540,7 +504,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
while (locIt.hasNext()) {
|
|
|
MapOutputLocation loc = (MapOutputLocation)locIt.next();
|
|
|
if (cr.getHost().equals(loc.getHost())) {
|
|
|
- retryFetches.put(new Integer(loc.getMapId()), loc);
|
|
|
locIt.remove();
|
|
|
neededOutputs.add(new Integer(loc.getMapId()));
|
|
|
}
|
|
@@ -551,7 +514,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
}
|
|
|
|
|
|
// ensure we have enough to keep us busy
|
|
|
- if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
|
|
|
+ if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -645,16 +608,28 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
}
|
|
|
|
|
|
/** Queries the job tracker for a set of outputs ready to be copied
|
|
|
- * @param fromEventId the first event ID we want to start from, this will be
|
|
|
- * modified by the call to this method
|
|
|
+ * @param neededOutputs the list of currently unknown outputs
|
|
|
* @param jobClient the job tracker
|
|
|
* @return a set of locations to copy outputs from
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private MapOutputLocation[] queryJobTracker(int fromEventId,
|
|
|
+ private MapOutputLocation[] queryJobTracker(List neededOutputs,
|
|
|
InterTrackerProtocol jobClient)
|
|
|
throws IOException {
|
|
|
|
|
|
+ // query for a just a random subset of needed segments so that we don't
|
|
|
+ // overwhelm jobtracker. ideally perhaps we could send a more compact
|
|
|
+ // representation of all needed, i.e., a bit-vector
|
|
|
+ int checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
|
|
|
+ int neededIds[] = new int[checkSize];
|
|
|
+
|
|
|
+ Collections.shuffle(neededOutputs);
|
|
|
+
|
|
|
+ ListIterator itr = neededOutputs.listIterator();
|
|
|
+ for (int i=0; i < checkSize; i++) {
|
|
|
+ neededIds[i] = ((Integer)itr.next()).intValue();
|
|
|
+ }
|
|
|
+
|
|
|
long currentTime = System.currentTimeMillis();
|
|
|
long pollTime = lastPollTime + MIN_POLL_INTERVAL;
|
|
|
while (currentTime < pollTime) {
|
|
@@ -665,28 +640,9 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
}
|
|
|
lastPollTime = currentTime;
|
|
|
|
|
|
- TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
|
|
|
- reduceTask.getJobId().toString(),
|
|
|
- fromEventId,
|
|
|
- probe_sample_size);
|
|
|
-
|
|
|
- List <MapOutputLocation> mapOutputsList = new ArrayList();
|
|
|
- for (int i = 0; i < t.length; i++) {
|
|
|
- if (t[i].isMap &&
|
|
|
- t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
|
|
|
- URI u = URI.create(t[i].getTaskTrackerHttp());
|
|
|
- String host = u.getHost();
|
|
|
- int port = u.getPort();
|
|
|
- String taskId = t[i].getTaskId();
|
|
|
- int mId = t[i].idWithinJob();
|
|
|
- mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
|
|
|
- }
|
|
|
- }
|
|
|
- Collections.shuffle(mapOutputsList, randForProbing);
|
|
|
- MapOutputLocation[] locations =
|
|
|
- new MapOutputLocation[mapOutputsList.size()];
|
|
|
- fromEventId += t.length;
|
|
|
- return mapOutputsList.toArray(locations);
|
|
|
+ return jobClient.locateMapOutputs(reduceTask.getJobId().toString(),
|
|
|
+ neededIds,
|
|
|
+ reduceTask.getPartition());
|
|
|
}
|
|
|
|
|
|
|