|
@@ -242,9 +242,13 @@ public class TestJobInProgressListener extends TestCase {
|
|
|
// RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
|
|
|
JobInProgress jip = event.getJobInProgress();
|
|
|
String jobId = jip.getJobID().toString();
|
|
|
- if (statusEvent.getJobInProgress().isComplete()) {
|
|
|
+ if (jip.isComplete()) {
|
|
|
LOG.info("Job " + jobId + " deleted from the running queue");
|
|
|
- jobs.remove(jip);
|
|
|
+ if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
|
|
|
+ wjobs.remove(jip);
|
|
|
+ } else {
|
|
|
+ jobs.remove(jip);
|
|
|
+ }
|
|
|
} else {
|
|
|
// PREP->RUNNING
|
|
|
LOG.info("Job " + jobId + " deleted from the waiting queue");
|
|
@@ -329,4 +333,58 @@ public class TestJobInProgressListener extends TestCase {
|
|
|
assertFalse("Missing event notification for a successful job",
|
|
|
myListener.contains(rJob.getID(), false));
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This scheduler never schedules any task as it doesnt init any task. So all
|
|
|
+ * the jobs are queued forever.
|
|
|
+ */
|
|
|
+ public static class MyScheduler extends JobQueueTaskScheduler {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void start() throws IOException {
|
|
|
+ super.start();
|
|
|
+ // Remove the eager task initializer
|
|
|
+ taskTrackerManager.removeJobInProgressListener(
|
|
|
+ eagerTaskInitializationListener);
|
|
|
+ // terminate it
|
|
|
+ eagerTaskInitializationListener.terminate();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testQueuedJobKill() throws Exception {
|
|
|
+ LOG.info("Testing queued-job-kill");
|
|
|
+
|
|
|
+ MyListener myListener = new MyListener();
|
|
|
+
|
|
|
+ JobConf job = new JobConf();
|
|
|
+ job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
|
|
|
+ TaskScheduler.class);
|
|
|
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
|
|
|
+
|
|
|
+ job = mr.createJobConf();
|
|
|
+
|
|
|
+ mr.getJobTrackerRunner().getJobTracker()
|
|
|
+ .addJobInProgressListener(myListener);
|
|
|
+
|
|
|
+ RunningJob rJob = TestJobKillAndFail.runJob(job);
|
|
|
+ JobID id = rJob.getID();
|
|
|
+ LOG.info("Job : " + id.toString() + " submitted");
|
|
|
+
|
|
|
+ // check if the job is in the waiting queue
|
|
|
+ assertTrue("Missing event notification on submiting a job",
|
|
|
+ myListener.contains(id, true));
|
|
|
+
|
|
|
+ // kill the job
|
|
|
+ LOG.info("Killing job : " + id.toString());
|
|
|
+ rJob.killJob();
|
|
|
+
|
|
|
+ // check if the job is killed
|
|
|
+ assertEquals("Job status doesnt reflect the kill-job action",
|
|
|
+ JobStatus.KILLED, rJob.getJobState());
|
|
|
+
|
|
|
+ // check if the job is correctly moved
|
|
|
+ // from the waiting list
|
|
|
+ assertFalse("Missing event notification on killing a waiting job",
|
|
|
+ myListener.contains(id, true));
|
|
|
+ }
|
|
|
+}
|