|
@@ -248,6 +248,24 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ static class FakeFailingJobInProgress extends FakeJobInProgress {
|
|
|
+
|
|
|
+ public FakeFailingJobInProgress(JobID id, JobConf jobConf,
|
|
|
+ FakeTaskTrackerManager taskTrackerManager, String user) {
|
|
|
+ super(id, jobConf, taskTrackerManager, user);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void initTasks() throws IOException {
|
|
|
+ throw new IOException("Failed Initalization");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ synchronized void fail() {
|
|
|
+ this.status.setRunState(JobStatus.FAILED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
static class FakeTaskInProgress extends TaskInProgress {
|
|
|
private boolean isMap;
|
|
|
private FakeJobInProgress fakeJob;
|
|
@@ -2009,6 +2027,39 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
}
|
|
|
|
|
|
+ public void testFailedJobInitalizations() throws Exception {
|
|
|
+ String[] qs = {"default"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
|
+
|
|
|
+ //Submit a job whose initialization would fail always.
|
|
|
+ FakeJobInProgress job =
|
|
|
+ new FakeFailingJobInProgress(new JobID("test", ++jobCounter),
|
|
|
+ new JobConf(), taskTrackerManager,"u1");
|
|
|
+ job.getStatus().setRunState(JobStatus.PREP);
|
|
|
+ taskTrackerManager.submitJob(job);
|
|
|
+ //check if job is present in waiting list.
|
|
|
+ assertEquals("Waiting job list does not contain submitted job",
|
|
|
+ 1, mgr.getWaitingJobCount("default"));
|
|
|
+ assertTrue("Waiting job does not contain submitted job",
|
|
|
+ mgr.getWaitingJobs("default").contains(job));
|
|
|
+ //initialization should fail now.
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
+ //Check if the job has been properly cleaned up.
|
|
|
+ assertEquals("Waiting job list contains submitted job",
|
|
|
+ 0, mgr.getWaitingJobCount("default"));
|
|
|
+ assertFalse("Waiting job contains submitted job",
|
|
|
+ mgr.getWaitingJobs("default").contains(job));
|
|
|
+ assertFalse("Waiting job contains submitted job",
|
|
|
+ mgr.getRunningJobQueue("default").contains(job));
|
|
|
+ }
|
|
|
+
|
|
|
private void checkRunningJobMovementAndCompletion() throws IOException {
|
|
|
|
|
|
JobQueuesManager mgr = scheduler.jobQueuesManager;
|