|
@@ -20,13 +20,13 @@ package org.apache.hadoop.mapred;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.Map;
|
|
|
-import java.util.TreeSet;
|
|
|
+import java.util.TreeMap;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
|
|
|
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
|
|
|
/**
|
|
@@ -45,35 +45,17 @@ class JobQueuesManager extends JobInProgressListener {
|
|
|
* is ahead in the queue, so insertion should be at the tail.
|
|
|
*/
|
|
|
|
|
|
- // comparator for jobs in queues that support priorities
|
|
|
- private static final Comparator<JobInProgress> PRIORITY_JOB_COMPARATOR
|
|
|
- = new Comparator<JobInProgress>() {
|
|
|
- public int compare(JobInProgress o1, JobInProgress o2) {
|
|
|
- // Look at priority.
|
|
|
- int res = o1.getPriority().compareTo(o2.getPriority());
|
|
|
- if (res == 0) {
|
|
|
- // the job that started earlier wins
|
|
|
- if (o1.getStartTime() < o2.getStartTime()) {
|
|
|
- res = -1;
|
|
|
- } else {
|
|
|
- res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
|
|
|
- }
|
|
|
- }
|
|
|
- if (res == 0) {
|
|
|
- res = o1.getJobID().compareTo(o2.getJobID());
|
|
|
- }
|
|
|
- return res;
|
|
|
- }
|
|
|
- };
|
|
|
// comparator for jobs in queues that don't support priorities
|
|
|
- private static final Comparator<JobInProgress> STARTTIME_JOB_COMPARATOR
|
|
|
- = new Comparator<JobInProgress>() {
|
|
|
- public int compare(JobInProgress o1, JobInProgress o2) {
|
|
|
+ private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
|
|
|
+ = new Comparator<JobSchedulingInfo>() {
|
|
|
+ public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
|
|
|
// the job that started earlier wins
|
|
|
if (o1.getStartTime() < o2.getStartTime()) {
|
|
|
return -1;
|
|
|
} else {
|
|
|
- return (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
|
|
|
+ return (o1.getStartTime() == o2.getStartTime()
|
|
|
+ ? o1.getJobID().compareTo(o2.getJobID())
|
|
|
+ : 1);
|
|
|
}
|
|
|
}
|
|
|
};
|
|
@@ -83,40 +65,27 @@ class JobQueuesManager extends JobInProgressListener {
|
|
|
|
|
|
// whether the queue supports priorities
|
|
|
boolean supportsPriorities;
|
|
|
- // maintain separate collections of running & waiting jobs. This we do
|
|
|
+ // maintain separate structures for running & waiting jobs. This we do
|
|
|
// mainly because when a new job is added, it cannot superceede a running
|
|
|
// job, even though the latter may be a lower priority. If this is ever
|
|
|
// changed, we may get by with one collection.
|
|
|
- Collection<JobInProgress> waitingJobs;
|
|
|
+ Map<JobSchedulingInfo, JobInProgress> waitingJobs;
|
|
|
Collection<JobInProgress> runningJobs;
|
|
|
|
|
|
QueueInfo(boolean prio) {
|
|
|
this.supportsPriorities = prio;
|
|
|
if (supportsPriorities) {
|
|
|
- this.waitingJobs = new TreeSet<JobInProgress>(PRIORITY_JOB_COMPARATOR);
|
|
|
+ // use the default priority-aware comparator
|
|
|
+ this.waitingJobs =
|
|
|
+ new TreeMap<JobSchedulingInfo, JobInProgress>(
|
|
|
+ JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
|
|
|
}
|
|
|
else {
|
|
|
- this.waitingJobs = new TreeSet<JobInProgress>(STARTTIME_JOB_COMPARATOR);
|
|
|
+ this.waitingJobs =
|
|
|
+ new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
|
|
|
}
|
|
|
this.runningJobs = new LinkedList<JobInProgress>();
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * we need to delete an object from our TreeSet based on referential
|
|
|
- * equality, rather than value equality that the TreeSet uses.
|
|
|
- * Another way to do this is to extend the TreeSet and override remove().
|
|
|
- */
|
|
|
- static private boolean removeOb(Collection<JobInProgress> c, Object o) {
|
|
|
- Iterator<JobInProgress> i = c.iterator();
|
|
|
- while (i.hasNext()) {
|
|
|
- if (i.next() == o) {
|
|
|
- i.remove();
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
|
|
|
// we maintain a hashmap of queue-names to queue info
|
|
@@ -150,7 +119,7 @@ class JobQueuesManager extends JobInProgressListener {
|
|
|
* Returns the queue of waiting jobs associated with the name
|
|
|
*/
|
|
|
public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
|
|
|
- return jobQueues.get(queueName).waitingJobs;
|
|
|
+ return jobQueues.get(queueName).waitingJobs.values();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -167,19 +136,18 @@ class JobQueuesManager extends JobInProgressListener {
|
|
|
}
|
|
|
// add job to waiting queue. It will end up in the right place,
|
|
|
// based on priority.
|
|
|
- // We use our own version of removing objects based on referential
|
|
|
- // equality, since the 'job' object has already been changed.
|
|
|
- qi.waitingJobs.add(job);
|
|
|
+ qi.waitingJobs.put(new JobSchedulingInfo(job), job);
|
|
|
// let scheduler know.
|
|
|
scheduler.jobAdded(job);
|
|
|
}
|
|
|
|
|
|
- private void jobCompleted(JobInProgress job, QueueInfo qi) {
|
|
|
+ private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo,
|
|
|
+ QueueInfo qi) {
|
|
|
LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
|
|
|
+ job.getProfile().getQueueName() + " has completed");
|
|
|
// job could be in running or waiting queue
|
|
|
if (!qi.runningJobs.remove(job)) {
|
|
|
- QueueInfo.removeOb(qi.waitingJobs, job);
|
|
|
+ qi.waitingJobs.remove(oldInfo);
|
|
|
}
|
|
|
// let scheduler know
|
|
|
scheduler.jobCompleted(job);
|
|
@@ -191,23 +159,24 @@ class JobQueuesManager extends JobInProgressListener {
|
|
|
|
|
|
// This is used to reposition a job in the queue. A job can get repositioned
|
|
|
// because of the change in the job priority or job start-time.
|
|
|
- private void reorderJobs(JobInProgress job, QueueInfo qi) {
|
|
|
- Collection<JobInProgress> queue = qi.waitingJobs;
|
|
|
+ private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo,
|
|
|
+ QueueInfo qi) {
|
|
|
|
|
|
- // Remove from the waiting queue
|
|
|
- if (!QueueInfo.removeOb(queue, job)) {
|
|
|
- queue = qi.runningJobs;
|
|
|
- QueueInfo.removeOb(queue, job);
|
|
|
+ if (qi.waitingJobs.remove(oldInfo) == null) {
|
|
|
+ qi.runningJobs.remove(job);
|
|
|
+ // Add back to the running queue
|
|
|
+ qi.runningJobs.add(job);
|
|
|
+ } else {
|
|
|
+ // Add back to the waiting queue
|
|
|
+ qi.waitingJobs.put(new JobSchedulingInfo(job), job);
|
|
|
}
|
|
|
-
|
|
|
- // Add back to the queue
|
|
|
- queue.add(job);
|
|
|
}
|
|
|
|
|
|
// This is used to move a job from the waiting queue to the running queue.
|
|
|
- private void makeJobRunning(JobInProgress job, QueueInfo qi) {
|
|
|
+ private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo,
|
|
|
+ QueueInfo qi) {
|
|
|
// Remove from the waiting queue
|
|
|
- QueueInfo.removeOb(qi.waitingJobs, job);
|
|
|
+ qi.waitingJobs.remove(oldInfo);
|
|
|
|
|
|
// Add the job to the running queue
|
|
|
qi.runningJobs.add(job);
|
|
@@ -216,21 +185,23 @@ class JobQueuesManager extends JobInProgressListener {
|
|
|
// Update the scheduler as job's state has changed
|
|
|
private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
|
|
|
JobInProgress job = event.getJobInProgress();
|
|
|
+ JobSchedulingInfo oldJobStateInfo =
|
|
|
+ new JobSchedulingInfo(event.getOldStatus());
|
|
|
// Check if the ordering of the job has changed
|
|
|
// For now priority and start-time can change the job ordering
|
|
|
if (event.getEventType() == EventType.PRIORITY_CHANGED
|
|
|
|| event.getEventType() == EventType.START_TIME_CHANGED) {
|
|
|
// Make a priority change
|
|
|
- reorderJobs(job, qi);
|
|
|
+ reorderJobs(job, oldJobStateInfo, qi);
|
|
|
} else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
|
|
|
// Check if the job is complete
|
|
|
int runState = job.getStatus().getRunState();
|
|
|
if (runState == JobStatus.SUCCEEDED
|
|
|
|| runState == JobStatus.FAILED
|
|
|
|| runState == JobStatus.KILLED) {
|
|
|
- jobCompleted(job, qi);
|
|
|
+ jobCompleted(job, oldJobStateInfo, qi);
|
|
|
} else if (runState == JobStatus.RUNNING) {
|
|
|
- makeJobRunning(job, qi);
|
|
|
+ makeJobRunning(job, oldJobStateInfo, qi);
|
|
|
}
|
|
|
}
|
|
|
}
|