|
@@ -18,10 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -29,33 +25,25 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
|
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
|
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
-import org.apache.hadoop.yarn.client.api.TimelineClient;
|
|
|
+import org.apache.hadoop.service.Service;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
-import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
-import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
|
|
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
|
|
-import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
- * The class that helps RM publish metrics to the timeline server V1. RM will
|
|
|
+ * The class that helps RM publish metrics to the timeline server. RM will
|
|
|
* always invoke the methods of this class regardless the service is enabled or
|
|
|
* not. If it is disabled, publishing requests will be ignored silently.
|
|
|
*/
|
|
@@ -67,30 +55,38 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
.getLog(SystemMetricsPublisher.class);
|
|
|
|
|
|
private Dispatcher dispatcher;
|
|
|
- private TimelineClient client;
|
|
|
- private boolean publishSystemMetricsToATSv1;
|
|
|
+ private boolean publishSystemMetrics;
|
|
|
+ private boolean publishContainerMetrics;
|
|
|
+ protected RMContext rmContext;
|
|
|
|
|
|
- public SystemMetricsPublisher() {
|
|
|
+ public SystemMetricsPublisher(RMContext rmContext) {
|
|
|
super(SystemMetricsPublisher.class.getName());
|
|
|
+ this.rmContext = rmContext;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
|
- publishSystemMetricsToATSv1 =
|
|
|
+ publishSystemMetrics =
|
|
|
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
|
|
|
- && conf.getBoolean(
|
|
|
- YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
|
|
|
-
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
- client = TimelineClient.createTimelineClient();
|
|
|
- addIfService(client);
|
|
|
-
|
|
|
- dispatcher = createDispatcher(conf);
|
|
|
- dispatcher.register(SystemMetricsEventType.class,
|
|
|
- new ForwardingEventHandler());
|
|
|
- addIfService(dispatcher);
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
|
|
|
+ if (publishSystemMetrics) {
|
|
|
+ TimelineServicePublisher timelineServicePublisher =
|
|
|
+ getTimelineServicePublisher(conf);
|
|
|
+ if (timelineServicePublisher != null) {
|
|
|
+ addService(timelineServicePublisher);
|
|
|
+ // init required to be called so that other methods of
|
|
|
+ // TimelineServicePublisher can be utilized
|
|
|
+ timelineServicePublisher.init(conf);
|
|
|
+ dispatcher = createDispatcher(timelineServicePublisher);
|
|
|
+ publishContainerMetrics =
|
|
|
+ timelineServicePublisher.publishRMContainerMetrics();
|
|
|
+ dispatcher.register(SystemMetricsEventType.class,
|
|
|
+ timelineServicePublisher.getEventHandler());
|
|
|
+ addIfService(dispatcher);
|
|
|
+ } else {
|
|
|
+ LOG.info("TimelineServicePublisher is not configured");
|
|
|
+ publishSystemMetrics = false;
|
|
|
+ }
|
|
|
LOG.info("YARN system metrics publishing service is enabled");
|
|
|
} else {
|
|
|
LOG.info("YARN system metrics publishing service is not enabled");
|
|
@@ -98,9 +94,26 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
|
|
|
+ return timelineServicePublisher.getDispatcher();
|
|
|
+ }
|
|
|
+
|
|
|
+ TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
|
|
|
+ if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
|
|
|
+ return new TimelineServiceV1Publisher();
|
|
|
+ } else if (conf.getBoolean(
|
|
|
+ YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
|
|
|
+ return new TimelineServiceV2Publisher(rmContext);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void appCreated(RMApp app, long createdTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishSystemMetrics) {
|
|
|
ApplicationSubmissionContext appSubmissionContext =
|
|
|
app.getApplicationSubmissionContext();
|
|
|
dispatcher.getEventHandler().handle(
|
|
@@ -122,7 +135,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void appUpdated(RMApp app, long updatedTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishSystemMetrics) {
|
|
|
dispatcher.getEventHandler()
|
|
|
.handle(new ApplicationUpdatedEvent(app.getApplicationId(),
|
|
|
app.getQueue(), updatedTime,
|
|
@@ -132,7 +145,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishSystemMetrics) {
|
|
|
dispatcher.getEventHandler().handle(
|
|
|
new ApplicationFinishedEvent(
|
|
|
app.getApplicationId(),
|
|
@@ -142,14 +155,15 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
app.getCurrentAppAttempt() == null ?
|
|
|
null : app.getCurrentAppAttempt().getAppAttemptId(),
|
|
|
finishedTime,
|
|
|
- app.getRMAppMetrics()));
|
|
|
+ app.getRMAppMetrics(),
|
|
|
+ (RMAppImpl)app));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void appACLsUpdated(RMApp app, String appViewACLs,
|
|
|
long updatedTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishSystemMetrics) {
|
|
|
dispatcher.getEventHandler().handle(
|
|
|
new ApplicationACLsUpdatedEvent(
|
|
|
app.getApplicationId(),
|
|
@@ -161,7 +175,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void appAttemptRegistered(RMAppAttempt appAttempt,
|
|
|
long registeredTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishSystemMetrics) {
|
|
|
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
|
|
|
: appAttempt.getMasterContainer().getId();
|
|
|
dispatcher.getEventHandler().handle(
|
|
@@ -179,7 +193,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void appAttemptFinished(RMAppAttempt appAttempt,
|
|
|
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishSystemMetrics) {
|
|
|
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
|
|
|
: appAttempt.getMasterContainer().getId();
|
|
|
dispatcher.getEventHandler().handle(
|
|
@@ -199,7 +213,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void containerCreated(RMContainer container, long createdTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishContainerMetrics) {
|
|
|
dispatcher.getEventHandler().handle(
|
|
|
new ContainerCreatedEvent(
|
|
|
container.getContainerId(),
|
|
@@ -212,7 +226,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void containerFinished(RMContainer container, long finishedTime) {
|
|
|
- if (publishSystemMetricsToATSv1) {
|
|
|
+ if (publishContainerMetrics) {
|
|
|
dispatcher.getEventHandler().handle(
|
|
|
new ContainerFinishedEvent(
|
|
|
container.getContainerId(),
|
|
@@ -223,370 +237,31 @@ public class SystemMetricsPublisher extends CompositeService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected Dispatcher createDispatcher(Configuration conf) {
|
|
|
- MultiThreadedDispatcher dispatcher =
|
|
|
- new MultiThreadedDispatcher(
|
|
|
- conf.getInt(
|
|
|
- YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
|
|
|
- YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
|
|
|
- dispatcher.setDrainEventsOnStop();
|
|
|
- return dispatcher;
|
|
|
- }
|
|
|
-
|
|
|
- protected void handleSystemMetricsEvent(
|
|
|
- SystemMetricsEvent event) {
|
|
|
- switch (event.getType()) {
|
|
|
- case APP_CREATED:
|
|
|
- publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
|
|
|
- break;
|
|
|
- case APP_FINISHED:
|
|
|
- publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
|
|
|
- break;
|
|
|
- case APP_ACLS_UPDATED:
|
|
|
- publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
|
|
|
- break;
|
|
|
- case APP_UPDATED:
|
|
|
- publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
|
|
|
- break;
|
|
|
- case APP_ATTEMPT_REGISTERED:
|
|
|
- publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
|
|
|
- break;
|
|
|
- case APP_ATTEMPT_FINISHED:
|
|
|
- publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
|
|
|
- break;
|
|
|
- case CONTAINER_CREATED:
|
|
|
- publishContainerCreatedEvent((ContainerCreatedEvent) event);
|
|
|
- break;
|
|
|
- case CONTAINER_FINISHED:
|
|
|
- publishContainerFinishedEvent((ContainerFinishedEvent) event);
|
|
|
- break;
|
|
|
- default:
|
|
|
- LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
|
|
|
- TimelineEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
- Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
|
|
|
- event.getApplicationName());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
|
|
- event.getApplicationType());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
|
|
|
- event.getUser());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
|
|
- event.getQueue());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
|
|
- event.getSubmittedTime());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
|
|
|
- event.getAppTags());
|
|
|
- entityInfo.put(
|
|
|
- ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
|
|
|
- event.isUnmanagedApp());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
|
|
|
- event.getApplicationPriority().getPriority());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
|
|
|
- event.getAppNodeLabelsExpression());
|
|
|
- entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
|
|
|
- event.getAmNodeLabelsExpression());
|
|
|
- if (event.getCallerContext() != null) {
|
|
|
- if (event.getCallerContext().getContext() != null) {
|
|
|
- entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
|
|
|
- event.getCallerContext().getContext());
|
|
|
- }
|
|
|
- if (event.getCallerContext().getSignature() != null) {
|
|
|
- entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
|
|
|
- event.getCallerContext().getSignature());
|
|
|
- }
|
|
|
- }
|
|
|
- entity.setOtherInfo(entityInfo);
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(
|
|
|
- ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
|
|
|
- TimelineEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(
|
|
|
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
- event.getDiagnosticsInfo());
|
|
|
- eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
|
|
|
- event.getFinalApplicationStatus().toString());
|
|
|
- eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
|
|
|
- event.getYarnApplicationState().toString());
|
|
|
- if (event.getLatestApplicationAttemptId() != null) {
|
|
|
- eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
|
|
|
- event.getLatestApplicationAttemptId().toString());
|
|
|
- }
|
|
|
- RMAppMetrics appMetrics = event.getAppMetrics();
|
|
|
- entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
|
|
|
- appMetrics.getVcoreSeconds());
|
|
|
- entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
|
|
|
- appMetrics.getMemorySeconds());
|
|
|
-
|
|
|
- tEvent.setEventInfo(eventInfo);
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
|
|
|
- TimelineEntity entity = createApplicationEntity(event.getApplicationId());
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
|
|
- event.getQueue());
|
|
|
- eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
|
|
|
- .getApplicationPriority().getPriority());
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- tEvent.setEventInfo(eventInfo);
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private void publishApplicationACLsUpdatedEvent(
|
|
|
- ApplicationACLsUpdatedEvent event) {
|
|
|
- TimelineEntity entity =
|
|
|
- createApplicationEntity(event.getApplicationId());
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
|
|
- event.getViewAppACLs());
|
|
|
- entity.setOtherInfo(entityInfo);
|
|
|
- tEvent.setEventType(
|
|
|
- ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private static TimelineEntity createApplicationEntity(
|
|
|
- ApplicationId applicationId) {
|
|
|
- TimelineEntity entity = new TimelineEntity();
|
|
|
- entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
|
|
|
- entity.setEntityId(applicationId.toString());
|
|
|
- return entity;
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean isPublishContainerMetrics() {
|
|
|
+ return publishContainerMetrics;
|
|
|
}
|
|
|
|
|
|
- private void
|
|
|
- publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
|
|
|
- TimelineEntity entity =
|
|
|
- createAppAttemptEntity(event.getApplicationAttemptId());
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(
|
|
|
- AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(
|
|
|
- AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
|
|
|
- event.getTrackingUrl());
|
|
|
- eventInfo.put(
|
|
|
- AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
|
|
|
- event.getOriginalTrackingURL());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
|
|
|
- event.getHost());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
|
|
|
- event.getRpcPort());
|
|
|
- if (event.getMasterContainerId() != null) {
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
|
|
|
- event.getMasterContainerId().toString());
|
|
|
- }
|
|
|
- tEvent.setEventInfo(eventInfo);
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
|
|
|
- TimelineEntity entity =
|
|
|
- createAppAttemptEntity(event.getApplicationAttemptId());
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(
|
|
|
- AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
|
|
|
- event.getTrackingUrl());
|
|
|
- eventInfo.put(
|
|
|
- AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
|
|
|
- event.getOriginalTrackingURL());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
- event.getDiagnosticsInfo());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
|
|
|
- event.getFinalApplicationStatus().toString());
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
|
|
|
- event.getYarnApplicationAttemptState().toString());
|
|
|
- if (event.getMasterContainerId() != null) {
|
|
|
- eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
|
|
|
- event.getMasterContainerId().toString());
|
|
|
- }
|
|
|
- tEvent.setEventInfo(eventInfo);
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private static TimelineEntity createAppAttemptEntity(
|
|
|
- ApplicationAttemptId appAttemptId) {
|
|
|
- TimelineEntity entity = new TimelineEntity();
|
|
|
- entity.setEntityType(
|
|
|
- AppAttemptMetricsConstants.ENTITY_TYPE);
|
|
|
- entity.setEntityId(appAttemptId.toString());
|
|
|
- entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
|
|
|
- appAttemptId.getApplicationId().toString());
|
|
|
- return entity;
|
|
|
- }
|
|
|
-
|
|
|
- private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
|
|
|
- TimelineEntity entity = createContainerEntity(event.getContainerId());
|
|
|
- Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
|
|
|
- event.getAllocatedResource().getMemory());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
|
|
|
- event.getAllocatedResource().getVirtualCores());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
|
|
|
- event.getAllocatedNode().getHost());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
|
|
|
- event.getAllocatedNode().getPort());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
|
|
|
- event.getAllocatedPriority().getPriority());
|
|
|
- entityInfo.put(
|
|
|
- ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
|
|
|
- event.getNodeHttpAddress());
|
|
|
- entity.setOtherInfo(entityInfo);
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private void publishContainerFinishedEvent(ContainerFinishedEvent event) {
|
|
|
- TimelineEntity entity = createContainerEntity(event.getContainerId());
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- Map<String, Object> eventInfo = new HashMap<String, Object>();
|
|
|
- eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
|
|
|
- event.getDiagnosticsInfo());
|
|
|
- eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
|
|
|
- event.getContainerExitStatus());
|
|
|
- eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
|
|
|
- event.getContainerState().toString());
|
|
|
- tEvent.setEventInfo(eventInfo);
|
|
|
- entity.addEvent(tEvent);
|
|
|
- putEntity(entity);
|
|
|
- }
|
|
|
-
|
|
|
- private static TimelineEntity createContainerEntity(
|
|
|
- ContainerId containerId) {
|
|
|
- TimelineEntity entity = new TimelineEntity();
|
|
|
- entity.setEntityType(
|
|
|
- ContainerMetricsConstants.ENTITY_TYPE);
|
|
|
- entity.setEntityId(containerId.toString());
|
|
|
- entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
|
|
|
- containerId.getApplicationAttemptId().toString());
|
|
|
- return entity;
|
|
|
+ @VisibleForTesting
|
|
|
+ Dispatcher getDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
}
|
|
|
|
|
|
- private void putEntity(TimelineEntity entity) {
|
|
|
- try {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Publishing the entity " + entity.getEntityId() +
|
|
|
- ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
|
|
- }
|
|
|
- TimelinePutResponse response = client.putEntities(entity);
|
|
|
- List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
|
|
|
- if (errors.size() == 0) {
|
|
|
- LOG.debug("Timeline entities are successfully put");
|
|
|
- } else {
|
|
|
- for (TimelinePutResponse.TimelinePutError error : errors) {
|
|
|
- LOG.error(
|
|
|
- "Error when publishing entity [" + error.getEntityType() + ","
|
|
|
- + error.getEntityId() + "], server side error code: "
|
|
|
- + error.getErrorCode());
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
|
|
|
- + entity.getEntityId() + "]", e);
|
|
|
- }
|
|
|
+ interface TimelineServicePublisher extends Service {
|
|
|
+ /**
|
|
|
+ * @return the Dispatcher which needs to be used to dispatch events
|
|
|
+ */
|
|
|
+ Dispatcher getDispatcher();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if RMContainerMetricsNeeds to be sent
|
|
|
+ */
|
|
|
+ boolean publishRMContainerMetrics();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return EventHandler which needs to be registered to the dispatcher to
|
|
|
+ * handle the SystemMetricsEvent
|
|
|
+ */
|
|
|
+ EventHandler<SystemMetricsEvent> getEventHandler();
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * EventHandler implementation which forward events to SystemMetricsPublisher.
|
|
|
- * Making use of it, SystemMetricsPublisher can avoid to have a public handle
|
|
|
- * method.
|
|
|
- */
|
|
|
- private final class ForwardingEventHandler implements
|
|
|
- EventHandler<SystemMetricsEvent> {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void handle(SystemMetricsEvent event) {
|
|
|
- handleSystemMetricsEvent(event);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
- protected static class MultiThreadedDispatcher extends CompositeService
|
|
|
- implements Dispatcher {
|
|
|
-
|
|
|
- private List<AsyncDispatcher> dispatchers =
|
|
|
- new ArrayList<AsyncDispatcher>();
|
|
|
-
|
|
|
- public MultiThreadedDispatcher(int num) {
|
|
|
- super(MultiThreadedDispatcher.class.getName());
|
|
|
- for (int i = 0; i < num; ++i) {
|
|
|
- AsyncDispatcher dispatcher = createDispatcher();
|
|
|
- dispatchers.add(dispatcher);
|
|
|
- addIfService(dispatcher);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public EventHandler getEventHandler() {
|
|
|
- return new CompositEventHandler();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void register(Class<? extends Enum> eventType, EventHandler handler) {
|
|
|
- for (AsyncDispatcher dispatcher : dispatchers) {
|
|
|
- dispatcher.register(eventType, handler);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void setDrainEventsOnStop() {
|
|
|
- for (AsyncDispatcher dispatcher : dispatchers) {
|
|
|
- dispatcher.setDrainEventsOnStop();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private class CompositEventHandler implements EventHandler<Event> {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void handle(Event event) {
|
|
|
- // Use hashCode (of ApplicationId) to dispatch the event to the child
|
|
|
- // dispatcher, such that all the writing events of one application will
|
|
|
- // be handled by one thread, the scheduled order of the these events
|
|
|
- // will be preserved
|
|
|
- int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
|
|
|
- dispatchers.get(index).getEventHandler().handle(event);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- protected AsyncDispatcher createDispatcher() {
|
|
|
- return new AsyncDispatcher();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
}
|