Explorar el Código

MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du.

Zhijie Shen hace 10 años
padre
commit
5947ce0747
Se han modificado 31 ficheros con 838 adiciones y 52 borrados
  1. 15 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 218 40
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
  3. 23 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  4. 9 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  5. 7 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
  6. 5 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
  7. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  8. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
  9. 4 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
  10. 25 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
  11. 11 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
  12. 14 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
  13. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
  14. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
  15. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
  16. 23 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
  17. 16 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
  18. 23 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
  19. 11 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
  20. 24 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
  21. 19 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
  22. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
  23. 24 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
  24. 19 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
  25. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
  26. 12 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
  27. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
  28. 51 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
  29. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  30. 160 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
  31. 19 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java

+ 15 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1,5 +1,20 @@
 Hadoop MapReduce Change Log
 
+Branch YARN-2928: Timeline Server Next Generation: Phase 1
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+    MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history
+    events and counters. (Junping Du via zjshen)
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES

+ 218 - 40
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.mapreduce.jobhistory;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -49,11 +52,13 @@ import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -68,10 +73,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.ObjectNode;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import com.google.common.annotations.VisibleForTesting;
 /**
@@ -119,14 +122,24 @@ public class JobHistoryEventHandler extends AbstractService
 
   protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
+  
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO YARN-3367 replace with event loop in TimelineClient.
+  private static ExecutorService threadPool =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+          .build());
 
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
   protected TimelineClient timelineClient;
+  
+  private boolean newTimelineServiceEnabled = false;
 
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
+  private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT";
 
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
@@ -246,13 +259,22 @@ public class JobHistoryEventHandler extends AbstractService
             MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
             MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
 
+    // TODO replace MR specific configurations on timeline service with getting 
+    // configuration from RM through registerApplicationMaster() in 
+    // ApplicationMasterProtocol with return value for timeline service 
+    // configuration status: off, on_with_v1 or on_with_v2.
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
         MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
       if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
             YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-        timelineClient = TimelineClient.createTimelineClient();
+        
+        timelineClient = 
+            ((MRAppMaster.RunningAppContext)context).getTimelineClient();
         timelineClient.init(conf);
-        LOG.info("Timeline service is enabled");
+        newTimelineServiceEnabled = conf.getBoolean(
+            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
+            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
+        LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
         LOG.info("Emitting job history data to the timeline server is enabled");
       } else {
         LOG.info("Timeline service is not enabled");
@@ -426,9 +448,26 @@ public class JobHistoryEventHandler extends AbstractService
     if (timelineClient != null) {
       timelineClient.stop();
     }
+    shutdownAndAwaitTermination();
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
+  
+  // TODO remove threadPool after adding non-blocking call in TimelineClient
+  private static void shutdownAndAwaitTermination() {
+    threadPool.shutdown();
+    try {
+      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        threadPool.shutdownNow(); 
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
+            LOG.error("ThreadPool did not terminate");
+      }
+    } catch (InterruptedException ie) {
+      threadPool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
 
   protected EventWriter createEventWriter(Path historyFilePath)
       throws IOException {
@@ -583,8 +622,13 @@ public class JobHistoryEventHandler extends AbstractService
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
         if (timelineClient != null) {
-          processEventForTimelineServer(historyEvent, event.getJobID(),
-              event.getTimestamp());
+          if (newTimelineServiceEnabled) {
+            processEventForNewTimelineService(historyEvent, event.getJobID(),
+                event.getTimestamp());
+          } else {
+            processEventForTimelineServer(historyEvent, event.getJobID(),
+                event.getTimestamp());
+          }
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
@@ -825,11 +869,11 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
         tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
         tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
-                countersToJSON(jfe.getMapCounters()));
+            JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
         tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
-                countersToJSON(jfe.getReduceCounters()));
+            JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
         tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
-                countersToJSON(jfe.getTotalCounters()));
+            JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
         tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(jobId.toString());
@@ -855,7 +899,7 @@ public class JobHistoryEventHandler extends AbstractService
                 tfe.getFailedAttemptID() == null ?
                 "" : tfe.getFailedAttemptID().toString());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tfe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tfe.getTaskId().toString());
         tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@@ -873,7 +917,7 @@ public class JobHistoryEventHandler extends AbstractService
         TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
         tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tfe2.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
         tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
         tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
         tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
@@ -895,7 +939,6 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("START_TIME", tase.getStartTime());
         tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
         tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
-        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
         tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
         tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
             "" : tase.getContainerId().toString());
@@ -928,7 +971,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
         tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tauce.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tauce.getTaskId().toString());
         tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@@ -942,7 +985,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("STATE", mafe.getState());
         tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(mafe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
         tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
         tEvent.addEventInfo("PORT", mafe.getPort());
         tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
@@ -964,7 +1007,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
         tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(rafe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
         tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
         tEvent.addEventInfo("PORT", rafe.getPort());
         tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
@@ -983,7 +1026,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
         tEvent.addEventInfo("STATE", tafe.getState());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tafe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
         tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tafe.getTaskId().toString());
@@ -1010,37 +1053,172 @@ public class JobHistoryEventHandler extends AbstractService
       default:
         break;
     }
-
+    
     try {
       timelineClient.putEntities(tEntity);
-    } catch (IOException ex) {
-      LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
-      + "Server", ex);
-    } catch (YarnException ex) {
+    } catch (IOException|YarnException ex) {
       LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
       + "Server", ex);
     }
   }
-
-  @Private
-  public JsonNode countersToJSON(Counters counters) {
-    ObjectMapper mapper = new ObjectMapper();
-    ArrayNode nodes = mapper.createArrayNode();
-    if (counters != null) {
-      for (CounterGroup counterGroup : counters) {
-        ObjectNode groupNode = nodes.addObject();
-        groupNode.put("NAME", counterGroup.getName());
-        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
-        ArrayNode countersNode = groupNode.putArray("COUNTERS");
-        for (Counter counter : counterGroup) {
-          ObjectNode counterNode = countersNode.addObject();
-          counterNode.put("NAME", counter.getName());
-          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
-          counterNode.put("VALUE", counter.getValue());
+  
+  private void putEntityWithoutBlocking(final TimelineClient timelineClient, 
+      final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+        try {
+          timelineClient.putEntities(entity);
+        } catch (IOException|YarnException e) {
+          LOG.error("putEntityNonBlocking get failed: " + e);
+          throw new RuntimeException(e.toString());
         }
       }
+    };
+    threadPool.execute(publishWrapper);
+  }
+  
+  // create JobEntity from HistoryEvent with adding other info, like: 
+  // jobId, timestamp and entityType.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createJobEntity(HistoryEvent event, long timestamp, JobId jobId, 
+      String entityType) {
+    
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        createBaseEntity(event, timestamp, entityType);
+    entity.setId(jobId.toString());
+    return entity;
+  }
+  
+  // create BaseEntity from HistoryEvent with adding other info, like: 
+  // timestamp and entityType.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createBaseEntity(HistoryEvent event, long timestamp, String entityType) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = 
+        event.toTimelineEvent();
+    tEvent.setTimestamp(timestamp);
+    
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+    entity.addEvent(tEvent);
+    entity.setType(entityType);
+    return entity;
+  }
+  
+  // create TaskEntity from HistoryEvent with adding other info, like: 
+  // taskId, jobId, timestamp, entityType and relatedJobEntity.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createTaskEntity(HistoryEvent event, long timestamp, String taskId,
+      String entityType, String relatedJobEntity, JobId jobId) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        createBaseEntity(event, timestamp, entityType);
+    entity.setId(taskId);
+    entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
+    return entity;
+  }
+  
+  // create TaskAttemptEntity from HistoryEvent with adding other info, like: 
+  // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createTaskAttemptEntity(HistoryEvent event, long timestamp, 
+      String taskAttemptId, String entityType, String relatedTaskEntity, 
+      String taskId) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        createBaseEntity(event, timestamp, entityType);
+    entity.setId(taskAttemptId);
+    entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
+    return entity;
+  }
+  
+  private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
+      long timestamp) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null;
+    String taskId = null;
+    String taskAttemptId = null;
+
+    switch (event.getEventType()) {
+      // Handle job events
+      case JOB_SUBMITTED:
+      case JOB_STATUS_CHANGED:
+      case JOB_INFO_CHANGED:
+      case JOB_INITED:
+      case JOB_PRIORITY_CHANGED:
+      case JOB_QUEUE_CHANGED:
+      case JOB_FAILED:
+      case JOB_KILLED:
+      case JOB_ERROR:
+      case JOB_FINISHED:
+      case AM_STARTED:
+      case NORMALIZED_RESOURCE:
+        break;
+      // Handle task events
+      case TASK_STARTED:
+        taskId = ((TaskStartedEvent)event).getTaskId().toString();
+        break;
+      case TASK_FAILED:
+        taskId = ((TaskFailedEvent)event).getTaskId().toString();
+        break;
+      case TASK_UPDATED:
+        taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
+        break;
+      case TASK_FINISHED:
+        taskId = ((TaskFinishedEvent)event).getTaskId().toString();
+        break;
+      case MAP_ATTEMPT_STARTED:
+      case CLEANUP_ATTEMPT_STARTED:
+      case REDUCE_ATTEMPT_STARTED:
+      case SETUP_ATTEMPT_STARTED:
+        taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
+        taskAttemptId = ((TaskAttemptStartedEvent)event).
+            getTaskAttemptId().toString();
+        break;
+      case MAP_ATTEMPT_FAILED:
+      case CLEANUP_ATTEMPT_FAILED:
+      case REDUCE_ATTEMPT_FAILED:
+      case SETUP_ATTEMPT_FAILED:
+      case MAP_ATTEMPT_KILLED:
+      case CLEANUP_ATTEMPT_KILLED:
+      case REDUCE_ATTEMPT_KILLED:
+      case SETUP_ATTEMPT_KILLED:
+        taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString();
+        taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
+            getTaskAttemptId().toString();
+        break;
+      case MAP_ATTEMPT_FINISHED:
+        taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
+        taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString();
+        break;
+      case REDUCE_ATTEMPT_FINISHED:
+        taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
+        taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString();
+        break;
+      case SETUP_ATTEMPT_FINISHED:
+      case CLEANUP_ATTEMPT_FINISHED:
+        taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
+        taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString();
+        break;
+      default:
+        LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
+            " and handled by timeline service.");
+        return;
     }
-    return nodes;
+    if (taskId == null) {
+      // JobEntity
+      tEntity = createJobEntity(event, timestamp, jobId,
+          MAPREDUCE_JOB_ENTITY_TYPE);
+    } else {
+      if (taskAttemptId == null) {
+        // TaskEntity
+        tEntity = createTaskEntity(event, timestamp, taskId,
+            MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId);
+      } else {
+        // TaskAttemptEntity
+        tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
+            MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
+            taskId);
+      }
+    }
+
+    putEntityWithoutBlocking(timelineClient, tEntity);
   }
 
   private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {

+ 23 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -136,6 +136,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -1045,6 +1046,7 @@ public class MRAppMaster extends CompositeService {
     private final Configuration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
+    private TimelineClient timelineClient = null;
 
     private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
 
@@ -1054,6 +1056,23 @@ public class MRAppMaster extends CompositeService {
       this.clientToAMTokenSecretManager =
           new ClientToAMTokenSecretManager(appAttemptID, null);
       this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
+      if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
+              MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
+            && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+                YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+
+        boolean newTimelineServiceEnabled = conf.getBoolean(
+            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
+            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
+            
+        if (newTimelineServiceEnabled) {
+          // create new version TimelineClient
+          timelineClient = TimelineClient.createTimelineClient(
+              appAttemptID.getApplicationId());
+        } else {
+          timelineClient = TimelineClient.createTimelineClient();
+        }
+      }
     }
 
     @Override
@@ -1144,6 +1163,10 @@ public class MRAppMaster extends CompositeService {
       return taskAttemptFinishingMonitor;
     }
 
+    // Get Timeline Collector's address (get sync from RM)
+    public TimelineClient getTimelineClient() {
+      return timelineClient;
+    }
   }
 
   @SuppressWarnings("unchecked")

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -810,6 +811,14 @@ public class RMContainerAllocator extends RMContainerRequestor
 
     handleUpdatedNodes(response);
     handleJobPriorityChange(response);
+    String collectorAddr = response.getCollectorAddr();
+    MRAppMaster.RunningAppContext appContext = 
+        (MRAppMaster.RunningAppContext)this.getContext();
+    if (collectorAddr != null && !collectorAddr.isEmpty()
+        && appContext.getTimelineClient() != null) {
+      appContext.getTimelineClient().setTimelineServiceAddress(
+        response.getCollectorAddr());
+    }
 
     for (ContainerStatus cont : finishedContainers) {
       LOG.info("Received completed container " + cont.getContainerId());

+ 7 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.junit.Test;
 
 public class TestEvents {
@@ -404,7 +405,12 @@ public class TestEvents {
     public void setDatum(Object datum) {
       this.datum = datum;
     }
-
+    
+    @Override
+    public TimelineEvent toTimelineEvent() {
+      return null;
+    }
+    
   }
 
 }

+ 5 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -669,7 +670,7 @@ public class TestJobHistoryEventHandler {
     group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
     group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
     group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
-    JsonNode jsonNode = jheh.countersToJSON(counters);
+    JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
         + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
@@ -692,19 +693,19 @@ public class TestJobHistoryEventHandler {
   public void testCountersToJSONEmpty() throws Exception {
     JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
     Counters counters = null;
-    JsonNode jsonNode = jheh.countersToJSON(counters);
+    JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     String expected = "[]";
     Assert.assertEquals(expected, jsonStr);
 
     counters = new Counters();
-    jsonNode = jheh.countersToJSON(counters);
+    jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     expected = "[]";
     Assert.assertEquals(expected, jsonStr);
 
     counters.addGroup("DOCTORS", "Incarnations of the Doctor");
-    jsonNode = jheh.countersToJSON(counters);
+    jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
         + "Doctor\",\"COUNTERS\":[]}]";

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -467,6 +467,11 @@ public interface MRJobConfig {
     "mapreduce.job.emit-timeline-data";
   public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
       false;
+  
+  public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
+      "mapreduce.job.new-timeline-service.enabled";
+  public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
+      false;
 
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import org.apache.avro.util.Utf8;
@@ -166,4 +168,20 @@ public class AMStartedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.AM_STARTED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("APPLICATION_ATTEMPT_ID",
+        getAppAttemptId() == null ? "" : getAppAttemptId().toString());
+    tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+        "" : getContainerId().toString());
+    tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost());
+    tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort());
+    tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort());
+    tEvent.addInfo("START_TIME", getStartTime());
+    return tEvent;
+  }
+  
 }

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Interface for event wrapper classes.  Implementations each wrap an
@@ -37,4 +38,7 @@ public interface HistoryEvent {
 
   /** Set the Avro datum wrapped by this. */
   void setDatum(Object datum);
