|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.times;
|
|
@@ -68,6 +69,9 @@ import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
@@ -81,9 +85,12 @@ import org.apache.hadoop.yarn.Clock;
|
|
|
import org.apache.hadoop.yarn.ClusterInfo;
|
|
|
import org.apache.hadoop.yarn.SystemClock;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -91,12 +98,16 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.ArgumentCaptor;
|
|
|
|
|
|
-@SuppressWarnings("unchecked")
|
|
|
+@SuppressWarnings({"unchecked", "rawtypes"})
|
|
|
public class TestTaskAttempt{
|
|
|
-
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
@Test
|
|
|
public void testAttemptContainerRequest() throws Exception {
|
|
|
+ //WARNING: This test must run first. This is because there is an
|
|
|
+ // optimization where the credentials passed in are cached statically so
|
|
|
+ // they do not need to be recomputed when creating a new
|
|
|
+ // ContainerLaunchContext. if other tests run first this code will cache
|
|
|
+ // their credentials and this test will fail trying to look for the
|
|
|
+ // credentials it inserted in.
|
|
|
final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
|
|
|
final byte[] SECRET_KEY = ("secretkey").getBytes();
|
|
|
Map<ApplicationAccessType, String> acls =
|
|
@@ -125,7 +136,7 @@ public class TestTaskAttempt{
|
|
|
Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
|
|
|
("tokenid").getBytes(), ("tokenpw").getBytes(),
|
|
|
new Text("tokenkind"), new Text("tokenservice"));
|
|
|
-
|
|
|
+
|
|
|
TaskAttemptImpl taImpl =
|
|
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
|
|
mock(TaskSplitMetaInfo.class), jobConf, taListener,
|
|
@@ -134,7 +145,7 @@ public class TestTaskAttempt{
|
|
|
|
|
|
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
|
|
|
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
|
|
|
-
|
|
|
+
|
|
|
ContainerLaunchContext launchCtx =
|
|
|
TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
|
|
|
jobConf, jobToken, taImpl.createRemoteTask(),
|
|
@@ -185,7 +196,6 @@ public class TestTaskAttempt{
|
|
|
testMRAppHistory(app);
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
@Test
|
|
|
public void testSingleRackRequest() throws Exception {
|
|
|
TaskAttemptImpl.RequestContainerTransition rct =
|
|
@@ -213,11 +223,10 @@ public class TestTaskAttempt{
|
|
|
ContainerRequestEvent cre =
|
|
|
(ContainerRequestEvent) arg.getAllValues().get(1);
|
|
|
String[] requestedRacks = cre.getRacks();
|
|
|
- //Only a single occurance of /DefaultRack
|
|
|
+ //Only a single occurrence of /DefaultRack
|
|
|
assertEquals(1, requestedRacks.length);
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
@Test
|
|
|
public void testHostResolveAttempt() throws Exception {
|
|
|
TaskAttemptImpl.RequestContainerTransition rct =
|
|
@@ -316,14 +325,12 @@ public class TestTaskAttempt{
|
|
|
.getValue());
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
|
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
|
|
|
Clock clock = new SystemClock();
|
|
|
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
|
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
|
|
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
|
@@ -394,4 +401,67 @@ public class TestTaskAttempt{
|
|
|
};
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLaunchFailedWhileKilling() throws Exception {
|
|
|
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ BuilderUtils.newApplicationAttemptId(appId, 0);
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
|
|
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
|
|
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
|
|
+ Path jobFile = mock(Path.class);
|
|
|
+
|
|
|
+ MockEventHandler eventHandler = new MockEventHandler();
|
|
|
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
|
|
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
|
|
|
+
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
|
|
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
|
|
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
|
|
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
|
|
+
|
|
|
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
|
|
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
|
|
|
+
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
|
|
+ splits, jobConf, taListener,
|
|
|
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
|
|
|
+ new SystemClock(), null);
|
|
|
+
|
|
|
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
|
|
|
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
|
|
|
+ Container container = mock(Container.class);
|
|
|
+ when(container.getId()).thenReturn(contId);
|
|
|
+ when(container.getNodeId()).thenReturn(nid);
|
|
|
+
|
|
|
+ taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
+ TaskAttemptEventType.TA_SCHEDULE));
|
|
|
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
|
|
|
+ container, mock(Map.class)));
|
|
|
+ taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
+ TaskAttemptEventType.TA_KILL));
|
|
|
+ taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
|
|
+ taImpl.handle(new TaskAttemptEvent(attemptId,
|
|
|
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
|
|
|
+ assertFalse(eventHandler.internalError);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class MockEventHandler implements EventHandler {
|
|
|
+ public boolean internalError;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(Event event) {
|
|
|
+ if (event instanceof JobEvent) {
|
|
|
+ JobEvent je = ((JobEvent) event);
|
|
|
+ if (JobEventType.INTERNAL_ERROR == je.getType()) {
|
|
|
+ internalError = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
}
|