|
@@ -46,6 +46,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
static float PAD_FRACTION;
|
|
|
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
|
|
|
|
|
|
+ /**
|
|
|
+ * The maximum no. of 'completed' (successful/failed/killed)
|
|
|
+ * jobs kept in memory per-user.
|
|
|
+ */
|
|
|
+ static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
|
|
|
+
|
|
|
/**
|
|
|
* Used for formatting the id numbers
|
|
|
*/
|
|
@@ -215,36 +221,45 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
//
|
|
|
// Loop through all expired items in the queue
|
|
|
//
|
|
|
- synchronized (taskTrackers) {
|
|
|
+ // Need to lock the JobTracker here since we are
|
|
|
+ // manipulating it's data-structures via
|
|
|
+ // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
|
|
|
+ // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
|
|
|
+ // Also need to lock JobTracker before locking 'taskTracker' &
|
|
|
+ // 'trackerExpiryQueue' to prevent deadlock:
|
|
|
+ // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)}
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ synchronized (taskTrackers) {
|
|
|
synchronized (trackerExpiryQueue) {
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- TaskTrackerStatus leastRecent = null;
|
|
|
- while ((trackerExpiryQueue.size() > 0) &&
|
|
|
- ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
|
|
|
- (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
|
|
|
-
|
|
|
- // Remove profile from head of queue
|
|
|
- trackerExpiryQueue.remove(leastRecent);
|
|
|
- String trackerName = leastRecent.getTrackerName();
|
|
|
-
|
|
|
- // Figure out if last-seen time should be updated, or if tracker is dead
|
|
|
- TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
|
|
|
- // Items might leave the taskTracker set through other means; the
|
|
|
- // status stored in 'taskTrackers' might be null, which means the
|
|
|
- // tracker has already been destroyed.
|
|
|
- if (newProfile != null) {
|
|
|
- if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
- // Remove completely
|
|
|
- updateTaskTrackerStatus(trackerName, null);
|
|
|
- lostTaskTracker(leastRecent.getTrackerName(),
|
|
|
- leastRecent.getHost());
|
|
|
- } else {
|
|
|
- // Update time by inserting latest profile
|
|
|
- trackerExpiryQueue.add(newProfile);
|
|
|
- }
|
|
|
- }
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ TaskTrackerStatus leastRecent = null;
|
|
|
+ while ((trackerExpiryQueue.size() > 0) &&
|
|
|
+ ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
|
|
|
+ (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
|
|
|
+
|
|
|
+ // Remove profile from head of queue
|
|
|
+ trackerExpiryQueue.remove(leastRecent);
|
|
|
+ String trackerName = leastRecent.getTrackerName();
|
|
|
+
|
|
|
+ // Figure out if last-seen time should be updated, or if tracker is dead
|
|
|
+ TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
|
|
|
+ // Items might leave the taskTracker set through other means; the
|
|
|
+ // status stored in 'taskTrackers' might be null, which means the
|
|
|
+ // tracker has already been destroyed.
|
|
|
+ if (newProfile != null) {
|
|
|
+ if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
|
+ // Remove completely
|
|
|
+ updateTaskTrackerStatus(trackerName, null);
|
|
|
+ lostTaskTracker(leastRecent.getTrackerName(),
|
|
|
+ leastRecent.getHost());
|
|
|
+ } else {
|
|
|
+ // Update time by inserting latest profile
|
|
|
+ trackerExpiryQueue.add(newProfile);
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
} catch (Exception t) {
|
|
|
LOG.error("Tracker Expiry Thread got exception: " +
|
|
@@ -289,10 +304,26 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
if (job.getStatus().getRunState() != JobStatus.RUNNING &&
|
|
|
job.getStatus().getRunState() != JobStatus.PREP &&
|
|
|
(job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
|
|
|
+ // Ok, this call to removeTaskEntries
|
|
|
+ // is dangerous in some very very obscure
|
|
|
+ // cases; e.g. when job completed, exceeded
|
|
|
+ // RETIRE_JOB_INTERVAL time-limit and yet
|
|
|
+ // some task (taskid) wasn't complete!
|
|
|
+ removeJobTasks(job);
|
|
|
+
|
|
|
it.remove();
|
|
|
-
|
|
|
+ synchronized (userToJobsMap) {
|
|
|
+ ArrayList<JobInProgress> userJobs =
|
|
|
+ userToJobsMap.get(job.getProfile().getUser());
|
|
|
+ synchronized (userJobs) {
|
|
|
+ userJobs.remove(job);
|
|
|
+ }
|
|
|
+ }
|
|
|
jobInitQueue.remove(job);
|
|
|
jobsByArrival.remove(job);
|
|
|
+
|
|
|
+ LOG.info("Retired job with id: '" +
|
|
|
+ job.getProfile().getJobId() + "'");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -418,6 +449,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
TreeMap jobs = new TreeMap();
|
|
|
Vector jobsByArrival = new Vector();
|
|
|
|
|
|
+ // (user -> list of JobInProgress)
|
|
|
+ TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
|
|
|
+
|
|
|
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
|
|
|
Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
|
|
|
|
|
@@ -427,8 +461,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// (trackerID->TreeSet of taskids running at that tracker)
|
|
|
TreeMap trackerToTaskMap = new TreeMap();
|
|
|
|
|
|
- // (trackerID --> last sent HeartBeatResponseID)
|
|
|
- Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
|
|
|
+ // (trackerID -> TreeSet of completed taskids running at that tracker)
|
|
|
+ TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
|
|
|
+
|
|
|
+ // (trackerID --> last sent HeartBeatResponse)
|
|
|
+ Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap =
|
|
|
+ new TreeMap();
|
|
|
|
|
|
//
|
|
|
// Watch and expire TaskTracker objects using these structures.
|
|
@@ -644,18 +682,181 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// taskid --> TIP
|
|
|
taskidToTIPMap.put(taskid, tip);
|
|
|
}
|
|
|
+
|
|
|
void removeTaskEntry(String taskid) {
|
|
|
// taskid --> tracker
|
|
|
String tracker = (String) taskidToTrackerMap.remove(taskid);
|
|
|
|
|
|
// tracker --> taskid
|
|
|
- TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
|
|
|
- if (trackerSet != null) {
|
|
|
- trackerSet.remove(taskid);
|
|
|
+ if (tracker != null) {
|
|
|
+ TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
|
|
|
+ if (trackerSet != null) {
|
|
|
+ trackerSet.remove(taskid);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// taskid --> TIP
|
|
|
taskidToTIPMap.remove(taskid);
|
|
|
+
|
|
|
+ LOG.debug("Removing task '" + taskid + "'");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Mark a 'task' for removal later.
|
|
|
+ * This function assumes that the JobTracker is locked on entry.
|
|
|
+ *
|
|
|
+ * @param taskTracker the tasktracker at which the 'task' was running
|
|
|
+ * @param taskid completed (success/failure/killed) task
|
|
|
+ */
|
|
|
+ void markCompletedTaskAttempt(String taskTracker, String taskid) {
|
|
|
+ // tracker --> taskid
|
|
|
+ TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
|
|
|
+ if (taskset == null) {
|
|
|
+ taskset = new TreeSet();
|
|
|
+ trackerToMarkedTasksMap.put(taskTracker, taskset);
|
|
|
+ }
|
|
|
+ taskset.add(taskid);
|
|
|
+
|
|
|
+ LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Mark all 'non-running' jobs of the job for pruning.
|
|
|
+ * This function assumes that the JobTracker is locked on entry.
|
|
|
+ *
|
|
|
+ * @param job the completed job
|
|
|
+ */
|
|
|
+ void markCompletedJob(JobInProgress job) {
|
|
|
+ for (TaskInProgress tip : job.getMapTasks()) {
|
|
|
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
|
|
|
+ taskStatus.getTaskId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (TaskInProgress tip : job.getReduceTasks()) {
|
|
|
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
|
|
|
+ taskStatus.getTaskId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove all 'marked' tasks running on a given {@link TaskTracker}
|
|
|
+ * from the {@link JobTracker}'s data-structures.
|
|
|
+ * This function assumes that the JobTracker is locked on entry.
|
|
|
+ *
|
|
|
+ * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
|
|
|
+ */
|
|
|
+ private void removeMarkedTasks(String taskTracker) {
|
|
|
+ // Purge all the 'marked' tasks which were running at taskTracker
|
|
|
+ TreeSet<String> markedTaskSet =
|
|
|
+ (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
|
|
|
+ if (markedTaskSet != null) {
|
|
|
+ for (String taskid : markedTaskSet) {
|
|
|
+ removeTaskEntry(taskid);
|
|
|
+ LOG.info("Removed completed task '" + taskid + "' from '" +
|
|
|
+ taskTracker + "'");
|
|
|
+ }
|
|
|
+ // Clear
|
|
|
+ trackerToMarkedTasksMap.remove(taskTracker);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Call {@link #removeTaskEntry(String)} for each of the
|
|
|
+ * job's tasks.
|
|
|
+ * When the JobTracker is retiring the long-completed
|
|
|
+ * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
|
|
|
+ * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs
|
|
|
+ * has been reached, we can afford to nuke all it's tasks; a little
|
|
|
+ * unsafe, but practically feasible.
|
|
|
+ *
|
|
|
+ * @param job the job about to be 'retired'
|
|
|
+ */
|
|
|
+ synchronized private void removeJobTasks(JobInProgress job) {
|
|
|
+ for (TaskInProgress tip : job.getMapTasks()) {
|
|
|
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
+ removeTaskEntry(taskStatus.getTaskId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (TaskInProgress tip : job.getReduceTasks()) {
|
|
|
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
+ removeTaskEntry(taskStatus.getTaskId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Safe clean-up all data structures at the end of the
|
|
|
+ * job (success/failure/killed).
|
|
|
+ * Here we also ensure that for a given user we maintain
|
|
|
+ * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs
|
|
|
+ * on the JobTracker.
|
|
|
+ *
|
|
|
+ * @param job completed job.
|
|
|
+ */
|
|
|
+ synchronized void finalizeJob(JobInProgress job) {
|
|
|
+ // Mark the 'non-running' tasks for pruning
|
|
|
+ markCompletedJob(job);
|
|
|
+
|
|
|
+ // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
|
|
|
+ // in memory; information about the purged jobs is available via
|
|
|
+ // JobHistory.
|
|
|
+ synchronized (jobs) {
|
|
|
+ synchronized (jobsByArrival) {
|
|
|
+ synchronized (jobInitQueue) {
|
|
|
+ String jobUser = job.getProfile().getUser();
|
|
|
+ synchronized (userToJobsMap) {
|
|
|
+ ArrayList<JobInProgress> userJobs =
|
|
|
+ userToJobsMap.get(jobUser);
|
|
|
+ synchronized (userJobs) {
|
|
|
+ while (userJobs.size() >
|
|
|
+ MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
|
|
|
+ JobInProgress rjob = userJobs.get(0);
|
|
|
+
|
|
|
+ // Do not delete 'current'
|
|
|
+ // finished job just yet.
|
|
|
+ if (rjob == job) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Cleanup all datastructures
|
|
|
+ int rjobRunState =
|
|
|
+ rjob.getStatus().getRunState();
|
|
|
+ if (rjobRunState == JobStatus.SUCCEEDED ||
|
|
|
+ rjobRunState == JobStatus.FAILED) {
|
|
|
+ // Ok, this call to removeTaskEntries
|
|
|
+ // is dangerous is some very very obscure
|
|
|
+ // cases; e.g. when rjob completed, hit
|
|
|
+ // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
|
|
|
+ // limit and yet some task (taskid)
|
|
|
+ // wasn't complete!
|
|
|
+ removeJobTasks(rjob);
|
|
|
+
|
|
|
+ userJobs.remove(0);
|
|
|
+ jobs.remove(rjob.getProfile().getJobId());
|
|
|
+ jobInitQueue.remove(rjob);
|
|
|
+ jobsByArrival.remove(rjob);
|
|
|
+
|
|
|
+ LOG.info("Retired job with id: '" +
|
|
|
+ rjob.getProfile().getJobId() + "'");
|
|
|
+ } else {
|
|
|
+ // Do not remove jobs that aren't complete.
|
|
|
+ // Stop here, and let the next pass take
|
|
|
+ // care of purging jobs.
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
///////////////////////////////////////////////////////
|
|
@@ -736,26 +937,46 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
|
|
|
boolean initialContact, boolean acceptNewTasks, short responseId)
|
|
|
throws IOException {
|
|
|
- LOG.debug("Got heartbeat from: " + status.getTrackerName() +
|
|
|
+ LOG.debug("Got heartbeat from: " + status.getTrackerName() +
|
|
|
" (initialContact: " + initialContact +
|
|
|
" acceptNewTasks: " + acceptNewTasks + ")" +
|
|
|
" with responseId: " + responseId);
|
|
|
|
|
|
// First check if the last heartbeat response got through
|
|
|
String trackerName = status.getTrackerName();
|
|
|
- Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
|
|
|
-
|
|
|
- short newResponseId = (short)(responseId + 1);
|
|
|
- if (!initialContact && oldResponseId != null &&
|
|
|
- oldResponseId.shortValue() != responseId) {
|
|
|
- newResponseId = oldResponseId.shortValue();
|
|
|
+ HeartbeatResponse prevHeartbeatResponse =
|
|
|
+ trackerToHeartbeatResponseMap.get(trackerName);
|
|
|
+
|
|
|
+ if (initialContact != true) {
|
|
|
+ // If this isn't the 'initial contact' from the tasktracker,
|
|
|
+ // there is something seriously wrong if the JobTracker has
|
|
|
+ // no record of the 'previous heartbeat'; if so, ask the
|
|
|
+ // tasktracker to re-initialize itself.
|
|
|
+ if (prevHeartbeatResponse == null) {
|
|
|
+ LOG.warn("Serious problem, cannot find record of 'previous' " +
|
|
|
+ "heartbeat for '" + trackerName +
|
|
|
+ "'; reinitializing the tasktracker");
|
|
|
+ return new HeartbeatResponse(responseId,
|
|
|
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ // It is completely safe to ignore a 'duplicate' from a tracker
|
|
|
+ // since we are guaranteed that the tracker sends the same
|
|
|
+ // 'heartbeat' when rpcs are lost.
|
|
|
+ // {@see TaskTracker.transmitHeartbeat()}
|
|
|
+ if (prevHeartbeatResponse.getResponseId() != responseId) {
|
|
|
+ LOG.info("Ignoring 'duplicate' heartbeat from '" +
|
|
|
+ trackerName + "'");
|
|
|
+ return prevHeartbeatResponse;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Process this heartbeat
|
|
|
- if (!processHeartbeat(status, initialContact,
|
|
|
- (newResponseId != responseId))) {
|
|
|
- if (oldResponseId != null) {
|
|
|
- trackerToHeartbeatResponseIDMap.remove(trackerName);
|
|
|
+ short newResponseId = (short)(responseId + 1);
|
|
|
+ if (!processHeartbeat(status, initialContact)) {
|
|
|
+ if (prevHeartbeatResponse != null) {
|
|
|
+ trackerToHeartbeatResponseMap.remove(trackerName);
|
|
|
}
|
|
|
|
|
|
return new HeartbeatResponse(newResponseId,
|
|
@@ -784,12 +1005,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
response.setActions(
|
|
|
actions.toArray(new TaskTrackerAction[actions.size()]));
|
|
|
|
|
|
- // Update the trackerToHeartbeatResponseIDMap
|
|
|
- if (newResponseId != responseId) {
|
|
|
- trackerToHeartbeatResponseIDMap.put(trackerName,
|
|
|
- new Short(newResponseId));
|
|
|
- }
|
|
|
+ // Update the trackerToHeartbeatResponseMap
|
|
|
+ trackerToHeartbeatResponseMap.put(trackerName, response);
|
|
|
|
|
|
+ // Done processing the hearbeat, now remove 'marked' tasks
|
|
|
+ removeMarkedTasks(trackerName);
|
|
|
+
|
|
|
return response;
|
|
|
}
|
|
|
|
|
@@ -824,12 +1045,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* Process incoming heartbeat messages from the task trackers.
|
|
|
*/
|
|
|
private synchronized boolean processHeartbeat(
|
|
|
- TaskTrackerStatus trackerStatus,
|
|
|
- boolean initialContact, boolean updateStatusTimestamp) {
|
|
|
+ TaskTrackerStatus trackerStatus, boolean initialContact) {
|
|
|
String trackerName = trackerStatus.getTrackerName();
|
|
|
- if (initialContact || updateStatusTimestamp) {
|
|
|
- trackerStatus.setLastSeen(System.currentTimeMillis());
|
|
|
- }
|
|
|
+ trackerStatus.setLastSeen(System.currentTimeMillis());
|
|
|
|
|
|
synchronized (taskTrackers) {
|
|
|
synchronized (trackerExpiryQueue) {
|
|
@@ -857,7 +1075,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
|
|
|
updateTaskStatuses(trackerStatus);
|
|
|
- //LOG.info("Got heartbeat from "+trackerName);
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -1028,7 +1246,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
killList.add(new KillTaskAction(killTaskId));
|
|
|
LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
|
|
|
} else {
|
|
|
- //killTasksList.add(new KillJobAction(taskId));
|
|
|
String killJobId = tip.getJob().getStatus().getJobId();
|
|
|
killJobIds.add(killJobId);
|
|
|
}
|
|
@@ -1051,14 +1268,28 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* map task outputs.
|
|
|
*/
|
|
|
public synchronized MapOutputLocation[]
|
|
|
- locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
|
|
|
- ArrayList result = new ArrayList(mapTasksNeeded.length);
|
|
|
+ locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce)
|
|
|
+ throws IOException {
|
|
|
+ // Check to make sure that the job hasn't 'completed'.
|
|
|
JobInProgress job = getJob(jobId);
|
|
|
+ if (job.status.getRunState() != JobStatus.RUNNING) {
|
|
|
+ return new MapOutputLocation[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ ArrayList result = new ArrayList(mapTasksNeeded.length);
|
|
|
for (int i = 0; i < mapTasksNeeded.length; i++) {
|
|
|
TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
|
|
|
if (status != null) {
|
|
|
String trackerId =
|
|
|
(String) taskidToTrackerMap.get(status.getTaskId());
|
|
|
+ // Safety check, if we can't find the taskid in
|
|
|
+ // taskidToTrackerMap and job isn't 'running', then just
|
|
|
+ // return an empty array
|
|
|
+ if (trackerId == null &&
|
|
|
+ job.status.getRunState() != JobStatus.RUNNING) {
|
|
|
+ return new MapOutputLocation[0];
|
|
|
+ }
|
|
|
+
|
|
|
TaskTrackerStatus tracker;
|
|
|
synchronized (taskTrackers) {
|
|
|
tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
|
|
@@ -1108,10 +1339,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized (jobs) {
|
|
|
synchronized (jobsByArrival) {
|
|
|
synchronized (jobInitQueue) {
|
|
|
- jobs.put(job.getProfile().getJobId(), job);
|
|
|
- jobsByArrival.add(job);
|
|
|
- jobInitQueue.add(job);
|
|
|
- jobInitQueue.notifyAll();
|
|
|
+ synchronized (userToJobsMap) {
|
|
|
+ jobs.put(job.getProfile().getJobId(), job);
|
|
|
+ String jobUser = job.getProfile().getUser();
|
|
|
+ if (!userToJobsMap.containsKey(jobUser)) {
|
|
|
+ userToJobsMap.put(jobUser,
|
|
|
+ new ArrayList<JobInProgress>());
|
|
|
+ }
|
|
|
+ ArrayList<JobInProgress> userJobs =
|
|
|
+ userToJobsMap.get(jobUser);
|
|
|
+ synchronized (userJobs) {
|
|
|
+ userJobs.add(job);
|
|
|
+ }
|
|
|
+ jobsByArrival.add(job);
|
|
|
+ jobInitQueue.add(job);
|
|
|
+ jobInitQueue.notifyAll();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1271,8 +1514,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* jobs that might be affected.
|
|
|
*/
|
|
|
void updateTaskStatuses(TaskTrackerStatus status) {
|
|
|
- for (Iterator it = status.taskReports(); it.hasNext(); ) {
|
|
|
- TaskStatus report = (TaskStatus) it.next();
|
|
|
+ for (TaskStatus report : status.getTaskReports()) {
|
|
|
report.setTaskTracker(status.getTrackerName());
|
|
|
String taskId = report.getTaskId();
|
|
|
TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
|
|
@@ -1310,8 +1552,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
TaskStatus.Phase.MAP, hostname, trackerName,
|
|
|
myMetrics);
|
|
|
}
|
|
|
+ } else if (!tip.isMapTask() && tip.isComplete()) {
|
|
|
+ // Completed 'reduce' task, not failed;
|
|
|
+ // only removed from data-structures.
|
|
|
+ markCompletedTaskAttempt(trackerName, taskId);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // Purge 'marked' tasks, needs to be done
|
|
|
+ // here to prevent hanging references!
|
|
|
+ removeMarkedTasks(trackerName);
|
|
|
}
|
|
|
}
|
|
|
|