|
@@ -24,6 +24,7 @@ import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -88,6 +89,8 @@ public class NMTimelinePublisher extends CompositeService {
|
|
|
|
|
|
private final Map<ApplicationId, TimelineV2Client> appToClientMap;
|
|
|
|
|
|
+ private boolean publishNMContainerEvents = true;
|
|
|
+
|
|
|
public NMTimelinePublisher(Context context) {
|
|
|
super(NMTimelinePublisher.class.getName());
|
|
|
this.context = context;
|
|
@@ -110,6 +113,10 @@ public class NMTimelinePublisher extends CompositeService {
|
|
|
if (webAppURLWithoutScheme.contains(":")) {
|
|
|
httpPort = webAppURLWithoutScheme.split(":")[1];
|
|
|
}
|
|
|
+
|
|
|
+ publishNMContainerEvents = conf.getBoolean(
|
|
|
+ YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED);
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -155,31 +162,148 @@ public class NMTimelinePublisher extends CompositeService {
|
|
|
|
|
|
public void reportContainerResourceUsage(Container container, Long pmemUsage,
|
|
|
Float cpuUsagePercentPerCore) {
|
|
|
- if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
|
|
|
- cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
|
- ContainerEntity entity =
|
|
|
- createContainerEntity(container.getContainerId());
|
|
|
- long currentTimeMillis = System.currentTimeMillis();
|
|
|
- if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
|
- TimelineMetric memoryMetric = new TimelineMetric();
|
|
|
- memoryMetric.setId(ContainerMetric.MEMORY.toString());
|
|
|
- memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
|
|
- memoryMetric.addValue(currentTimeMillis, pmemUsage);
|
|
|
- entity.addMetric(memoryMetric);
|
|
|
- }
|
|
|
- if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
|
- TimelineMetric cpuMetric = new TimelineMetric();
|
|
|
- cpuMetric.setId(ContainerMetric.CPU.toString());
|
|
|
- // TODO: support average
|
|
|
- cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
|
|
- cpuMetric.addValue(currentTimeMillis,
|
|
|
- Math.round(cpuUsagePercentPerCore));
|
|
|
- entity.addMetric(cpuMetric);
|
|
|
+ if (publishNMContainerEvents) {
|
|
|
+ if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE
|
|
|
+ || cpuUsagePercentPerCore !=
|
|
|
+ ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
|
+ ContainerEntity entity =
|
|
|
+ createContainerEntity(container.getContainerId());
|
|
|
+ long currentTimeMillis = System.currentTimeMillis();
|
|
|
+ if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
|
+ TimelineMetric memoryMetric = new TimelineMetric();
|
|
|
+ memoryMetric.setId(ContainerMetric.MEMORY.toString());
|
|
|
+ memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
|
|
+ memoryMetric.addValue(currentTimeMillis, pmemUsage);
|
|
|
+ entity.addMetric(memoryMetric);
|
|
|
+ }
|
|
|
+ if (cpuUsagePercentPerCore !=
|
|
|
+ ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
|
+ TimelineMetric cpuMetric = new TimelineMetric();
|
|
|
+ cpuMetric.setId(ContainerMetric.CPU.toString());
|
|
|
+ // TODO: support average
|
|
|
+ cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
|
|
+ cpuMetric.addValue(currentTimeMillis,
|
|
|
+ Math.round(cpuUsagePercentPerCore));
|
|
|
+ entity.addMetric(cpuMetric);
|
|
|
+ }
|
|
|
+ entity.setIdPrefix(TimelineServiceHelper.
|
|
|
+ invertLong(container.getContainerStartTime()));
|
|
|
+ ApplicationId appId = container.getContainerId().
|
|
|
+ getApplicationAttemptId().getApplicationId();
|
|
|
+ try {
|
|
|
+ // no need to put it as part of publisher as timeline client
|
|
|
+ // already has Queuing concept
|
|
|
+ TimelineV2Client timelineClient = getTimelineClient(appId);
|
|
|
+ if (timelineClient != null) {
|
|
|
+ timelineClient.putEntitiesAsync(entity);
|
|
|
+ } else {
|
|
|
+ LOG.error("Seems like client has been removed before the container"
|
|
|
+ + " metric could be published for " +
|
|
|
+ container.getContainerId());
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error(
|
|
|
+ "Failed to publish Container metrics for container " +
|
|
|
+ container.getContainerId());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Failed to publish Container metrics for container " +
|
|
|
+ container.getContainerId(), e);
|
|
|
+ }
|
|
|
+ } catch (YarnException e) {
|
|
|
+ LOG.error(
|
|
|
+ "Failed to publish Container metrics for container " +
|
|
|
+ container.getContainerId(), e.getMessage());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Failed to publish Container metrics for container " +
|
|
|
+ container.getContainerId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void publishContainerCreatedEvent(ContainerEvent event) {
|
|
|
+ if (publishNMContainerEvents) {
|
|
|
+ ContainerId containerId = event.getContainerID();
|
|
|
+ ContainerEntity entity = createContainerEntity(containerId);
|
|
|
+ Container container = context.getContainers().get(containerId);
|
|
|
+ Resource resource = container.getResource();
|
|
|
+
|
|
|
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
|
|
|
+ resource.getMemorySize());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
|
|
|
+ resource.getVirtualCores());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
|
|
|
+ nodeId.getHost());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
|
|
|
+ nodeId.getPort());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
|
|
|
+ container.getPriority().toString());
|
|
|
+ entityInfo.put(
|
|
|
+ ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
|
|
+ httpAddress);
|
|
|
+ entity.setInfo(entityInfo);
|
|
|
+
|
|
|
+ TimelineEvent tEvent = new TimelineEvent();
|
|
|
+ tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
+ tEvent.setTimestamp(event.getTimestamp());
|
|
|
+
|
|
|
+ long containerStartTime = container.getContainerStartTime();
|
|
|
+ entity.addEvent(tEvent);
|
|
|
+ entity.setCreatedTime(containerStartTime);
|
|
|
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
|
|
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
|
|
+ containerId.getApplicationAttemptId().getApplicationId()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void publishContainerFinishedEvent(ContainerStatus containerStatus,
|
|
|
+ long containerFinishTime, long containerStartTime) {
|
|
|
+ if (publishNMContainerEvents) {
|
|
|
+ ContainerId containerId = containerStatus.getContainerId();
|
|
|
+ TimelineEntity entity = createContainerEntity(containerId);
|
|
|
+
|
|
|
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
+ entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
|
|
+ containerStatus.getDiagnostics());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
|
|
|
+ containerStatus.getExitStatus());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.STATE_INFO,
|
|
|
+ ContainerState.COMPLETE.toString());
|
|
|
+ entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
|
|
|
+ containerFinishTime);
|
|
|
+ entity.setInfo(entityInfo);
|
|
|
+
|
|
|
+ TimelineEvent tEvent = new TimelineEvent();
|
|
|
+ tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
+ tEvent.setTimestamp(containerFinishTime);
|
|
|
+ entity.addEvent(tEvent);
|
|
|
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
|
|
+
|
|
|
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
|
|
+ containerId.getApplicationAttemptId().getApplicationId()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void publishContainerLocalizationEvent(
|
|
|
+ ContainerLocalizationEvent event, String eventType) {
|
|
|
+ if (publishNMContainerEvents) {
|
|
|
+ Container container = event.getContainer();
|
|
|
+ ContainerId containerId = container.getContainerId();
|
|
|
+ TimelineEntity entity = createContainerEntity(containerId);
|
|
|
+
|
|
|
+ TimelineEvent tEvent = new TimelineEvent();
|
|
|
+ tEvent.setId(eventType);
|
|
|
+ tEvent.setTimestamp(event.getTimestamp());
|
|
|
+ entity.addEvent(tEvent);
|
|
|
entity.setIdPrefix(TimelineServiceHelper.
|
|
|
invertLong(container.getContainerStartTime()));
|
|
|
- ApplicationId appId = container.getContainerId().getApplicationAttemptId()
|
|
|
- .getApplicationId();
|
|
|
+
|
|
|
+ ApplicationId appId = container.getContainerId().
|
|
|
+ getApplicationAttemptId().getApplicationId();
|
|
|
try {
|
|
|
// no need to put it as part of publisher as timeline client already has
|
|
|
// Queuing concept
|
|
@@ -187,8 +311,8 @@ public class NMTimelinePublisher extends CompositeService {
|
|
|
if (timelineClient != null) {
|
|
|
timelineClient.putEntitiesAsync(entity);
|
|
|
} else {
|
|
|
- LOG.error("Seems like client has been removed before the container"
|
|
|
- + " metric could be published for " + container.getContainerId());
|
|
|
+ LOG.error("Seems like client has been removed before the event"
|
|
|
+ + " could be published for " + container.getContainerId());
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Failed to publish Container metrics for container "
|
|
@@ -208,110 +332,6 @@ public class NMTimelinePublisher extends CompositeService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private void publishContainerCreatedEvent(ContainerEvent event) {
|
|
|
- ContainerId containerId = event.getContainerID();
|
|
|
- ContainerEntity entity = createContainerEntity(containerId);
|
|
|
- Container container = context.getContainers().get(containerId);
|
|
|
- Resource resource = container.getResource();
|
|
|
-
|
|
|
- Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
|
|
|
- resource.getMemorySize());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
|
|
|
- resource.getVirtualCores());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
|
|
|
- nodeId.getHost());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
|
|
|
- nodeId.getPort());
|
|
|
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
|
|
|
- container.getPriority().toString());
|
|
|
- entityInfo.put(
|
|
|
- ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
|
|
- httpAddress);
|
|
|
- entity.setInfo(entityInfo);
|
|
|
-
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
-
|
|
|
- long containerStartTime = container.getContainerStartTime();
|
|
|
- entity.addEvent(tEvent);
|
|
|
- entity.setCreatedTime(containerStartTime);
|
|
|
- entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
|
|
- dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
|
|
- containerId.getApplicationAttemptId().getApplicationId()));
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private void publishContainerFinishedEvent(ContainerStatus containerStatus,
|
|
|
- long containerFinishTime, long containerStartTime) {
|
|
|
- ContainerId containerId = containerStatus.getContainerId();
|
|
|
- TimelineEntity entity = createContainerEntity(containerId);
|
|
|
-
|
|
|
- Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
|
- entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
|
|
- containerStatus.getDiagnostics());
|
|
|
- entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
|
|
|
- containerStatus.getExitStatus());
|
|
|
- entityInfo.put(ContainerMetricsConstants.STATE_INFO,
|
|
|
- ContainerState.COMPLETE.toString());
|
|
|
- entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
|
|
|
- containerFinishTime);
|
|
|
- entity.setInfo(entityInfo);
|
|
|
-
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
- tEvent.setTimestamp(containerFinishTime);
|
|
|
- entity.addEvent(tEvent);
|
|
|
- entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
|
|
-
|
|
|
- dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
|
|
- containerId.getApplicationAttemptId().getApplicationId()));
|
|
|
- }
|
|
|
-
|
|
|
- private void publishContainerLocalizationEvent(
|
|
|
- ContainerLocalizationEvent event, String eventType) {
|
|
|
- Container container = event.getContainer();
|
|
|
- ContainerId containerId = container.getContainerId();
|
|
|
- TimelineEntity entity = createContainerEntity(containerId);
|
|
|
-
|
|
|
- TimelineEvent tEvent = new TimelineEvent();
|
|
|
- tEvent.setId(eventType);
|
|
|
- tEvent.setTimestamp(event.getTimestamp());
|
|
|
- entity.addEvent(tEvent);
|
|
|
- entity.setIdPrefix(TimelineServiceHelper.
|
|
|
- invertLong(container.getContainerStartTime()));
|
|
|
-
|
|
|
- ApplicationId appId =
|
|
|
- container.getContainerId().getApplicationAttemptId().getApplicationId();
|
|
|
- try {
|
|
|
- // no need to put it as part of publisher as timeline client already has
|
|
|
- // Queuing concept
|
|
|
- TimelineV2Client timelineClient = getTimelineClient(appId);
|
|
|
- if (timelineClient != null) {
|
|
|
- timelineClient.putEntitiesAsync(entity);
|
|
|
- } else {
|
|
|
- LOG.error("Seems like client has been removed before the event could be"
|
|
|
- + " published for " + container.getContainerId());
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Failed to publish Container metrics for container "
|
|
|
- + container.getContainerId());
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Failed to publish Container metrics for container "
|
|
|
- + container.getContainerId(), e);
|
|
|
- }
|
|
|
- } catch (YarnException e) {
|
|
|
- LOG.error("Failed to publish Container metrics for container "
|
|
|
- + container.getContainerId(), e.getMessage());
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Failed to publish Container metrics for container "
|
|
|
- + container.getContainerId(), e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private static ContainerEntity createContainerEntity(
|
|
|
ContainerId containerId) {
|
|
|
ContainerEntity entity = new ContainerEntity();
|