Browse Source

AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)

Aravindan Vijayan 9 years ago
parent
commit
14a4f97082
29 changed files with 1134 additions and 198 deletions
  1. 23 0
      ambari-metrics/ambari-metrics-timelineservice/pom.xml
  2. 65 22
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  3. 46 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
  4. 6 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
  5. 79 28
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
  6. 22 4
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
  7. 64 36
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
  8. 6 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
  9. 6 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
  10. 8 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
  11. 8 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
  12. 7 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
  13. 144 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
  14. 98 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
  15. 69 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
  16. 276 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
  17. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
  18. 12 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
  19. 6 6
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
  20. 5 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
  21. 3 3
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
  22. 10 10
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
  23. 4 15
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
  24. 7 7
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
  25. 107 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
  26. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
  27. 3 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
  28. 1 1
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
  29. 47 49
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java

+ 23 - 0
ambari-metrics/ambari-metrics-timelineservice/pom.xml

@@ -268,6 +268,25 @@
   </build>
 
   <dependencies>
+
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+      <version>0.6.5</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <artifactId>zookeeper</artifactId>
+      <groupId>org.apache.zookeeper</groupId>
+      <version>3.4.8</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-core</artifactId>
@@ -589,6 +608,10 @@
             <groupId>org.jruby</groupId>
             <artifactId>jruby-complete</artifactId>
           </exclusion>
+          <exclusion>
+            <artifactId>zookeeper</artifactId>
+            <groupId>org.apache.zookeeper</groupId>
+          </exclusion>
         </exclusions>
       </dependency>
     <dependency>

+ 65 - 22
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
@@ -43,7 +45,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
-
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -56,10 +57,13 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.ThreadFactory;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
 public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
 
@@ -67,9 +71,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
   private final TimelineMetricConfiguration configuration;
   private PhoenixHBaseAccessor hBaseAccessor;
   private static volatile boolean isInitialized = false;
-  private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+  private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
+  private final Map<AggregationTaskRunner.AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
   private TimelineMetricMetadataManager metricMetadataManager;
   private Integer defaultTopNHostsLimit;
+  private TimelineMetricHAController haController;
 
   /**
    * Construct the service.
@@ -97,6 +103,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       metricMetadataManager.initializeMetadata();
       // Initialize policies before TTL update
       hBaseAccessor.initPoliciesAndTTL();
+      // Start HA service
+      if (configuration.isDistributedOperationModeEnabled()) {
+        // Start the controller
+        haController = new TimelineMetricHAController(configuration);
+        try {
+          haController.initializeHAController();
+        } catch (Exception e) {
+          LOG.error(e);
+          throw new MetricsSystemInitializationException("Unable to " +
+            "initialize HA controller", e);
+        }
+      }
 
       String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, "");
       if (!StringUtils.isEmpty(whitelistFile)) {
@@ -110,44 +128,51 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
 
       // Start the cluster aggregator second
       TimelineMetricAggregator secondClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
-      scheduleAggregatorThread(secondClusterAggregator);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+          hBaseAccessor, metricsConf, metricMetadataManager, haController);
+      scheduleAggregatorThread(secondClusterAggregator, metricsConf);
 
       // Start the minute cluster aggregator
       TimelineMetricAggregator minuteClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(minuteClusterAggregator);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(minuteClusterAggregator, metricsConf);
 
       // Start the hourly cluster aggregator
       TimelineMetricAggregator hourlyClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(hourlyClusterAggregator);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(hourlyClusterAggregator, metricsConf);
 
       // Start the daily cluster aggregator
       TimelineMetricAggregator dailyClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(dailyClusterAggregator);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(dailyClusterAggregator, metricsConf);
 
       // Start the minute host aggregator
       TimelineMetricAggregator minuteHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(minuteHostAggregator);
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(minuteHostAggregator, metricsConf);
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(hourlyHostAggregator);
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(hourlyHostAggregator, metricsConf);
 
       // Start the daily host aggregator
       TimelineMetricAggregator dailyHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(dailyHostAggregator);
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(dailyHostAggregator, metricsConf);
 
       if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
         int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
         int delay = configuration.getTimelineMetricsServiceWatcherDelay();
         // Start the watchdog
-        executorService.scheduleWithFixedDelay(
+        watchdogExecutorService.scheduleWithFixedDelay(
           new TimelineMetricStoreWatcher(this, configuration), initDelay, delay,
           TimeUnit.SECONDS);
         LOG.info("Started watchdog for timeline metrics store with initial " +
@@ -357,13 +382,31 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
     return metricMetadataManager.getHostedAppsCache();
   }
 
-  private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) {
-    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+  @Override
+  public List<String> getLiveInstances() {
+    return haController.getLiveInstanceHostNames();
+  }
+
+  private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator,
+                                        Configuration metricsConf) {
     if (!aggregator.isDisabled()) {
+      ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
+          }
+        }
+      );
+      scheduledExecutors.put(aggregator.getName(), executorService);
       executorService.scheduleAtFixedRate(aggregator,
-        0l,
+        SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)),
         aggregator.getSleepIntervalMillis(),
         TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+        + aggregator.getSleepIntervalMillis() + " milliseconds.");
+    } else {
+      LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
     }
   }
 }

+ 46 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java

@@ -23,9 +23,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
+import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.UnknownHostException;
 
 /**
  * Configuration class that reads properties from ams-site.xml. All values
@@ -38,6 +40,7 @@ public class TimelineMetricConfiguration {
 
   public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
   public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+  public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml";
 
   public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
     "timeline.metrics.aggregator.checkpoint.dir";
@@ -251,8 +254,11 @@ public class TimelineMetricConfiguration {
 
   public static final String HOST_APP_ID = "HOST";
 
+  public static final String DEFAULT_INSTANCE_PORT = "12001";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
+  private Configuration amsEnvConf;
   private volatile boolean isInitialized = false;
 
   public void initialize() throws URISyntaxException, MalformedURLException {
@@ -279,6 +285,7 @@ public class TimelineMetricConfiguration {
     hbaseConf.addResource(hbaseResUrl.toURI().toURL());
     metricsConf = new Configuration(true);
     metricsConf.addResource(amsResUrl.toURI().toURL());
+
     isInitialized = true;
   }
 
@@ -296,6 +303,37 @@ public class TimelineMetricConfiguration {
     return metricsConf;
   }
 
+  public String getZKClientPort() throws MalformedURLException, URISyntaxException {
+    if (!isInitialized) {
+      initialize();
+    }
+    return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181");
+  }
+
+  public String getZKQuorum() throws MalformedURLException, URISyntaxException {
+    if (!isInitialized) {
+      initialize();
+    }
+    return hbaseConf.getTrimmed("hbase.zookeeper.quorum");
+  }
+
+  public String getInstanceHostnameFromEnv() throws UnknownHostException {
+    String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME");
+    if (amsInstanceName == null) {
+      amsInstanceName = InetAddress.getLocalHost().getHostName();
+    }
+    return amsInstanceName;
+  }
+
+  public String getInstancePort() throws MalformedURLException, URISyntaxException {
+    String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT");
+    if (amsInstancePort == null) {
+      // Check config
+      return getMetricsConf().get("timeline.metrics.availability.instance.port", DEFAULT_INSTANCE_PORT);
+    }
+    return DEFAULT_INSTANCE_PORT;
+  }
+
   public String getWebappAddress() {
     String defaultHttpAddress = "0.0.0.0:6188";
     if (metricsConf != null) {
@@ -353,4 +391,12 @@ public class TimelineMetricConfiguration {
     }
     return defaultRpcAddress;
   }
+
+  public boolean isDistributedOperationModeEnabled() {
+    try {
+      return getMetricsConf().get("timeline.metrics.service.operation.mode").equals("distributed");
+    } catch (Exception e) {
+      return false;
+    }
+  }
 }

+ 6 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java

@@ -87,4 +87,10 @@ public interface TimelineMetricStore {
    * @throws IOException
    */
   Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException;
