|
@@ -275,6 +275,7 @@ public class TestJobImpl {
|
|
|
|
|
|
AppContext mockContext = mock(AppContext.class);
|
|
AppContext mockContext = mock(AppContext.class);
|
|
when(mockContext.isLastAMRetry()).thenReturn(true);
|
|
when(mockContext.isLastAMRetry()).thenReturn(true);
|
|
|
|
+ when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
|
|
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
|
|
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
|
|
completeJobTasks(job);
|
|
completeJobTasks(job);
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
@@ -282,7 +283,9 @@ public class TestJobImpl {
|
|
syncBarrier.await();
|
|
syncBarrier.await();
|
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
|
assertJobState(job, JobStateInternal.REBOOT);
|
|
assertJobState(job, JobStateInternal.REBOOT);
|
|
- // return the external state as FAILED since this is last retry.
|
|
|
|
|
|
+ // return the external state as ERROR since this is last retry.
|
|
|
|
+ Assert.assertEquals(JobState.RUNNING, job.getState());
|
|
|
|
+ when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
|
|
Assert.assertEquals(JobState.ERROR, job.getState());
|
|
Assert.assertEquals(JobState.ERROR, job.getState());
|
|
|
|
|
|
dispatcher.stop();
|
|
dispatcher.stop();
|
|
@@ -590,12 +593,14 @@ public class TestJobImpl {
|
|
final JobDiagnosticsUpdateEvent diagUpdateEvent =
|
|
final JobDiagnosticsUpdateEvent diagUpdateEvent =
|
|
new JobDiagnosticsUpdateEvent(jobId, diagMsg);
|
|
new JobDiagnosticsUpdateEvent(jobId, diagMsg);
|
|
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
|
|
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
|
|
|
|
+ AppContext mockContext = mock(AppContext.class);
|
|
|
|
+ when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
|
|
JobImpl job = new JobImpl(jobId, Records
|
|
JobImpl job = new JobImpl(jobId, Records
|
|
.newRecord(ApplicationAttemptId.class), new Configuration(),
|
|
.newRecord(ApplicationAttemptId.class), new Configuration(),
|
|
mock(EventHandler.class),
|
|
mock(EventHandler.class),
|
|
null, mock(JobTokenSecretManager.class), null,
|
|
null, mock(JobTokenSecretManager.class), null,
|
|
new SystemClock(), null,
|
|
new SystemClock(), null,
|
|
- mrAppMetrics, null, true, null, 0, null, null, null, null);
|
|
|
|
|
|
+ mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
|
|
job.handle(diagUpdateEvent);
|
|
job.handle(diagUpdateEvent);
|
|
String diagnostics = job.getReport().getDiagnostics();
|
|
String diagnostics = job.getReport().getDiagnostics();
|
|
Assert.assertNotNull(diagnostics);
|
|
Assert.assertNotNull(diagnostics);
|
|
@@ -606,7 +611,7 @@ public class TestJobImpl {
|
|
mock(EventHandler.class),
|
|
mock(EventHandler.class),
|
|
null, mock(JobTokenSecretManager.class), null,
|
|
null, mock(JobTokenSecretManager.class), null,
|
|
new SystemClock(), null,
|
|
new SystemClock(), null,
|
|
- mrAppMetrics, null, true, null, 0, null, null, null, null);
|
|
|
|
|
|
+ mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
|
job.handle(diagUpdateEvent);
|
|
job.handle(diagUpdateEvent);
|
|
diagnostics = job.getReport().getDiagnostics();
|
|
diagnostics = job.getReport().getDiagnostics();
|
|
@@ -699,7 +704,9 @@ public class TestJobImpl {
|
|
commitHandler.init(conf);
|
|
commitHandler.init(conf);
|
|
commitHandler.start();
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
|
|
|
|
|
+ AppContext mockContext = mock(AppContext.class);
|
|
|
|
+ when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
|
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
|
|
JobId jobId = job.getID();
|
|
JobId jobId = job.getID();
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
@@ -707,12 +714,15 @@ public class TestJobImpl {
|
|
assertJobState(job, JobStateInternal.FAILED);
|
|
assertJobState(job, JobStateInternal.FAILED);
|
|
|
|
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
|
|
- Assert.assertEquals(JobState.FAILED, job.getState());
|
|
|
|
|
|
+ assertJobState(job, JobStateInternal.FAILED);
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
|
|
- Assert.assertEquals(JobState.FAILED, job.getState());
|
|
|
|
|
|
+ assertJobState(job, JobStateInternal.FAILED);
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
|
- Assert.assertEquals(JobState.FAILED, job.getState());
|
|
|
|
|
|
+ assertJobState(job, JobStateInternal.FAILED);
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
|
|
|
|
+ assertJobState(job, JobStateInternal.FAILED);
|
|
|
|
+ Assert.assertEquals(JobState.RUNNING, job.getState());
|
|
|
|
+ when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
|
|
Assert.assertEquals(JobState.FAILED, job.getState());
|
|
Assert.assertEquals(JobState.FAILED, job.getState());
|
|
|
|
|
|
dispatcher.stop();
|
|
dispatcher.stop();
|
|
@@ -750,6 +760,10 @@ public class TestJobImpl {
|
|
Dispatcher dispatcher, int numSplits, AppContext appContext) {
|
|
Dispatcher dispatcher, int numSplits, AppContext appContext) {
|
|
JobID jobID = JobID.forName("job_1234567890000_0001");
|
|
JobID jobID = JobID.forName("job_1234567890000_0001");
|
|
JobId jobId = TypeConverter.toYarn(jobID);
|
|
JobId jobId = TypeConverter.toYarn(jobID);
|
|
|
|
+ if (appContext == null) {
|
|
|
|
+ appContext = mock(AppContext.class);
|
|
|
|
+ when(appContext.safeToReportTerminationToUser()).thenReturn(true);
|
|
|
|
+ }
|
|
StubbedJob job = new StubbedJob(jobId,
|
|
StubbedJob job = new StubbedJob(jobId,
|
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
|
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
|
|
conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
|
|
conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
|