|
@@ -652,15 +652,25 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
Map.Entry<String, ArrayList<JobInProgress>> entry =
|
|
Map.Entry<String, ArrayList<JobInProgress>> entry =
|
|
userToJobsMapIt.next();
|
|
userToJobsMapIt.next();
|
|
ArrayList<JobInProgress> userJobs = entry.getValue();
|
|
ArrayList<JobInProgress> userJobs = entry.getValue();
|
|
|
|
+
|
|
|
|
+ // Remove retiredJobs from userToJobsMap to ensure we don't
|
|
|
|
+ // retire them multiple times
|
|
Iterator<JobInProgress> it = userJobs.iterator();
|
|
Iterator<JobInProgress> it = userJobs.iterator();
|
|
- while (it.hasNext() &&
|
|
|
|
- userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
|
|
|
|
|
|
+ while (it.hasNext()) {
|
|
JobInProgress jobUser = it.next();
|
|
JobInProgress jobUser = it.next();
|
|
if (retiredJobs.contains(jobUser)) {
|
|
if (retiredJobs.contains(jobUser)) {
|
|
LOG.info("Removing from userToJobsMap: " +
|
|
LOG.info("Removing from userToJobsMap: " +
|
|
jobUser.getJobID());
|
|
jobUser.getJobID());
|
|
it.remove();
|
|
it.remove();
|
|
- } else if (minConditionToRetire(jobUser, now)) {
|
|
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Now, check for #jobs per user
|
|
|
|
+ it = userJobs.iterator();
|
|
|
|
+ while (it.hasNext() &&
|
|
|
|
+ userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
|
|
|
|
+ JobInProgress jobUser = it.next();
|
|
|
|
+ if (minConditionToRetire(jobUser, now)) {
|
|
LOG.info("User limit exceeded. Marking job: " +
|
|
LOG.info("User limit exceeded. Marking job: " +
|
|
jobUser.getJobID() + " for retire.");
|
|
jobUser.getJobID() + " for retire.");
|
|
retiredJobs.add(jobUser);
|
|
retiredJobs.add(jobUser);
|