|
@@ -564,33 +564,13 @@ public class TestJobImpl {
|
|
|
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
|
|
|
|
|
|
// replace the tasks with spied versions to return the right attempts
|
|
|
- Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
|
|
|
- List<NodeReport> nodeReports = new ArrayList<NodeReport>();
|
|
|
- Map<NodeReport,TaskId> nodeReportsToTaskIds =
|
|
|
- new HashMap<NodeReport,TaskId>();
|
|
|
- for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
|
|
|
- TaskId taskId = e.getKey();
|
|
|
- Task task = e.getValue();
|
|
|
- if (taskId.getTaskType() == TaskType.MAP) {
|
|
|
- // add an attempt to the task to simulate nodes
|
|
|
- NodeId nodeId = mock(NodeId.class);
|
|
|
- TaskAttempt attempt = mock(TaskAttempt.class);
|
|
|
- when(attempt.getNodeId()).thenReturn(nodeId);
|
|
|
- TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
|
|
- when(attempt.getID()).thenReturn(attemptId);
|
|
|
- // create a spied task
|
|
|
- Task spied = spy(task);
|
|
|
- doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
|
|
|
- spiedTasks.put(taskId, spied);
|
|
|
+ Map<TaskId, Task> spiedTasks = new HashMap<>();
|
|
|
+ List<NodeReport> nodeReports = new ArrayList<>();
|
|
|
+ Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
|
|
|
+
|
|
|
+ createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
|
|
|
+ NodeState.UNHEALTHY, nodeReports);
|
|
|
|
|
|
- // create a NodeReport based on the node id
|
|
|
- NodeReport report = mock(NodeReport.class);
|
|
|
- when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
|
|
|
- when(report.getNodeId()).thenReturn(nodeId);
|
|
|
- nodeReports.add(report);
|
|
|
- nodeReportsToTaskIds.put(report, taskId);
|
|
|
- }
|
|
|
- }
|
|
|
// replace the tasks with the spied tasks
|
|
|
job.tasks.putAll(spiedTasks);
|
|
|
|
|
@@ -641,6 +621,82 @@ public class TestJobImpl {
|
|
|
commitHandler.stop();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testJobNCompletedWhenAllReducersAreFinished()
|
|
|
+ throws Exception {
|
|
|
+ testJobCompletionWhenReducersAreFinished(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testJobNotCompletedWhenAllReducersAreFinished()
|
|
|
+ throws Exception {
|
|
|
+ testJobCompletionWhenReducersAreFinished(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testJobCompletionWhenReducersAreFinished(boolean killMappers)
|
|
|
+ throws InterruptedException, BrokenBarrierException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers);
|
|
|
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
+ conf.setInt(MRJobConfig.NUM_REDUCES, 1);
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ dispatcher.init(conf);
|
|
|
+ final List<TaskEvent> killedEvents =
|
|
|
+ Collections.synchronizedList(new ArrayList<TaskEvent>());
|
|
|
+ dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() {
|
|
|
+ @Override
|
|
|
+ public void handle(TaskEvent event) {
|
|
|
+ if (event.getType() == TaskEventType.T_KILL) {
|
|
|
+ killedEvents.add(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ dispatcher.start();
|
|
|
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
|
+ OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
|
|
|
+ CommitterEventHandler commitHandler =
|
|
|
+ createCommitterEventHandler(dispatcher, committer);
|
|
|
+ commitHandler.init(conf);
|
|
|
+ commitHandler.start();
|
|
|
+
|
|
|
+ final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
|
|
+
|
|
|
+ // replace the tasks with spied versions to return the right attempts
|
|
|
+ Map<TaskId, Task> spiedTasks = new HashMap<>();
|
|
|
+ List<NodeReport> nodeReports = new ArrayList<>();
|
|
|
+ Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
|
|
|
+
|
|
|
+ createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
|
|
|
+ NodeState.RUNNING, nodeReports);
|
|
|
+
|
|
|
+ // replace the tasks with the spied tasks
|
|
|
+ job.tasks.putAll(spiedTasks);
|
|
|
+
|
|
|
+ // finish reducer
|
|
|
+ for (TaskId taskId: job.tasks.keySet()) {
|
|
|
+ if (taskId.getTaskType() == TaskType.REDUCE) {
|
|
|
+ job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * StubbedJob cannot finish in this test - we'd have to generate the
|
|
|
+ * necessary events in this test manually, but that wouldn't add too
|
|
|
+ * much value. Instead, we validate the T_KILL events.
|
|
|
+ */
|
|
|
+ if (killMappers) {
|
|
|
+ Assert.assertEquals("Number of killed events", 2, killedEvents.size());
|
|
|
+ Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000",
|
|
|
+ killedEvents.get(0).getTaskID().toString());
|
|
|
+ Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001",
|
|
|
+ killedEvents.get(1).getTaskID().toString());
|
|
|
+ } else {
|
|
|
+ Assert.assertEquals("Number of killed events", 0, killedEvents.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestJobImpl t = new TestJobImpl();
|
|
|
t.testJobNoTasks();
|
|
@@ -1021,6 +1077,37 @@ public class TestJobImpl {
|
|
|
Assert.assertEquals(state, job.getInternalState());
|
|
|
}
|
|
|
|
|
|
+ private void createSpiedMapTasks(Map<NodeReport, TaskId>
|
|
|
+ nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job,
|
|
|
+ NodeState nodeState, List<NodeReport> nodeReports) {
|
|
|
+ for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) {
|
|
|
+ TaskId taskId = e.getKey();
|
|
|
+ Task task = e.getValue();
|
|
|
+ if (taskId.getTaskType() == TaskType.MAP) {
|
|
|
+ // add an attempt to the task to simulate nodes
|
|
|
+ NodeId nodeId = mock(NodeId.class);
|
|
|
+ TaskAttempt attempt = mock(TaskAttempt.class);
|
|
|
+ when(attempt.getNodeId()).thenReturn(nodeId);
|
|
|
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
|
|
+ when(attempt.getID()).thenReturn(attemptId);
|
|
|
+ // create a spied task
|
|
|
+ Task spied = spy(task);
|
|
|
+ Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>();
|
|
|
+ attemptMap.put(attemptId, attempt);
|
|
|
+ when(spied.getAttempts()).thenReturn(attemptMap);
|
|
|
+ doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
|
|
|
+ spiedTasks.put(taskId, spied);
|
|
|
+
|
|
|
+ // create a NodeReport based on the node id
|
|
|
+ NodeReport report = mock(NodeReport.class);
|
|
|
+ when(report.getNodeState()).thenReturn(nodeState);
|
|
|
+ when(report.getNodeId()).thenReturn(nodeId);
|
|
|
+ nodeReports.add(report);
|
|
|
+ nodeReportsToTaskIds.put(report, taskId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static class JobSubmittedEventHandler implements
|
|
|
EventHandler<JobHistoryEvent> {
|
|
|
|