|
@@ -25,10 +25,12 @@ import java.util.HashMap;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Set;
|
|
import java.util.Timer;
|
|
import java.util.Timer;
|
|
import java.util.TimerTask;
|
|
import java.util.TimerTask;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
@@ -44,16 +46,17 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.TaskStatus;
|
|
import org.apache.hadoop.mapred.TaskStatus;
|
|
import org.apache.hadoop.mapreduce.Counter;
|
|
import org.apache.hadoop.mapreduce.Counter;
|
|
-import org.apache.hadoop.mapreduce.CounterGroup;
|
|
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
|
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
|
|
|
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
|
@@ -67,16 +70,15 @@ import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
|
|
|
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
-import org.codehaus.jackson.JsonNode;
|
|
|
|
-import org.codehaus.jackson.map.ObjectMapper;
|
|
|
|
-import org.codehaus.jackson.node.ArrayNode;
|
|
|
|
-import org.codehaus.jackson.node.JsonNodeFactory;
|
|
|
|
-import org.codehaus.jackson.node.ObjectNode;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.sun.jersey.api.client.ClientHandlerException;
|
|
import com.sun.jersey.api.client.ClientHandlerException;
|
|
@@ -89,8 +91,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
|
|
*/
|
|
*/
|
|
public class JobHistoryEventHandler extends AbstractService
|
|
public class JobHistoryEventHandler extends AbstractService
|
|
implements EventHandler<JobHistoryEvent> {
|
|
implements EventHandler<JobHistoryEvent> {
|
|
- private static final JsonNodeFactory FACTORY =
|
|
|
|
- new ObjectMapper().getNodeFactory();
|
|
|
|
|
|
|
|
private final AppContext context;
|
|
private final AppContext context;
|
|
private final int startCount;
|
|
private final int startCount;
|
|
@@ -132,10 +132,15 @@ public class JobHistoryEventHandler extends AbstractService
|
|
// should job completion be force when the AM shuts down?
|
|
// should job completion be force when the AM shuts down?
|
|
protected volatile boolean forceJobCompletion = false;
|
|
protected volatile boolean forceJobCompletion = false;
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
protected TimelineClient timelineClient;
|
|
protected TimelineClient timelineClient;
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ protected TimelineV2Client timelineV2Client;
|
|
|
|
|
|
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
|
|
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
|
|
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
|
|
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
|
|
|
|
+ private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
|
|
|
|
+ "MAPREDUCE_TASK_ATTEMPT";
|
|
|
|
|
|
public JobHistoryEventHandler(AppContext context, int startCount) {
|
|
public JobHistoryEventHandler(AppContext context, int startCount) {
|
|
super("JobHistoryEventHandler");
|
|
super("JobHistoryEventHandler");
|
|
@@ -255,19 +260,33 @@ public class JobHistoryEventHandler extends AbstractService
|
|
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
|
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
|
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
|
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
|
|
|
|
|
|
|
+ // TODO replace MR specific configurations on timeline service with getting
|
|
|
|
+ // configuration from RM through registerApplicationMaster() in
|
|
|
|
+ // ApplicationMasterProtocol with return value for timeline service
|
|
|
|
+ // configuration status: off, on_with_v1 or on_with_v2.
|
|
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
|
|
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
|
|
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
|
|
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
|
|
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
|
|
|
- timelineClient = TimelineClient.createTimelineClient();
|
|
|
|
- timelineClient.init(conf);
|
|
|
|
- LOG.info("Timeline service is enabled");
|
|
|
|
- LOG.info("Emitting job history data to the timeline server is enabled");
|
|
|
|
|
|
+ LOG.info("Emitting job history data to the timeline service is enabled");
|
|
|
|
+ if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
|
|
|
+ boolean timelineServiceV2Enabled =
|
|
|
|
+ ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
|
|
|
|
+ if(timelineServiceV2Enabled) {
|
|
|
|
+ timelineV2Client =
|
|
|
|
+ ((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
|
|
|
|
+ timelineV2Client.init(conf);
|
|
|
|
+ } else {
|
|
|
|
+ timelineClient =
|
|
|
|
+ ((MRAppMaster.RunningAppContext) context).getTimelineClient();
|
|
|
|
+ timelineClient.init(conf);
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Timeline service is enabled; version: " +
|
|
|
|
+ YarnConfiguration.getTimelineServiceVersion(conf));
|
|
} else {
|
|
} else {
|
|
LOG.info("Timeline service is not enabled");
|
|
LOG.info("Timeline service is not enabled");
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- LOG.info("Emitting job history data to the timeline server is not enabled");
|
|
|
|
|
|
+ LOG.info("Emitting job history data to the timeline server is not " +
|
|
|
|
+ "enabled");
|
|
}
|
|
}
|
|
|
|
|
|
// Flag for setting
|
|
// Flag for setting
|
|
@@ -310,6 +329,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|
protected void serviceStart() throws Exception {
|
|
protected void serviceStart() throws Exception {
|
|
if (timelineClient != null) {
|
|
if (timelineClient != null) {
|
|
timelineClient.start();
|
|
timelineClient.start();
|
|
|
|
+ } else if (timelineV2Client != null) {
|
|
|
|
+ timelineV2Client.start();
|
|
}
|
|
}
|
|
eventHandlingThread = new Thread(new Runnable() {
|
|
eventHandlingThread = new Thread(new Runnable() {
|
|
@Override
|
|
@Override
|
|
@@ -434,6 +455,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|
}
|
|
}
|
|
if (timelineClient != null) {
|
|
if (timelineClient != null) {
|
|
timelineClient.stop();
|
|
timelineClient.stop();
|
|
|
|
+ } else if (timelineV2Client != null) {
|
|
|
|
+ timelineV2Client.stop();
|
|
}
|
|
}
|
|
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
|
|
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
|
|
super.serviceStop();
|
|
super.serviceStop();
|
|
@@ -591,7 +614,10 @@ public class JobHistoryEventHandler extends AbstractService
|
|
}
|
|
}
|
|
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
|
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
|
event.getJobID());
|
|
event.getJobID());
|
|
- if (timelineClient != null) {
|
|
|
|
|
|
+ if (timelineV2Client != null) {
|
|
|
|
+ processEventForNewTimelineService(historyEvent, event.getJobID(),
|
|
|
|
+ event.getTimestamp());
|
|
|
|
+ } else if (timelineClient != null) {
|
|
processEventForTimelineServer(historyEvent, event.getJobID(),
|
|
processEventForTimelineServer(historyEvent, event.getJobID(),
|
|
event.getTimestamp());
|
|
event.getTimestamp());
|
|
}
|
|
}
|
|
@@ -835,11 +861,11 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
|
|
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
|
|
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
|
|
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
|
|
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
|
|
- countersToJSON(jfe.getMapCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
|
|
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
|
|
- countersToJSON(jfe.getReduceCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
|
|
tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
|
|
- countersToJSON(jfe.getTotalCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
|
|
tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
|
|
tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.setEntityId(jobId.toString());
|
|
tEntity.setEntityId(jobId.toString());
|
|
@@ -865,7 +891,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tfe.getFailedAttemptID() == null ?
|
|
tfe.getFailedAttemptID() == null ?
|
|
"" : tfe.getFailedAttemptID().toString());
|
|
"" : tfe.getFailedAttemptID().toString());
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
- countersToJSON(tfe.getCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.setEntityId(tfe.getTaskId().toString());
|
|
tEntity.setEntityId(tfe.getTaskId().toString());
|
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
|
@@ -883,7 +909,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|
TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
|
|
TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
|
|
tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
|
|
tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
- countersToJSON(tfe2.getCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
|
|
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
|
|
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
|
|
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
|
|
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
|
|
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
|
|
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
|
|
@@ -905,7 +931,6 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tEvent.addEventInfo("START_TIME", tase.getStartTime());
|
|
tEvent.addEventInfo("START_TIME", tase.getStartTime());
|
|
tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
|
|
tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
|
|
tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
|
|
tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
|
|
- tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
|
|
|
|
tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
|
|
tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
|
|
tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
|
|
tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
|
|
"" : tase.getContainerId().toString());
|
|
"" : tase.getContainerId().toString());
|
|
@@ -938,7 +963,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
|
|
tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
|
|
tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
|
|
tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
- countersToJSON(tauce.getCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.setEntityId(tauce.getTaskId().toString());
|
|
tEntity.setEntityId(tauce.getTaskId().toString());
|
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
|
@@ -952,7 +977,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tEvent.addEventInfo("STATE", mafe.getState());
|
|
tEvent.addEventInfo("STATE", mafe.getState());
|
|
tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
|
|
tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
- countersToJSON(mafe.getCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
|
|
tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
|
|
tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
|
|
tEvent.addEventInfo("PORT", mafe.getPort());
|
|
tEvent.addEventInfo("PORT", mafe.getPort());
|
|
tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
|
|
tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
|
|
@@ -974,7 +999,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
|
|
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
|
|
tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
|
|
tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
- countersToJSON(rafe.getCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
|
|
tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
|
|
tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
|
|
tEvent.addEventInfo("PORT", rafe.getPort());
|
|
tEvent.addEventInfo("PORT", rafe.getPort());
|
|
tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
|
|
tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
|
|
@@ -993,7 +1018,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|
tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
|
|
tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
|
|
tEvent.addEventInfo("STATE", tafe.getState());
|
|
tEvent.addEventInfo("STATE", tafe.getState());
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
|
- countersToJSON(tafe.getCounters()));
|
|
|
|
|
|
+ JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
|
|
tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
|
|
tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.addEvent(tEvent);
|
|
tEntity.setEntityId(tafe.getTaskId().toString());
|
|
tEntity.setEntityId(tafe.getTaskId().toString());
|
|
@@ -1043,24 +1068,272 @@ public class JobHistoryEventHandler extends AbstractService
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Private
|
|
|
|
- public JsonNode countersToJSON(Counters counters) {
|
|
|
|
- ArrayNode nodes = FACTORY.arrayNode();
|
|
|
|
- if (counters != null) {
|
|
|
|
- for (CounterGroup counterGroup : counters) {
|
|
|
|
- ObjectNode groupNode = nodes.addObject();
|
|
|
|
- groupNode.put("NAME", counterGroup.getName());
|
|
|
|
- groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
|
|
|
|
- ArrayNode countersNode = groupNode.putArray("COUNTERS");
|
|
|
|
- for (Counter counter : counterGroup) {
|
|
|
|
- ObjectNode counterNode = countersNode.addObject();
|
|
|
|
- counterNode.put("NAME", counter.getName());
|
|
|
|
- counterNode.put("DISPLAY_NAME", counter.getDisplayName());
|
|
|
|
- counterNode.put("VALUE", counter.getValue());
|
|
|
|
|
|
+ // create JobEntity from HistoryEvent with adding other info, like:
|
|
|
|
+ // jobId, timestamp and entityType.
|
|
|
|
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
|
|
|
|
+ String entityType, boolean setCreatedTime) {
|
|
|
|
+
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
|
+ createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
|
|
|
+ entity.setId(jobId.toString());
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ createJobEntity(JobId jobId) {
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
|
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
|
|
|
+ entity.setId(jobId.toString());
|
|
|
|
+ entity.setType(MAPREDUCE_JOB_ENTITY_TYPE);
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // create ApplicationEntity with job finished Metrics from HistoryEvent
|
|
|
|
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) {
|
|
|
|
+ ApplicationEntity entity = new ApplicationEntity();
|
|
|
|
+ entity.setId(jobId.getAppId().toString());
|
|
|
|
+ entity.setMetrics(event.getTimelineMetrics());
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // create BaseEntity from HistoryEvent with adding other info, like:
|
|
|
|
+ // timestamp and entityType.
|
|
|
|
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ createBaseEntity(HistoryEvent event, long timestamp, String entityType,
|
|
|
|
+ boolean setCreatedTime) {
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent =
|
|
|
|
+ event.toTimelineEvent();
|
|
|
|
+ tEvent.setTimestamp(timestamp);
|
|
|
|
+
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
|
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
|
|
|
+ entity.addEvent(tEvent);
|
|
|
|
+ entity.setType(entityType);
|
|
|
|
+ if (setCreatedTime) {
|
|
|
|
+ entity.setCreatedTime(timestamp);
|
|
|
|
+ }
|
|
|
|
+ Set<TimelineMetric> timelineMetrics = event.getTimelineMetrics();
|
|
|
|
+ if (timelineMetrics != null) {
|
|
|
|
+ entity.setMetrics(timelineMetrics);
|
|
|
|
+ }
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // create TaskEntity from HistoryEvent with adding other info, like:
|
|
|
|
+ // taskId, jobId, timestamp, entityType and relatedJobEntity.
|
|
|
|
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ createTaskEntity(HistoryEvent event, long timestamp, String taskId,
|
|
|
|
+ String entityType, String relatedJobEntity, JobId jobId,
|
|
|
|
+ boolean setCreatedTime, long taskIdPrefix) {
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
|
+ createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
|
|
|
+ entity.setId(taskId);
|
|
|
|
+ if (event.getEventType() == EventType.TASK_STARTED) {
|
|
|
|
+ entity.addInfo("TASK_TYPE",
|
|
|
|
+ ((TaskStartedEvent)event).getTaskType().toString());
|
|
|
|
+ }
|
|
|
|
+ entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
|
|
|
|
+ entity.setIdPrefix(taskIdPrefix);
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // create TaskAttemptEntity from HistoryEvent with adding other info, like:
|
|
|
|
+ // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
|
|
|
|
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ createTaskAttemptEntity(HistoryEvent event, long timestamp,
|
|
|
|
+ String taskAttemptId, String entityType, String relatedTaskEntity,
|
|
|
|
+ String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) {
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
|
+ createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
|
|
|
+ entity.setId(taskAttemptId);
|
|
|
|
+ entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
|
|
|
|
+ entity.setIdPrefix(taskAttemptIdPrefix);
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event,
|
|
|
|
+ JobId jobId) {
|
|
|
|
+ if (event.getJobConf() == null) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // Publish job configurations both as job and app entity.
|
|
|
|
+ // Configs are split into multiple entities if they exceed 100kb in size.
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.
|
|
|
|
+ TimelineEntity jobEntityForConfigs = createJobEntity(jobId);
|
|
|
|
+ ApplicationEntity appEntityForConfigs = new ApplicationEntity();
|
|
|
|
+ String appId = jobId.getAppId().toString();
|
|
|
|
+ appEntityForConfigs.setId(appId);
|
|
|
|
+ try {
|
|
|
|
+ int configSize = 0;
|
|
|
|
+ for (Map.Entry<String, String> entry : event.getJobConf()) {
|
|
|
|
+ int size = entry.getKey().length() + entry.getValue().length();
|
|
|
|
+ configSize += size;
|
|
|
|
+ if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
|
|
|
|
+ if (jobEntityForConfigs.getConfigs().size() > 0) {
|
|
|
|
+ timelineV2Client.putEntities(jobEntityForConfigs);
|
|
|
|
+ timelineV2Client.putEntities(appEntityForConfigs);
|
|
|
|
+ jobEntityForConfigs = createJobEntity(jobId);
|
|
|
|
+ appEntityForConfigs = new ApplicationEntity();
|
|
|
|
+ appEntityForConfigs.setId(appId);
|
|
|
|
+ }
|
|
|
|
+ configSize = size;
|
|
}
|
|
}
|
|
|
|
+ jobEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
|
|
|
|
+ appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
|
|
|
|
+ }
|
|
|
|
+ if (configSize > 0) {
|
|
|
|
+ timelineV2Client.putEntities(jobEntityForConfigs);
|
|
|
|
+ timelineV2Client.putEntities(appEntityForConfigs);
|
|
}
|
|
}
|
|
|
|
+ } catch (IOException | YarnException e) {
|
|
|
|
+ LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
|
|
|
|
+ " for the job : " + jobId, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void processEventForNewTimelineService(HistoryEvent event,
|
|
|
|
+ JobId jobId, long timestamp) {
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity =
|
|
|
|
+ null;
|
|
|
|
+ String taskId = null;
|
|
|
|
+ String taskAttemptId = null;
|
|
|
|
+ boolean setCreatedTime = false;
|
|
|
|
+ long taskIdPrefix = 0;
|
|
|
|
+ long taskAttemptIdPrefix = 0;
|
|
|
|
+
|
|
|
|
+ switch (event.getEventType()) {
|
|
|
|
+ // Handle job events
|
|
|
|
+ case JOB_SUBMITTED:
|
|
|
|
+ setCreatedTime = true;
|
|
|
|
+ break;
|
|
|
|
+ case JOB_STATUS_CHANGED:
|
|
|
|
+ case JOB_INFO_CHANGED:
|
|
|
|
+ case JOB_INITED:
|
|
|
|
+ case JOB_PRIORITY_CHANGED:
|
|
|
|
+ case JOB_QUEUE_CHANGED:
|
|
|
|
+ case JOB_FAILED:
|
|
|
|
+ case JOB_KILLED:
|
|
|
|
+ case JOB_ERROR:
|
|
|
|
+ case JOB_FINISHED:
|
|
|
|
+ case AM_STARTED:
|
|
|
|
+ case NORMALIZED_RESOURCE:
|
|
|
|
+ break;
|
|
|
|
+ // Handle task events
|
|
|
|
+ case TASK_STARTED:
|
|
|
|
+ setCreatedTime = true;
|
|
|
|
+ taskId = ((TaskStartedEvent)event).getTaskId().toString();
|
|
|
|
+ taskIdPrefix = TimelineServiceHelper.
|
|
|
|
+ invertLong(((TaskStartedEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case TASK_FAILED:
|
|
|
|
+ taskId = ((TaskFailedEvent)event).getTaskId().toString();
|
|
|
|
+ taskIdPrefix = TimelineServiceHelper.
|
|
|
|
+ invertLong(((TaskFailedEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case TASK_UPDATED:
|
|
|
|
+ taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
|
|
|
|
+ break;
|
|
|
|
+ case TASK_FINISHED:
|
|
|
|
+ taskId = ((TaskFinishedEvent)event).getTaskId().toString();
|
|
|
|
+ taskIdPrefix = TimelineServiceHelper.
|
|
|
|
+ invertLong(((TaskFinishedEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case MAP_ATTEMPT_STARTED:
|
|
|
|
+ case REDUCE_ATTEMPT_STARTED:
|
|
|
|
+ setCreatedTime = true;
|
|
|
|
+ taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
|
|
|
|
+ taskAttemptId = ((TaskAttemptStartedEvent)event).
|
|
|
|
+ getTaskAttemptId().toString();
|
|
|
|
+ taskAttemptIdPrefix = TimelineServiceHelper.
|
|
|
|
+ invertLong(((TaskAttemptStartedEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case CLEANUP_ATTEMPT_STARTED:
|
|
|
|
+ case SETUP_ATTEMPT_STARTED:
|
|
|
|
+ taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
|
|
|
|
+ taskAttemptId = ((TaskAttemptStartedEvent)event).
|
|
|
|
+ getTaskAttemptId().toString();
|
|
|
|
+ break;
|
|
|
|
+ case MAP_ATTEMPT_FAILED:
|
|
|
|
+ case CLEANUP_ATTEMPT_FAILED:
|
|
|
|
+ case REDUCE_ATTEMPT_FAILED:
|
|
|
|
+ case SETUP_ATTEMPT_FAILED:
|
|
|
|
+ case MAP_ATTEMPT_KILLED:
|
|
|
|
+ case CLEANUP_ATTEMPT_KILLED:
|
|
|
|
+ case REDUCE_ATTEMPT_KILLED:
|
|
|
|
+ case SETUP_ATTEMPT_KILLED:
|
|
|
|
+ taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
|
|
|
|
+ getTaskId().toString();
|
|
|
|
+ taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
|
|
|
|
+ getTaskAttemptId().toString();
|
|
|
|
+ taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
|
|
|
|
+ ((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case MAP_ATTEMPT_FINISHED:
|
|
|
|
+ taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
|
|
|
|
+ taskAttemptId = ((MapAttemptFinishedEvent)event).
|
|
|
|
+ getAttemptId().toString();
|
|
|
|
+ taskAttemptIdPrefix = TimelineServiceHelper.
|
|
|
|
+ invertLong(((MapAttemptFinishedEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case REDUCE_ATTEMPT_FINISHED:
|
|
|
|
+ taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
|
|
|
|
+ taskAttemptId = ((ReduceAttemptFinishedEvent)event).
|
|
|
|
+ getAttemptId().toString();
|
|
|
|
+ taskAttemptIdPrefix = TimelineServiceHelper.
|
|
|
|
+ invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
|
|
|
|
+ break;
|
|
|
|
+ case SETUP_ATTEMPT_FINISHED:
|
|
|
|
+ case CLEANUP_ATTEMPT_FINISHED:
|
|
|
|
+ taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
|
|
|
|
+ taskAttemptId = ((TaskAttemptFinishedEvent)event).
|
|
|
|
+ getAttemptId().toString();
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
|
|
|
|
+ " and handled by timeline service.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
|
+ appEntityWithJobMetrics = null;
|
|
|
|
+ if (taskId == null) {
|
|
|
|
+ // JobEntity
|
|
|
|
+ tEntity = createJobEntity(event, timestamp, jobId,
|
|
|
|
+ MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime);
|
|
|
|
+ if (event.getEventType() == EventType.JOB_FINISHED
|
|
|
|
+ && event.getTimelineMetrics() != null) {
|
|
|
|
+ appEntityWithJobMetrics = createAppEntityWithJobMetrics(event, jobId);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ if (taskAttemptId == null) {
|
|
|
|
+ // TaskEntity
|
|
|
|
+ tEntity = createTaskEntity(event, timestamp, taskId,
|
|
|
|
+ MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
|
|
|
|
+ jobId, setCreatedTime, taskIdPrefix);
|
|
|
|
+ } else {
|
|
|
|
+ // TaskAttemptEntity
|
|
|
|
+ tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
|
|
|
|
+ MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
|
|
|
|
+ taskId, setCreatedTime, taskAttemptIdPrefix);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ if (appEntityWithJobMetrics == null) {
|
|
|
|
+ timelineV2Client.putEntitiesAsync(tEntity);
|
|
|
|
+ } else {
|
|
|
|
+ timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException | YarnException e) {
|
|
|
|
+ LOG.error("Failed to process Event " + event.getEventType()
|
|
|
|
+ + " for the job : " + jobId, e);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (event.getEventType() == EventType.JOB_SUBMITTED) {
|
|
|
|
+ // Publish configs after main job submitted event has been posted.
|
|
|
|
+ publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId);
|
|
}
|
|
}
|
|
- return nodes;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
|
|
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
|