|
@@ -322,19 +322,19 @@ public class TestFetchFailure {
|
|
|
reduceTask3.getAttempts().values().iterator().next();
|
|
|
app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
|
|
|
updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
|
|
|
-
|
|
|
- //send 3 fetch failures from reduce to trigger map re execution
|
|
|
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
+
|
|
|
+ //send 2 fetch failures from reduce to prepare for map re execution
|
|
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
-
|
|
|
+
|
|
|
//We should not re-launch the map task yet
|
|
|
assertEquals(TaskState.SUCCEEDED, mapTask.getState());
|
|
|
updateStatus(app, reduceAttempt2, Phase.REDUCE);
|
|
|
updateStatus(app, reduceAttempt3, Phase.REDUCE);
|
|
|
-
|
|
|
+
|
|
|
+ //send 3rd fetch failures from reduce to trigger map re execution
|
|
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
|
|
-
|
|
|
+
|
|
|
//wait for map Task state move back to RUNNING
|
|
|
app.waitForState(mapTask, TaskState.RUNNING);
|
|
|
|