|
@@ -214,6 +214,87 @@ public class TestKill {
|
|
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
|
|
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ static class MyAsyncDispatch extends AsyncDispatcher {
|
|
|
|
+ private CountDownLatch latch;
|
|
|
|
+ private TaskAttemptEventType attemptEventTypeToWait;
|
|
|
|
+ MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
|
|
|
|
+ super();
|
|
|
|
+ this.latch = latch;
|
|
|
|
+ this.attemptEventTypeToWait = attemptEventTypeToWait;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void dispatch(Event event) {
|
|
|
|
+ if (event instanceof TaskAttemptEvent) {
|
|
|
|
+ TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
|
|
|
|
+ TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
|
|
|
|
+ if (attemptEvent.getType() == this.attemptEventTypeToWait
|
|
|
|
+ && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
|
|
|
|
+ try {
|
|
|
|
+ latch.await();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ super.dispatch(event);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // This is to test a race condition where JobEventType.JOB_KILL is generated
|
|
|
|
+ // right after TaskAttemptEventType.TA_DONE is generated.
|
|
|
|
+ // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
|
|
|
|
+ // and T_ATTEMPT_KILLED from the same attempt.
|
|
|
|
+ @Test
|
|
|
|
+ public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
|
|
|
|
+ CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
+ final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE);
|
|
|
|
+ MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
|
|
|
|
+ @Override
|
|
|
|
+ public Dispatcher createDispatcher() {
|
|
|
|
+ return dispatcher;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ Job job = app.submit(new Configuration());
|
|
|
|
+ JobId jobId = app.getJobId();
|
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
|
+ Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
|
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
|
+ Task mapTask = it.next();
|
|
|
|
+ Task reduceTask = it.next();
|
|
|
|
+ app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
|
+ app.waitForState(reduceTask, TaskState.RUNNING);
|
|
|
|
+ TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
|
|
|
|
+ app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
|
|
|
|
+ TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
|
|
|
|
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
|
|
|
|
+
|
|
|
|
+ // The order in the dispatch event queue, from the oldest to the newest
|
|
|
|
+ // TA_DONE
|
|
|
|
+ // JOB_KILL
|
|
|
|
+ // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling )
|
|
|
|
+ // T_KILL ( from JOB_KILL's handling )
|
|
|
|
+ // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling )
|
|
|
|
+ // TA_KILL ( from T_KILL's handling )
|
|
|
|
+ // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
|
|
|
|
+ // T_ATTEMPT_KILLED ( from TA_KILL's handling )
|
|
|
|
+
|
|
|
|
+ // Finish map
|
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
|
+ new TaskAttemptEvent(
|
|
|
|
+ mapAttempt.getID(),
|
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
|
+
|
|
|
|
+ // Now kill the job
|
|
|
|
+ app.getContext().getEventHandler()
|
|
|
|
+ .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
|
|
|
+
|
|
|
|
+ //unblock
|
|
|
|
+ latch.countDown();
|
|
|
|
+
|
|
|
|
+ app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testKillTaskAttempt() throws Exception {
|
|
public void testKillTaskAttempt() throws Exception {
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
final CountDownLatch latch = new CountDownLatch(1);
|