+  
+  /** Map HistoryEvent to TimelineEvent */
+  TimelineEvent toTimelineEvent();
 }

+ 25 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java

@@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful completion of job
@@ -133,4 +136,26 @@ public class JobFinishedEvent  implements HistoryEvent {
   public Counters getReduceCounters() {
     return reduceCounters;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("NUM_MAPS", getFinishedMaps());
+    tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
+    tEvent.addInfo("FAILED_MAPS", getFailedMaps());
+    tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
+    tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
+    tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
+    tEvent.addInfo("MAP_COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getMapCounters()));
+    tEvent.addInfo("REDUCE_COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getReduceCounters()));
+    tEvent.addInfo("TOTAL_COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getTotalCounters()));
+    // TODO replace SUCCEEDED with JobState.SUCCEEDED.toString()
+    tEvent.addInfo("JOB_STATUS", "SUCCEEDED");
+    return tEvent;
+  }
 }

+ 11 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java

@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -64,5 +66,14 @@ public class JobInfoChangeEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.JOB_INFO_CHANGED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
+    tEvent.addInfo("LAUNCH_TIME", getLaunchTime());
+    return tEvent;
+  }
 
 }

+ 14 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -73,4 +75,16 @@ public class JobInitedEvent implements HistoryEvent {
   }
   /** Get whether the job's map and reduce stages were combined */
   public boolean getUberized() { return datum.getUberized(); }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("START_TIME", getLaunchTime());
+    tEvent.addInfo("STATUS", getStatus());
+    tEvent.addInfo("TOTAL_MAPS", getTotalMaps());
+    tEvent.addInfo("TOTAL_REDUCES", getTotalReduces());
+    tEvent.addInfo("UBERIZED", getUberized());
+    return tEvent;
+  }
 }

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -64,5 +66,13 @@ public class JobPriorityChangeEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.JOB_PRIORITY_CHANGED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("PRIORITY", getPriority().toString());
+    return tEvent;
+  }
 
 }

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 @SuppressWarnings("deprecation")
 public class JobQueueChangeEvent implements HistoryEvent {
@@ -59,5 +61,13 @@ public class JobQueueChangeEvent implements HistoryEvent {
     }
     return null;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("QUEUE_NAMES", getJobQueueName());
+    return tEvent;
+  }
 
 }

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java

