|
@@ -219,20 +219,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// tracker has already been destroyed.
|
|
|
if (newProfile != null) {
|
|
|
if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
- // But save the state so that if at a later
|
|
|
- // point of time, we happen to hear from the
|
|
|
- // same TaskTracker, we can reinstate
|
|
|
- // the state
|
|
|
- ExpiredTaskTrackerState
|
|
|
- expTaskTrackerState =
|
|
|
- new ExpiredTaskTrackerState(
|
|
|
- leastRecent.getTrackerName());
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Saving state of TaskTracker " +
|
|
|
- leastRecent.getTrackerName());
|
|
|
- expiredTaskTrackerStates.put(
|
|
|
- leastRecent.getTrackerName(),
|
|
|
- expTaskTrackerState);
|
|
|
// Remove completely
|
|
|
updateTaskTrackerStatus(trackerName, null);
|
|
|
lostTaskTracker(leastRecent.getTrackerName(),
|
|
@@ -361,11 +347,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
Metrics.report(metricsRecord, "maps-completed",
|
|
|
++numMapTasksCompleted);
|
|
|
}
|
|
|
-
|
|
|
- synchronized void failedMap() {
|
|
|
- Metrics.report(metricsRecord, "maps-completed",
|
|
|
- --numMapTasksCompleted);
|
|
|
- }
|
|
|
|
|
|
synchronized void launchReduce() {
|
|
|
Metrics.report(metricsRecord, "reduces-launched",
|
|
@@ -376,11 +357,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
Metrics.report(metricsRecord, "reduces-completed",
|
|
|
++numReduceTasksCompleted);
|
|
|
}
|
|
|
-
|
|
|
- synchronized void failedReduce() {
|
|
|
- Metrics.report(metricsRecord, "reduces-completed",
|
|
|
- --numReduceTasksCompleted);
|
|
|
- }
|
|
|
|
|
|
synchronized void submitJob() {
|
|
|
Metrics.report(metricsRecord, "jobs-submitted",
|
|
@@ -451,7 +427,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
Thread initJobsThread = null;
|
|
|
ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
|
|
|
Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
|
|
|
- private TreeMap expiredTaskTrackerStates = new TreeMap();
|
|
|
|
|
|
/**
|
|
|
* It might seem like a bug to maintain a TreeSet of status objects,
|
|
@@ -624,36 +599,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
LOG.info("stopped all jobtracker services");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- boolean reinstateStateOfTaskTracker(String trackerName) {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Going to reinstate state of tasktracker " + trackerName);
|
|
|
- ExpiredTaskTrackerState e = (ExpiredTaskTrackerState)
|
|
|
- expiredTaskTrackerStates.get(trackerName);
|
|
|
- if (e == null) return false;
|
|
|
- Set taskset = e.getTaskSet();
|
|
|
- if (taskset == null) return true;
|
|
|
- for (Iterator it = taskset.iterator(); it.hasNext(); ) {
|
|
|
- String taskId = (String) it.next();
|
|
|
- TaskInProgress tip = e.getTIP(taskId);
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Going to recreate task entry for task " + taskId);
|
|
|
- //check whether the job is still running
|
|
|
- if (tip != null &&
|
|
|
- tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
|
|
|
- createTaskEntry(taskId, trackerName, tip);
|
|
|
- }
|
|
|
- ArrayList completedTasks = e.getCompletedTasks();
|
|
|
- for (int i = 0; i < completedTasks.size(); i++) {
|
|
|
- TaskStatus ts = (TaskStatus)completedTasks.get(i);
|
|
|
- TaskInProgress tip = (TaskInProgress)taskidToTIPMap.get(ts.getTaskId());
|
|
|
- if (tip == null) continue;
|
|
|
- JobInProgress j = tip.getJob();
|
|
|
- if (j != null && j.getStatus().getRunState() == JobStatus.RUNNING)
|
|
|
- j.updateTaskStatus(tip, ts, myMetrics);
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
|
|
|
///////////////////////////////////////////////////////
|
|
|
// Maintain lookup tables; called by JobInProgress
|
|
@@ -803,11 +748,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
} else {
|
|
|
// If not first contact, there should be some record of the tracker
|
|
|
if (!seenBefore) {
|
|
|
- if (!reinstateStateOfTaskTracker(trackerName))
|
|
|
- return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
|
|
|
- else
|
|
|
- trackerExpiryQueue.add(trackerStatus);
|
|
|
-
|
|
|
+ return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1256,67 +1197,4 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
Configuration conf=new Configuration();
|
|
|
startTracker(conf);
|
|
|
}
|
|
|
-
|
|
|
- private class ExpiredTaskTrackerState {
|
|
|
- //Map from taskId (assigned to a given tasktracker) to the taskId's TIP
|
|
|
- private TreeMap trackerTaskIdToTIPMap = new TreeMap();
|
|
|
- //completedTasks is an array list that contains the list of tasks that a
|
|
|
- //tasktracker successfully completed
|
|
|
- ArrayList completedTasks = new ArrayList();
|
|
|
-
|
|
|
- public ExpiredTaskTrackerState(String trackerId) {
|
|
|
- trackerTaskIdToTIPMap.clear();
|
|
|
- completedTasks.clear();
|
|
|
- TreeSet tasks = (TreeSet) trackerToTaskMap.get(trackerId);
|
|
|
- if (tasks == null) {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("This tasktracker has no tasks");
|
|
|
- return;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Task IDs that this tasktracker has: ");
|
|
|
- //We save the status of completed tasks only since TaskTrackers don't
|
|
|
- //send updates about completed tasks. We don't need to save the status
|
|
|
- //of other tasks since the TaskTracker will send the update along
|
|
|
- //with the heartbeat (whenever that happens).
|
|
|
- //Saving the status of completed tasks is required since the JobTracker
|
|
|
- //will mark all tasks that belonged to a given TaskTracker as failed
|
|
|
- //if that TaskTracker is lost. Now, if that same TaskTracker reports
|
|
|
- //in later on, we can simply re-mark the completed tasks (TIPs really)
|
|
|
- //it reported earlier about as "completed" and avoid unnecessary
|
|
|
- //re-run of those tasks.
|
|
|
- for (Iterator it = tasks.iterator(); it.hasNext(); ) {
|
|
|
- String taskId = (String) it.next();
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(taskId);
|
|
|
- TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
|
|
|
- if (tip !=null &&
|
|
|
- tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
|
|
|
- trackerTaskIdToTIPMap.put(taskId, tip);
|
|
|
- else continue;
|
|
|
- TaskStatus ts = tip.getTaskStatus(taskId);
|
|
|
- //ts could be null for a recently assigned task, in the case where,
|
|
|
- //the tasktracker hasn't yet reported status about that task
|
|
|
- if (ts == null) continue;
|
|
|
- if (tip.isComplete()) {
|
|
|
- TaskStatus saveTS = null;
|
|
|
- try {
|
|
|
- saveTS = (TaskStatus)ts.clone();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.fatal("Could not save TaskTracker state",e);
|
|
|
- }
|
|
|
- completedTasks.add(saveTS);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- public Set getTaskSet() {
|
|
|
- return trackerTaskIdToTIPMap.keySet();
|
|
|
- }
|
|
|
- public TaskInProgress getTIP(String taskId) {
|
|
|
- return (TaskInProgress)trackerTaskIdToTIPMap.get(taskId);
|
|
|
- }
|
|
|
- public ArrayList getCompletedTasks() {
|
|
|
- return completedTasks;
|
|
|
- }
|
|
|
- }
|
|
|
}
|