|
@@ -18,14 +18,19 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Iterator;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.mapreduce.Counters;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
@@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
@@ -254,6 +260,169 @@ public class TestFetchFailure {
|
|
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
|
|
Assert.assertEquals("Num completion events not correct", 2, events.length);
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFetchFailureMultipleReduces() throws Exception {
|
|
|
+ MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // map -> reduce -> fetch-failure -> map retry is incompatible with
|
|
|
+ // sequential, single-task-attempt approach in uber-AM, so disable:
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ //all maps would be running
|
|
|
+ Assert.assertEquals("Num tasks not correct",
|
|
|
+ 4, job.getTasks().size());
|
|
|
+ Iterator<Task> it = job.getTasks().values().iterator();
|
|
|
+ Task mapTask = it.next();
|
|
|
+ Task reduceTask = it.next();
|
|
|
+ Task reduceTask2 = it.next();
|
|
|
+ Task reduceTask3 = it.next();
|
|
|
+
|
|
|
+ //wait for Task state move to RUNNING
|
|
|
+ app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
+ TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ //send the done signal to the map attempt
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(mapAttempt1.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait for map success
|
|
|
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ TaskAttemptCompletionEvent[] events =
|
|
|
+ job.getTaskAttemptCompletionEvents(0, 100);
|
|
|
+ Assert.assertEquals("Num completion events not correct",
|
|
|
+ 1, events.length);
|
|
|
+ Assert.assertEquals("Event status not correct",
|
|
|
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
|
|
|
+
|
|
|
+ // wait for reduce to start running
|
|
|
+ app.waitForState(reduceTask, TaskState.RUNNING);
|
|
|
+ app.waitForState(reduceTask2, TaskState.RUNNING);
|
|
|
+ app.waitForState(reduceTask3, TaskState.RUNNING);
|
|
|
+ TaskAttempt reduceAttempt =
|
|
|
+ reduceTask.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
|
|
|
+
|
|
|
+ updateStatus(app, reduceAttempt, Phase.SHUFFLE);
|
|
|
+
|
|
|
+ TaskAttempt reduceAttempt2 =
|
|
|
+ reduceTask2.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
|
|
|
+ updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
|
|
|
+
|
|
|
+ TaskAttempt reduceAttempt3 =
|
|
|
+ reduceTask3.getAttempts().values().iterator().next();
|
|
|
+ app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
|
|
|
+ updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
|
|
|
+
|
|
|
+ //send 3 fetch failures from reduce to trigger map re execution
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+
|
|
|
+ //We should not re-launch the map task yet
|
|
|
+ assertEquals(TaskState.SUCCEEDED, mapTask.getState());
|
|
|
+ updateStatus(app, reduceAttempt2, Phase.REDUCE);
|
|
|
+ updateStatus(app, reduceAttempt3, Phase.REDUCE);
|
|
|
+
|
|
|
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+
|
|
|
+ //wait for map Task state move back to RUNNING
|
|
|
+ app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
+
|
|
|
+ //map attempt must have become FAILED
|
|
|
+ Assert.assertEquals("Map TaskAttempt state not correct",
|
|
|
+ TaskAttemptState.FAILED, mapAttempt1.getState());
|
|
|
+
|
|
|
+ Assert.assertEquals("Num attempts in Map Task not correct",
|
|
|
+ 2, mapTask.getAttempts().size());
|
|
|
+
|
|
|
+ Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
|
|
|
+ atIt.next();
|
|
|
+ TaskAttempt mapAttempt2 = atIt.next();
|
|
|
+
|
|
|
+ app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
|
|
|
+ //send the done signal to the second map attempt
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(mapAttempt2.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ // wait for map success
|
|
|
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
|
|
|
+
|
|
|
+ //send done to reduce
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(reduceAttempt.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //send done to reduce
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(reduceAttempt2.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ //send done to reduce
|
|
|
+ app.getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(reduceAttempt3.getID(),
|
|
|
+ TaskAttemptEventType.TA_DONE));
|
|
|
+
|
|
|
+ app.waitForState(job, JobState.SUCCEEDED);
|
|
|
+
|
|
|
+ //previous completion event now becomes obsolete
|
|
|
+ Assert.assertEquals("Event status not correct",
|
|
|
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
|
|
|
+
|
|
|
+ events = job.getTaskAttemptCompletionEvents(0, 100);
|
|
|
+ Assert.assertEquals("Num completion events not correct",
|
|
|
+ 6, events.length);
|
|
|
+ Assert.assertEquals("Event map attempt id not correct",
|
|
|
+ mapAttempt1.getID(), events[0].getAttemptId());
|
|
|
+ Assert.assertEquals("Event map attempt id not correct",
|
|
|
+ mapAttempt1.getID(), events[1].getAttemptId());
|
|
|
+ Assert.assertEquals("Event map attempt id not correct",
|
|
|
+ mapAttempt2.getID(), events[2].getAttemptId());
|
|
|
+ Assert.assertEquals("Event reduce attempt id not correct",
|
|
|
+ reduceAttempt.getID(), events[3].getAttemptId());
|
|
|
+ Assert.assertEquals("Event status not correct for map attempt1",
|
|
|
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
|
|
|
+ Assert.assertEquals("Event status not correct for map attempt1",
|
|
|
+ TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
|
|
|
+ Assert.assertEquals("Event status not correct for map attempt2",
|
|
|
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
|
|
|
+ Assert.assertEquals("Event status not correct for reduce attempt1",
|
|
|
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
|
|
+
|
|
|
+ TaskAttemptCompletionEvent mapEvents[] =
|
|
|
+ job.getMapAttemptCompletionEvents(0, 2);
|
|
|
+ Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
|
|
|
+ Assert.assertArrayEquals("Unexpected map events",
|
|
|
+ Arrays.copyOfRange(events, 0, 2), mapEvents);
|
|
|
+ mapEvents = job.getMapAttemptCompletionEvents(2, 200);
|
|
|
+ Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
|
|
|
+ Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
|
|
|
+ TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
|
|
|
+ status.counters = new Counters();
|
|
|
+ status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
|
|
|
+ status.id = attempt.getID();
|
|
|
+ status.mapFinishTime = 0;
|
|
|
+ status.outputSize = 0;
|
|
|
+ status.phase = phase;
|
|
|
+ status.progress = 0.5f;
|
|
|
+ status.shuffleFinishTime = 0;
|
|
|
+ status.sortFinishTime = 0;
|
|
|
+ status.stateString = "OK";
|
|
|
+ status.taskState = attempt.getState();
|
|
|
+ TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
|
|
|
+ status);
|
|
|
+ app.getContext().getEventHandler().handle(event);
|
|
|
+ }
|
|
|
|
|
|
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
|
|
TaskAttempt mapAttempt) {
|