@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -60,5 +62,13 @@ public class JobStatusChangedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.JOB_STATUS_CHANGED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("STATUS", getStatus());
+    return tEvent;
+  }
 
 }

+ 23 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -205,5 +207,26 @@ public class JobSubmittedEvent implements HistoryEvent {
   }
   /** Get the event type */
   public EventType getEventType() { return EventType.JOB_SUBMITTED; }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
+    tEvent.addInfo("QUEUE_NAME", getJobQueueName());
+    tEvent.addInfo("JOB_NAME", getJobName());
+    tEvent.addInfo("USER_NAME", getUserName());
+    tEvent.addInfo("JOB_CONF_PATH", getJobConfPath());
+    tEvent.addInfo("ACLS", getJobAcls());
+    tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName());
+    tEvent.addInfo("WORKLFOW_ID", getWorkflowId());
+    tEvent.addInfo("WORKFLOW_NAME", getWorkflowName());
+    tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName());
+    tEvent.addInfo("WORKFLOW_ADJACENCIES",
+        getWorkflowAdjacencies());
+    tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags());
+    
+    return tEvent;
+  }
 
 }

+ 16 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java

@@ -24,6 +24,8 @@ import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import java.util.Collections;
 
@@ -119,4 +121,18 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
     final CharSequence diagnostics = datum.getDiagnostics();
     return diagnostics == null ? NODIAGS : diagnostics.toString();
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("NUM_MAPS", getFinishedMaps());
+    tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
+    tEvent.addInfo("JOB_STATUS", getStatus());
+    tEvent.addInfo("DIAGNOSTICS", getDiagnostics());
+    tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
+    tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
+    return tEvent;
+  }
 }

