|
@@ -498,72 +498,34 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
FakeJobInProgress fjob2 =
|
|
|
submitJob(JobStatus.PREP, 1, 0, "default", "user");
|
|
|
|
|
|
- // check if the job is in the waiting queue
|
|
|
- JobInProgress[] jobs =
|
|
|
- scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
- .toArray(new JobInProgress[0]);
|
|
|
- assertTrue("Waiting queue doesnt contain queued job #1 in right order",
|
|
|
- jobs[0].getJobID().equals(fjob1.getJobID()));
|
|
|
- assertTrue("Waiting queue doesnt contain queued job #2 in right order",
|
|
|
- jobs[1].getJobID().equals(fjob2.getJobID()));
|
|
|
-
|
|
|
- // I. Check the start-time change
|
|
|
- // Change job2 start-time and check if job2 bumps up in the queue
|
|
|
- taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
|
|
|
-
|
|
|
- jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
- .toArray(new JobInProgress[0]);
|
|
|
- assertTrue("Start time change didnt not work as expected for job #2",
|
|
|
- jobs[0].getJobID().equals(fjob2.getJobID()));
|
|
|
- assertTrue("Start time change didnt not work as expected for job #1",
|
|
|
- jobs[1].getJobID().equals(fjob1.getJobID()));
|
|
|
-
|
|
|
- // check if the queue is fine
|
|
|
- assertEquals("Start-time change garbled the waiting queue",
|
|
|
- 2, scheduler.getJobs("default").size());
|
|
|
-
|
|
|
- // II. Change job priority change
|
|
|
- // Bump up job1's priority and make sure job1 bumps up in the queue
|
|
|
- taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
|
|
|
-
|
|
|
- // Check if the priority changes are reflected
|
|
|
- jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
- .toArray(new JobInProgress[0]);
|
|
|
- assertTrue("Priority change didnt not work as expected for job #1",
|
|
|
- jobs[0].getJobID().equals(fjob1.getJobID()));
|
|
|
- assertTrue("Priority change didnt not work as expected for job #2",
|
|
|
- jobs[1].getJobID().equals(fjob2.getJobID()));
|
|
|
+ // test if changing the job priority/start-time works as expected in the
|
|
|
+ // waiting queue
|
|
|
+ testJobOrderChange(fjob1, fjob2, true);
|
|
|
|
|
|
- // check if the queue is fine
|
|
|
- assertEquals("Priority change has garbled the waiting queue",
|
|
|
- 2, scheduler.getJobs("default").size());
|
|
|
+ // Init the jobs
|
|
|
+ // simulate the case where the job with a lower priority becomes running
|
|
|
+ // first (may be because of the setup tasks).
|
|
|
|
|
|
- // Create an event
|
|
|
- JobChangeEvent event = initTasksAndReportEvent(fjob1);
|
|
|
+ // init the lower ranked job first
|
|
|
+ JobChangeEvent event = initTasksAndReportEvent(fjob2);
|
|
|
|
|
|
// inform the scheduler
|
|
|
scheduler.jobQueuesManager.jobUpdated(event);
|
|
|
|
|
|
- // waiting queue
|
|
|
- Collection<JobInProgress> wqueue =
|
|
|
- scheduler.jobQueuesManager.getWaitingJobQueue("default");
|
|
|
-
|
|
|
- // check if the job is not in the waiting queue
|
|
|
- assertFalse("Waiting queue contains running/inited job",
|
|
|
- wqueue.contains(fjob1));
|
|
|
-
|
|
|
- // check if the waiting queue is fine
|
|
|
- assertEquals("Waiting queue is garbled on job init", 1, wqueue.size());
|
|
|
+ // init the higher ordered job later
|
|
|
+ event = initTasksAndReportEvent(fjob1);
|
|
|
|
|
|
- Collection<JobInProgress> rqueue =
|
|
|
- scheduler.jobQueuesManager.getRunningJobQueue("default");
|
|
|
+ // inform the scheduler
|
|
|
+ scheduler.jobQueuesManager.jobUpdated(event);
|
|
|
|
|
|
- // check if the job is in the running queue
|
|
|
- assertTrue("Running queue doesnt contain running/inited job",
|
|
|
- rqueue.contains(fjob1));
|
|
|
+ // check if the jobs are missing from the waiting queue
|
|
|
+ assertEquals("Waiting queue is garbled on job init", 0,
|
|
|
+ scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
+ .size());
|
|
|
|
|
|
- // check if the running queue is fine
|
|
|
- assertEquals("Running queue is garbled upon init", 1, rqueue.size());
|
|
|
+ // test if changing the job priority/start-time works as expected in the
|
|
|
+ // running queue
|
|
|
+ testJobOrderChange(fjob1, fjob2, false);
|
|
|
|
|
|
// schedule a task
|
|
|
List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
|
|
@@ -575,7 +537,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// mark the job as complete
|
|
|
taskTrackerManager.finalizeJob(fjob1);
|
|
|
|
|
|
- rqueue = scheduler.jobQueuesManager.getRunningJobQueue("default");
|
|
|
+ Collection<JobInProgress> rqueue =
|
|
|
+ scheduler.jobQueuesManager.getRunningJobQueue("default");
|
|
|
|
|
|
// check if the job is removed from the scheduler
|
|
|
assertFalse("Scheduler contains completed job",
|
|
@@ -583,8 +546,67 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// check if the running queue size is correct
|
|
|
assertEquals("Job finish garbles the queue",
|
|
|
- 0, rqueue.size());
|
|
|
+ 1, rqueue.size());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ // test if the queue reflects the changes
|
|
|
+ private void testJobOrderChange(FakeJobInProgress fjob1,
|
|
|
+ FakeJobInProgress fjob2,
|
|
|
+ boolean waiting) {
|
|
|
+ String queueName = waiting ? "waiting" : "running";
|
|
|
+
|
|
|
+ // check if the jobs in the queue are the right order
|
|
|
+ JobInProgress[] jobs = getJobsInQueue(waiting);
|
|
|
+ assertTrue(queueName + " queue doesnt contain job #1 in right order",
|
|
|
+ jobs[0].getJobID().equals(fjob1.getJobID()));
|
|
|
+ assertTrue(queueName + " queue doesnt contain job #2 in right order",
|
|
|
+ jobs[1].getJobID().equals(fjob2.getJobID()));
|
|
|
+
|
|
|
+ // I. Check the start-time change
|
|
|
+ // Change job2 start-time and check if job2 bumps up in the queue
|
|
|
+ taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
|
|
|
+
|
|
|
+ jobs = getJobsInQueue(waiting);
|
|
|
+ assertTrue("Start time change didnt not work as expected for job #2 in "
|
|
|
+ + queueName + " queue",
|
|
|
+ jobs[0].getJobID().equals(fjob2.getJobID()));
|
|
|
+ assertTrue("Start time change didnt not work as expected for job #1 in"
|
|
|
+ + queueName + " queue",
|
|
|
+ jobs[1].getJobID().equals(fjob1.getJobID()));
|
|
|
|
|
|
+ // check if the queue is fine
|
|
|
+ assertEquals("Start-time change garbled the " + queueName + " queue",
|
|
|
+ 2, jobs.length);
|
|
|
+
|
|
|
+ // II. Change job priority change
|
|
|
+ // Bump up job1's priority and make sure job1 bumps up in the queue
|
|
|
+ taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
|
|
|
+
|
|
|
+ // Check if the priority changes are reflected
|
|
|
+ jobs = getJobsInQueue(waiting);
|
|
|
+ assertTrue("Priority change didnt not work as expected for job #1 in "
|
|
|
+ + queueName + " queue",
|
|
|
+ jobs[0].getJobID().equals(fjob1.getJobID()));
|
|
|
+ assertTrue("Priority change didnt not work as expected for job #2 in "
|
|
|
+ + queueName + " queue",
|
|
|
+ jobs[1].getJobID().equals(fjob2.getJobID()));
|
|
|
+
|
|
|
+ // check if the queue is fine
|
|
|
+ assertEquals("Priority change has garbled the " + queueName + " queue",
|
|
|
+ 2, jobs.length);
|
|
|
+
|
|
|
+ // reset the queue state back to normal
|
|
|
+ taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
|
|
|
+ taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
|
|
|
+ }
|
|
|
+
|
|
|
+ private JobInProgress[] getJobsInQueue(boolean waiting) {
|
|
|
+ Collection<JobInProgress> queue =
|
|
|
+ waiting
|
|
|
+ ? scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
+ : scheduler.jobQueuesManager.getRunningJobQueue("default");
|
|
|
+ return queue.toArray(new JobInProgress[0]);
|
|
|
}
|
|
|
|
|
|
/*protected void submitJobs(int number, int state, int maps, int reduces)
|