|
@@ -25,6 +25,8 @@ import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestFetchFailure {
|
|
@@ -142,6 +145,107 @@ public class TestFetchFailure {
|
|
|
Assert.assertEquals("Event status not correct for reduce attempt1",
|
|
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This tests that if a map attempt was failed (say due to fetch failures),
|
|
|
+ * then it gets re-run. When the next map attempt is running, if the AM dies,
|
|
|
+ * then, on AM re-run, the AM does not incorrectly remember the first failed
|
|
|
+ * attempt. Currently recovery does not recover running tasks. Effectively,
|
|
|
+ * the AM re-runs the maps from scratch.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFetchFailureWithRecovery() throws Exception {
|
|
|
+ int runCount = 0;
|
|
|
+ MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // map -> reduce -> fetch-failure -> map retry is incompatible with
|
|
|
+ // sequential, single-task-attempt approach in uber-AM, so disable:
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ //all maps would be running
|
|
|
+ Assert.assertEquals("Num tasks not correct",
|
|
|
+ 2, job.getTasks().size());
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
+ Task mapTask = it.next();
|
|
|
+ Task reduceTask = it.next();
|
|
|
+
|
|
|
+ //wait for Task state move to RUNNING
|
|
|
+ app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
+ TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send the done signal to the map attempt
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(mapAttempt1.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait for map success
|
|
|
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ TaskAttemptCompletionEvent[] events =
|
|
|
+ job.getTaskAttemptCompletionEvents(0, 100);
|
|
|
+ Assert.assertEquals("Num completion events not correct",
|
|
|
+ 1, events.length);
|
|
|
+ Assert.assertEquals("Event status not correct",
|
|
|
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
|
|
|
+
|
|
|
+ // wait for reduce to start running
|
|
|
+ app.waitForState(reduceTask, TaskState.RUNNING);
|
|
|
+ TaskAttempt reduceAttempt =
|
|
|
+ reduceTask.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send 3 fetch failures from reduce to trigger map re execution
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+
|
|
|
+ //wait for map Task state move back to RUNNING
|
|
|
+ app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
+
|
|
|
+ // Crash the app again.
|
|
|
+ app.stop();
|
|
|
+
|
|
|
+ //rerun
|
|
|
+ app =
|
|
|
+ new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
|
|
|
+ ++runCount);
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ //all maps would be running
|
|
|
+ Assert.assertEquals("Num tasks not correct",
|
|
|
+ 2, job.getTasks().size());
|
|
|
+ it = job.getTasks().values().iterator();
|
|
|
+ mapTask = it.next();
|
|
|
+ reduceTask = it.next();
|
|
|
+
|
|
|
+ // the map is not in a SUCCEEDED state after restart of AM
|
|
|
+ app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
+ mapAttempt1 = mapTask.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send the done signal to the map attempt
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(mapAttempt1.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait for map success
|
|
|
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ reduceAttempt = reduceTask.getAttempts().values().iterator().next();
|
|
|
+ //send done to reduce
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(reduceAttempt.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ app.waitForState(job, JobState.SUCCEEDED);
|
|
|
+ events = job.getTaskAttemptCompletionEvents(0, 100);
|
|
|
+ Assert.assertEquals("Num completion events not correct", 2, events.length);
|
|
|
+ }
|
|
|
|
|
|
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
|
|
TaskAttempt mapAttempt) {
|
|
@@ -150,4 +254,20 @@ public class TestFetchFailure {
|
|
|
reduceAttempt.getID(),
|
|
|
Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
|
|
|
}
|
|
|
+
|
|
|
+ static class MRAppWithHistory extends MRApp {
|
|
|
+ public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
|
|
+ String testName, boolean cleanOnStart, int startCount) {
|
|
|
+ super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
|
|
+ AppContext context) {
|
|
|
+ JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
|
|
|
+ getStartCount());
|
|
|
+ return eventHandler;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|