+ 23 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java

@@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful completion of a map attempt
@@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MapAttemptFinishedEvent  implements HistoryEvent {
+public class MapAttemptFinishedEvent implements HistoryEvent {
 
   private MapAttemptFinished datum = null;
 
@@ -218,4 +221,23 @@ public class MapAttemptFinishedEvent  implements HistoryEvent {
     return physMemKbytes;
   }
   
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    return tEvent;
+  }
+  
 }

+ 11 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record the normalized map/reduce requirements.
@@ -71,4 +73,13 @@ public class NormalizedResourceEvent implements HistoryEvent {
   public void setDatum(Object datum) {
     throw new UnsupportedOperationException("Not a seriable object");
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("MEMORY", "" + getMemory());
+    tEvent.addInfo("TASK_TYPE", getTaskType());
+    return tEvent;
+  }
 }

+ 24 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java

@@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful completion of a reduce attempt
@@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class ReduceAttemptFinishedEvent  implements HistoryEvent {
+public class ReduceAttemptFinishedEvent implements HistoryEvent {
 
   private ReduceAttemptFinished datum = null;
 
@@ -222,5 +225,25 @@ public class ReduceAttemptFinishedEvent  implements HistoryEvent {
   public int[] getPhysMemKbytes() {
     return physMemKbytes;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime());
+    tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    return tEvent;
+  }
 
 }

+ 19 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java

@@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful task completion
@@ -135,5 +138,21 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
            ? EventType.MAP_ATTEMPT_FINISHED
            : EventType.REDUCE_ATTEMPT_FINISHED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("HOSTNAME", getHostname());
+    return tEvent;
+  }
 
 }

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java

