|
@@ -78,6 +78,11 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
* A reference to the local file system for writing the map outputs to.
|
|
|
*/
|
|
|
private FileSystem localFileSys;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The threads for fetching the files.
|
|
|
+ */
|
|
|
+ private MapOutputCopier[] copiers = null;
|
|
|
|
|
|
/**
|
|
|
* the minimum interval between jobtracker polls
|
|
@@ -109,13 +114,76 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
public String getHost() { return loc.getHost(); }
|
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
|
}
|
|
|
+
|
|
|
+ private static class PingTimer implements MapOutputLocation.Pingable {
|
|
|
+ private long pingTime;
|
|
|
+
|
|
|
+ public synchronized void reset() {
|
|
|
+ pingTime = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getLastPing() {
|
|
|
+ return pingTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void ping() {
|
|
|
+ synchronized (this) {
|
|
|
+ pingTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/** Copies map outputs as they become available */
|
|
|
private class MapOutputCopier extends Thread {
|
|
|
|
|
|
+ private PingTimer pingTimer = new PingTimer();
|
|
|
+ private MapOutputLocation currentLocation = null;
|
|
|
+
|
|
|
public MapOutputCopier() {
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the last time that this copier made progress.
|
|
|
+ * @return the System.currentTimeMillis when this copier last made progress
|
|
|
+ */
|
|
|
+ public long getLastProgressTime() {
|
|
|
+ return pingTimer.getLastPing();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fail the current file that we are fetching
|
|
|
+ * @return were we currently fetching?
|
|
|
+ */
|
|
|
+ public synchronized boolean fail() {
|
|
|
+ if (currentLocation != null) {
|
|
|
+ finish(-1);
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the current map output location.
|
|
|
+ */
|
|
|
+ public synchronized MapOutputLocation getLocation() {
|
|
|
+ return currentLocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void start(MapOutputLocation loc) {
|
|
|
+ currentLocation = loc;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void finish(long size) {
|
|
|
+ if (currentLocation != null) {
|
|
|
+ synchronized (copyResults) {
|
|
|
+ copyResults.add(new CopyResult(currentLocation, size));
|
|
|
+ copyResults.notify();
|
|
|
+ }
|
|
|
+ currentLocation = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Loop forever and fetch map outputs as they become available.
|
|
|
* The thread exits when it is interrupted by the {@link ReduceTaskRunner}
|
|
|
*/
|
|
@@ -133,27 +201,28 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- size = copyOutput(loc);
|
|
|
+ start(loc);
|
|
|
+ pingTimer.ping();
|
|
|
+ size = copyOutput(loc, pingTimer);
|
|
|
+ pingTimer.reset();
|
|
|
} catch (IOException e) {
|
|
|
LOG.warning(reduceTask.getTaskId() + " copy failed: " +
|
|
|
loc.getMapTaskId() + " from " + loc.getHost());
|
|
|
LOG.warning(StringUtils.stringifyException(e));
|
|
|
}
|
|
|
-
|
|
|
- synchronized (copyResults) {
|
|
|
- copyResults.add(new CopyResult(loc, size));
|
|
|
- copyResults.notifyAll();
|
|
|
- }
|
|
|
+ finish(size);
|
|
|
}
|
|
|
} catch (InterruptedException e) { } // ALL DONE!
|
|
|
}
|
|
|
|
|
|
/** Copies a a map output from a remote host, using raw RPC.
|
|
|
- * @param loc the map output location to be copied
|
|
|
+ * @param currentLocation the map output location to be copied
|
|
|
+ * @param pingee a status object to ping as we make progress
|
|
|
* @return the size of the copied file
|
|
|
* @throws IOException if there is an error copying the file
|
|
|
*/
|
|
|
- private long copyOutput(MapOutputLocation loc)
|
|
|
+ private long copyOutput(MapOutputLocation loc,
|
|
|
+ MapOutputLocation.Pingable pingee)
|
|
|
throws IOException {
|
|
|
|
|
|
String reduceId = reduceTask.getTaskId();
|
|
@@ -165,7 +234,7 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
Path filename = conf.getLocalPath(reduceId + "/map_" +
|
|
|
loc.getMapId() + ".out");
|
|
|
long bytes = loc.getFile(localFileSys, filename,
|
|
|
- reduceTask.getPartition());
|
|
|
+ reduceTask.getPartition(), pingee);
|
|
|
|
|
|
LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
|
|
|
" output from " + loc.getHost() + ".");
|
|
@@ -181,6 +250,46 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ private class MapCopyLeaseChecker extends Thread {
|
|
|
+ private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
|
|
|
+ private static final long STALLED_COPY_CHECK = 60 * 1000;
|
|
|
+ private long lastStalledCheck = 0;
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ while (true) {
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
|
|
|
+ lastStalledCheck = currentTime;
|
|
|
+ synchronized (copiers) {
|
|
|
+ for(int i=0; i < copiers.length; ++i) {
|
|
|
+ if (copiers[i] == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ long lastProgress = copiers[i].getLastProgressTime();
|
|
|
+ if (lastProgress != 0 &&
|
|
|
+ currentTime - lastProgress > STALLED_COPY_TIMEOUT) {
|
|
|
+ LOG.warning("Map output copy stalled on " +
|
|
|
+ copiers[i].getLocation());
|
|
|
+ // mark the current file as failed
|
|
|
+ copiers[i].fail();
|
|
|
+ // tell the thread to stop
|
|
|
+ copiers[i].interrupt();
|
|
|
+ // create a replacement thread
|
|
|
+ copiers[i] = new MapOutputCopier();
|
|
|
+ copiers[i].start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public ReduceTaskRunner(Task task, TaskTracker tracker,
|
|
|
JobConf conf) throws IOException {
|
|
|
super(task, tracker, conf);
|
|
@@ -218,6 +327,7 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
|
|
Random backoff = new Random();
|
|
|
final Progress copyPhase = getTask().getProgress().phase();
|
|
|
+ MapCopyLeaseChecker leaseChecker = null;
|
|
|
|
|
|
for (int i = 0; i < numOutputs; i++) {
|
|
|
neededOutputs.add(new Integer(i));
|
|
@@ -225,13 +335,15 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
}
|
|
|
|
|
|
InterTrackerProtocol jobClient = getTracker().getJobClient();
|
|
|
- MapOutputCopier[] copiers = new MapOutputCopier[numCopiers];
|
|
|
+ copiers = new MapOutputCopier[numCopiers];
|
|
|
|
|
|
// start all the copying threads
|
|
|
for (int i=0; i < copiers.length; i++) {
|
|
|
copiers[i] = new MapOutputCopier();
|
|
|
copiers[i].start();
|
|
|
}
|
|
|
+ leaseChecker = new MapCopyLeaseChecker();
|
|
|
+ leaseChecker.start();
|
|
|
|
|
|
// start the clock for bandwidth measurement
|
|
|
long startTime = System.currentTimeMillis();
|
|
@@ -314,34 +426,36 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
while (!killed && numInFlight > 0) {
|
|
|
CopyResult cr = getCopyResult();
|
|
|
|
|
|
- if (cr.getSuccess()) { // a successful copy
|
|
|
- numCopied++;
|
|
|
- bytesTransferred += cr.getSize();
|
|
|
+ if (cr != null) {
|
|
|
+ if (cr.getSuccess()) { // a successful copy
|
|
|
+ numCopied++;
|
|
|
+ bytesTransferred += cr.getSize();
|
|
|
|
|
|
- long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
|
|
|
- float mbs = ((float)bytesTransferred)/(1024*1024);
|
|
|
- float transferRate = mbs/secsSinceStart;
|
|
|
+ long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
|
|
|
+ float mbs = ((float)bytesTransferred)/(1024*1024);
|
|
|
+ float transferRate = mbs/secsSinceStart;
|
|
|
|
|
|
- copyPhase.startNextPhase();
|
|
|
- copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " +
|
|
|
- mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
- getTask().reportProgress(getTracker());
|
|
|
- }
|
|
|
- else {
|
|
|
- // this copy failed, put it back onto neededOutputs
|
|
|
- neededOutputs.add(new Integer(cr.getMapId()));
|
|
|
+ copyPhase.startNextPhase();
|
|
|
+ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
|
|
|
+ " at " +
|
|
|
+ mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
+ getTask().reportProgress(getTracker());
|
|
|
+ } else {
|
|
|
+ // this copy failed, put it back onto neededOutputs
|
|
|
+ neededOutputs.add(new Integer(cr.getMapId()));
|
|
|
|
|
|
- // wait a random amount of time for next contact
|
|
|
- currentTime = System.currentTimeMillis();
|
|
|
- long nextContact = currentTime + 60 * 1000 +
|
|
|
- backoff.nextInt(maxBackoff*1000);
|
|
|
- penaltyBox.put(cr.getHost(), new Long(nextContact));
|
|
|
- LOG.warning(reduceTask.getTaskId() + " adding host " +
|
|
|
- cr.getHost() + " to penalty box, next contact in " +
|
|
|
- ((nextContact-currentTime)/1000) + " seconds");
|
|
|
+ // wait a random amount of time for next contact
|
|
|
+ currentTime = System.currentTimeMillis();
|
|
|
+ long nextContact = currentTime + 60 * 1000 +
|
|
|
+ backoff.nextInt(maxBackoff*1000);
|
|
|
+ penaltyBox.put(cr.getHost(), new Long(nextContact));
|
|
|
+ LOG.warning(reduceTask.getTaskId() + " adding host " +
|
|
|
+ cr.getHost() + " to penalty box, next contact in " +
|
|
|
+ ((nextContact-currentTime)/1000) + " seconds");
|
|
|
+ }
|
|
|
+ uniqueHosts.remove(cr.getHost());
|
|
|
+ numInFlight--;
|
|
|
}
|
|
|
- uniqueHosts.remove(cr.getHost());
|
|
|
- numInFlight--;
|
|
|
|
|
|
// ensure we have enough to keep us busy
|
|
|
if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
|
|
@@ -352,9 +466,13 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
}
|
|
|
|
|
|
// all done, inform the copiers to exit
|
|
|
- synchronized (scheduledCopies) {
|
|
|
- for (int i=0; i < copiers.length; i++) {
|
|
|
- copiers[i].interrupt();
|
|
|
+ leaseChecker.interrupt();
|
|
|
+ synchronized (copiers) {
|
|
|
+ synchronized (scheduledCopies) {
|
|
|
+ for (int i=0; i < copiers.length; i++) {
|
|
|
+ copiers[i].interrupt();
|
|
|
+ copiers[i] = null;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -363,17 +481,18 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
|
|
|
|
|
|
private CopyResult getCopyResult() {
|
|
|
- CopyResult cr = null;
|
|
|
-
|
|
|
- synchronized (copyResults) {
|
|
|
- while (copyResults.isEmpty()) {
|
|
|
+ synchronized (copyResults) {
|
|
|
+ while (!killed && copyResults.isEmpty()) {
|
|
|
try {
|
|
|
copyResults.wait();
|
|
|
} catch (InterruptedException e) { }
|
|
|
}
|
|
|
- cr = (CopyResult)copyResults.remove(0);
|
|
|
+ if (copyResults.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ } else {
|
|
|
+ return (CopyResult) copyResults.remove(0);
|
|
|
+ }
|
|
|
}
|
|
|
- return cr;
|
|
|
}
|
|
|
|
|
|
/** Queries the job tracker for a set of outputs ready to be copied
|
|
@@ -421,4 +540,15 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
this.mapOutputFile.removeAll(getTask().getTaskId());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Kill the child process, but also kick getCopyResult so that it checks
|
|
|
+ * the kill flag.
|
|
|
+ */
|
|
|
+ public void kill() {
|
|
|
+ synchronized (copyResults) {
|
|
|
+ super.kill();
|
|
|
+ copyResults.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|