|
@@ -113,32 +113,32 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
|
|
|
long now = System.currentTimeMillis();
|
|
|
LOG.debug("Starting launching task sweep");
|
|
|
- synchronized (launchingTasks) {
|
|
|
- Iterator itr = launchingTasks.entrySet().iterator();
|
|
|
- while (itr.hasNext()) {
|
|
|
- Map.Entry pair = (Map.Entry) itr.next();
|
|
|
- String taskId = (String) pair.getKey();
|
|
|
- long age = now - ((Long) pair.getValue()).longValue();
|
|
|
- LOG.info(taskId + " is " + age + " ms debug.");
|
|
|
- if (age > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
- LOG.info("Launching task " + taskId + " timed out.");
|
|
|
- TaskInProgress tip = null;
|
|
|
- synchronized (JobTracker.this) {
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ synchronized (launchingTasks) {
|
|
|
+ Iterator itr = launchingTasks.entrySet().iterator();
|
|
|
+ while (itr.hasNext()) {
|
|
|
+ Map.Entry pair = (Map.Entry) itr.next();
|
|
|
+ String taskId = (String) pair.getKey();
|
|
|
+ long age = now - ((Long) pair.getValue()).longValue();
|
|
|
+ LOG.info(taskId + " is " + age + " ms debug.");
|
|
|
+ if (age > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
+ LOG.info("Launching task " + taskId + " timed out.");
|
|
|
+ TaskInProgress tip = null;
|
|
|
tip = (TaskInProgress) taskidToTIPMap.get(taskId);
|
|
|
+ if (tip != null) {
|
|
|
+ JobInProgress job = tip.getJob();
|
|
|
+ String trackerName = getAssignedTracker(taskId);
|
|
|
+ TaskTrackerStatus trackerStatus =
|
|
|
+ getTaskTracker(trackerName);
|
|
|
+ job.failedTask(tip, taskId, "Error launching task",
|
|
|
+ trackerStatus.getHost(), trackerName);
|
|
|
+ }
|
|
|
+ itr.remove();
|
|
|
+ } else {
|
|
|
+ // the tasks are sorted by start time, so once we find
|
|
|
+ // one that we want to keep, we are done for this cycle.
|
|
|
+ break;
|
|
|
}
|
|
|
- if (tip != null) {
|
|
|
- JobInProgress job = tip.getJob();
|
|
|
- String trackerName = getAssignedTracker(taskId);
|
|
|
- TaskTrackerStatus trackerStatus =
|
|
|
- getTaskTracker(trackerName);
|
|
|
- job.failedTask(tip, taskId, "Error launching task",
|
|
|
- trackerStatus.getHost(), trackerName);
|
|
|
- }
|
|
|
- itr.remove();
|
|
|
- } else {
|
|
|
- // the tasks are sorted by start time, so once we find
|
|
|
- // one that we want to keep, we are done for this cycle.
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
}
|