|
@@ -144,6 +144,12 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
*/
|
|
|
private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
|
|
|
|
|
|
+ /**
|
|
|
+ * a TreeSet for needed map outputs
|
|
|
+ */
|
|
|
+ private Set <Integer> neededOutputs =
|
|
|
+ Collections.synchronizedSet(new TreeSet<Integer>());
|
|
|
+
|
|
|
/** Represents the result of an attempt to copy a map output */
|
|
|
private class CopyResult {
|
|
|
|
|
@@ -152,7 +158,10 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
|
|
|
// the size of the file copied, -1 if the transfer failed
|
|
|
private final long size;
|
|
|
-
|
|
|
+
|
|
|
+ //a flag signifying whether a copy result is obsolete
|
|
|
+ private static final int OBSOLETE = -2;
|
|
|
+
|
|
|
CopyResult(MapOutputLocation loc, long size) {
|
|
|
this.loc = loc;
|
|
|
this.size = size;
|
|
@@ -160,6 +169,9 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
|
|
|
public int getMapId() { return loc.getMapId(); }
|
|
|
public boolean getSuccess() { return size >= 0; }
|
|
|
+ public boolean isObsolete() {
|
|
|
+ return size == OBSOLETE;
|
|
|
+ }
|
|
|
public long getSize() { return size; }
|
|
|
public String getHost() { return loc.getHost(); }
|
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
@@ -284,7 +296,9 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
*/
|
|
|
private long copyOutput(MapOutputLocation loc
|
|
|
) throws IOException, InterruptedException {
|
|
|
-
|
|
|
+ if (!neededOutputs.contains(loc.getMapId())) {
|
|
|
+ return CopyResult.OBSOLETE;
|
|
|
+ }
|
|
|
String reduceId = reduceTask.getTaskId();
|
|
|
LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
|
|
|
" output from " + loc.getHost() + ".");
|
|
@@ -297,16 +311,28 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
|
|
|
tmpFilename, reduceTask.getPartition(),
|
|
|
STALLED_COPY_TIMEOUT);
|
|
|
+ if (!neededOutputs.contains(loc.getMapId())) {
|
|
|
+ if (tmpFilename != null) {
|
|
|
+ FileSystem fs = tmpFilename.getFileSystem(conf);
|
|
|
+ fs.delete(tmpFilename);
|
|
|
+ }
|
|
|
+ return CopyResult.OBSOLETE;
|
|
|
+ }
|
|
|
if (tmpFilename == null)
|
|
|
throw new IOException("File " + finalFilename + "-" + id +
|
|
|
" not created");
|
|
|
long bytes = -1;
|
|
|
// lock the ReduceTaskRunner while we do the rename
|
|
|
synchronized (ReduceTaskRunner.this) {
|
|
|
- // if we can't rename the file, something is broken (and IOException
|
|
|
- // will be thrown). This file could have been created in the inmemory
|
|
|
+ // This file could have been created in the inmemory
|
|
|
// fs or the localfs. So need to get the filesystem owning the path.
|
|
|
FileSystem fs = tmpFilename.getFileSystem(conf);
|
|
|
+ if (!neededOutputs.contains(loc.getMapId())) {
|
|
|
+ fs.delete(tmpFilename);
|
|
|
+ return CopyResult.OBSOLETE;
|
|
|
+ }
|
|
|
+ // if we can't rename the file, something is broken (and IOException
|
|
|
+ // will be thrown).
|
|
|
if (!fs.rename(tmpFilename, finalFilename)) {
|
|
|
fs.delete(tmpFilename);
|
|
|
throw new IOException("failure to rename map output " + tmpFilename);
|
|
@@ -332,6 +358,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
mergeInProgress = true;
|
|
|
m.start();
|
|
|
}
|
|
|
+ neededOutputs.remove(loc.getMapId());
|
|
|
}
|
|
|
return bytes;
|
|
|
}
|
|
@@ -424,7 +451,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
this.mapOutputFile.removeAll(reduceTask.getTaskId());
|
|
|
|
|
|
final int numOutputs = reduceTask.getNumMaps();
|
|
|
- List neededOutputs = new ArrayList(numOutputs);
|
|
|
Map<Integer, MapOutputLocation> knownOutputs =
|
|
|
new HashMap<Integer, MapOutputLocation>();
|
|
|
int numInFlight = 0, numCopied = 0;
|
|
@@ -484,23 +510,12 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
List <MapOutputLocation> locs = queryJobTracker(fromEventId,
|
|
|
jobClient);
|
|
|
|
|
|
- // remove discovered outputs from needed list
|
|
|
- // and put them on the known list
|
|
|
- int gotLocs = (locs == null ? 0 : locs.size());
|
|
|
+ // put discovered them on the known list
|
|
|
for (int i=0; i < locs.size(); i++) {
|
|
|
- // check whether we actually need an output. It could happen
|
|
|
- // that a map task that successfully ran earlier got lost, but
|
|
|
- // if we already have copied the output of that unfortunate task
|
|
|
- // we need not copy it again from the new TT (we will ignore
|
|
|
- // the event for the new rescheduled execution)
|
|
|
- if(neededOutputs.remove(new Integer(locs.get(i).getMapId()))) {
|
|
|
- knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
|
|
|
- }
|
|
|
- else gotLocs--; //we don't need this output
|
|
|
-
|
|
|
+ knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
|
|
|
}
|
|
|
LOG.info(reduceTask.getTaskId() +
|
|
|
- " Got " + gotLocs +
|
|
|
+ " Got " + locs.size() +
|
|
|
" new map outputs from jobtracker and " + retryFetches.size() +
|
|
|
" map outputs from previous failures");
|
|
|
// clear the "failed" fetches hashmap
|
|
@@ -575,9 +590,13 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
|
|
|
" at " +
|
|
|
mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
+ } else if (cr.isObsolete()) {
|
|
|
+ //ignore
|
|
|
+ LOG.info(reduceTask.getTaskId() +
|
|
|
+ " Ignoring obsolete copy result for Map Task: " +
|
|
|
+ cr.getLocation().getMapTaskId() + " from host: " +
|
|
|
+ cr.getHost());
|
|
|
} else {
|
|
|
- // this copy failed, put it back onto neededOutputs
|
|
|
- neededOutputs.add(new Integer(cr.getMapId()));
|
|
|
retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
|
|
|
|
|
|
// wait a random amount of time for next contact
|
|
@@ -600,7 +619,6 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
|
if (cr.getHost().equals(loc.getHost())) {
|
|
|
retryFetches.put(new Integer(loc.getMapId()), loc);
|
|
|
locIt.remove();
|
|
|
- neededOutputs.add(new Integer(loc.getMapId()));
|
|
|
}
|
|
|
}
|
|
|
}
|