+
+  /**
+   * Return a list of known live collector nodes
+   * @return [ hostname ]
+   */
+  List<String> getLiveInstances();
 }

+ 79 - 28
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java

@@ -22,6 +22,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
@@ -40,6 +43,7 @@ import java.util.List;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
 /**
  * Base class for all runnable aggregators. Provides common functions like
@@ -58,13 +62,14 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   protected String tableName;
   protected String outputTableName;
   protected Long nativeTimeRangeDelay;
+  protected AggregationTaskRunner taskRunner;
   protected List<String> downsampleMetricPatterns;
   protected List<CustomDownSampler> configuredDownSamplers;
 
   // Explicitly name aggregators for logging needs
-  private final String aggregatorName;
+  private final AGGREGATOR_NAME aggregatorName;
 
-  AbstractTimelineAggregator(String aggregatorName,
+  AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
                              PhoenixHBaseAccessor hBaseAccessor,
                              Configuration metricsConf) {
     this.aggregatorName = aggregatorName;
@@ -72,12 +77,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     this.metricsConf = metricsConf;
     this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
     this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
-    this.LOG = LoggerFactory.getLogger(aggregatorName);
+    this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
     this.configuredDownSamplers = DownSamplerUtils.getDownSamplers(metricsConf);
     this.downsampleMetricPatterns = DownSamplerUtils.getDownsampleMetricPatterns(metricsConf);
   }
 
-  public AbstractTimelineAggregator(String aggregatorName,
+  public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
                                     PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf,
                                     String checkpointLocation,
@@ -86,7 +91,8 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
                                     String aggregatorDisableParam,
                                     String tableName,
                                     String outputTableName,
-                                    Long nativeTimeRangeDelay) {
+                                    Long nativeTimeRangeDelay,
+                                    TimelineMetricHAController haController) {
     this(aggregatorName, hBaseAccessor, metricsConf);
     this.checkpointLocation = checkpointLocation;
     this.sleepIntervalMillis = sleepIntervalMillis;
@@ -94,7 +100,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     this.aggregatorDisableParam = aggregatorDisableParam;
     this.tableName = tableName;
     this.outputTableName = outputTableName;
-    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
+    this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+    this.taskRunner = haController != null && haController.isInitialized() ?
+      haController.getAggregationTaskRunner() : null;
   }
 
   @Override
@@ -108,25 +116,39 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
    * Access relaxed for tests
    */
   public void runOnce(Long SLEEP_INTERVAL) {
+    boolean performAggregationFunction = true;
+    if (taskRunner != null) {
+      switch (getAggregatorType()) {
+        case HOST:
+          performAggregationFunction = taskRunner.performsHostAggregation();
+          break;
+        case CLUSTER:
+          performAggregationFunction = taskRunner.performsClusterAggregation();
+      }
+    }
 
-    long currentTime = System.currentTimeMillis();
-    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
-
-    if (lastCheckPointTime != -1) {
-      LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
-        + ((currentTime - lastCheckPointTime) / 1000)
-        + " seconds.");
-
-      boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+    if (performAggregationFunction) {
+      long currentTime = System.currentTimeMillis();
+      long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
 
-      if (success) {
-        try {
-          saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
-        } catch (IOException io) {
-          LOG.warn("Error saving checkpoint, restarting aggregation at " +
-            "previous checkpoint.");
+      if (lastCheckPointTime != -1) {
+        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+          + ((currentTime - lastCheckPointTime) / 1000)
+          + " seconds.");
+
+        boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+
+        if (success) {
+          try {
+            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+          } catch (IOException io) {
+            LOG.warn("Error saving checkpoint, restarting aggregation at " +
+              "previous checkpoint.");
+          }
         }
       }
+    } else {
+      LOG.info("Skipping aggregation function not owned by this instance.");
     }
   }
 
@@ -184,6 +206,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   }
 
   protected long readCheckPoint() {
+    if (taskRunner != null) {
+      return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName);
+    }
     try {
       File checkpoint = new File(getCheckpointLocation());
       if (checkpoint.exists()) {
@@ -199,15 +224,23 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   }
 
   protected void saveCheckPoint(long checkpointTime) throws IOException {
-    File checkpoint = new File(getCheckpointLocation());
-    if (!checkpoint.exists()) {
-      boolean done = checkpoint.createNewFile();
-      if (!done) {
-        throw new IOException("Could not create checkpoint at location, " +
-          getCheckpointLocation());
+    if (taskRunner != null) {
+      boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime);
+      if (!success) {
+        LOG.error("Error saving checkpoint with AggregationTaskRunner, " +
+          "aggregator = " + aggregatorName + "value = " + checkpointTime);
       }
+    } else {
+      File checkpoint = new File(getCheckpointLocation());
+      if (!checkpoint.exists()) {
+        boolean done = checkpoint.createNewFile();
+        if (!done) {
+          throw new IOException("Could not create checkpoint at location, " +
+            getCheckpointLocation());
+        }
+      }
+      FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
     }
-    FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
   }
 
   /**
@@ -364,6 +397,24 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     return currentTime - (currentTime % aggregatorPeriod);
   }
 
+  /**
+   * Get @AGGREGATOR_TYPE based on the output table.
+   * This is solely used by the HAController to determine which lock to acquire.
+   */
+  public AGGREGATOR_TYPE getAggregatorType() {
+    if (outputTableName.contains("RECORD")) {
+      return AGGREGATOR_TYPE.HOST;
+    } else if (outputTableName.contains("AGGREGATE")) {
+      return AGGREGATOR_TYPE.CLUSTER;
+    }
+    return null;
+  }
+
+  @Override
+  public AGGREGATOR_NAME getName() {
+    return aggregatorName;
+  }
+
   /**
    * Run 1 downsampler query.
    * @param conn

+ 22 - 4
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java

@@ -1,5 +1,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,22 +22,38 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 public interface TimelineMetricAggregator extends Runnable {
   /**
    * Aggregate metric data within the time bounds.
+   *
    * @param startTime start time millis
-   * @param endTime end time millis
+   * @param endTime   end time millis
    * @return success
    */
-  public boolean doWork(long startTime, long endTime);
+  boolean doWork(long startTime, long endTime);
 
   /**
    * Is aggregator is disabled by configuration.
+   *
    * @return true/false
    */
-  public boolean isDisabled();
+  boolean isDisabled();
 
   /**
    * Return aggregator Interval
+   *
    * @return Interval in Millis
    */
-  public Long getSleepIntervalMillis();
+  Long getSleepIntervalMillis();
+
+  /**
+   * Get aggregator name
+   * @return @AGGREGATOR_NAME
+   */
+  AGGREGATOR_NAME getName();
 
+  /**
+   * Known aggregator types
+   */
+  enum AGGREGATOR_TYPE {
+    CLUSTER,
+    HOST
   }
+}

+ 64 - 36
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -29,12 +30,12 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
@@ -48,6 +49,13 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -86,7 +94,8 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 5 mins
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+     TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -104,7 +113,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        "TimelineMetricHostAggregatorMinute",
+        METRIC_RECORD_MINUTE,
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -112,12 +121,13 @@ public class TimelineMetricAggregatorFactory {
         hostAggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l
+        120000l,
+        haController
       );
     }
 
     return new TimelineMetricHostAggregator(
-      "TimelineMetricHostAggregatorMinute",
+      METRIC_RECORD_MINUTE,
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -125,7 +135,8 @@ public class TimelineMetricAggregatorFactory {
       hostAggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l);
+      120000l,
+      haController);
   }
 
   /**
@@ -133,7 +144,8 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+     TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -151,7 +163,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        "TimelineMetricHostAggregatorHourly",
+        METRIC_RECORD_HOURLY,
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -159,12 +171,13 @@ public class TimelineMetricAggregatorFactory {
         hostAggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        3600000l
+        3600000l,
+        haController
       );
     }
 
     return new TimelineMetricHostAggregator(
-      "TimelineMetricHostAggregatorHourly",
+      METRIC_RECORD_HOURLY,
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -172,7 +185,8 @@ public class TimelineMetricAggregatorFactory {
       hostAggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      3600000l);
+      3600000l,
+      haController);
   }
 
   /**
@@ -180,7 +194,8 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+     TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -198,7 +213,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        "TimelineMetricHostAggregatorDaily",
+        METRIC_RECORD_DAILY,
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -206,12 +221,13 @@ public class TimelineMetricAggregatorFactory {
         hostAggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        3600000l
+        3600000l,
+        haController
       );
     }
 
     return new TimelineMetricHostAggregator(
-      "TimelineMetricHostAggregatorDaily",
+      METRIC_RECORD_DAILY,
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -219,7 +235,8 @@ public class TimelineMetricAggregatorFactory {
       hostAggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      3600000l);
+      3600000l,
+      haController);
   }
 
   /**
@@ -229,7 +246,8 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricMetadataManager metadataManager) {
+    TimelineMetricMetadataManager metadataManager,
+    TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -252,7 +270,7 @@ public class TimelineMetricAggregatorFactory {
 
     // Second based aggregation have added responsibility of time slicing
     return new TimelineMetricClusterAggregatorSecond(
-      "TimelineClusterAggregatorSecond",
+      METRIC_AGGREGATE_SECOND,
       metadataManager,
       hBaseAccessor, metricsConf,
       checkpointLocation,
@@ -262,7 +280,8 @@ public class TimelineMetricAggregatorFactory {
       inputTableName,
       outputTableName,
       120000l,
-      timeSliceIntervalMillis
+      timeSliceIntervalMillis,
+      haController
     );
   }
 
@@ -271,7 +290,8 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 5 mins
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -291,7 +311,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        "TimelineClusterAggregatorMinute",
+        METRIC_AGGREGATE_MINUTE,
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -299,12 +319,13 @@ public class TimelineMetricAggregatorFactory {
         aggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l
+        120000l,
+        haController
       );
     }
 
     return new TimelineMetricClusterAggregator(
-      "TimelineClusterAggregatorMinute",
+      METRIC_AGGREGATE_MINUTE,
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -312,7 +333,8 @@ public class TimelineMetricAggregatorFactory {
       aggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l
+      120000l,
+      haController
     );
   }
 
@@ -321,7 +343,8 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -341,7 +364,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        "TimelineClusterAggregatorHourly",
+        METRIC_AGGREGATE_HOURLY,
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -349,12 +372,13 @@ public class TimelineMetricAggregatorFactory {
         aggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l
+        120000l,
+        haController
       );
     }
 
     return new TimelineMetricClusterAggregator(
-      "TimelineClusterAggregatorHourly",
+      METRIC_AGGREGATE_HOURLY,
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -362,7 +386,8 @@ public class TimelineMetricAggregatorFactory {
       aggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l
+      120000l,
+      haController
     );
   }
 
@@ -371,7 +396,8 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -391,7 +417,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        "TimelineClusterAggregatorDaily",
+        METRIC_AGGREGATE_DAILY,
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -399,12 +425,13 @@ public class TimelineMetricAggregatorFactory {
         aggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l
+        120000l,
+        haController
       );
     }
 
     return new TimelineMetricClusterAggregator(
-      "TimelineClusterAggregatorDaily",
+      METRIC_AGGREGATE_DAILY,
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -412,7 +439,8 @@ public class TimelineMetricAggregatorFactory {
       aggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l
+      120000l,
+      haController
     );
   }
 }

+ 6 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -36,7 +38,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
   private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
   private final boolean isClusterPrecisionInputTable;
 
-  public TimelineMetricClusterAggregator(String aggregatorName,
+  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
                                          PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf,
                                          String checkpointLocation,
@@ -45,11 +47,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
                                          String hostAggregatorDisabledParam,
                                          String inputTableName,
                                          String outputTableName,
-                                         Long nativeTimeRangeDelay) {
+                                         Long nativeTimeRangeDelay,
+                                         TimelineMetricHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,
-      nativeTimeRangeDelay);
+      nativeTimeRangeDelay, haController);
     isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
   }
 

+ 6 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -62,7 +64,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
   private String skipAggrPatternStrings;
 
 
-  public TimelineMetricClusterAggregatorSecond(String aggregatorName,
+  public TimelineMetricClusterAggregatorSecond(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
                                                TimelineMetricMetadataManager metadataManager,
                                                PhoenixHBaseAccessor hBaseAccessor,
                                                Configuration metricsConf,
@@ -73,10 +75,11 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
                                                String tableName,
                                                String outputTableName,
                                                Long nativeTimeRangeDelay,
-                                               Long timeSliceInterval) {
+                                               Long timeSliceInterval,
+                                               TimelineMetricHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay);
+      tableName, outputTableName, nativeTimeRangeDelay, haController);
 
     this.metadataManagerInstance = metadataManager;
     appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);

+ 8 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java

@@ -22,20 +22,24 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
+
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 
 public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
   private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
   TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
-  public TimelineMetricHostAggregator(String aggregatorName,
+  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
                                       PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
                                       String checkpointLocation,
@@ -44,10 +48,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
                                       String hostAggregatorDisabledParam,
                                       String tableName,
                                       String outputTableName,
-                                      Long nativeTimeRangeDelay) {
+                                      Long nativeTimeRangeDelay,
+                                      TimelineMetricHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay);
+      tableName, outputTableName, nativeTimeRangeDelay, haController);
   }
 
   @Override

+ 8 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java

@@ -20,19 +20,23 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
+
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 
 public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
   private final String aggregateColumnName;
 
-  public TimelineMetricClusterAggregator(String aggregatorName,
+  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
                                          PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf,
                                          String checkpointLocation,
@@ -41,11 +45,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
                                          String hostAggregatorDisabledParam,
                                          String inputTableName,
                                          String outputTableName,
-                                         Long nativeTimeRangeDelay) {
+                                         Long nativeTimeRangeDelay,
+                                         TimelineMetricHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,
-      nativeTimeRangeDelay);
+      nativeTimeRangeDelay, haController);
 
     if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
       aggregateColumnName = "HOSTS_COUNT";

+ 7 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java

@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -31,7 +34,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 
 public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
 
-  public TimelineMetricHostAggregator(String aggregatorName,
+  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
                                       PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
                                       String checkpointLocation,
@@ -40,10 +43,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
                                       String hostAggregatorDisabledParam,
                                       String tableName,
                                       String outputTableName,
-                                      Long nativeTimeRangeDelay) {
+                                      Long nativeTimeRangeDelay,
+                                      TimelineMetricHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay);
+      tableName, outputTableName, nativeTimeRangeDelay, haController);
   }
 
   @Override

+ 144 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java

@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.StateMachineEngine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+
+public class AggregationTaskRunner {
+  private final String instanceName;
+  private final String zkAddress;
+  private HelixManager manager;
+  private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class);
+  private CheckpointManager checkpointManager;
+  // Map partition name to an aggregator dimension
+  static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>();
+  // Ownership flags to be set by the State transitions
+  private AtomicBoolean performsClusterAggregation = new AtomicBoolean(false);
+  private AtomicBoolean performsHostAggregation = new AtomicBoolean(false);
+
+  public enum AGGREGATOR_NAME {
+    METRIC_RECORD_MINUTE,
+    METRIC_RECORD_HOURLY,
+    METRIC_RECORD_DAILY,
+    METRIC_AGGREGATE_SECOND,
+    METRIC_AGGREGATE_MINUTE,
+    METRIC_AGGREGATE_HOURLY,
+    METRIC_AGGREGATE_DAILY,
+  }
+
+  public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>();
+
+  static {
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily");
+
+    // Partition name to task assignment
+    PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER);
+    PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST);
+  }
+
+  public AggregationTaskRunner(String instanceName, String zkAddress) {
+    this.instanceName = instanceName;
+    this.zkAddress = zkAddress;
+  }
+
+  public void initialize() throws Exception {
+    manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
+      InstanceType.PARTICIPANT, zkAddress);
+
+    OnlineOfflineStateModelFactory stateModelFactory =
+      new OnlineOfflineStateModelFactory(instanceName, this);
+
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
+    manager.connect();
+
+    checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
+  }
+
+  public boolean performsClusterAggregation() {
+    return performsClusterAggregation.get();
+  }
+
+  public boolean performsHostAggregation() {
+    return performsHostAggregation.get();
+  }
+
+  public CheckpointManager getCheckpointManager() {
+    return checkpointManager;
+  }
+
+  public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) {
+    switch (type) {
+      case HOST:
+        performsHostAggregation.set(true);
+        LOG.info("Set host aggregator function for : " + instanceName);
+        break;
+      case CLUSTER:
+        performsClusterAggregation.set(true);
+        LOG.info("Set cluster aggregator function for : " + instanceName);
+    }
+  }
+
+  public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) {
+    switch (type) {
+      case HOST:
+        performsHostAggregation.set(false);
+        LOG.info("Unset host aggregator function for : " + instanceName);
+        break;
+      case CLUSTER:
+        performsClusterAggregation.set(false);
+        LOG.info("Unset cluster aggregator function for : " + instanceName);
+    }
+  }
+
+  /**
+   * Disconnect participant before controller shutdown
+   */
+  void stop() {
+    manager.disconnect();
+  }
+}