@@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -132,5 +134,21 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     }
     return null;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("TASK_ATTEMPT_ID",
+        getTaskAttemptId().toString());
+    tEvent.addInfo("START_TIME", getStartTime());
+    tEvent.addInfo("HTTP_PORT", getHttpPort());
+    tEvent.addInfo("TRACKER_NAME", getTrackerName());
+    tEvent.addInfo("SHUFFLE_PORT", getShufflePort());
+    tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+        "" : getContainerId().toString());
+    return tEvent;
+  }
 
 }

+ 24 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java

@@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.hadoop.mapred.ProgressSplitsBlock;
 
@@ -247,5 +250,26 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
   public int[] getPhysMemKbytes() {
     return physMemKbytes;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ?
+        "" : getTaskAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("ERROR", getError());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("SORT_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("MAP_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    return tEvent;
+  }
 
 }

+ 19 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java

@@ -20,10 +20,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -136,5 +140,20 @@ public class TaskFailedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.TASK_FAILED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("ERROR", getError());
+    tEvent.addInfo("FAILED_ATTEMPT_ID",
+        getFailedAttemptID() == null ? "" : getFailedAttemptID().toString());
+    tEvent.addInfo("COUNTERS_GROUPS", 
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    return tEvent;
+  }
 
 }

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java

