|
@@ -161,7 +161,8 @@ public class TaskTracker
|
|
|
/**
|
|
|
* the minimum interval between jobtracker polls
|
|
|
*/
|
|
|
- private static final long MIN_POLL_INTERVAL = 5000;
|
|
|
+ private static final int MIN_POLL_INTERVAL = 5000;
|
|
|
+ private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
|
|
|
/**
|
|
|
* Number of maptask completion events locations to poll for at one time
|
|
|
*/
|
|
@@ -452,6 +453,9 @@ public class TaskTracker
|
|
|
mapEventsFetcher.start();
|
|
|
}
|
|
|
|
|
|
+ // Object on wait which MapEventsFetcherThread is going to wait.
|
|
|
+ private Object waitingOn = new Object();
|
|
|
+
|
|
|
private class MapEventsFetcherThread extends Thread {
|
|
|
|
|
|
private List <FetchStatus> reducesInShuffle() {
|
|
@@ -507,14 +511,31 @@ public class TaskTracker
|
|
|
// possibly belonging to different jobs
|
|
|
for (FetchStatus f : fList) {
|
|
|
try {
|
|
|
-
|
|
|
f.fetchMapCompletionEvents();
|
|
|
-
|
|
|
- try {
|
|
|
- Thread.sleep(MIN_POLL_INTERVAL);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.info("Shutting down: " + getName());
|
|
|
- return;
|
|
|
+ long startWait;
|
|
|
+ long endWait;
|
|
|
+ // polling interval is heartbeat interval
|
|
|
+ int waitTime = heartbeatInterval;
|
|
|
+ // Thread will wait for a minumum of MIN_POLL_INTERVAL,
|
|
|
+ // if it is notified before that, notification will be ignored.
|
|
|
+ int minWait = MIN_POLL_INTERVAL;
|
|
|
+ synchronized (waitingOn) {
|
|
|
+ try {
|
|
|
+ while (true) {
|
|
|
+ startWait = System.currentTimeMillis();
|
|
|
+ waitingOn.wait(waitTime);
|
|
|
+ endWait = System.currentTimeMillis();
|
|
|
+ int diff = (int)(endWait - startWait);
|
|
|
+ if (diff >= minWait) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ minWait = minWait - diff;
|
|
|
+ waitTime = minWait;
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.info("Shutting down: " + getName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn(
|
|
@@ -548,12 +569,21 @@ public class TaskTracker
|
|
|
|
|
|
TaskCompletionEvent[] mapEvents =
|
|
|
TaskCompletionEvent.EMPTY_ARRAY;
|
|
|
+ boolean notifyFetcher = false;
|
|
|
synchronized (allMapEvents) {
|
|
|
if (allMapEvents.size() > fromId) {
|
|
|
int actualMax = Math.min(max, (allMapEvents.size() - fromId));
|
|
|
List <TaskCompletionEvent> eventSublist =
|
|
|
allMapEvents.subList(fromId, actualMax + fromId);
|
|
|
mapEvents = eventSublist.toArray(mapEvents);
|
|
|
+ } else {
|
|
|
+ // Notify Fetcher thread.
|
|
|
+ notifyFetcher = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (notifyFetcher) {
|
|
|
+ synchronized (waitingOn) {
|
|
|
+ waitingOn.notify();
|
|
|
}
|
|
|
}
|
|
|
return mapEvents;
|
|
@@ -824,7 +854,7 @@ public class TaskTracker
|
|
|
try {
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
- long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
|
|
|
+ long waitTime = heartbeatInterval - (now - lastHeartbeat);
|
|
|
if (waitTime > 0) {
|
|
|
// sleeps for the wait time, wakes up if a task is finished.
|
|
|
synchronized(finishedCount) {
|
|
@@ -848,6 +878,8 @@ public class TaskTracker
|
|
|
}
|
|
|
|
|
|
lastHeartbeat = now;
|
|
|
+ // resetting heartbeat interval from the response.
|
|
|
+ heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
|
|
|
justStarted = false;
|
|
|
if (actions != null){
|
|
|
for(TaskTrackerAction action: actions) {
|