|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -36,6 +37,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
|
|
+import org.apache.hadoop.yarn.api.ContainerManager;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerToken;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
/**
|
|
@@ -160,6 +167,74 @@ public class TestFail {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testTaskFailWithUnusedContainer() throws Exception {
|
|
|
+ MRApp app = new FailingTaskWithUnusedContainer();
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ int maxAttempts = 1;
|
|
|
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
|
|
|
+ // disable uberization (requires entire job to be reattempted, so max for
|
|
|
+ // subtask attempts is overridden to 1)
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
|
|
+ Job job = app.submit(conf);
|
|
|
+ app.waitForState(job, JobState.RUNNING);
|
|
|
+ Map<TaskId, Task> tasks = job.getTasks();
|
|
|
+ Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
|
|
|
+ Task task = tasks.values().iterator().next();
|
|
|
+ app.waitForState(task, TaskState.SCHEDULED);
|
|
|
+ Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
|
|
|
+ .next().getAttempts();
|
|
|
+ Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
|
|
|
+ .size());
|
|
|
+ TaskAttempt attempt = attempts.values().iterator().next();
|
|
|
+ app.waitForState(attempt, TaskAttemptState.ASSIGNED);
|
|
|
+ app.getDispatcher().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(attempt.getID(),
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
|
|
|
+ app.waitForState(job, JobState.FAILED);
|
|
|
+ }
|
|
|
+
|
|
|
+ static class FailingTaskWithUnusedContainer extends MRApp {
|
|
|
+
|
|
|
+ public FailingTaskWithUnusedContainer() {
|
|
|
+ super(1, 0, false, "TaskFailWithUnsedContainer", true);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected ContainerLauncher createContainerLauncher(AppContext context,
|
|
|
+ boolean isLocal) {
|
|
|
+ return new ContainerLauncherImpl(context) {
|
|
|
+ @Override
|
|
|
+ public void handle(ContainerLauncherEvent event) {
|
|
|
+
|
|
|
+ switch (event.getType()) {
|
|
|
+ case CONTAINER_REMOTE_LAUNCH:
|
|
|
+ super.handle(event);
|
|
|
+ break;
|
|
|
+ case CONTAINER_REMOTE_CLEANUP:
|
|
|
+ getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(event.getTaskAttemptID(),
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ContainerManager getCMProxy(ContainerId containerID,
|
|
|
+ String containerManagerBindAddr, ContainerToken containerToken)
|
|
|
+ throws IOException {
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ wait(); // Just hang the thread simulating a very slow NM.
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
static class TimeOutTaskMRApp extends MRApp {
|
|
|
TimeOutTaskMRApp(int maps, int reduces) {
|
|
|
super(maps, reduces, false, "TimeOutTaskMRApp", true);
|
|
@@ -232,5 +307,6 @@ public class TestFail {
|
|
|
t.testTimedOutTask();
|
|
|
t.testMapFailureMaxPercent();
|
|
|
t.testReduceFailureMaxPercent();
|
|
|
+ t.testTaskFailWithUnusedContainer();
|
|
|
}
|
|
|
}
|