|
@@ -353,15 +353,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
jobs.remove(job.getProfile().getJobId());
|
|
|
jobInitQueue.remove(job);
|
|
|
jobsByArrival.remove(job);
|
|
|
+ String jobUser = job.getProfile().getUser();
|
|
|
synchronized (userToJobsMap) {
|
|
|
ArrayList<JobInProgress> userJobs =
|
|
|
- userToJobsMap.get(job.getProfile().getUser());
|
|
|
+ userToJobsMap.get(jobUser);
|
|
|
synchronized (userJobs) {
|
|
|
userJobs.remove(job);
|
|
|
}
|
|
|
+ if (userJobs.isEmpty()) {
|
|
|
+ userToJobsMap.remove(jobUser);
|
|
|
+ }
|
|
|
}
|
|
|
LOG.info("Retired job with id: '" +
|
|
|
- job.getProfile().getJobId() + "'");
|
|
|
+ job.getProfile().getJobId() + "' of user '" +
|
|
|
+ jobUser + "'");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -895,11 +900,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized (jobs) {
|
|
|
synchronized (jobsByArrival) {
|
|
|
synchronized (jobInitQueue) {
|
|
|
- String jobUser = job.getProfile().getUser();
|
|
|
synchronized (userToJobsMap) {
|
|
|
+ String jobUser = job.getProfile().getUser();
|
|
|
+ if (!userToJobsMap.containsKey(jobUser)) {
|
|
|
+ userToJobsMap.put(jobUser,
|
|
|
+ new ArrayList<JobInProgress>());
|
|
|
+ }
|
|
|
ArrayList<JobInProgress> userJobs =
|
|
|
userToJobsMap.get(jobUser);
|
|
|
synchronized (userJobs) {
|
|
|
+ // Add the currently completed 'job'
|
|
|
+ userJobs.add(job);
|
|
|
+
|
|
|
+ // Check if we need to retire some jobs of this user
|
|
|
while (userJobs.size() >
|
|
|
MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
|
|
|
JobInProgress rjob = userJobs.get(0);
|
|
@@ -929,7 +942,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
jobsByArrival.remove(rjob);
|
|
|
|
|
|
LOG.info("Retired job with id: '" +
|
|
|
- rjob.getProfile().getJobId() + "'");
|
|
|
+ rjob.getProfile().getJobId() + "' of user: '" +
|
|
|
+ jobUser + "'");
|
|
|
} else {
|
|
|
// Do not remove jobs that aren't complete.
|
|
|
// Stop here, and let the next pass take
|
|
@@ -938,6 +952,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (userJobs.isEmpty()) {
|
|
|
+ userToJobsMap.remove(jobUser);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1424,22 +1441,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized (jobs) {
|
|
|
synchronized (jobsByArrival) {
|
|
|
synchronized (jobInitQueue) {
|
|
|
- synchronized (userToJobsMap) {
|
|
|
- jobs.put(job.getProfile().getJobId(), job);
|
|
|
- String jobUser = job.getProfile().getUser();
|
|
|
- if (!userToJobsMap.containsKey(jobUser)) {
|
|
|
- userToJobsMap.put(jobUser,
|
|
|
- new ArrayList<JobInProgress>());
|
|
|
- }
|
|
|
- ArrayList<JobInProgress> userJobs =
|
|
|
- userToJobsMap.get(jobUser);
|
|
|
- synchronized (userJobs) {
|
|
|
- userJobs.add(job);
|
|
|
- }
|
|
|
- jobsByArrival.add(job);
|
|
|
- jobInitQueue.add(job);
|
|
|
- jobInitQueue.notifyAll();
|
|
|
- }
|
|
|
+ jobs.put(job.getProfile().getJobId(), job);
|
|
|
+ jobsByArrival.add(job);
|
|
|
+ jobInitQueue.add(job);
|
|
|
+ jobInitQueue.notifyAll();
|
|
|
}
|
|
|
}
|
|
|
}
|