|
@@ -18,27 +18,17 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|
|
|
|
|
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.util.HashSet;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Set;
|
|
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Service that handles writes to the timeline service and writes them to the
|
|
* Service that handles writes to the timeline service and writes them to the
|
|
* backing storage for a given YARN application.
|
|
* backing storage for a given YARN application.
|
|
@@ -51,15 +41,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(TimelineCollector.class);
|
|
LoggerFactory.getLogger(TimelineCollector.class);
|
|
|
|
|
|
- private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
|
|
|
|
- private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
|
|
|
|
- private static Set<String> entityTypesSkipAggregation
|
|
|
|
- = initializeSkipSet();
|
|
|
|
-
|
|
|
|
private final ApplicationId appId;
|
|
private final ApplicationId appId;
|
|
private final TimelineCollectorContext context;
|
|
private final TimelineCollectorContext context;
|
|
- private ScheduledThreadPoolExecutor appAggregationExecutor;
|
|
|
|
- private AppLevelAggregator appAggregator;
|
|
|
|
private UserGroupInformation currentUser;
|
|
private UserGroupInformation currentUser;
|
|
|
|
|
|
public AppLevelTimelineCollector(ApplicationId appId) {
|
|
public AppLevelTimelineCollector(ApplicationId appId) {
|
|
@@ -69,12 +52,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|
context = new TimelineCollectorContext();
|
|
context = new TimelineCollectorContext();
|
|
}
|
|
}
|
|
|
|
|
|
- private static Set<String> initializeSkipSet() {
|
|
|
|
- Set<String> result = new HashSet<>();
|
|
|
|
- result.add(TimelineEntityType.YARN_APPLICATION.toString());
|
|
|
|
- result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
|
|
|
|
- result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
|
|
|
|
- return result;
|
|
|
|
|
|
+ public UserGroupInformation getCurrentUser() {
|
|
|
|
+ return currentUser;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -92,29 +71,11 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void serviceStart() throws Exception {
|
|
protected void serviceStart() throws Exception {
|
|
- // Launch the aggregation thread
|
|
|
|
- appAggregationExecutor = new ScheduledThreadPoolExecutor(
|
|
|
|
- AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
|
|
|
|
- new ThreadFactoryBuilder()
|
|
|
|
- .setNameFormat("TimelineCollector Aggregation thread #%d")
|
|
|
|
- .build());
|
|
|
|
- appAggregator = new AppLevelAggregator();
|
|
|
|
- appAggregationExecutor.scheduleAtFixedRate(appAggregator,
|
|
|
|
- AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
|
|
|
|
- AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
|
|
|
|
- TimeUnit.SECONDS);
|
|
|
|
super.serviceStart();
|
|
super.serviceStart();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void serviceStop() throws Exception {
|
|
protected void serviceStop() throws Exception {
|
|
- appAggregationExecutor.shutdown();
|
|
|
|
- if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
|
|
|
- LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
|
|
|
|
- appAggregationExecutor.shutdownNow();
|
|
|
|
- }
|
|
|
|
- // Perform one round of aggregation after the aggregation executor is done.
|
|
|
|
- appAggregator.aggregate();
|
|
|
|
super.serviceStop();
|
|
super.serviceStop();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -123,48 +84,4 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|
return context;
|
|
return context;
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- protected Set<String> getEntityTypesSkipAggregation() {
|
|
|
|
- return entityTypesSkipAggregation;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private class AppLevelAggregator implements Runnable {
|
|
|
|
-
|
|
|
|
- private void aggregate() {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("App-level real-time aggregating");
|
|
|
|
- }
|
|
|
|
- if (!isReadyToAggregate()) {
|
|
|
|
- LOG.warn("App-level collector is not ready, skip aggregation. ");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- TimelineCollectorContext currContext = getTimelineEntityContext();
|
|
|
|
- Map<String, AggregationStatusTable> aggregationGroups
|
|
|
|
- = getAggregationGroups();
|
|
|
|
- if (aggregationGroups == null
|
|
|
|
- || aggregationGroups.isEmpty()) {
|
|
|
|
- LOG.debug("App-level collector is empty, skip aggregation. ");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
|
|
|
|
- aggregationGroups, currContext.getAppId(),
|
|
|
|
- TimelineEntityType.YARN_APPLICATION.toString());
|
|
|
|
- TimelineEntities entities = new TimelineEntities();
|
|
|
|
- entities.addEntity(resultEntity);
|
|
|
|
- putEntitiesAsync(entities, currentUser);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.error("Error aggregating timeline metrics", e);
|
|
|
|
- }
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("App-level real-time aggregation complete");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- aggregate();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
}
|
|
}
|