|
@@ -46,33 +46,22 @@ import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
public class TaskHeartbeatHandler extends AbstractService {
|
|
|
|
|
|
private static class ReportTime {
|
|
|
- private long lastPing;
|
|
|
private long lastProgress;
|
|
|
|
|
|
public ReportTime(long time) {
|
|
|
setLastProgress(time);
|
|
|
}
|
|
|
|
|
|
- public synchronized void setLastPing(long time) {
|
|
|
- lastPing = time;
|
|
|
- }
|
|
|
-
|
|
|
public synchronized void setLastProgress(long time) {
|
|
|
lastProgress = time;
|
|
|
- lastPing = time;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized long getLastPing() {
|
|
|
- return lastPing;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public synchronized long getLastProgress() {
|
|
|
return lastProgress;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
|
|
|
- private static final int PING_TIMEOUT = 5 * 60 * 1000;
|
|
|
|
|
|
//thread which runs periodically to see the last time since a heartbeat is
|
|
|
//received from a task.
|
|
@@ -127,14 +116,6 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void pinged(TaskAttemptId attemptID) {
|
|
|
- //only put for the registered attempts
|
|
|
- //TODO throw an exception if the task isn't registered.
|
|
|
- ReportTime time = runningAttempts.get(attemptID);
|
|
|
- if(time != null) {
|
|
|
- time.setLastPing(clock.getTime());
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
public void register(TaskAttemptId attemptID) {
|
|
|
runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
|
|
@@ -159,10 +140,8 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|
|
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
|
|
|
boolean taskTimedOut = (taskTimeOut > 0) &&
|
|
|
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
|
|
|
- boolean pingTimedOut =
|
|
|
- (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
|
|
|
-
|
|
|
- if(taskTimedOut || pingTimedOut) {
|
|
|
+
|
|
|
+ if(taskTimedOut) {
|
|
|
// task is lost, remove from the list and raise lost event
|
|
|
iterator.remove();
|
|
|
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
|