+ 98 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+public class CheckpointManager {
+  private final ZkHelixPropertyStore<ZNRecord> propertyStore;
+  private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
+
+  static final String ZNODE_FIELD = "checkpoint";
+  static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
+
+  public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this.propertyStore = propertyStore;
+  }
+
+  /**
+   * Read aggregator checkpoint from zookeeper
+   *
+   * @return timestamp
+   */
+  public long readCheckpoint(AGGREGATOR_NAME aggregatorName) {
+    String path = getCheckpointZKPath(aggregatorName);
+    LOG.debug("Reading checkpoint at " + path);
+    Stat stat = new Stat();
+    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Stat => " + stat);
+    }
+    long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1;
+    LOG.debug("Checkpoint value = " + checkpoint);
+    return checkpoint;
+  }
+
+  /**
+   * Write aggregator checkpoint in zookeeper
+   *
+   * @param value timestamp
+   * @return sucsess
+   */
+  public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) {
+    String path = getCheckpointZKPath(aggregatorName);
+    LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value));
+    return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT);
+  }
+
+  static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
+    final String path;
+    final Long value;
+
+    public CheckpointDataUpdater(String path, Long value) {
+      this.path = path;
+      this.value = value;
+    }
+
+    @Override
+    public ZNRecord update(ZNRecord currentData) {
+      if (currentData == null) {
+        currentData = new ZNRecord(path);
+      }
+      currentData.setLongField(ZNODE_FIELD, value);
+      return currentData;
+    }
+  }
+
+  String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) {
+    StringBuilder sb = new StringBuilder("/");
+    sb.append(CHECKPOINT_PATH_PREFIX);
+    sb.append("/");
+    sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+    return sb.toString();
+  }
+}

