|
@@ -18,30 +18,54 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
|
|
+import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
|
|
|
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.Clock;
|
|
|
+import org.apache.hadoop.yarn.SystemClock;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.ArgumentCaptor;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public class TestTaskAttempt{
|
|
@@ -57,6 +81,64 @@ public class TestTaskAttempt{
|
|
|
MRApp app = new FailingAttemptsMRApp(0, 1);
|
|
|
testMRAppHistory(app);
|
|
|
}
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ @Test
|
|
|
+ public void testHostResolveAttempt() throws Exception {
|
|
|
+ TaskAttemptImpl.RequestContainerTransition rct =
|
|
|
+ new TaskAttemptImpl.RequestContainerTransition(false);
|
|
|
+
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
+ String[] hosts = new String[3];
|
|
|
+ hosts[0] = "192.168.1.1";
|
|
|
+ hosts[1] = "host2";
|
|
|
+ hosts[2] = "host3";
|
|
|
+ TaskSplitMetaInfo splitInfo =
|
|
|
+ new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
|
|
|
+
|
|
|
+ TaskAttemptImpl mockTaskAttempt =
|
|
|
+ createMapTaskAttemptImplForTest(eventHandler, splitInfo);
|
|
|
+ TaskAttemptImpl spyTa = spy(mockTaskAttempt);
|
|
|
+ when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
|
|
|
+
|
|
|
+ TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
|
|
|
+ rct.transition(spyTa, mockTAEvent);
|
|
|
+ verify(spyTa).resolveHost(hosts[0]);
|
|
|
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
|
|
+ verify(eventHandler, times(2)).handle(arg.capture());
|
|
|
+ if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
|
|
|
+ Assert.fail("Second Event not of type ContainerRequestEvent");
|
|
|
+ }
|
|
|
+ Map<String, Boolean> expected = new HashMap<String, Boolean>();
|
|
|
+ expected.put("host1", true);
|
|
|
+ expected.put("host2", true);
|
|
|
+ expected.put("host3", true);
|
|
|
+ ContainerRequestEvent cre =
|
|
|
+ (ContainerRequestEvent) arg.getAllValues().get(1);
|
|
|
+ String[] requestedHosts = cre.getHosts();
|
|
|
+ for (String h : requestedHosts) {
|
|
|
+ expected.remove(h);
|
|
|
+ }
|
|
|
+ assertEquals(0, expected.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
|
+ EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
|
|
|
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
|
|
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
|
|
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
|
|
+ Path jobFile = mock(Path.class);
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
+ OutputCommitter outputCommitter = mock(OutputCommitter.class);
|
|
|
+ Clock clock = new SystemClock();
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
|
|
+ taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
|
|
|
+ null, clock);
|
|
|
+ return taImpl;
|
|
|
+ }
|
|
|
|
|
|
private void testMRAppHistory(MRApp app) throws Exception {
|
|
|
Configuration conf = new Configuration();
|