Преглед на файлове

YARN-5747. Application timeline metric aggregation in timeline v2 will lose last round aggregation when an application finishes (Li Lu via Varun Saxena)

Varun Saxena преди 8 години
родител
ревизия
44eb2bd7ae

+ 11 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java

@@ -58,6 +58,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
   private final ApplicationId appId;
   private final TimelineCollectorContext context;
   private ScheduledThreadPoolExecutor appAggregationExecutor;
+  private AppLevelAggregator appAggregator;
 
   public AppLevelTimelineCollector(ApplicationId appId) {
     super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
@@ -94,7 +95,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
         new ThreadFactoryBuilder()
             .setNameFormat("TimelineCollector Aggregation thread #%d")
             .build());
-    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(),
+    appAggregator = new AppLevelAggregator();
+    appAggregationExecutor.scheduleAtFixedRate(appAggregator,
         AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
         AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
         TimeUnit.SECONDS);
@@ -108,6 +110,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
       LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
       appAggregationExecutor.shutdownNow();
     }
+    // Perform one round of aggregation after the aggregation executor is done.
+    appAggregator.aggregate();
     super.serviceStop();
   }
 
@@ -123,8 +127,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
 
   private class AppLevelAggregator implements Runnable {
 
-    @Override
-    public void run() {
+    private void aggregate() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("App-level real-time aggregating");
       }
@@ -156,6 +159,11 @@ public class AppLevelTimelineCollector extends TimelineCollector {
         LOG.debug("App-level real-time aggregation complete");
       }
     }
+
+    @Override
+    public void run() {
+      aggregate();
+    }
   }
 
 }