+ 69 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
+
+public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class);
+  private final String instanceName;
+  private final AggregationTaskRunner taskRunner;
+
+  public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) {
+    this.instanceName = instanceName;
+    this.taskRunner = taskRunner;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partition) {
+    LOG.info("Received request to process partition = " + partition + ", for " +
+      "resource = " + resourceName + ", at " + instanceName);
+    return new OnlineOfflineStateModel();
+  }
+
+  public class OnlineOfflineStateModel extends StateModel {
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      String partitionName = message.getPartitionName();
+      LOG.info("Received transition to Online from Offline for partition: " + partitionName);
+      AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+      taskRunner.setPartitionAggregationFunction(type);
+    }
+
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      String partitionName = message.getPartitionName();
+      LOG.info("Received transition to Offline from Online for partition: " + partitionName);
+      AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+      taskRunner.unsetPartitionAggregationFunction(type);
+    }
+
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      String partitionName = message.getPartitionName();
+      LOG.info("Received transition to Dropped from Offline for partition: " + partitionName);
+      AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+      taskRunner.unsetPartitionAggregationFunction(type);
+    }
+  }
+}

+ 276 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java

@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
+
+public class TimelineMetricHAController {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricHAController.class);
+
+  static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+  static final String STATE_MODEL_NAME = OnlineOfflineSMD.name;
+  static final String INSTANCE_NAME_DELIMITER = "_";
+
+  final String zkConnectUrl;
+  final String instanceHostname;
+  final InstanceConfig instanceConfig;
+  final AggregationTaskRunner aggregationTaskRunner;
+
+  // Cache list of known live instances
+  final List<String> liveInstanceNames = new ArrayList<>();
+
+  // Helix Admin
+  HelixAdmin admin;
+  // Helix Manager
+  HelixManager manager;
+
+  private volatile boolean isInitialized = false;
+
+  public TimelineMetricHAController(TimelineMetricConfiguration configuration) {
+    String instancePort;
+    try {
+      instanceHostname = configuration.getInstanceHostnameFromEnv();
+      instancePort = configuration.getInstancePort();
+
+    } catch (Exception e) {
+      LOG.error("Error reading configs from classpath, will resort to defaults.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    try {
+      String zkClientPort = configuration.getZKClientPort();
+      String zkQuorum = configuration.getZKQuorum();
+
+      if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
+        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+          + zkClientPort +", quorum = " + zkQuorum);
+      }
+
+      zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
+
+    } catch (Exception e) {
+      LOG.error("Unable to load hbase-site from classpath.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
+    instanceConfig.setHostName(instanceHostname);
+    instanceConfig.setPort(instancePort);
+    instanceConfig.setInstanceEnabled(true);
+    aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl);
+  }
+
+  /**
+   * Initialize the instance with zookeeper via Helix
+   */
+  public void initializeHAController() throws Exception {
+    admin = new ZKHelixAdmin(zkConnectUrl);
+    // create cluster
+    LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME);
+    admin.addCluster(CLUSTER_NAME, false);
+
+    // Adding host to the cluster
+    List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+    if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) {
+      LOG.info("Adding participant instance " + instanceConfig);
+      admin.addInstance(CLUSTER_NAME, instanceConfig);
+    }
+
+    // Add a state model
+    if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) {
+      LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
+      admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, OnlineOfflineSMD.build());
+    }
+
+    // Add resources with 1 cluster-wide replica
+    // Since our aggregators are unbalanced in terms of work distribution we
+    // only need to distribute writes to METRIC_AGGREGATE and
+    // METRIC_RECORD_MINUTE
+    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
+    if (!resources.contains(METRIC_AGGREGATORS)) {
+      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
+      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, OnlineOfflineSMD.name, FULL_AUTO.toString());
+    }
+    // this will set up the ideal state, it calculates the preference list for
+    // each partition similar to consistent hashing
+    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Start participant
+    startAggregators();
+
+    // Start controller
+    startController();
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        aggregationTaskRunner.stop();
+        manager.disconnect();
+      }
+    });
+
+    isInitialized = true;
+  }
+
+  /**
+   * Return true if HA controller is enabled.
+   */
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  private void startAggregators() {
+    try {
+      aggregationTaskRunner.initialize();
+
+    } catch (Exception e) {
+      LOG.error("Unable to start aggregators.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+  }
+
+  private void startController() throws Exception {
+    manager = HelixManagerFactory.getZKHelixManager(
+      CLUSTER_NAME,
+      instanceHostname,
+      InstanceType.CONTROLLER,
+      zkConnectUrl
+    );
+
+    manager.connect();
+    HelixController controller = new HelixController();
+    manager.addLiveInstanceChangeListener(controller);
+  }
+
+  private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+    StringBuilder sb = new StringBuilder();
+    String[] quorumParts = zkQuorum.split(",");
+    String prefix = "";
+    for (String part : quorumParts) {
+      sb.append(prefix);
+      sb.append(part.trim());
+      if (!part.contains(":")) {
+        sb.append(":");
+        sb.append(zkClientPort);
+      }
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  public AggregationTaskRunner getAggregationTaskRunner() {
+    return aggregationTaskRunner;
+  }
+
+  public List<String> getLiveInstanceHostNames() {
+    List<String> liveInstanceHostNames = new ArrayList<>();
+
+    for (String instance : liveInstanceNames) {
+      liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
+    }
+
+    return liveInstanceHostNames;
+  }
+
+  public class HelixController extends GenericHelixController {
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    Joiner joiner = Joiner.on(", ").skipNulls();
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+      super.onLiveInstanceChange(liveInstances, changeContext);
+
+      liveInstanceNames.clear();
+      for (LiveInstance instance : liveInstances) {
+        liveInstanceNames.add(instance.getInstanceName());
+      }
+
+      LOG.info("Detected change in liveliness of Collector instances. " +
+        "LiveIsntances = " + joiner.join(liveInstanceNames));
+      // Print HA state - after some delay
+      executorService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          printClusterState();
+        }
+      }, 30, TimeUnit.SECONDS);
+
+
+    }
+  }
+
+  public void printClusterState() {
+    StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
+
+    ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+    if (resourceExternalView != null) {
+      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+    }
+    sb.append("\n##################################################");
+    LOG.info(sb.toString());
+  }
+
+  private void getPrintableResourceState(ExternalView resourceExternalView,
+                                         String resourceName,
+                                         StringBuilder sb) {
+    TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
+    sb.append("\nCLUSTER: ");
+    sb.append(CLUSTER_NAME);
+    sb.append("\nRESOURCE: ");
+    sb.append(resourceName);
+    for (String partitionName : sortedSet) {
+      sb.append("\nPARTITION: ");
+      sb.append(partitionName).append("\t");
+      Map<String, String> states = resourceExternalView.getStateMap(partitionName);
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        sb.append("\t");
+        sb.append(stateEntry.getKey());
+        sb.append("\t");
+        sb.append(stateEntry.getValue());
+      }
+    }
+  }
+}

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java