@@ -21,10 +21,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record the successful completion of a task
@@ -115,5 +119,19 @@ public class TaskFinishedEvent implements HistoryEvent {
     return EventType.TASK_FINISHED;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
+    tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
+        getSuccessfulTaskAttemptId() == null ? "" : 
+            getSuccessfulTaskAttemptId().toString());
+    return tEvent;
+  }
   
 }

+ 12 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java

@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record the start of a task
@@ -71,5 +73,15 @@ public class TaskStartedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.TASK_STARTED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("START_TIME", getStartTime());
+    tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations());
+    return tEvent;
+  }
 
 }

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java

@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -60,5 +62,13 @@ public class TaskUpdatedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.TASK_UPDATED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    return tEvent;
+  }
 
 }

+ 51 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java

@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.util;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+
+public class JobHistoryEventUtils {
+
+  public static JsonNode countersToJSON(Counters counters) {
+    ObjectMapper mapper = new ObjectMapper();
+    ArrayNode nodes = mapper.createArrayNode();
+    if (counters != null) {
+      for (CounterGroup counterGroup : counters) {
+        ObjectNode groupNode = nodes.addObject();
+        groupNode.put("NAME", counterGroup.getName());
+        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
+        ArrayNode countersNode = groupNode.putArray("COUNTERS");
+        for (Counter counter : counterGroup) {
+          ObjectNode counterNode = countersNode.addObject();
+          counterNode.put("NAME", counter.getName());
+          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
+          counterNode.put("VALUE", counter.getValue());
+        }
+      }
+    }
+    return nodes;
+  }
+
+}

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -618,6 +618,13 @@
     </description>
 </property>
 
+ <property>
+    <name>mapreduce.job.new-timeline-service.enabled</name>
+    <value>false</value>
+    <description>Specifies if posting job and task events to new timeline service.
+    </description>
+</property>
+
 <property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>

+ 160 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java

