|
@@ -824,8 +824,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// Apply the final (job-level) updates
|
|
|
JobStatusChangeEvent event = updateJob(jip, job);
|
|
|
|
|
|
- // Update the job listeners
|
|
|
- updateJobInProgressListeners(event);
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ // Update the job listeners
|
|
|
+ updateJobInProgressListeners(event);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -943,10 +945,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// This means that the this is a FAILED events
|
|
|
TaskAttemptID id = TaskAttemptID.forName(cause);
|
|
|
TaskStatus status = tip.getTaskStatus(id);
|
|
|
- // This will add the tip failed event in the new log
|
|
|
- tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
|
|
|
- status.getPhase(), status.getRunState(),
|
|
|
- status.getTaskTracker());
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ // This will add the tip failed event in the new log
|
|
|
+ tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
|
|
|
+ status.getPhase(), status.getRunState(),
|
|
|
+ status.getTaskTracker());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -996,23 +1000,30 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
0 , 0, 0);
|
|
|
ttStatus.setLastSeen(System.currentTimeMillis());
|
|
|
|
|
|
- // IV. Register a new tracker
|
|
|
- boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
|
|
|
- if (!isTrackerRegistered) {
|
|
|
- markTracker(trackerName); // add the tracker to recovery-manager
|
|
|
- addNewTracker(ttStatus);
|
|
|
- }
|
|
|
-
|
|
|
- // V. Update the tracker status
|
|
|
- // This will update the meta info of the jobtracker and also add the
|
|
|
- // tracker status if missing i.e register it
|
|
|
- updateTaskTrackerStatus(trackerName, ttStatus);
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ synchronized (taskTrackers) {
|
|
|
+ synchronized (trackerExpiryQueue) {
|
|
|
+ // IV. Register a new tracker
|
|
|
+ boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
|
|
|
+ if (!isTrackerRegistered) {
|
|
|
+ markTracker(trackerName); // add the tracker to recovery-manager
|
|
|
+ addNewTracker(ttStatus);
|
|
|
+ }
|
|
|
|
|
|
- // VI. Register the attempt
|
|
|
- // a) In the job
|
|
|
- job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
|
|
|
- // b) In the tip
|
|
|
- tip.updateStatus(taskStatus);
|
|
|
+ // V. Update the tracker status
|
|
|
+ // This will update the meta info of the jobtracker and also add the
|
|
|
+ // tracker status if missing i.e register it
|
|
|
+ updateTaskTrackerStatus(trackerName, ttStatus);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Register the attempt with job and tip, under JobTracker lock.
|
|
|
+ // Since, as of today they are atomic through heartbeat.
|
|
|
+ // VI. Register the attempt
|
|
|
+ // a) In the job
|
|
|
+ job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
|
|
|
+ // b) In the tip
|
|
|
+ tip.updateStatus(taskStatus);
|
|
|
+ }
|
|
|
|
|
|
// VII. Make an entry in the launched tasks
|
|
|
expireLaunchingTasks.addNewTask(attemptId);
|
|
@@ -1060,8 +1071,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
taskStatus.setCounters(counter);
|
|
|
|
|
|
- // II. Replay the status
|
|
|
- job.updateTaskStatus(tip, taskStatus);
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ // II. Replay the status
|
|
|
+ job.updateTaskStatus(tip, taskStatus);
|
|
|
+ }
|
|
|
|
|
|
// III. Prevent the task from expiry
|
|
|
expireLaunchingTasks.removeTask(attemptId);
|
|
@@ -1097,8 +1110,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
String diagInfo = attempt.get(Keys.ERROR);
|
|
|
taskStatus.setDiagnosticInfo(diagInfo); // diag info
|
|
|
|
|
|
- // II. Update the task status
|
|
|
- job.updateTaskStatus(tip, taskStatus);
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ // II. Update the task status
|
|
|
+ job.updateTaskStatus(tip, taskStatus);
|
|
|
+ }
|
|
|
|
|
|
// III. Prevent the task from expiry
|
|
|
expireLaunchingTasks.removeTask(attemptId);
|
|
@@ -1221,22 +1236,24 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
hasRecovered = true;
|
|
|
|
|
|
// III. Finalize the recovery
|
|
|
- // Make sure that the tracker statuses in the expiry-tracker queue
|
|
|
- // are updated
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- int size = trackerExpiryQueue.size();
|
|
|
- for (int i = 0; i < size ; ++i) {
|
|
|
- // Get the first status
|
|
|
- TaskTrackerStatus status = trackerExpiryQueue.first();
|
|
|
+ synchronized (trackerExpiryQueue) {
|
|
|
+ // Make sure that the tracker statuses in the expiry-tracker queue
|
|
|
+ // are updated
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ int size = trackerExpiryQueue.size();
|
|
|
+ for (int i = 0; i < size ; ++i) {
|
|
|
+ // Get the first status
|
|
|
+ TaskTrackerStatus status = trackerExpiryQueue.first();
|
|
|
|
|
|
- // Remove it
|
|
|
- trackerExpiryQueue.remove(status);
|
|
|
+ // Remove it
|
|
|
+ trackerExpiryQueue.remove(status);
|
|
|
|
|
|
- // Set the new time
|
|
|
- status.setLastSeen(now);
|
|
|
+ // Set the new time
|
|
|
+ status.setLastSeen(now);
|
|
|
|
|
|
- // Add back to get the sorted list
|
|
|
- trackerExpiryQueue.add(status);
|
|
|
+ // Add back to get the sorted list
|
|
|
+ trackerExpiryQueue.add(status);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
LOG.info("Restoration complete");
|
|
@@ -2181,8 +2198,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
/**
|
|
|
* Adds a new node to the jobtracker. It involves adding it to the expiry
|
|
|
* thread and adding it for resolution
|
|
|
+ *
|
|
|
+ * Assuming trackerExpiryQueue is locked on entry
|
|
|
+ *
|
|
|
* @param status Task Tracker's status
|
|
|
- * @param resolveInline Should the resolution happen inline?
|
|
|
*/
|
|
|
private void addNewTracker(TaskTrackerStatus status) {
|
|
|
trackerExpiryQueue.add(status);
|
|
@@ -2266,6 +2285,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
// Update the listeners about the job
|
|
|
+ // Assuming JobTracker is locked on entry.
|
|
|
private void updateJobInProgressListeners(JobChangeEvent event) {
|
|
|
for (JobInProgressListener listener : jobInProgressListeners) {
|
|
|
listener.jobUpdated(event);
|