|
@@ -45,6 +45,7 @@ import java.util.Vector;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.regex.Pattern;
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import javax.crypto.SecretKey;
|
|
import javax.crypto.SecretKey;
|
|
@@ -272,9 +273,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
static final String TT_OUTOFBAND_HEARBEAT =
|
|
static final String TT_OUTOFBAND_HEARBEAT =
|
|
"mapreduce.tasktracker.outofband.heartbeat";
|
|
"mapreduce.tasktracker.outofband.heartbeat";
|
|
private volatile boolean oobHeartbeatOnTaskCompletion;
|
|
private volatile boolean oobHeartbeatOnTaskCompletion;
|
|
|
|
+ static final String TT_OUTOFBAND_HEARTBEAT_DAMPER =
|
|
|
|
+ "mapreduce.tasktracker.outofband.heartbeat.damper";
|
|
|
|
+ static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
|
|
|
|
+ private volatile int oobHeartbeatDamper;
|
|
|
|
|
|
// Track number of completed tasks to send an out-of-band heartbeat
|
|
// Track number of completed tasks to send an out-of-band heartbeat
|
|
- private IntWritable finishedCount = new IntWritable(0);
|
|
|
|
|
|
+ private AtomicInteger finishedCount = new AtomicInteger(0);
|
|
|
|
|
|
private MapEventsFetcherThread mapEventsFetcher;
|
|
private MapEventsFetcherThread mapEventsFetcher;
|
|
final int workerThreads;
|
|
final int workerThreads;
|
|
@@ -298,6 +303,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
* the minimum interval between jobtracker polls
|
|
* the minimum interval between jobtracker polls
|
|
*/
|
|
*/
|
|
private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
|
|
private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Number of maptask completion events locations to poll for at one time
|
|
* Number of maptask completion events locations to poll for at one time
|
|
*/
|
|
*/
|
|
@@ -711,6 +717,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
oobHeartbeatOnTaskCompletion =
|
|
oobHeartbeatOnTaskCompletion =
|
|
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
|
|
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
|
|
|
|
+
|
|
|
|
+ oobHeartbeatDamper =
|
|
|
|
+ fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER,
|
|
|
|
+ DEFAULT_OOB_HEARTBEAT_DAMPER);
|
|
}
|
|
}
|
|
|
|
|
|
private void createInstrumentation() {
|
|
private void createInstrumentation() {
|
|
@@ -1356,25 +1366,39 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
return recentMapEvents;
|
|
return recentMapEvents;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private long getHeartbeatInterval(int numFinishedTasks) {
|
|
|
|
+ return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Main service loop. Will stay in this loop forever.
|
|
* Main service loop. Will stay in this loop forever.
|
|
*/
|
|
*/
|
|
State offerService() throws Exception {
|
|
State offerService() throws Exception {
|
|
- long lastHeartbeat = 0;
|
|
|
|
|
|
+ long lastHeartbeat = System.currentTimeMillis();
|
|
|
|
|
|
while (running && !shuttingDown) {
|
|
while (running && !shuttingDown) {
|
|
try {
|
|
try {
|
|
long now = System.currentTimeMillis();
|
|
long now = System.currentTimeMillis();
|
|
-
|
|
|
|
- long waitTime = heartbeatInterval - (now - lastHeartbeat);
|
|
|
|
- if (waitTime > 0) {
|
|
|
|
|
|
+
|
|
|
|
+ // accelerate to account for multiple finished tasks up-front
|
|
|
|
+ long remaining =
|
|
|
|
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
|
|
|
|
+ while (remaining > 0) {
|
|
// sleeps for the wait time or
|
|
// sleeps for the wait time or
|
|
- // until there are empty slots to schedule tasks
|
|
|
|
|
|
+ // until there are *enough* empty slots to schedule tasks
|
|
synchronized (finishedCount) {
|
|
synchronized (finishedCount) {
|
|
- if (finishedCount.get() == 0) {
|
|
|
|
- finishedCount.wait(waitTime);
|
|
|
|
|
|
+ finishedCount.wait(remaining);
|
|
|
|
+
|
|
|
|
+ // Recompute
|
|
|
|
+ now = System.currentTimeMillis();
|
|
|
|
+ remaining =
|
|
|
|
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
|
|
|
|
+
|
|
|
|
+ if (remaining <= 0) {
|
|
|
|
+ // Reset count
|
|
|
|
+ finishedCount.set(0);
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
- finishedCount.set(0);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2142,8 +2166,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
private void notifyTTAboutTaskCompletion() {
|
|
private void notifyTTAboutTaskCompletion() {
|
|
if (oobHeartbeatOnTaskCompletion) {
|
|
if (oobHeartbeatOnTaskCompletion) {
|
|
synchronized (finishedCount) {
|
|
synchronized (finishedCount) {
|
|
- int value = finishedCount.get();
|
|
|
|
- finishedCount.set(value+1);
|
|
|
|
|
|
+ finishedCount.incrementAndGet();
|
|
finishedCount.notify();
|
|
finishedCount.notify();
|
|
}
|
|
}
|
|
}
|
|
}
|