|
@@ -101,6 +101,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
// Remove profile from head of queue
|
|
|
trackerExpiryQueue.remove(leastRecent);
|
|
|
+ String trackerName = leastRecent.getTrackerName();
|
|
|
|
|
|
// Figure out if last-seen time should be updated, or if tracker is dead
|
|
|
TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
|
|
@@ -110,12 +111,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
if (newProfile != null) {
|
|
|
if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
// Remove completely
|
|
|
-
|
|
|
- TaskTrackerStatus oldStatus = (TaskTrackerStatus) taskTrackers.remove(leastRecent.getTrackerName());
|
|
|
- if (oldStatus != null) {
|
|
|
- totalMaps -= oldStatus.countMapTasks();
|
|
|
- totalReduces -= oldStatus.countReduceTasks();
|
|
|
- }
|
|
|
+ updateTaskTrackerStatus(trackerName, null);
|
|
|
lostTaskTracker(leastRecent.getTrackerName());
|
|
|
} else {
|
|
|
// Update time by inserting latest profile
|
|
@@ -478,15 +474,41 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// InterTrackerProtocol
|
|
|
////////////////////////////////////////////////////
|
|
|
public void initialize(String taskTrackerName) {
|
|
|
- if (taskTrackers.get(taskTrackerName) != null) {
|
|
|
- TaskTrackerStatus oldStatus = (TaskTrackerStatus) taskTrackers.remove(taskTrackerName);
|
|
|
- totalMaps -= oldStatus.countMapTasks();
|
|
|
- totalReduces -= oldStatus.countReduceTasks();
|
|
|
-
|
|
|
- lostTaskTracker(taskTrackerName);
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ boolean seenBefore = updateTaskTrackerStatus(taskTrackerName, null);
|
|
|
+ if (seenBefore) {
|
|
|
+ lostTaskTracker(taskTrackerName);
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Update the last recorded status for the given task tracker.
|
|
|
+ * It assumes that the taskTrackers are locked on entry.
|
|
|
+ * @author Owen O'Malley
|
|
|
+ * @param trackerName The name of the tracker
|
|
|
+ * @param status The new status for the task tracker
|
|
|
+ * @return Was an old status found?
|
|
|
+ */
|
|
|
+ private boolean updateTaskTrackerStatus(String trackerName,
|
|
|
+ TaskTrackerStatus status) {
|
|
|
+ TaskTrackerStatus oldStatus =
|
|
|
+ (TaskTrackerStatus) taskTrackers.get(trackerName);
|
|
|
+ if (oldStatus != null) {
|
|
|
+ totalMaps -= oldStatus.countMapTasks();
|
|
|
+ totalReduces -= oldStatus.countReduceTasks();
|
|
|
+ if (status == null) {
|
|
|
+ taskTrackers.remove(trackerName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (status != null) {
|
|
|
+ totalMaps += status.countMapTasks();
|
|
|
+ totalReduces += status.countReduceTasks();
|
|
|
+ taskTrackers.put(trackerName, status);
|
|
|
+ }
|
|
|
+ return oldStatus != null;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Process incoming heartbeat messages from the task trackers.
|
|
|
*/
|
|
@@ -496,26 +518,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
synchronized (taskTrackers) {
|
|
|
synchronized (trackerExpiryQueue) {
|
|
|
+ boolean seenBefore = updateTaskTrackerStatus(trackerName,
|
|
|
+ trackerStatus);
|
|
|
if (initialContact) {
|
|
|
// If it's first contact, then clear out any state hanging around
|
|
|
- if (taskTrackers.get(trackerName) != null) {
|
|
|
- TaskTrackerStatus oldStatus = (TaskTrackerStatus) taskTrackers.remove(trackerName);
|
|
|
- totalMaps -= oldStatus.countMapTasks();
|
|
|
- totalReduces -= oldStatus.countReduceTasks();
|
|
|
+ if (seenBefore) {
|
|
|
lostTaskTracker(trackerName);
|
|
|
}
|
|
|
} else {
|
|
|
// If not first contact, there should be some record of the tracker
|
|
|
- if (taskTrackers.get(trackerName) == null) {
|
|
|
+ if (!seenBefore) {
|
|
|
return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Store latest state. If first contact, then save current
|
|
|
- // state in expiry queue
|
|
|
- totalMaps += trackerStatus.countMapTasks();
|
|
|
- totalReduces += trackerStatus.countReduceTasks();
|
|
|
- taskTrackers.put(trackerName, trackerStatus);
|
|
|
if (initialContact) {
|
|
|
trackerExpiryQueue.add(trackerStatus);
|
|
|
}
|