|
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
@@ -39,93 +38,100 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identif
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
|
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
|
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
|
|
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
* This class is responsible for posting application, appattempt & Container
|
|
|
* lifecycle related events to timeline service V2
|
|
|
*/
|
|
|
@Private
|
|
|
@Unstable
|
|
|
-public class TimelineServiceV2Publisher extends
|
|
|
- AbstractTimelineServicePublisher {
|
|
|
- private static final Log LOG = LogFactory
|
|
|
- .getLog(TimelineServiceV2Publisher.class);
|
|
|
+public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TimelineServiceV2Publisher.class);
|
|
|
protected RMTimelineCollectorManager rmTimelineCollectorManager;
|
|
|
+ private boolean publishContainerMetrics;
|
|
|
|
|
|
public TimelineServiceV2Publisher(RMContext rmContext) {
|
|
|
super("TimelineserviceV2Publisher");
|
|
|
rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
|
|
|
}
|
|
|
|
|
|
- private boolean publishContainerMetrics;
|
|
|
-
|
|
|
@Override
|
|
|
- protected void serviceInit(Configuration conf) throws Exception {
|
|
|
- publishContainerMetrics =
|
|
|
- conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
|
|
|
- super.serviceInit(conf);
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
+ super.serviceStart();
|
|
|
+ publishContainerMetrics = getConfig().getBoolean(
|
|
|
+ YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
|
|
|
+ getDispatcher().register(SystemMetricsEventType.class,
|
|
|
+ new TimelineV2EventHandler());
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean isPublishContainerMetrics() {
|
|
|
+ return publishContainerMetrics;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
|
|
|
- TimelineEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
+ public void appCreated(RMApp app, long createdTime) {
|
|
|
+ ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
|
|
+ entity.setQueue(app.getQueue());
|
|
|
+ entity.setCreatedTime(createdTime);
|
|
|
+
|
|
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
|
|
|
- event.getApplicationName());
|
|
|
+ entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
|
|
|
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
|
|
- event.getApplicationType());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
|
|
|
- event.getUser());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
|
|
- event.getQueue());
|
|
|
+ app.getApplicationType());
|
|
|
+ entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
|
|
|
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
|
|
- event.getSubmittedTime());
|
|
|
+ app.getSubmitTime());
|
|
|
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
|
|
|
- event.getAppTags());
|
|
|
+ app.getApplicationTags());
|
|
|
entityInfo.put(
|
|
|
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
|
|
|
- event.isUnmanagedApp());
|
|
|
+ app.getApplicationSubmissionContext().getUnmanagedAM());
|
|
|
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
|
|
|
- event.getApplicationPriority().getPriority());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
|
|
|
- event.getAppNodeLabelsExpression());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
|
|
|
- event.getAmNodeLabelsExpression());
|
|
|
- if (event.getCallerContext() != null) {
|
|
|
- if (event.getCallerContext().getContext() != null) {
|
|
|
- entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
|
|
|
- event.getCallerContext().getContext());
|
|
|
- }
|
|
|
- if (event.getCallerContext().getSignature() != null) {
|
|
|
- entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
|
|
|
- event.getCallerContext().getSignature());
|
|
|
- }
|
|
|
- }
|
|
|
+ app.getApplicationSubmissionContext().getPriority().getPriority());
|
|
|
+ entity.getConfigs().put(
|
|
|
+ ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
|
|
|
+ app.getAmNodeLabelExpression());
|
|
|
+ entity.getConfigs().put(
|
|
|
+ ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
|
|
|
+ app.getAppNodeLabelExpression());
|
|
|
entity.setInfo(entityInfo);
|
|
|
|
|
|
TimelineEvent tEvent = new TimelineEvent();
|
|
|
tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
+ tEvent.setTimestamp(createdTime);
|
|
|
entity.addEvent(tEvent);
|
|
|
|
|
|
- putEntity(entity, event.getApplicationId());
|
|
|
+ getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
|
|
+ SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
|
|
|
- ApplicationEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
- RMAppMetrics appMetrics = event.getAppMetrics();
|
|
|
+ public void appFinished(RMApp app, RMAppState state, long finishedTime) {
|
|
|
+ ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
|
|
+ RMAppMetrics appMetrics = app.getRMAppMetrics();
|
|
|
entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
|
|
|
appMetrics.getVcoreSeconds());
|
|
|
entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
|
|
@@ -133,54 +139,57 @@ public class TimelineServiceV2Publisher extends
|
|
|
|
|
|
TimelineEvent tEvent = new TimelineEvent();
|
|
|
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
+ tEvent.setTimestamp(finishedTime);
|
|
|
Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
- event.getDiagnosticsInfo());
|
|
|
- eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
|
|
|
- .getFinalApplicationStatus().toString());
|
|
|
- eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
|
|
|
- .getYarnApplicationState().toString());
|
|
|
- if (event.getLatestApplicationAttemptId() != null) {
|
|
|
+ app.getDiagnostics().toString());
|
|
|
+ eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
|
|
|
+ app.getFinalApplicationStatus().toString());
|
|
|
+ eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
|
|
|
+ RMServerUtils.createApplicationState(state).toString());
|
|
|
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null
|
|
|
+ ? null : app.getCurrentAppAttempt().getAppAttemptId();
|
|
|
+ if (appAttemptId != null) {
|
|
|
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
|
|
|
- event.getLatestApplicationAttemptId().toString());
|
|
|
+ appAttemptId.toString());
|
|
|
}
|
|
|
tEvent.setInfo(eventInfo);
|
|
|
|
|
|
entity.addEvent(tEvent);
|
|
|
- putEntity(entity, event.getApplicationId());
|
|
|
|
|
|
- //cleaning up the collector cached
|
|
|
- event.getApp().stopTimelineCollector();
|
|
|
+ getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
|
|
+ SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
|
|
|
+ ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
|
|
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
+ entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
|
|
+ appViewACLs);
|
|
|
+ entity.setInfo(entityInfo);
|
|
|
+
|
|
|
+ getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
|
|
+ SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
|
|
|
- ApplicationEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
+ public void appUpdated(RMApp app, long currentTimeMillis) {
|
|
|
+ ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
|
|
Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
|
|
- event.getQueue());
|
|
|
- eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
|
|
|
- .getApplicationPriority().getPriority());
|
|
|
+ app.getQueue());
|
|
|
+ eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
|
|
|
+ app.getApplicationSubmissionContext().getPriority().getPriority());
|
|
|
TimelineEvent tEvent = new TimelineEvent();
|
|
|
tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
+ tEvent.setTimestamp(currentTimeMillis);
|
|
|
tEvent.setInfo(eventInfo);
|
|
|
entity.addEvent(tEvent);
|
|
|
- putEntity(entity, event.getApplicationId());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
|
|
|
- ApplicationEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
- Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
|
|
- event.getViewAppACLs());
|
|
|
- entity.setInfo(entityInfo);
|
|
|
-
|
|
|
- putEntity(entity, event.getApplicationId());
|
|
|
+ getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
|
|
+ SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
|
|
}
|
|
|
|
|
|
private static ApplicationEntity createApplicationEntity(
|
|
@@ -190,111 +199,134 @@ public class TimelineServiceV2Publisher extends
|
|
|
return entity;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
|
|
|
+ public void appAttemptRegistered(RMAppAttempt appAttempt,
|
|
|
+ long registeredTime) {
|
|
|
TimelineEntity entity =
|
|
|
- createAppAttemptEntity(event.getApplicationAttemptId());
|
|
|
+ createAppAttemptEntity(appAttempt.getAppAttemptId());
|
|
|
+ entity.setCreatedTime(registeredTime);
|
|
|
+
|
|
|
TimelineEvent tEvent = new TimelineEvent();
|
|
|
tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
+ tEvent.setTimestamp(registeredTime);
|
|
|
Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(
|
|
|
- AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
|
|
|
- event.getTrackingUrl());
|
|
|
- eventInfo.put(
|
|
|
- AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
|
|
|
- event.getOriginalTrackingURL());
|
|
|
+ eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
|
|
|
+ appAttempt.getTrackingUrl());
|
|
|
+ eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
|
|
|
+ appAttempt.getOriginalTrackingUrl());
|
|
|
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
|
|
|
- event.getHost());
|
|
|
+ appAttempt.getHost());
|
|
|
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
|
|
|
- event.getRpcPort());
|
|
|
- if (event.getMasterContainerId() != null) {
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
|
|
|
- event.getMasterContainerId().toString());
|
|
|
- }
|
|
|
+ appAttempt.getRpcPort());
|
|
|
+ eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
|
|
|
+ appAttempt.getMasterContainer().getId().toString());
|
|
|
tEvent.setInfo(eventInfo);
|
|
|
entity.addEvent(tEvent);
|
|
|
- putEntity(entity, event.getApplicationAttemptId().getApplicationId());
|
|
|
+ getDispatcher().getEventHandler().handle(
|
|
|
+ new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
|
|
|
+ entity, appAttempt.getAppAttemptId().getApplicationId()));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
|
|
|
+ public void appAttemptFinished(RMAppAttempt appAttempt,
|
|
|
+ RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
|
|
|
+
|
|
|
ApplicationAttemptEntity entity =
|
|
|
- createAppAttemptEntity(event.getApplicationAttemptId());
|
|
|
+ createAppAttemptEntity(appAttempt.getAppAttemptId());
|
|
|
|
|
|
TimelineEvent tEvent = new TimelineEvent();
|
|
|
tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
+ tEvent.setTimestamp(finishedTime);
|
|
|
Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
|
|
|
- event.getTrackingUrl());
|
|
|
+ appAttempt.getTrackingUrl());
|
|
|
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
|
|
|
- event.getOriginalTrackingURL());
|
|
|
+ appAttempt.getOriginalTrackingUrl());
|
|
|
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
- event.getDiagnosticsInfo());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
|
|
|
- .getFinalApplicationStatus().toString());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
|
|
|
- .getYarnApplicationAttemptState().toString());
|
|
|
- if (event.getMasterContainerId() != null) {
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
|
|
|
- event.getMasterContainerId().toString());
|
|
|
- }
|
|
|
+ appAttempt.getDiagnostics());
|
|
|
+ // app will get the final status from app attempt, or create one
|
|
|
+ // based on app state if it doesn't exist
|
|
|
+ eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
|
|
|
+ app.getFinalApplicationStatus().toString());
|
|
|
+ eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
|
|
|
+ .createApplicationAttemptState(appAttemtpState).toString());
|
|
|
tEvent.setInfo(eventInfo);
|
|
|
|
|
|
entity.addEvent(tEvent);
|
|
|
- putEntity(entity, event.getApplicationAttemptId().getApplicationId());
|
|
|
+ getDispatcher().getEventHandler().handle(
|
|
|
+ new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
|
|
|
+ entity, appAttempt.getAppAttemptId().getApplicationId()));
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- void publishContainerCreatedEvent(ContainerCreatedEvent event) {
|
|
|
- TimelineEntity entity = createContainerEntity(event.getContainerId());
|
|
|
-
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- // updated as event info instead of entity info, as entity info is updated
|
|
|
- // by NM
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
|
|
|
- .getAllocatedResource().getMemory());
|
|
|
- eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
|
|
|
- .getAllocatedResource().getVirtualCores());
|
|
|
- eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
|
|
|
- .getAllocatedNode().getHost());
|
|
|
- eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
|
|
|
- .getAllocatedNode().getPort());
|
|
|
- eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
|
|
|
- event.getAllocatedPriority().getPriority());
|
|
|
- eventInfo.put(
|
|
|
- ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
|
|
|
- event.getNodeHttpAddress());
|
|
|
- tEvent.setInfo(eventInfo);
|
|
|
-
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity, event.getContainerId().getApplicationAttemptId()
|
|
|
- .getApplicationId());
|
|
|
+ private static ApplicationAttemptEntity createAppAttemptEntity(
|
|
|
+ ApplicationAttemptId appAttemptId) {
|
|
|
+ ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
|
|
|
+ entity.setId(appAttemptId.toString());
|
|
|
+ entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
|
|
|
+ appAttemptId.getApplicationId().toString()));
|
|
|
+ return entity;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
- void publishContainerFinishedEvent(ContainerFinishedEvent event) {
|
|
|
- TimelineEntity entity = createContainerEntity(event.getContainerId());
|
|
|
-
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
- event.getDiagnosticsInfo());
|
|
|
- eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
|
|
|
- event.getContainerExitStatus());
|
|
|
- eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
|
|
|
- .getContainerState().toString());
|
|
|
- tEvent.setInfo(eventInfo);
|
|
|
+ public void containerCreated(RMContainer container, long createdTime) {
|
|
|
+ if (publishContainerMetrics) {
|
|
|
+ TimelineEntity entity = createContainerEntity(container.getContainerId());
|
|
|
+ entity.setCreatedTime(createdTime);
|
|
|
+
|
|
|
+ TimelineEvent tEvent = new TimelineEvent();
|
|
|
+ tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
|
|
+ tEvent.setTimestamp(createdTime);
|
|
|
+ // updated as event info instead of entity info, as entity info is updated
|
|
|
+ // by NM
|
|
|
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
|
|
|
+ container.getAllocatedResource().getMemory());
|
|
|
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
|
|
|
+ container.getAllocatedResource().getVirtualCores());
|
|
|
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
|
|
|
+ container.getAllocatedNode().getHost());
|
|
|
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
|
|
|
+ container.getAllocatedNode().getPort());
|
|
|
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
|
|
|
+ container.getAllocatedPriority().getPriority());
|
|
|
+ eventInfo.put(
|
|
|
+ ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
|
|
|
+ container.getNodeHttpAddress());
|
|
|
+ tEvent.setInfo(eventInfo);
|
|
|
+
|
|
|
+ entity.addEvent(tEvent);
|
|
|
+ getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
|
|
+ SystemMetricsEventType.PUBLISH_ENTITY, entity, container
|
|
|
+ .getContainerId().getApplicationAttemptId().getApplicationId()));
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity, event.getContainerId().getApplicationAttemptId()
|
|
|
- .getApplicationId());
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void containerFinished(RMContainer container, long finishedTime) {
|
|
|
+ if (publishContainerMetrics) {
|
|
|
+ TimelineEntity entity = createContainerEntity(container.getContainerId());
|
|
|
+
|
|
|
+ TimelineEvent tEvent = new TimelineEvent();
|
|
|
+ tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
|
|
|
+ tEvent.setTimestamp(finishedTime);
|
|
|
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
+ eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
+ container.getDiagnosticsInfo());
|
|
|
+ eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
|
|
|
+ container.getContainerExitStatus());
|
|
|
+ eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
|
|
|
+ container.getContainerState().toString());
|
|
|
+ tEvent.setInfo(eventInfo);
|
|
|
+
|
|
|
+ entity.addEvent(tEvent);
|
|
|
+ getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
|
|
+ SystemMetricsEventType.PUBLISH_ENTITY, entity, container
|
|
|
+ .getContainerId().getApplicationAttemptId().getApplicationId()));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static ContainerEntity createContainerEntity(ContainerId containerId) {
|
|
@@ -322,17 +354,48 @@ public class TimelineServiceV2Publisher extends
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static ApplicationAttemptEntity createAppAttemptEntity(
|
|
|
- ApplicationAttemptId appAttemptId) {
|
|
|
- ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
|
|
|
- entity.setId(appAttemptId.toString());
|
|
|
- entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
|
|
|
- appAttemptId.getApplicationId().toString()));
|
|
|
- return entity;
|
|
|
+ private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent {
|
|
|
+ private RMAppImpl app;
|
|
|
+
|
|
|
+ public ApplicationFinishPublishEvent(SystemMetricsEventType type,
|
|
|
+ TimelineEntity entity, RMAppImpl app) {
|
|
|
+ super(type, entity, app.getApplicationId());
|
|
|
+ this.app = app;
|
|
|
+ }
|
|
|
+
|
|
|
+ public RMAppImpl getRMAppImpl() {
|
|
|
+ return app;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public boolean publishRMContainerMetrics() {
|
|
|
- return publishContainerMetrics;
|
|
|
+ private class TimelineV2EventHandler
|
|
|
+ implements EventHandler<TimelineV2PublishEvent> {
|
|
|
+ @Override
|
|
|
+ public void handle(TimelineV2PublishEvent event) {
|
|
|
+ switch (event.getType()) {
|
|
|
+ case PUBLISH_APPLICATION_FINISHED_ENTITY:
|
|
|
+ putEntity(event.getEntity(), event.getApplicationId());
|
|
|
+ ((ApplicationFinishPublishEvent) event).getRMAppImpl()
|
|
|
+ .stopTimelineCollector();
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ putEntity(event.getEntity(), event.getApplicationId());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class TimelineV2PublishEvent extends TimelinePublishEvent {
|
|
|
+ private TimelineEntity entity;
|
|
|
+
|
|
|
+ public TimelineV2PublishEvent(SystemMetricsEventType type,
|
|
|
+ TimelineEntity entity, ApplicationId appId) {
|
|
|
+ super(type, appId);
|
|
|
+ this.entity = entity;
|
|
|
+ }
|
|
|
+
|
|
|
+ public TimelineEntity getEntity() {
|
|
|
+ return entity;
|
|
|
+ }
|
|
|
}
|
|
|
}
|