|
@@ -1247,6 +1247,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
//Raise status change event so that jobs can move to running queue.
|
|
|
raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
|
|
|
//assign one job
|
|
|
Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
//Initalize extra job.
|
|
@@ -1331,6 +1332,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
//in running queue as we just failed the second job which was initialized
|
|
|
//and completed the first one.
|
|
|
raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
|
|
|
|
|
|
//Now schedule a map should be job3 of the user as job1 succeeded job2
|
|
|
//failed and now job3 is running
|
|
@@ -1988,6 +1990,25 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ public void testStartWithoutDefaultQueueConfigured() throws Exception {
|
|
|
+ //configure a single queue which is not default queue
|
|
|
+ String[] qs = {"q1"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("q1", 100.0f, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ //Start the scheduler.
|
|
|
+ scheduler.start();
|
|
|
+ //Submit a job and wait till it completes
|
|
|
+ FakeJobInProgress job =
|
|
|
+ submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
|
|
|
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ }
|
|
|
+
|
|
|
private void checkRunningJobMovementAndCompletion() throws IOException {
|
|
|
|
|
|
JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
@@ -2103,7 +2124,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
private void raiseStatusChangeEvents(JobQueuesManager mgr) {
|
|
|
- Collection<JobInProgress> jips = mgr.getWaitingJobs("default");
|
|
|
+ raiseStatusChangeEvents(mgr, "default");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void raiseStatusChangeEvents(JobQueuesManager mgr, String queueName) {
|
|
|
+ Collection<JobInProgress> jips = mgr.getWaitingJobs(queueName);
|
|
|
for(JobInProgress jip : jips) {
|
|
|
if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
|
|
|
JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
|