Browse Source

YARN-5018. Online aggregation logic should not run immediately after collectors got started (Li Lu via sjlee)

Sangjin Lee 9 years ago
parent
commit
a1b6d7456f

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java

@@ -49,7 +49,7 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
   }
 
   @Override
-  public void postPut(ApplicationId appId, TimelineCollector collector) {
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
     RMApp app = rmContext.getRMApps().get(appId);
     if (app == null) {
       throw new YarnRuntimeException(

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import com.google.common.base.Preconditions;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
         new ThreadFactoryBuilder()
             .setNameFormat("TimelineCollector Aggregation thread #%d")
             .build());
-    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0,
+    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(),
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
         AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
         TimeUnit.SECONDS);
     super.serviceStart();
@@ -126,10 +128,21 @@ public class AppLevelTimelineCollector extends TimelineCollector {
       if (LOG.isDebugEnabled()) {
         LOG.debug("App-level real-time aggregating");
       }
+      if (!isReadyToAggregate()) {
+        LOG.warn("App-level collector is not ready, skip aggregation. ");
+        return;
+      }
       try {
         TimelineCollectorContext currContext = getTimelineEntityContext();
+        Map<String, AggregationStatusTable> aggregationGroups
+            = getAggregationGroups();
+        if (aggregationGroups == null
+            || aggregationGroups.isEmpty()) {
+          LOG.debug("App-level collector is empty, skip aggregation. ");
+          return;
+        }
         TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
-            getAggregationGroups(), currContext.getAppId(),
+            aggregationGroups, currContext.getAppId(),
             TimelineEntityType.YARN_APPLICATION.toString());
         TimelineEntities entities = new TimelineEntities();
         entities.addEntity(resultEntity);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java

@@ -87,7 +87,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
   }
 
   @Override
-  public void postPut(ApplicationId appId, TimelineCollector collector) {
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
     try {
       // Get context info from NM
       updateTimelineCollectorContext(appId, collector);

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

@@ -60,6 +60,8 @@ public abstract class TimelineCollector extends CompositeService {
   private static Set<String> entityTypesSkipAggregation
       = new HashSet<>();
 
+  private volatile boolean readyToAggregate = false;
+
   public TimelineCollector(String name) {
     super(name);
   }
@@ -91,6 +93,14 @@ public abstract class TimelineCollector extends CompositeService {
     return aggregationGroups;
   }
 
+  protected void setReadyToAggregate() {
+    readyToAggregate = true;
+  }
+
+  protected boolean isReadyToAggregate() {
+    return readyToAggregate;
+  }
+
   /**
    * Method to decide the set of timeline entity types the collector should
    * skip on aggregations. Subclasses may want to override this method to
@@ -258,7 +268,7 @@ public abstract class TimelineCollector extends CompositeService {
 
   // Note: In memory aggregation is performed in an eventually consistent
   // fashion.
-  private static class AggregationStatusTable {
+  protected static class AggregationStatusTable {
     // On aggregation, for each metric, aggregate all per-entity accumulated
     // metrics. We only use the id and type for TimelineMetrics in the key set
     // of this table.

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java

@@ -136,8 +136,24 @@ public class TimelineCollectorManager extends AbstractService {
     return collectorInTable;
   }
 
-  protected void postPut(ApplicationId appId, TimelineCollector collector) {
+  /**
+   * Callback handler for the timeline collector manager when a collector has
+   * been added into the collector map.
+   * @param appId Application id of the collector.
+   * @param collector The actual timeline collector that has been added.
+   */
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    doPostPut(appId, collector);
+    collector.setReadyToAggregate();
+  }
 
+  /**
+   * A template method that will be called by
+   * {@link  #postPut(ApplicationId, TimelineCollector)}.
+   * @param appId Application id of the collector.
+   * @param collector The actual timeline collector that has been added.
+   */
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
   }
 
   /**