@@ -32,7 +32,7 @@ import java.sql.SQLException;
 public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
 
   static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
-  private static final String ZOOKEEPER_CLIENT_PORT ="hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
   private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
   private static final String ZNODE_PARENT = "zookeeper.znode.parent";
 

+ 12 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java

@@ -411,6 +411,18 @@ public class TimelineWebServices {
     }
   }
 
+  @GET
+  @Path("/metrics/livenodes")
+  @Produces({ MediaType.APPLICATION_JSON })
+  public List<String> getLiveCollectorNodes(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res
+  ) {
+    init(res);
+
+    return timelineMetricStore.getLiveInstances();
+  }
+
   /**
    * Store the given entities into the timeline store, and return the errors
    * that happen during storing.

+ 6 - 6
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java

@@ -109,7 +109,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetMetricRecordsMinutes() throws IOException, SQLException {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration(), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -146,7 +146,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetMetricRecordsHours() throws IOException, SQLException {
     // GIVEN
     TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(), null);
 
     MetricHostAggregate expectedAggregate =
         createMetricHostAggregate(2.0, 0.0, 20, 15.0);
@@ -198,7 +198,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-        hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -237,8 +237,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond
-        (hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+        new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -279,7 +279,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetClusterMetricRecordsHours() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration(), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;

+ 5 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java

@@ -95,4 +95,9 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
   public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
     return Collections.emptyMap();
   }
+
+  @Override
+  public List<String> getLiveInstances() {
+    return Collections.emptyList();
+  }
 }

+ 3 - 3
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java

@@ -18,7 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public class AbstractTimelineAggregatorTest {
 
   @Before
   public void setUp() throws Exception {
-    sleepIntervalMillis = 5*60*1000l; //5 minutes
+    sleepIntervalMillis = 5 * 60 * 1000l; //5 minutes
     checkpointCutOffMultiplier = 2;
 
     Configuration metricsConf = new Configuration();
@@ -56,7 +56,7 @@ public class AbstractTimelineAggregatorTest {
     checkPoint = new AtomicLong(-1);
     actualRuns = 0;
 
-    agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf) {
+    agg = new AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND, null, metricsConf) {
       @Override
       public boolean doWork(long startTime, long endTime) {
         startTimeInDoWork.set(startTime);

+ 10 - 10
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java

@@ -77,7 +77,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -130,7 +130,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -206,7 +206,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -270,7 +270,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testAggregateDailyClusterMetrics() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false), null);
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -315,7 +315,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
 
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -382,7 +382,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false), null);
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -426,7 +426,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -490,7 +490,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        conf, new TimelineMetricMetadataManager(hdb, new Configuration()));
+        conf, new TimelineMetricMetadataManager(hdb, new Configuration()), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -542,7 +542,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
@@ -619,7 +619,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testAggregationUsingGroupByQuery() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;

+ 4 - 15
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java

@@ -22,24 +22,13 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -95,7 +84,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
-        getConfigurationForTest(false));
+        getConfigurationForTest(false), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -156,7 +145,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregator =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
-        getConfigurationForTest(false));
+        getConfigurationForTest(false), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -219,7 +208,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregator =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
-        getConfigurationForTest(false));
+        getConfigurationForTest(false), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -281,7 +270,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
-        getConfigurationForTest(true));
+        getConfigurationForTest(true), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();

+ 7 - 7
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java

@@ -24,12 +24,12 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.easymock.EasyMock;
 import org.junit.Test;
-
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
 
 public class TimelineMetricClusterAggregatorSecondTest {
 
@@ -44,9 +44,9 @@ public class TimelineMetricClusterAggregatorSecondTest {
     TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
-      "TimelineClusterAggregatorSecond", metricMetadataManagerMock, null, configuration, null,
-      aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval
-    );
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
+      configuration, null, aggregatorInterval, 2, "false", "", "",
+      aggregatorInterval, sliceInterval, null);
 
     secondAggregator.timeSliceIntervalMillis = sliceInterval;
     long roundedEndTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(aggregatorInterval);
@@ -120,9 +120,9 @@ public class TimelineMetricClusterAggregatorSecondTest {
     EasyMock.replay(metricMetadataManagerMock);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
-      "TimelineClusterAggregatorSecond", metricMetadataManagerMock, null, configuration, null,
-      aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval
-    );
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+      aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+      sliceInterval, null);
 
     long startTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
     List<Long[]> timeslices = secondAggregator.getTimeSlices(startTime, startTime + aggregatorInterval);

+ 107 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class TimelineMetricHAControllerTest extends AbstractMiniHBaseClusterTest {
+  TimelineMetricConfiguration configuration;
+
+  @Before
+  public void setup() throws Exception {
+    configuration = createNiceMock(TimelineMetricConfiguration.class);
+
+    expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1");
+    expect(configuration.getInstancePort()).andReturn("12000");
+    // jdbc:phoenix:localhost:52887:/hbase;test=true
+    String zkUrl = getUrl();
+    String port = zkUrl.split(":")[3];
+    String quorum = zkUrl.split(":")[2];
+
+    expect(configuration.getZKClientPort()).andReturn(port);
+    expect(configuration.getZKQuorum()).andReturn(quorum);
+
+    replay(configuration);
+  }
+
+  @Test(timeout = 150000)
+  public void testHAControllerDistributedAggregation() throws Exception {
+    TimelineMetricHAController haController = new TimelineMetricHAController(configuration);
+    haController.initializeHAController();
+    // Wait for task assignment
+    Thread.sleep(10000);
+
+    Assert.assertTrue(haController.isInitialized());
+    Assert.assertEquals(1, haController.getLiveInstanceHostNames().size());
+    Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation());
+    Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation());
+
+    // Add new instance
+    InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001");
+    haController.admin.addInstance(CLUSTER_NAME, instanceConfig2);
+    HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+      instanceConfig2.getInstanceName(),
+      InstanceType.PARTICIPANT, haController.zkConnectUrl);
+    manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME,
+      new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
+        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "")));
+    manager2.connect();
+    haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Wait on re-assignment of partitions
+    Thread.sleep(10000);
+    Assert.assertEquals(2, haController.getLiveInstanceHostNames().size());
+
+    ExternalView view = haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+
+    Map<String, String> partitionInstanceMap = new HashMap<>();
+
+    for (String partition : view.getPartitionSet()) {
+      Map<String, String> states = view.getStateMap(partition);
+      // (instance, state) pairs
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        partitionInstanceMap.put(partition, stateEntry.getKey());
+        Assert.assertEquals("ONLINE", stateEntry.getValue());
+      }
+    }
+    // Re-assigned partitions
+    Assert.assertEquals(2, partitionInstanceMap.size());
+
+    haController.getAggregationTaskRunner().stop();
+    haController.manager.disconnect();
+  }
+}

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java

@@ -363,7 +363,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     dbAccessor.addColumn(CLUSTER_TABLE, new DBColumnInfo(CLUSTER_UPGRADE_ID_COLUMN, Long.class, null, null, true));
 
     dbAccessor.addFKConstraint(CLUSTER_TABLE, "FK_clusters_upgrade_id",
-        CLUSTER_UPGRADE_ID_COLUMN, UPGRADE_TABLE, "upgrade_id", false);
+      CLUSTER_UPGRADE_ID_COLUMN, UPGRADE_TABLE, "upgrade_id", false);
   }
 
   @Override

+ 3 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml

@@ -106,6 +106,9 @@
     <value>
 # Set environment variables here.
 
+# AMS instance name
+export AMS_INSTANCE_NAME={{hostname}}
+
 # The java implementation to use. Java 1.6 required.
 export JAVA_HOME={{java64_home}}
 

+ 1 - 1
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml

@@ -29,7 +29,7 @@
           <name>METRICS_COLLECTOR</name>
           <displayName>Metrics Collector</displayName>
           <category>MASTER</category>
-          <cardinality>1</cardinality>
+          <cardinality>1+</cardinality>
           <versionAdvertised>false</versionAdvertised>
           <reassignAllowed>true</reassignAllowed>
           <timelineAppid>AMS-HBASE</timelineAppid>

+ 47 - 49
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java

@@ -19,46 +19,15 @@
 package org.apache.ambari.server.upgrade;
 
 
-import javax.persistence.EntityManager;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provider;
 import junit.framework.Assert;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createMockBuilder;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.newCapture;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import javax.persistence.EntityManager;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -122,19 +91,48 @@ import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.springframework.security.crypto.password.PasswordEncoder;
 
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.google.inject.Provider;
+import javax.persistence.EntityManager;
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 
-import org.springframework.security.crypto.password.PasswordEncoder;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class UpgradeCatalog240Test {
   private static final String CAPACITY_SCHEDULER_CONFIG_TYPE = "capacity-scheduler";