|
@@ -142,7 +142,7 @@ public class TestJobImpl {
|
|
|
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
|
|
|
"tag1,tag2");
|
|
|
dispatcher.register(EventType.class, jseHandler);
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
|
|
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
|
job.handle(new JobStartEvent(job.getID()));
|
|
@@ -170,7 +170,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
|
|
completeJobTasks(job);
|
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
|
@@ -195,7 +195,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
|
|
completeJobTasks(job);
|
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
|
@@ -239,7 +239,9 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
|
|
+ AppContext mockContext = mock(AppContext.class);
|
|
|
+ when(mockContext.isLastAMRetry()).thenReturn(false);
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
|
|
|
JobId jobId = job.getID();
|
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
@@ -248,6 +250,10 @@ public class TestJobImpl {
|
|
|
|
|
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
|
|
assertJobState(job, JobStateInternal.REBOOT);
|
|
|
+ // return the external state as RUNNING since otherwise JobClient will
|
|
|
+ // exit when it polls the AM for job state
|
|
|
+ Assert.assertEquals(JobState.RUNNING, job.getState());
|
|
|
+
|
|
|
dispatcher.stop();
|
|
|
commitHandler.stop();
|
|
|
}
|
|
@@ -256,6 +262,7 @@ public class TestJobImpl {
|
|
|
public void testRebootedDuringCommit() throws Exception {
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
+ conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);
|
|
|
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
|
|
dispatcher.init(conf);
|
|
|
dispatcher.start();
|
|
@@ -266,13 +273,18 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
|
|
+ AppContext mockContext = mock(AppContext.class);
|
|
|
+ when(mockContext.isLastAMRetry()).thenReturn(true);
|
|
|
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
|
|
|
completeJobTasks(job);
|
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
|
|
syncBarrier.await();
|
|
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
|
|
assertJobState(job, JobStateInternal.REBOOT);
|
|
|
+ // return the external state as FAILED since this is last retry.
|
|
|
+ Assert.assertEquals(JobState.ERROR, job.getState());
|
|
|
+
|
|
|
dispatcher.stop();
|
|
|
commitHandler.stop();
|
|
|
}
|
|
@@ -301,7 +313,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
|
|
JobId jobId = job.getID();
|
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
@@ -328,7 +340,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
|
|
completeJobTasks(job);
|
|
|
assertJobState(job, JobStateInternal.COMMITTING);
|
|
|
|
|
@@ -352,7 +364,7 @@ public class TestJobImpl {
|
|
|
createCommitterEventHandler(dispatcher, committer);
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
|
|
|
|
|
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
|
|
|
job.handle(new JobTaskEvent(
|
|
@@ -388,7 +400,7 @@ public class TestJobImpl {
|
|
|
//Job has only 1 mapper task. No reducers
|
|
|
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
|
|
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
|
|
|
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
|
|
|
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);
|
|
|
|
|
|
//Fail / finish all the tasks. This should land the JobImpl directly in the
|
|
|
//FAIL_ABORT state
|
|
@@ -440,7 +452,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
|
|
JobId jobId = job.getID();
|
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
@@ -477,7 +489,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
|
|
JobId jobId = job.getID();
|
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
@@ -687,7 +699,7 @@ public class TestJobImpl {
|
|
|
commitHandler.init(conf);
|
|
|
commitHandler.start();
|
|
|
|
|
|
- JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
|
|
+ JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
|
|
JobId jobId = job.getID();
|
|
|
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
@@ -735,12 +747,12 @@ public class TestJobImpl {
|
|
|
}
|
|
|
|
|
|
private static StubbedJob createStubbedJob(Configuration conf,
|
|
|
- Dispatcher dispatcher, int numSplits) {
|
|
|
+ Dispatcher dispatcher, int numSplits, AppContext appContext) {
|
|
|
JobID jobID = JobID.forName("job_1234567890000_0001");
|
|
|
JobId jobId = TypeConverter.toYarn(jobID);
|
|
|
StubbedJob job = new StubbedJob(jobId,
|
|
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
|
|
|
- conf,dispatcher.getEventHandler(), true, "somebody", numSplits);
|
|
|
+ conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
|
|
|
dispatcher.register(JobEventType.class, job);
|
|
|
EventHandler mockHandler = mock(EventHandler.class);
|
|
|
dispatcher.register(TaskEventType.class, mockHandler);
|
|
@@ -751,8 +763,8 @@ public class TestJobImpl {
|
|
|
}
|
|
|
|
|
|
private static StubbedJob createRunningStubbedJob(Configuration conf,
|
|
|
- Dispatcher dispatcher, int numSplits) {
|
|
|
- StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
|
|
|
+ Dispatcher dispatcher, int numSplits, AppContext appContext) {
|
|
|
+ StubbedJob job = createStubbedJob(conf, dispatcher, numSplits, appContext);
|
|
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
|
|
assertJobState(job, JobStateInternal.INITED);
|
|
|
job.handle(new JobStartEvent(job.getID()));
|
|
@@ -880,13 +892,13 @@ public class TestJobImpl {
|
|
|
}
|
|
|
|
|
|
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
|
|
- Configuration conf, EventHandler eventHandler,
|
|
|
- boolean newApiCommitter, String user, int numSplits) {
|
|
|
+ Configuration conf, EventHandler eventHandler, boolean newApiCommitter,
|
|
|
+ String user, int numSplits, AppContext appContext) {
|
|
|
super(jobId, applicationAttemptId, conf, eventHandler,
|
|
|
null, new JobTokenSecretManager(), new Credentials(),
|
|
|
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
|
|
|
MRAppMetrics.create(), null, newApiCommitter, user,
|
|
|
- System.currentTimeMillis(), null, null, null, null);
|
|
|
+ System.currentTimeMillis(), null, appContext, null, null);
|
|
|
|
|
|
initTransition = getInitTransition(numSplits);
|
|
|
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
|