|
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
@@ -402,6 +403,63 @@ public class TestJobHistoryParsing {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCountersForFailedTask() throws Exception {
|
|
|
+ LOG.info("STARTING testCountersForFailedTask");
|
|
|
+ try {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf
|
|
|
+ .setClass(
|
|
|
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
+ MyResolver.class, DNSToSwitchMapping.class);
|
|
|
+ RackResolver.init(conf);
|
|
|
+ MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
|
|
|
+ this.getClass().getName(), true);
|
|
|
+ app.submit(conf);
|
|
|
+ Job job = app.getContext().getAllJobs().values().iterator().next();
|
|
|
+ JobId jobId = job.getID();
|
|
|
+ app.waitForState(job, JobState.FAILED);
|
|
|
+
|
|
|
+ // make sure all events are flushed
|
|
|
+ app.waitForState(Service.STATE.STOPPED);
|
|
|
+
|
|
|
+ String jobhistoryDir = JobHistoryUtils
|
|
|
+ .getHistoryIntermediateDoneDirForUser(conf);
|
|
|
+ JobHistory jobHistory = new JobHistory();
|
|
|
+ jobHistory.init(conf);
|
|
|
+
|
|
|
+ JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
|
|
|
+ .getJobIndexInfo();
|
|
|
+ String jobhistoryFileName = FileNameIndexUtils
|
|
|
+ .getDoneFileName(jobIndexInfo);
|
|
|
+
|
|
|
+ Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
|
|
|
+ FSDataInputStream in = null;
|
|
|
+ FileContext fc = null;
|
|
|
+ try {
|
|
|
+ fc = FileContext.getFileContext(conf);
|
|
|
+ in = fc.open(fc.makeQualified(historyFilePath));
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
|
|
|
+ throw (new Exception("Can not open History File"));
|
|
|
+ }
|
|
|
+
|
|
|
+ JobHistoryParser parser = new JobHistoryParser(in);
|
|
|
+ JobInfo jobInfo = parser.parse();
|
|
|
+ Exception parseException = parser.getParseException();
|
|
|
+ Assert.assertNull("Caught an expected exception " + parseException,
|
|
|
+ parseException);
|
|
|
+ for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
|
|
|
+ TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
|
|
|
+ CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
|
|
|
+ Assert.assertNotNull("completed task report has null counters",
|
|
|
+ ct.getReport().getCounters());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ LOG.info("FINISHED testCountersForFailedTask");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
|
|
|
|
|
|
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
|
|
@@ -422,6 +480,26 @@ public class TestJobHistoryParsing {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
|
|
|
+
|
|
|
+ public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
|
|
|
+ String testName, boolean cleanOnStart) {
|
|
|
+ super(maps, reduces, autoComplete, testName, cleanOnStart);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ protected void attemptLaunched(TaskAttemptId attemptID) {
|
|
|
+ if (attemptID.getTaskId().getId() == 0) {
|
|
|
+ getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
|
|
|
+ } else {
|
|
|
+ getContext().getEventHandler().handle(
|
|
|
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestJobHistoryParsing t = new TestJobHistoryParsing();
|
|
|
t.testHistoryParsing();
|