Browse Source

MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server. Contributed by Robert Kanter.

(cherry picked from commit 6b2f11b54bc679b0715fe66bd129e340e8c61c5c)
Zhijie Shen 10 năm trước cách đây
mục cha
commit
733380fa70

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -20,6 +20,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
     RM-restart. Contributed by Rohith
 
+    MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server.
+    (Robert Kanter via zjshen)
+
   IMPROVEMENTS
 
     MAPREDUCE-5971. Move the default options for distcp -p to

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml

@@ -79,6 +79,12 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+   </dependency>
   </dependencies>
 
   <build>

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java

@@ -27,7 +27,12 @@ public class JobHistoryEvent extends AbstractEvent<EventType>{
   private final HistoryEvent historyEvent;
 
   public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) {
-    super(historyEvent.getEventType());
+    this(jobID, historyEvent, System.currentTimeMillis());
+  }
+
+  public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent,
+          long timestamp) {
+    super(historyEvent.getEventType(), timestamp);
     this.jobID = jobID;
     this.historyEvent = historyEvent;
   }

+ 339 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -41,7 +41,9 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -57,8 +59,16 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
 
 /**
  * The job history events get routed to this class. This class writes the Job
@@ -108,6 +118,11 @@ public class JobHistoryEventHandler extends AbstractService
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
+  protected TimelineClient timelineClient;
+
+  private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
+  private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
+
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
     this.context = context;
@@ -225,7 +240,10 @@ public class JobHistoryEventHandler extends AbstractService
         conf.getInt(
             MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
             MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
-    
+
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+
     super.serviceInit(conf);
   }
 
@@ -250,6 +268,7 @@ public class JobHistoryEventHandler extends AbstractService
 
   @Override
   protected void serviceStart() throws Exception {
+    timelineClient.start();
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -372,6 +391,9 @@ public class JobHistoryEventHandler extends AbstractService
         LOG.info("Exception while closing file " + e.getMessage());
       }
     }
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
@@ -515,6 +537,7 @@ public class JobHistoryEventHandler extends AbstractService
       // For all events
       // (1) Write it out
       // (2) Process it for JobSummary
+      // (3) Process it for ATS
       MetaInfo mi = fileMap.get(event.getJobID());
       try {
         HistoryEvent historyEvent = event.getHistoryEvent();
@@ -523,6 +546,8 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
+        processEventForTimelineServer(historyEvent, event.getJobID(),
+                event.getTimestamp());
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
               + event.getHistoryEvent().getEventType());
@@ -667,6 +692,319 @@ public class JobHistoryEventHandler extends AbstractService
     }
   }
 
+  private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
+          long timestamp) {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(event.getEventType().name().toUpperCase());
+    tEvent.setTimestamp(timestamp);
+    TimelineEntity tEntity = new TimelineEntity();
+
+    switch (event.getEventType()) {
+      case JOB_SUBMITTED:
+        JobSubmittedEvent jse =
+            (JobSubmittedEvent) event;
+        tEvent.addEventInfo("SUBMIT_TIME", jse.getSubmitTime());
+        tEvent.addEventInfo("QUEUE_NAME", jse.getJobQueueName());
+        tEvent.addEventInfo("JOB_NAME", jse.getJobName());
+        tEvent.addEventInfo("USER_NAME", jse.getUserName());
+        tEvent.addEventInfo("JOB_CONF_PATH", jse.getJobConfPath());
+        tEvent.addEventInfo("ACLS", jse.getJobAcls());
+        tEvent.addEventInfo("JOB_QUEUE_NAME", jse.getJobQueueName());
+        tEvent.addEventInfo("WORKLFOW_ID", jse.getWorkflowId());
+        tEvent.addEventInfo("WORKFLOW_NAME", jse.getWorkflowName());
+        tEvent.addEventInfo("WORKFLOW_NAME_NAME", jse.getWorkflowNodeName());
+        tEvent.addEventInfo("WORKFLOW_ADJACENCIES",
+                jse.getWorkflowAdjacencies());
+        tEvent.addEventInfo("WORKFLOW_TAGS", jse.getWorkflowTags());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_STATUS_CHANGED:
+        JobStatusChangedEvent jsce = (JobStatusChangedEvent) event;
+        tEvent.addEventInfo("STATUS", jsce.getStatus());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_INFO_CHANGED:
+        JobInfoChangeEvent jice = (JobInfoChangeEvent) event;
+        tEvent.addEventInfo("SUBMIT_TIME", jice.getSubmitTime());
+        tEvent.addEventInfo("LAUNCH_TIME", jice.getLaunchTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_INITED:
+        JobInitedEvent jie = (JobInitedEvent) event;
+        tEvent.addEventInfo("START_TIME", jie.getLaunchTime());
+        tEvent.addEventInfo("STATUS", jie.getStatus());
+        tEvent.addEventInfo("TOTAL_MAPS", jie.getTotalMaps());
+        tEvent.addEventInfo("TOTAL_REDUCES", jie.getTotalReduces());
+        tEvent.addEventInfo("UBERIZED", jie.getUberized());
+        tEntity.setStartTime(jie.getLaunchTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_PRIORITY_CHANGED:
+        JobPriorityChangeEvent jpce = (JobPriorityChangeEvent) event;
+        tEvent.addEventInfo("PRIORITY", jpce.getPriority().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_QUEUE_CHANGED:
+        JobQueueChangeEvent jqe = (JobQueueChangeEvent) event;
+        tEvent.addEventInfo("QUEUE_NAMES", jqe.getJobQueueName());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_FAILED:
+      case JOB_KILLED:
+      case JOB_ERROR:
+        JobUnsuccessfulCompletionEvent juce =
+              (JobUnsuccessfulCompletionEvent) event;
+        tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
+        tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps());
+        tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces());
+        tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
+        tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
+        tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps());
+        tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_FINISHED:
+        JobFinishedEvent jfe = (JobFinishedEvent) event;
+        tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
+        tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps());
+        tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces());
+        tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
+        tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
+        tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
+        tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
+        tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
+                countersToJSON(jfe.getTotalCounters()));
+        tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
+                countersToJSON(jfe.getReduceCounters()));
+        tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
+                countersToJSON(jfe.getTotalCounters()));
+        tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case TASK_STARTED:
+        TaskStartedEvent tse = (TaskStartedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tse.getTaskType().toString());
+        tEvent.addEventInfo("START_TIME", tse.getStartTime());
+        tEvent.addEventInfo("SPLIT_LOCATIONS", tse.getSplitLocations());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tse.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case TASK_FAILED:
+        TaskFailedEvent tfe = (TaskFailedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tfe.getTaskType().toString());
+        tEvent.addEventInfo("STATUS", TaskStatus.State.FAILED.toString());
+        tEvent.addEventInfo("FINISH_TIME", tfe.getFinishTime());
+        tEvent.addEventInfo("ERROR", tfe.getError());
+        tEvent.addEventInfo("FAILED_ATTEMPT_ID",
+                tfe.getFailedAttemptID() == null ?
+                "" : tfe.getFailedAttemptID().toString());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tfe.getCounters()));
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tfe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case TASK_UPDATED:
+        TaskUpdatedEvent tue = (TaskUpdatedEvent) event;
+        tEvent.addEventInfo("FINISH_TIME", tue.getFinishTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tue.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case TASK_FINISHED:
+        TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tfe2.getCounters()));
+        tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
+        tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
+        tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
+                tfe2.getSuccessfulTaskAttemptId() == null ?
+                "" : tfe2.getSuccessfulTaskAttemptId().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tfe2.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case MAP_ATTEMPT_STARTED:
+      case CLEANUP_ATTEMPT_STARTED:
+      case REDUCE_ATTEMPT_STARTED:
+      case SETUP_ATTEMPT_STARTED:
+        TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
+        tEvent.addEventInfo("TASK_ATTEMPT_ID",
+            tase.getTaskAttemptId().toString() == null ?
+            "" : tase.getTaskAttemptId().toString());
+        tEvent.addEventInfo("START_TIME", tase.getStartTime());
+        tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
+        tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
+        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
+        tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
+        tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
+            "" : tase.getContainerId().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tase.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case MAP_ATTEMPT_FAILED:
+      case CLEANUP_ATTEMPT_FAILED:
+      case REDUCE_ATTEMPT_FAILED:
+      case SETUP_ATTEMPT_FAILED:
+      case MAP_ATTEMPT_KILLED:
+      case CLEANUP_ATTEMPT_KILLED:
+      case REDUCE_ATTEMPT_KILLED:
+      case SETUP_ATTEMPT_KILLED:
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+                (TaskAttemptUnsuccessfulCompletionEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tauce.getTaskType().toString());
+        tEvent.addEventInfo("TASK_ATTEMPT_ID",
+            tauce.getTaskAttemptId() == null ?
+            "" : tauce.getTaskAttemptId().toString());
+        tEvent.addEventInfo("FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("ERROR", tauce.getError());
+        tEvent.addEventInfo("STATUS", tauce.getTaskStatus());
+        tEvent.addEventInfo("HOSTNAME", tauce.getHostname());
+        tEvent.addEventInfo("PORT", tauce.getPort());
+        tEvent.addEventInfo("RACK_NAME", tauce.getRackName());
+        tEvent.addEventInfo("SHUFFLE_FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tauce.getCounters()));
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tauce.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case MAP_ATTEMPT_FINISHED:
+        MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", mafe.getTaskType().toString());
+        tEvent.addEventInfo("FINISH_TIME", mafe.getFinishTime());
+        tEvent.addEventInfo("STATUS", mafe.getTaskStatus());
+        tEvent.addEventInfo("STATE", mafe.getState());
+        tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(mafe.getCounters()));
+        tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
+        tEvent.addEventInfo("PORT", mafe.getPort());
+        tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
+        tEvent.addEventInfo("ATTEMPT_ID", mafe.getAttemptId() == null ?
+            "" : mafe.getAttemptId().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(mafe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case REDUCE_ATTEMPT_FINISHED:
+        ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", rafe.getTaskType().toString());
+        tEvent.addEventInfo("ATTEMPT_ID", rafe.getAttemptId() == null ?
+            "" : rafe.getAttemptId().toString());
+        tEvent.addEventInfo("FINISH_TIME", rafe.getFinishTime());
+        tEvent.addEventInfo("STATUS", rafe.getTaskStatus());
+        tEvent.addEventInfo("STATE", rafe.getState());
+        tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
+        tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(rafe.getCounters()));
+        tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
+        tEvent.addEventInfo("PORT", rafe.getPort());
+        tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(rafe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case SETUP_ATTEMPT_FINISHED:
+      case CLEANUP_ATTEMPT_FINISHED:
+        TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tafe.getTaskType().toString());
+        tEvent.addEventInfo("ATTEMPT_ID", tafe.getAttemptId() == null ?
+            "" : tafe.getAttemptId().toString());
+        tEvent.addEventInfo("FINISH_TIME", tafe.getFinishTime());
+        tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
+        tEvent.addEventInfo("STATE", tafe.getState());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tafe.getCounters()));
+        tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tafe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case AM_STARTED:
+        AMStartedEvent ase = (AMStartedEvent) event;
+        tEvent.addEventInfo("APPLICATION_ATTEMPT_ID",
+                ase.getAppAttemptId() == null ?
+                "" : ase.getAppAttemptId().toString());
+        tEvent.addEventInfo("CONTAINER_ID", ase.getContainerId() == null ?
+                "" : ase.getContainerId().toString());
+        tEvent.addEventInfo("NODE_MANAGER_HOST", ase.getNodeManagerHost());
+        tEvent.addEventInfo("NODE_MANAGER_PORT", ase.getNodeManagerPort());
+        tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
+                ase.getNodeManagerHttpPort());
+        tEvent.addEventInfo("START_TIME", ase.getStartTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      default:
+        break;
+    }
+
+    try {
+      timelineClient.putEntities(tEntity);
+    } catch (IOException ex) {
+      LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+      + "Server", ex);
+    } catch (YarnException ex) {
+      LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+      + "Server", ex);
+    }
+  }
+
+  @Private
+  public JsonNode countersToJSON(Counters counters) {
+    ObjectMapper mapper = new ObjectMapper();
+    ArrayNode nodes = mapper.createArrayNode();
+    if (counters != null) {
+      for (CounterGroup counterGroup : counters) {
+        ObjectNode groupNode = nodes.addObject();
+        groupNode.put("NAME", counterGroup.getName());
+        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
+        ArrayNode countersNode = groupNode.putArray("COUNTERS");
+        for (Counter counter : counterGroup) {
+          ObjectNode counterNode = countersNode.addObject();
+          counterNode.put("NAME", counter.getName());
+          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
+          counterNode.put("VALUE", counter.getValue());
+        }
+      }
+    }
+    return nodes;
+  }
+
   private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
 
     Counter slotMillisMapCounter = allCounters

+ 244 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.HashMap;
 
 import org.junit.Assert;
 
@@ -42,9 +43,12 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -55,14 +59,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.junit.After;
 import org.junit.AfterClass;
 import static org.junit.Assert.assertFalse;
 import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -126,7 +138,7 @@ public class TestJobHistoryEventHandler {
 
       // First completion event, but min-queue-size for batching flushes is 10
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-          t.taskID, null, 0, TaskType.MAP, "", null)));
+          t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       verify(mockWriter).flush();
 
     } finally {
@@ -162,7 +174,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, null, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       }
 
       handleNextNEvents(jheh, 9);
@@ -207,7 +219,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, null, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       }
 
       handleNextNEvents(jheh, 9);
@@ -248,7 +260,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, null, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       }
       queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
           TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
@@ -427,6 +439,231 @@ public class TestJobHistoryEventHandler {
         pathStr);
   }
 
+  // Have JobHistoryEventHandler handle some events and make sure they get
+  // stored to the Timeline store
+  @Test (timeout=50000)
+  public void testTimelineEventHandling() throws Exception {
+    TestParams t = new TestParams(false);
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    JHEvenHandlerForTest jheh = new JHEvenHandlerForTest(t.mockAppContext, 0);
+    jheh.init(conf);
+    MiniYARNCluster yarnCluster = null;
+    long currentTime = System.currentTimeMillis();
+    try {
+      yarnCluster = new MiniYARNCluster(
+            TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1, true);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+      jheh.start();
+      TimelineStore ts = yarnCluster.getApplicationHistoryServer()
+              .getTimelineStore();
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+              t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
+              currentTime - 10));
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+              null, null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(1, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(0).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+              new JobSubmittedEvent(TypeConverter.fromYarn(t.jobId), "name",
+              "user", 200, "/foo/job.xml",
+              new HashMap<JobACL, AccessControlList>(), "default"),
+              currentTime + 10));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(2, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(1).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+              new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
+              currentTime - 20));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(3, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
+              tEntity.getEvents().get(2).getEventType());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(1).getTimestamp());
+      Assert.assertEquals(currentTime - 20,
+              tEntity.getEvents().get(2).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+              new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
+              0, new Counters(), new Counters(), new Counters()), currentTime));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(4, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.JOB_FINISHED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(2).getEventType());
+      Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
+              tEntity.getEvents().get(3).getEventType());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime,
+              tEntity.getEvents().get(1).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(2).getTimestamp());
+      Assert.assertEquals(currentTime - 20,
+              tEntity.getEvents().get(3).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+            new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
+            0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(5, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_KILLED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(EventType.JOB_FINISHED.toString(),
+              tEntity.getEvents().get(2).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(3).getEventType());
+      Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
+              tEntity.getEvents().get(4).getEventType());
+      Assert.assertEquals(currentTime + 20,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(1).getTimestamp());
+      Assert.assertEquals(currentTime,
+              tEntity.getEvents().get(2).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(3).getTimestamp());
+      Assert.assertEquals(currentTime - 20,
+              tEntity.getEvents().get(4).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+            new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
+      entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
+      Assert.assertEquals(1, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.TASK_STARTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(TaskType.MAP.toString(),
+              tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+            new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
+      entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
+      Assert.assertEquals(2, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.TASK_STARTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(TaskType.REDUCE.toString(),
+              tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
+      Assert.assertEquals(TaskType.MAP.toString(),
+              tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
+    } finally {
+      if (yarnCluster != null) {
+        yarnCluster.stop();
+      }
+    }
+  }
+
+  @Test (timeout=50000)
+  public void testCountersToJSON() throws Exception {
+    JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
+    Counters counters = new Counters();
+    CounterGroup group1 = counters.addGroup("DOCTORS",
+            "Incarnations of the Doctor");
+    group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
+    group1.addCounter("MATT_SMITH", "Matt Smith", 11);
+    group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
+    CounterGroup group2 = counters.addGroup("COMPANIONS",
+            "Companions of the Doctor");
+    group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
+    group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
+    group2.addCounter("AMY_POND", "Amy Pond", 4);
+    group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
+    group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
+    group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
+    JsonNode jsonNode = jheh.countersToJSON(counters);
+    String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+        + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
+        + ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\","
+        + "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":"
+        + "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2},"
+        + "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\","
+        + "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory "
+        + "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":"
+        + "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\""
+        + ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":"
+        + "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10},"
+        + "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
+        + "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
+        + "\"VALUE\":12}]}]";
+    Assert.assertEquals(expected, jsonStr);
+  }
+
+  @Test (timeout=50000)
+  public void testCountersToJSONEmpty() throws Exception {
+    JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
+    Counters counters = null;
+    JsonNode jsonNode = jheh.countersToJSON(counters);
+    String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    String expected = "[]";
+    Assert.assertEquals(expected, jsonStr);
+
+    counters = new Counters();
+    jsonNode = jheh.countersToJSON(counters);
+    jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    expected = "[]";
+    Assert.assertEquals(expected, jsonStr);
+
+    counters.addGroup("DOCTORS", "Incarnations of the Doctor");
+    jsonNode = jheh.countersToJSON(counters);
+    jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
+        + "Doctor\",\"COUNTERS\":[]}]";
+    Assert.assertEquals(expected, jsonStr);
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -480,6 +717,7 @@ public class TestJobHistoryEventHandler {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
+    TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, 0);
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     AppContext mockAppContext;
 
@@ -557,11 +795,13 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
   private boolean mockHistoryProcessing = true;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
+    JobHistoryEventHandler.fileMap.clear();
   }
 
   public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
     super(context, startCount);
     this.mockHistoryProcessing = mockHistoryProcessing;
+    JobHistoryEventHandler.fileMap.clear();
   }
 
   @Override

+ 88 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMRTimelineEventHandling {
+
+  @Test
+  public void testMRTimelineEventHandling() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    MiniMRYarnCluster cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+              TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      TimelineStore ts = cluster.getApplicationHistoryServer()
+              .getTimelineStore();
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+      RunningJob job =
+              UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+              job.getJobStatus().getState().getValue());
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+              null, null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+      Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(tEntity.getEvents().size() - 1)
+              .getEventType());
+      Assert.assertEquals(EventType.JOB_FINISHED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+
+      job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.FAILED,
+              job.getJobStatus().getState().getValue());
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+              null, null, null);
+      Assert.assertEquals(2, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+      Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(tEntity.getEvents().size() - 1)
+              .getEventType());
+      Assert.assertEquals(EventType.JOB_FAILED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+    }
+  }
+
+}

+ 5 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java

@@ -72,8 +72,11 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
   }
 
   public MiniMRYarnCluster(String testName, int noOfNMs) {
-    super(testName, noOfNMs, 4, 4);
-    //TODO: add the history server
+    this(testName, noOfNMs, false);
+  }
+
+  public MiniMRYarnCluster(String testName, int noOfNMs, boolean enableAHS) {
+    super(testName, 1, noOfNMs, 4, 4, enableAHS);
     historyServerWrapper = new JobHistoryServerWrapper();
     addService(historyServerWrapper);
   }