@@ -18,23 +18,46 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
+import java.io.IOException;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestMRTimelineEventHandling {
 
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  private static final Log LOG =
+    LogFactory.getLog(TestMRTimelineEventHandling.class);
+  
   @Test
   public void testTimelineServiceStartInMiniCluster() throws Exception {
     Configuration conf = new YarnConfiguration();
@@ -48,7 +71,7 @@ public class TestMRTimelineEventHandling {
     MiniMRYarnCluster cluster = null;
     try {
       cluster = new MiniMRYarnCluster(
-          TestJobHistoryEventHandler.class.getSimpleName(), 1);
+        TestMRTimelineEventHandling.class.getSimpleName(), 1);
       cluster.init(conf);
       cluster.start();
 
@@ -89,7 +112,7 @@ public class TestMRTimelineEventHandling {
     MiniMRYarnCluster cluster = null;
     try {
       cluster = new MiniMRYarnCluster(
-          TestJobHistoryEventHandler.class.getSimpleName(), 1);
+        TestMRTimelineEventHandling.class.getSimpleName(), 1);
       cluster.init(conf);
       cluster.start();
       conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
@@ -136,6 +159,140 @@ public class TestMRTimelineEventHandling {
       }
     }
   }
+  
+  @Test
+  public void testMRNewTimelineServiceEventHandling() throws Exception {
+    LOG.info("testMRNewTimelineServiceEventHandling start.");
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+
+    // enable new timeline serivce in MR side
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true);
+
+    // enable aux-service based timeline collectors
+    conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
+    conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+      + ".class", PerNodeTimelineCollectorsAuxService.class.getName());
+    
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+
+    MiniMRYarnCluster cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+          TestMRTimelineEventHandling.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      LOG.info("A MiniMRYarnCluster get start.");
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+      LOG.info("Run 1st job which should be successful.");
+      RunningJob job =
+          UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(new Configuration(cluster.getConfig()));
+      yarnClient.start();
+      EnumSet<YarnApplicationState> appStates = 
+          EnumSet.allOf(YarnApplicationState.class);
+      
+      ApplicationId firstAppId = null;
+      List<ApplicationReport> apps = yarnClient.getApplications(appStates);
+      Assert.assertEquals(apps.size(), 1);
+      ApplicationReport appReport = apps.get(0);
+      firstAppId = appReport.getApplicationId();
+
+      checkNewTimelineEvent(firstAppId);
+
+      LOG.info("Run 2nd job which should be failed.");
+      job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.FAILED,
+          job.getJobStatus().getState().getValue());
+      
+      apps = yarnClient.getApplications(appStates);
+      Assert.assertEquals(apps.size(), 2);
+      
+      ApplicationId secAppId = null;
+      secAppId = apps.get(0).getApplicationId() == firstAppId ? 
+          apps.get(1).getApplicationId() : apps.get(0).getApplicationId();
+      checkNewTimelineEvent(firstAppId);
+
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+      // Cleanup test file
+      String testRoot =
+          FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+      File testRootFolder = new File(testRoot);
+      if(testRootFolder.isDirectory()) {
+        FileUtils.deleteDirectory(testRootFolder);
+      }
+      
+    }
+  }
+  
+  private void checkNewTimelineEvent(ApplicationId appId) throws IOException {
+    String tmpRoot =
+        FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+            + "/entities/";
+
+    File tmpRootFolder = new File(tmpRoot);
+    
+    Assert.assertTrue(tmpRootFolder.isDirectory());
+    String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
+        UserGroupInformation.getCurrentUser().getShortUserName() +
+        "/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
+        "/1/1/" + appId.toString();
+    // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
+    String outputDirJob = basePath + "/MAPREDUCE_JOB/";
+
+    File entityFolder = new File(outputDirJob);
+    Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.",
+        entityFolder.isDirectory());
+
+    // check for job event file
+    String jobEventFileName = appId.toString().replaceAll("application", "job")
+        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String jobEventFilePath = outputDirJob + jobEventFileName;
+    File jobEventFile = new File(jobEventFilePath);
+    Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.",
+        jobEventFile.exists());
+
+    // check for task event file
+    String outputDirTask = basePath + "/MAPREDUCE_TASK/";
+    File taskFolder = new File(outputDirTask);
+    Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.",
+        taskFolder.isDirectory());
+    
+    String taskEventFileName = appId.toString().replaceAll("application", "task")
+        + "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String taskEventFilePath = outputDirTask + taskEventFileName;
+    File taskEventFile = new File(taskEventFilePath);
+    Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.",
+        taskEventFile.exists());
+    
+    // check for task attempt event file
+    String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
+    File taskAttemptFolder = new File(outputDirTaskAttempt);
+    Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + 
+        " is not exist.", taskAttemptFolder.isDirectory());
+    
+    String taskAttemptEventFileName = appId.toString().replaceAll(
+        "application", "attempt") + "_m_000000_0" + 
+        FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+    String taskAttemptEventFilePath = outputDirTaskAttempt +
+        taskAttemptEventFileName;
+    File taskAttemptEventFile = new File(taskAttemptEventFilePath);
+    Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
+        " is not exist.", taskAttemptEventFile.exists());
+  }
 
   @Test
   public void testMapreduceJobTimelineServiceEnabled()
@@ -146,7 +303,7 @@ public class TestMRTimelineEventHandling {
     MiniMRYarnCluster cluster = null;
     try {
       cluster = new MiniMRYarnCluster(
-          TestJobHistoryEventHandler.class.getSimpleName(), 1);
+        TestMRTimelineEventHandling.class.getSimpleName(), 1);
       cluster.init(conf);
       cluster.start();
       conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,

+ 19 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java

@@ -66,6 +66,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
   private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
   private JobHistoryServer historyServer;
   private JobHistoryServerWrapper historyServerWrapper;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
 
   public MiniMRYarnCluster(String testName) {
     this(testName, 1);
@@ -167,8 +168,24 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
     conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                              // which shuffle doesn't happen
     //configure the shuffle service in NM
-    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
-        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    String[] nmAuxServices = conf.getStrings(YarnConfiguration.NM_AUX_SERVICES);
+    // if need to enable TIMELINE_AUX_SERVICE_NAME
+    boolean enableTimelineAuxService = false;
+    if (nmAuxServices != null) {
+      for (String nmAuxService: nmAuxServices) {
+        if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) {
+          enableTimelineAuxService = true;
+          break;
+        }
+      }
+    }
+    if (enableTimelineAuxService) {
+      conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+          new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, TIMELINE_AUX_SERVICE_NAME });
+    } else {
+      conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+          new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    }
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
         ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
         Service.class);