|
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TaskID;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
@@ -52,7 +53,9 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
@@ -715,4 +718,40 @@ public class TestJobHistoryParsing {
|
|
|
assertNull(test.getAMInfos());
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMultipleFailedTasks() throws Exception {
|
|
|
+ JobHistoryParser parser =
|
|
|
+ new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
|
|
|
+ EventReader reader = Mockito.mock(EventReader.class);
|
|
|
+ final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
|
|
|
+ final org.apache.hadoop.mapreduce.TaskType taskType =
|
|
|
+ org.apache.hadoop.mapreduce.TaskType.MAP;
|
|
|
+ final TaskID[] tids = new TaskID[2];
|
|
|
+ JobID jid = new JobID("1", 1);
|
|
|
+ tids[0] = new TaskID(jid, taskType, 0);
|
|
|
+ tids[1] = new TaskID(jid, taskType, 1);
|
|
|
+ Mockito.when(reader.getNextEvent()).thenAnswer(
|
|
|
+ new Answer<HistoryEvent>() {
|
|
|
+ public HistoryEvent answer(InvocationOnMock invocation)
|
|
|
+ throws IOException {
|
|
|
+ // send two task start and two task fail events for tasks 0 and 1
|
|
|
+ int eventId = numEventsRead.getAndIncrement();
|
|
|
+ TaskID tid = tids[eventId & 0x1];
|
|
|
+ if (eventId < 2) {
|
|
|
+ return new TaskStartedEvent(tid, 0, taskType, "");
|
|
|
+ }
|
|
|
+ if (eventId < 4) {
|
|
|
+ TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
|
|
|
+ "failed", "FAILED", null);
|
|
|
+ tfe.setDatum(tfe.getDatum());
|
|
|
+ return tfe;
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ JobInfo info = parser.parse(reader);
|
|
|
+ assertTrue("Task 0 not implicated",
|
|
|
+ info.getErrorInfo().contains(tids[0].toString()));
|
|
|
+ }
|
|
|
}
|