|
@@ -1339,6 +1339,68 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertEquals(j1.runningMapTasks, 2);
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Test case for checking the reclaim capacity with uninitalized jobs.
|
|
|
+ *
|
|
|
+ * Configure 2 queue with capacity scheduler.
|
|
|
+ *
|
|
|
+ * Submit a single job to the default queue and make it go above the gc
|
|
|
+ * of the queue.
|
|
|
+ *
|
|
|
+ * Then submit another job to the second queue but don't initialize it.
|
|
|
+ *
|
|
|
+ * Run reclaim capacity thread for the scheduler, in order to let scheduler
|
|
|
+ * know that it has to reclaim capacity.
|
|
|
+ *
|
|
|
+ * Advance the scheduler clock by appropriate milliseconds.
|
|
|
+ *
|
|
|
+ * Run scheduler.reclaimCapacity() to kill the appropriate tasks.
|
|
|
+ *
|
|
|
+ * Check running task count of the running job.
|
|
|
+ *
|
|
|
+ */
|
|
|
+ public void testReclaimCapacityWithUninitializedJobs() throws IOException {
|
|
|
+ String[] qs = {"default", "q2"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
+ queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+
|
|
|
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
+ scheduler.jobQueuesManager,
|
|
|
+ resConf,
|
|
|
+ resConf.getQueues());
|
|
|
+ scheduler.setInitializationPoller(p);
|
|
|
+
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+ //Submit one job to the default queue and get the capacity over the
|
|
|
+ //gc of the particular queue.
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
|
+
|
|
|
+ //Submit another job to the second queue but not initialize it.
|
|
|
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
+
|
|
|
+ //call scheduler's reclaim capacity in order to start reclaim capacity
|
|
|
+ //process.
|
|
|
+ scheduler.reclaimCapacity();
|
|
|
+ //advance the clock to the position when the two task of the job would
|
|
|
+ //be killed.
|
|
|
+ clock.advance(600000);
|
|
|
+ //run reclaim capacity
|
|
|
+ scheduler.reclaimCapacity();
|
|
|
+ //check the count of the running tasks.
|
|
|
+ assertEquals(j1.runningMapTasks, 2);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Following is the testing strategy for testing scheduling information.
|
|
|
* - start capacity scheduler with two queues.
|