|
@@ -293,41 +293,42 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
while (shouldRun) {
|
|
|
try {
|
|
|
Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
|
|
|
-
|
|
|
- synchronized (jobs) {
|
|
|
- synchronized (jobsByArrival) {
|
|
|
+ List<JobInProgress> retiredJobs = new ArrayList();
|
|
|
+ long retireBefore = System.currentTimeMillis() -
|
|
|
+ RETIRE_JOB_INTERVAL;
|
|
|
+ synchronized (jobsByArrival) {
|
|
|
+ for(JobInProgress job: jobsByArrival) {
|
|
|
+ if (job.getStatus().getRunState() != JobStatus.RUNNING &&
|
|
|
+ job.getStatus().getRunState() != JobStatus.PREP &&
|
|
|
+ (job.getFinishTime() < retireBefore)) {
|
|
|
+ retiredJobs.add(job);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!retiredJobs.isEmpty()) {
|
|
|
+ synchronized (JobTracker.this) {
|
|
|
+ synchronized (jobs) {
|
|
|
+ synchronized (jobsByArrival) {
|
|
|
synchronized (jobInitQueue) {
|
|
|
- for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) {
|
|
|
- String jobid = (String) it.next();
|
|
|
- JobInProgress job = (JobInProgress) jobs.get(jobid);
|
|
|
-
|
|
|
- if (job.getStatus().getRunState() != JobStatus.RUNNING &&
|
|
|
- job.getStatus().getRunState() != JobStatus.PREP &&
|
|
|
- (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
|
|
|
- // Ok, this call to removeTaskEntries
|
|
|
- // is dangerous in some very very obscure
|
|
|
- // cases; e.g. when job completed, exceeded
|
|
|
- // RETIRE_JOB_INTERVAL time-limit and yet
|
|
|
- // some task (taskid) wasn't complete!
|
|
|
- removeJobTasks(job);
|
|
|
-
|
|
|
- it.remove();
|
|
|
- synchronized (userToJobsMap) {
|
|
|
- ArrayList<JobInProgress> userJobs =
|
|
|
- userToJobsMap.get(job.getProfile().getUser());
|
|
|
- synchronized (userJobs) {
|
|
|
- userJobs.remove(job);
|
|
|
- }
|
|
|
- }
|
|
|
- jobInitQueue.remove(job);
|
|
|
- jobsByArrival.remove(job);
|
|
|
-
|
|
|
- LOG.info("Retired job with id: '" +
|
|
|
- job.getProfile().getJobId() + "'");
|
|
|
- }
|
|
|
+ for (JobInProgress job: retiredJobs) {
|
|
|
+ removeJobTasks(job);
|
|
|
+ jobs.remove(job.getProfile().getJobId());
|
|
|
+ jobInitQueue.remove(job);
|
|
|
+ jobsByArrival.remove(job);
|
|
|
+ synchronized (userToJobsMap) {
|
|
|
+ ArrayList<JobInProgress> userJobs =
|
|
|
+ userToJobsMap.get(job.getProfile().getUser());
|
|
|
+ synchronized (userJobs) {
|
|
|
+ userJobs.remove(job);
|
|
|
+ }
|
|
|
}
|
|
|
+ LOG.info("Retired job with id: '" +
|
|
|
+ job.getProfile().getJobId() + "'");
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
} catch (InterruptedException t) {
|
|
|
shouldRun = false;
|
|
@@ -446,8 +447,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
//
|
|
|
|
|
|
// All the known jobs. (jobid->JobInProgress)
|
|
|
- TreeMap jobs = new TreeMap();
|
|
|
- Vector jobsByArrival = new Vector();
|
|
|
+ Map<String, JobInProgress> jobs = new TreeMap();
|
|
|
+ List<JobInProgress> jobsByArrival = new ArrayList();
|
|
|
|
|
|
// (user -> list of JobInProgress)
|
|
|
TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
|