|
@@ -2197,11 +2197,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
|
|
|
boolean restarted,
|
|
|
+ boolean initialContact,
|
|
|
boolean acceptNewTasks,
|
|
|
short responseId)
|
|
|
throws IOException {
|
|
|
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
|
|
|
" (restarted: " + restarted +
|
|
|
+ " initialContact: " + initialContact +
|
|
|
" acceptNewTasks: " + acceptNewTasks + ")" +
|
|
|
" with responseId: " + responseId);
|
|
|
|
|
@@ -2225,7 +2227,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
trackerToHeartbeatResponseMap.get(trackerName);
|
|
|
boolean addRestartInfo = false;
|
|
|
|
|
|
- if (restarted != true) {
|
|
|
+ if (initialContact != true) {
|
|
|
// If this isn't the 'initial contact' from the tasktracker,
|
|
|
// there is something seriously wrong if the JobTracker has
|
|
|
// no record of the 'previous heartbeat'; if so, ask the
|
|
@@ -2263,7 +2265,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// Process this heartbeat
|
|
|
short newResponseId = (short)(responseId + 1);
|
|
|
status.setLastSeen(now);
|
|
|
- if (!processHeartbeat(status, restarted)) {
|
|
|
+ if (!processHeartbeat(status, initialContact)) {
|
|
|
+ if (prevHeartbeatResponse != null) {
|
|
|
+ trackerToHeartbeatResponseMap.remove(trackerName);
|
|
|
+ }
|
|
|
return new HeartbeatResponse(newResponseId,
|
|
|
new TaskTrackerAction[] {new ReinitTrackerAction()});
|
|
|
}
|
|
@@ -2426,14 +2431,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
private synchronized boolean processHeartbeat(
|
|
|
TaskTrackerStatus trackerStatus,
|
|
|
- boolean restarted) {
|
|
|
+ boolean initialContact) {
|
|
|
String trackerName = trackerStatus.getTrackerName();
|
|
|
|
|
|
synchronized (taskTrackers) {
|
|
|
synchronized (trackerExpiryQueue) {
|
|
|
boolean seenBefore = updateTaskTrackerStatus(trackerName,
|
|
|
trackerStatus);
|
|
|
- if (restarted) {
|
|
|
+ if (initialContact) {
|
|
|
// If it's first contact, then clear out
|
|
|
// any state hanging around
|
|
|
if (seenBefore) {
|
|
@@ -2443,17 +2448,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// If not first contact, there should be some record of the tracker
|
|
|
if (!seenBefore) {
|
|
|
LOG.warn("Status from unknown Tracker : " + trackerName);
|
|
|
- // This is lost tracker that came back now, if it blacklisted
|
|
|
- // increment the count of blacklisted trackers in the cluster
|
|
|
- if (isBlacklisted(trackerName)) {
|
|
|
- faultyTrackers.numBlacklistedTrackers += 1;
|
|
|
- }
|
|
|
- addNewTracker(trackerStatus);
|
|
|
+ updateTaskTrackerStatus(trackerName, null);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (restarted) {
|
|
|
+ if (initialContact) {
|
|
|
+ // if this is lost tracker that came back now, and if it blacklisted
|
|
|
+ // increment the count of blacklisted trackers in the cluster
|
|
|
+ if (isBlacklisted(trackerName)) {
|
|
|
+ faultyTrackers.numBlacklistedTrackers += 1;
|
|
|
+ }
|
|
|
addNewTracker(trackerStatus);
|
|
|
}
|
|
|
}
|