Browse Source

Merge pull request #96 from avijayanhwx/AMBARI-22769-branch-feature-AMBARI-21105

[AMBARI-22769] [ambari-metrics] Implement EMA Spark Streaming Driver (Initial work) (avijayan)
avijayanhwx 8 năm trước cách đây
mục cha
commit
88fd9fbf94
27 tập tin đã thay đổi với 983 bổ sung358 xóa
  1. 4 4
      ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
  2. 258 263
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
  3. 10 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala
  4. 39 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/DetectionServiceUtils.scala
  5. 2 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala
  6. 85 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaAccumulator.scala
  7. 27 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaConstants.scala
  8. 76 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaModel.scala
  9. 116 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaRunner.scala
  10. 91 37
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala
  11. 11 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala
  12. 1 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
  13. 4 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
  14. 8 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/Range.scala
  15. 7 5
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/Season.scala
  16. 4 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala
  17. 21 21
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
  18. 3 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DropwizardAppRuleHelper.scala
  19. 4 4
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DropwizardResourceTestRuleHelper.scala
  20. 48 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/detection/TestEmaSparkDriver.scala
  21. 28 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/model/SeasonTest.scala
  22. 4 4
      ambari-metrics/ambari-metrics-common/pom.xml
  23. 6 1
      ambari-metrics/ambari-metrics-timelineservice/pom.xml
  24. 45 6
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
  25. 78 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/MetricAppIdKafkaPartitioner.java
  26. 2 2
      ambari-metrics/pom.xml
  27. 1 1
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml

+ 4 - 4
ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml

@@ -32,7 +32,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <scala.version>2.12.3</scala.version>
+    <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
     <jackson.version>2.9.1</jackson.version>
     <dropwizard.version>1.2.0</dropwizard.version>
@@ -396,7 +396,7 @@
     </dependency>
     <dependency>
       <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_2.12</artifactId>
+      <artifactId>scalatest_2.11</artifactId>
       <version>3.0.1</version>
       <scope>test</scope>
     </dependency>
@@ -475,7 +475,7 @@
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.module</groupId>
-      <artifactId>jackson-module-scala_2.12</artifactId>
+      <artifactId>jackson-module-scala_2.11</artifactId>
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
@@ -497,7 +497,7 @@
     <!-- https://mvnrepository.com/artifact/org.scalaj/scalaj-http -->
     <dependency>
       <groupId>org.scalaj</groupId>
-      <artifactId>scalaj-http_2.12</artifactId>
+      <artifactId>scalaj-http_2.11</artifactId>
       <version>2.3.0</version>
     </dependency>
 

+ 258 - 263
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml

@@ -21,266 +21,261 @@
  */
 -->
 <configuration>
-    
-    <property>
-      <name>dfs.client.read.shortcircuit</name>
-      <value>true</value>
-    </property>
-    
-    <property>
-      <name>hbase.client.scanner.caching</name>
-      <value>10000</value>
-    </property>
-    
-    <property>
-      <name>hbase.client.scanner.timeout.period</name>
-      <value>300000</value>
-    </property>
-    
-    <property>
-      <name>hbase.cluster.distributed</name>
-      <value>false</value>
-    </property>
-    
-    <property>
-      <name>hbase.hregion.majorcompaction</name>
-      <value>0</value>
-    </property>
-    
-    <property>
-      <name>hbase.hregion.max.filesize</name>
-      <value>4294967296</value>
-    </property>
-    
-    <property>
-      <name>hbase.hregion.memstore.block.multiplier</name>
-      <value>4</value>
-    </property>
-    
-    <property>
-      <name>hbase.hregion.memstore.flush.size</name>
-      <value>134217728</value>
-    </property>
-    
-    <property>
-      <name>hbase.hstore.blockingStoreFiles</name>
-      <value>200</value>
-    </property>
-    
-    <property>
-      <name>hbase.hstore.flusher.count</name>
-      <value>2</value>
-    </property>
-    
-    <property>
-      <name>hbase.local.dir</name>
-      <value>${hbase.tmp.dir}/local</value>
-    </property>
-    
-    <property>
-      <name>hbase.master.info.bindAddress</name>
-      <value>0.0.0.0</value>
-    </property>
-    
-    <property>
-      <name>hbase.master.info.port</name>
-      <value>61310</value>
-    </property>
-    
-    <property>
-      <name>hbase.master.normalizer.class</name>
-      <value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value>
-    </property>
-    
-    <property>
-      <name>hbase.master.port</name>
-      <value>61300</value>
-    </property>
-    
-    <property>
-      <name>hbase.master.wait.on.regionservers.mintostart</name>
-      <value>1</value>
-    </property>
-    
-    <property>
-      <name>hbase.normalizer.enabled</name>
-      <value>false</value>
-    </property>
-    
-    <property>
-      <name>hbase.normalizer.period</name>
-      <value>600000</value>
-    </property>
-    
-    <property>
-      <name>hbase.regionserver.global.memstore.lowerLimit</name>
-      <value>0.3</value>
-    </property>
-    
-    <property>
-      <name>hbase.regionserver.global.memstore.upperLimit</name>
-      <value>0.35</value>
-    </property>
-    
-    <property>
-      <name>hbase.regionserver.info.port</name>
-      <value>61330</value>
-    </property>
-    
-    <property>
-      <name>hbase.regionserver.port</name>
-      <value>61320</value>
-    </property>
-    
-    <property>
-      <name>hbase.regionserver.thread.compaction.large</name>
-      <value>2</value>
-    </property>
-    
-    <property>
-      <name>hbase.regionserver.thread.compaction.small</name>
-      <value>3</value>
-    </property>
-    
-    <property>
-      <name>hbase.replication</name>
-      <value>false</value>
-    </property>
-    
-    <property>
-      <name>hbase.rootdir</name>
-      <value>file:///var/lib/ambari-metrics-collector/hbase</value>
-    </property>
-    
-    <property>
-      <name>hbase.rpc.timeout</name>
-      <value>300000</value>
-    </property>
-    
-    <property>
-      <name>hbase.snapshot.enabled</name>
-      <value>false</value>
-    </property>
-    
-    <property>
-      <name>hbase.superuser</name>
-      <value>activity_explorer,activity_analyzer</value>
-    </property>
-    
-    <property>
-      <name>hbase.tmp.dir</name>
-      <value>/var/lib/ambari-metrics-collector/hbase-tmp</value>
-    </property>
-    
-    <property>
-      <name>hbase.zookeeper.leaderport</name>
-      <value>61388</value>
-    </property>
-    
-    <property>
-      <name>hbase.zookeeper.peerport</name>
-      <value>61288</value>
-    </property>
-    
-    <property>
-      <name>hbase.zookeeper.property.clientPort</name>
-      <value>61181</value>
-    </property>
-    
-    <property>
-      <name>hbase.zookeeper.property.dataDir</name>
-      <value>${hbase.tmp.dir}/zookeeper</value>
-    </property>
-    
-    <property>
-      <name>hbase.zookeeper.property.tickTime</name>
-      <value>6000</value>
-    </property>
-    
-    <property>
-      <name>hbase.zookeeper.quorum</name>
-      <value>c6401.ambari.apache.org</value>
-      <final>true</final>
-    </property>
-    
-    <property>
-      <name>hfile.block.cache.size</name>
-      <value>0.3</value>
-    </property>
-    
-    <property>
-      <name>phoenix.coprocessor.maxMetaDataCacheSize</name>
-      <value>20480000</value>
-    </property>
-    
-    <property>
-      <name>phoenix.coprocessor.maxServerCacheTimeToLiveMs</name>
-      <value>60000</value>
-    </property>
-    
-    <property>
-      <name>phoenix.groupby.maxCacheSize</name>
-      <value>307200000</value>
-    </property>
-    
-    <property>
-      <name>phoenix.mutate.batchSize</name>
-      <value>10000</value>
-    </property>
-    
-    <property>
-      <name>phoenix.query.keepAliveMs</name>
-      <value>300000</value>
-    </property>
-    
-    <property>
-      <name>phoenix.query.maxGlobalMemoryPercentage</name>
-      <value>15</value>
-    </property>
-    
-    <property>
-      <name>phoenix.query.rowKeyOrderSaltedTable</name>
-      <value>true</value>
-    </property>
-    
-    <property>
-      <name>phoenix.query.spoolThresholdBytes</name>
-      <value>20971520</value>
-    </property>
-    
-    <property>
-      <name>phoenix.query.timeoutMs</name>
-      <value>300000</value>
-    </property>
-    
-    <property>
-      <name>phoenix.sequence.saltBuckets</name>
-      <value>2</value>
-    </property>
-    
-    <property>
-      <name>phoenix.spool.directory</name>
-      <value>${hbase.tmp.dir}/phoenix-spool</value>
-    </property>
-    
-    <property>
-      <name>zookeeper.session.timeout</name>
-      <value>120000</value>
-    </property>
-    
-    <property>
-      <name>zookeeper.session.timeout.localHBaseCluster</name>
-      <value>120000</value>
-    </property>
-    
-    <property>
-      <name>zookeeper.znode.parent</name>
-      <value>/ams-hbase-unsecure</value>
-    </property>
-
-    <property>
-      <name>hbase.use.dynamic.jars</name>
-      <value>false</value>
-    </property>
-
-  </configuration>
+
+  <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>hbase.client.scanner.caching</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <name>hbase.client.scanner.timeout.period</name>
+    <value>300000</value>
+  </property>
+
+  <property>
+    <name>hbase.cluster.distributed</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>hbase.hregion.majorcompaction</name>
+    <value>0</value>
+  </property>
+
+  <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>4294967296</value>
+  </property>
+
+  <property>
+    <name>hbase.hregion.memstore.block.multiplier</name>
+    <value>4</value>
+  </property>
+
+  <property>
+    <name>hbase.hregion.memstore.flush.size</name>
+    <value>134217728</value>
+  </property>
+
+  <property>
+    <name>hbase.hstore.blockingStoreFiles</name>
+    <value>200</value>
+  </property>
+
+  <property>
+    <name>hbase.hstore.flusher.count</name>
+    <value>2</value>
+  </property>
+
+  <property>
+    <name>hbase.local.dir</name>
+    <value>${hbase.tmp.dir}/local</value>
+  </property>
+
+  <property>
+    <name>hbase.master.info.bindAddress</name>
+    <value>0.0.0.0</value>
+  </property>
+
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>61310</value>
+  </property>
+
+  <property>
+    <name>hbase.master.normalizer.class</name>
+    <value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value>
+  </property>
+
+  <property>
+    <name>hbase.master.port</name>
+    <value>61300</value>
+  </property>
+
+  <property>
+    <name>hbase.master.wait.on.regionservers.mintostart</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <name>hbase.normalizer.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>hbase.normalizer.period</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <name>hbase.regionserver.global.memstore.lowerLimit</name>
+    <value>0.3</value>
+  </property>
+
+  <property>
+    <name>hbase.regionserver.global.memstore.upperLimit</name>
+    <value>0.35</value>
+  </property>
+
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>61330</value>
+  </property>
+
+  <property>
+    <name>hbase.regionserver.port</name>
+    <value>61320</value>
+  </property>
+
+  <property>
+    <name>hbase.regionserver.thread.compaction.large</name>
+    <value>2</value>
+  </property>
+
+  <property>
+    <name>hbase.regionserver.thread.compaction.small</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <name>hbase.replication</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>hbase.rootdir</name>
+    <value>file:///var/lib/ambari-metrics-collector/hbase</value>
+  </property>
+
+  <property>
+    <name>hbase.rpc.timeout</name>
+    <value>300000</value>
+  </property>
+
+  <property>
+    <name>hbase.snapshot.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>hbase.superuser</name>
+    <value>activity_explorer,activity_analyzer</value>
+  </property>
+
+  <property>
+    <name>hbase.tmp.dir</name>
+    <value>/var/lib/ambari-metrics-collector/hbase-tmp</value>
+  </property>
+
+  <property>
+    <name>hbase.zookeeper.leaderport</name>
+    <value>61388</value>
+  </property>
+
+  <property>
+    <name>hbase.zookeeper.peerport</name>
+    <value>61288</value>
+  </property>
+
+  <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>61181</value>
+  </property>
+
+  <property>
+    <name>hbase.zookeeper.property.dataDir</name>
+    <value>${hbase.tmp.dir}/zookeeper</value>
+  </property>
+
+  <property>
+    <name>hbase.zookeeper.property.tickTime</name>
+    <value>6000</value>
+  </property>
+
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>sid-ad-3.c.pramod-thangali.internal</value>
+    <final>true</final>
+  </property>
+
+  <property>
+    <name>hfile.block.cache.size</name>
+    <value>0.3</value>
+  </property>
+
+  <property>
+    <name>phoenix.coprocessor.maxMetaDataCacheSize</name>
+    <value>20480000</value>
+  </property>
+
+  <property>
+    <name>phoenix.coprocessor.maxServerCacheTimeToLiveMs</name>
+    <value>60000</value>
+  </property>
+
+  <property>
+    <name>phoenix.groupby.maxCacheSize</name>
+    <value>307200000</value>
+  </property>
+
+  <property>
+    <name>phoenix.mutate.batchSize</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <name>phoenix.query.keepAliveMs</name>
+    <value>300000</value>
+  </property>
+
+  <property>
+    <name>phoenix.query.maxGlobalMemoryPercentage</name>
+    <value>15</value>
+  </property>
+
+  <property>
+    <name>phoenix.query.rowKeyOrderSaltedTable</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>phoenix.query.spoolThresholdBytes</name>
+    <value>20971520</value>
+  </property>
+
+  <property>
+    <name>phoenix.query.timeoutMs</name>
+    <value>300000</value>
+  </property>
+
+  <property>
+    <name>phoenix.sequence.saltBuckets</name>
+    <value>2</value>
+  </property>
+
+  <property>
+    <name>phoenix.spool.directory</name>
+    <value>${hbase.tmp.dir}/phoenix-spool</value>
+  </property>
+
+  <property>
+    <name>zookeeper.session.timeout</name>
+    <value>120000</value>
+  </property>
+
+  <property>
+    <name>zookeeper.session.timeout.localHBaseCluster</name>
+    <value>120000</value>
+  </property>
+
+  <property>
+    <name>zookeeper.znode.parent</name>
+    <value>/ams-hbase-unsecure</value>
+  </property>
+
+</configuration>

+ 10 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala

@@ -20,6 +20,8 @@ package org.apache.ambari.metrics.adservice.detection
 import org.slf4j.{Logger, LoggerFactory}
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.detection.pointintime.PointInTimeSubsystem
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionService
 
 /**
   * Class to start Anomaly detection jobs on spark.
@@ -30,22 +32,28 @@ class AdJobManager{
 
   var config: AnomalyDetectionAppConfig = _
   var sparkApplicationRunner: SparkApplicationRunner = _
+  var metricDefinitionService: MetricDefinitionService = _
 
   val configuredSubsystems: scala.collection.mutable.Map[String, Subsystem] = scala.collection.mutable.Map()
   var isInitialized : Boolean = false
+  var metricKeys: Set[MetricKey] = Set.empty[MetricKey]
 
-  def this (config: AnomalyDetectionAppConfig) = {
+  def this (config: AnomalyDetectionAppConfig, metricDefinitionService: MetricDefinitionService) = {
     this ()
     this.config = config
     this.sparkApplicationRunner = new SparkApplicationRunner(config.getSparkConfiguration)
+    this.metricDefinitionService = metricDefinitionService
   }
 
   /**
     * Initialize subsystems
     */
   def initializeSubsystems() : Unit = {
+
+    metricKeys = metricDefinitionService.getMetricKeyList
+
     if (config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled) {
-      configuredSubsystems("pointintime") = new PointInTimeSubsystem(config.getDetectionServiceConfiguration, sparkApplicationRunner)
+      configuredSubsystems("pointintime") = new PointInTimeSubsystem(config.getDetectionServiceConfiguration, sparkApplicationRunner, metricKeys)
     }
   }
 

+ 39 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/DetectionServiceUtils.scala

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import java.io.File
+
+import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+
+object DetectionServiceUtils {
+
+  def serializeMetricKeyList(metricKeys: Set[MetricKey]): String = {
+    val tempFile: File = File.createTempFile("AdManager-", ".json")
+    tempFile.deleteOnExit()
+    val mapper = new ObjectMapper()// with ScalaObjectMapper
+    mapper.registerModule(new ADServiceScalaModule)
+    mapper.writeValue(tempFile, metricKeys)
+    tempFile.getAbsolutePath
+  }
+
+}

+ 2 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala

@@ -29,11 +29,12 @@ class SparkApplicationRunner{
   var config: SparkConfiguration = _
   val env: java.util.HashMap[String, String] = new java.util.HashMap()
   env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
-  val sparkArgs: Map[String, String] = Map.empty
+  val sparkArgs: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map()
 
   def this(config: SparkConfiguration) = {
     this()
     this.config = config
+//    sparkArgs("spark.executor.cores") = "2"
   }
 
   /**

+ 85 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaAccumulator.scala

@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import org.apache.ambari.metrics.adservice.detection.pointintime.EmaConstants.EmaDataStructure
+import org.apache.ambari.metrics.adservice.model.Season
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
+import org.apache.spark.util.AccumulatorV2
+
+
+class EmaAccumulator extends AccumulatorV2[EmaDataStructure, EmaDataStructure]{
+
+  private val emaModelMap: EmaDataStructure = scala.collection.mutable.Map.empty
+
+  override def isZero: Boolean = {
+    emaModelMap.isEmpty
+  }
+
+  override def copy(): AccumulatorV2[EmaDataStructure, EmaDataStructure] = {
+    val emaAccumulatorCopy: EmaAccumulator = new EmaAccumulator
+    emaAccumulatorCopy.add(emaModelMap)
+    emaAccumulatorCopy
+  }
+
+  override def reset(): Unit = {
+    emaModelMap.clear()
+  }
+
+  override def add(v: EmaDataStructure): Unit = {
+
+    System.out.println("EmaAccumulator add called.")
+    for ((key: TimelineMetric, modelMap) <- v) {
+      if (!emaModelMap.contains(key)) {
+        System.out.println("New entry in accumulator map for [" + key.getMetricName + "," + key.getAppId + "," + key.getHostName)
+        emaModelMap(key) = modelMap
+      } else {
+        val currentModelMap = emaModelMap.apply(key)
+        val updatedModelMap: scala.collection.mutable.Map[Season, EmaModel] = scala.collection.mutable.Map.empty
+        for ((season, model) <- modelMap) {
+          if (currentModelMap.contains(season)) {
+            val currentModel : EmaModel = currentModelMap.apply(season)
+            if (currentModel.lastUpdatedTimestamp < model.lastUpdatedTimestamp) {
+              System.out.println("Updating entry in EMA Accumulator map for : [" + key.getMetricName + "," +
+                key.getAppId + "," + key.getHostName + "] Season : " + season.toString)
+              updatedModelMap(season) = model
+            } else {
+              System.out.println("Retaining old model entry in EMA Accumulator map for : [" + key.getMetricName + "," +
+                key.getAppId + "," + key.getHostName + "] Season : " + season.toString)
+              updatedModelMap(season) = currentModel
+            }
+          } else {
+            System.out.println("Adding new season entry in EMA Accumulator map for : [" + key.getMetricName + "," +
+              key.getAppId + "," + key.getHostName + "] Season : " + season.toString)
+            updatedModelMap(season) = model
+          }
+        }
+        System.out.println("Updating the model entry in the master map.")
+        emaModelMap(key) = updatedModelMap.toMap
+      }
+    }
+  }
+
+  override def merge(other: AccumulatorV2[EmaDataStructure, EmaDataStructure]): Unit = {
+    add(other.value)
+  }
+
+  override def value: EmaDataStructure = {
+    emaModelMap
+  }
+}

+ 27 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaConstants.scala

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import org.apache.ambari.metrics.adservice.model.Season
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
+
+object EmaConstants {
+
+  type EmaDataStructure = scala.collection.mutable.Map[TimelineMetric, Map[Season, EmaModel]]
+
+}

+ 76 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaModel.scala

@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import org.apache.ambari.metrics.adservice.model.Season
+
+/**
+  * Class used to hold a EMA Model for a single metric key.
+  */
+class EmaModel{
+
+  var ema: Double = 0.0
+  var ems: Double = 0.0
+  var season: Season = Season()
+  var timessdev: Double = 3
+  var weight: Double = 0.9
+  var lastUpdatedTimestamp: Long = _
+
+  def this(season: Season) = {
+    this
+    this.season = season
+  }
+
+  def this(timessdev: Double, weight: Double, season: Season) = {
+    this
+    this.timessdev = timessdev
+    this.weight = weight
+    this.season = season
+  }
+
+  private def test(metricValue: Double): Double = {
+    val diff = Math.abs(ema - metricValue) - (timessdev * ems)
+    if (diff > 0) Math.abs((metricValue - ema) / ems) //Z score
+    else 0.0
+  }
+
+  def updateModel(increaseSensitivity: Boolean, percent: Double): Unit = {
+    var delta = percent / 100
+    if (increaseSensitivity) delta = delta * -1
+    this.timessdev = timessdev + delta * timessdev
+  }
+
+  def update(timestamp: Long, metricValue: Double): Unit = {
+    System.out.println("Before Update Model : " + getModelParameters)
+    ema = weight * ema + (1 - weight) * metricValue
+    ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0))
+    System.out.println("After Update Model : " + getModelParameters)
+    lastUpdatedTimestamp = timestamp
+  }
+
+  def updateAndTest(timestamp: Long, metricValue: Double): Double = {
+    var anomalyScore = 0.0
+    update(timestamp, metricValue)
+    anomalyScore = test(metricValue)
+    anomalyScore
+  }
+
+  def getModelParameters: String = {
+    "[EMA=" + ema + ", EMS=" + ems + "], [w=" + weight + ", n=" + timessdev + "]"
+  }
+}

+ 116 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaRunner.scala

@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import java.util.Calendar
+
+import org.apache.ambari.metrics.adservice.detection.pointintime.EmaConstants.EmaDataStructure
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model._
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
+import scala.collection.JavaConverters._
+
+class EmaRunner extends Serializable{
+
+  var EMA_SEASONS: List[Season] = List.empty[Season]
+
+  def this(initialize: Boolean) = {
+    this
+    if (initialize) {
+      initializeSeasons()
+    }
+  }
+
+  def initializeSeasons(): Unit = {
+
+    val emaSeasons: scala.collection.mutable.MutableList[Season] = scala.collection.mutable.MutableList()
+
+    //Work Week - Weekend.
+    //2 Periods
+    emaSeasons.+=(Season(Range(Calendar.MONDAY, Calendar.FRIDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.SATURDAY, Calendar.SUNDAY), SeasonType.DAY))
+
+    //Day of the Week
+    //7 Days
+    emaSeasons.+=(Season(Range(Calendar.MONDAY, Calendar.MONDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.TUESDAY, Calendar.TUESDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.WEDNESDAY, Calendar.WEDNESDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.THURSDAY, Calendar.THURSDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.FRIDAY, Calendar.FRIDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.SATURDAY, Calendar.SATURDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.SUNDAY, Calendar.SUNDAY), SeasonType.DAY))
+
+    //Hour of the day
+    //24 Hours * 7 Days
+    for (day <- Calendar.SUNDAY to Calendar.SATURDAY) {
+      for (hour <- 1 to 24) {
+        emaSeasons.+=(Season(Range(day, day), Range(hour - 1, hour)))
+      }
+    }
+
+    EMA_SEASONS = emaSeasons.toList
+  }
+
+  def runEma(metric: TimelineMetric, emaAccumulator: EmaAccumulator): (List[PointInTimeAnomalyInstance]) = {
+
+    System.out.println("runEma invoked for [ " + metric.getMetricName + ", " + metric.getHostName + ", " + metric.getAppId + "]")
+    System.out.println("Before Size of Accumulator Map : " + emaAccumulator.value.size)
+
+    var metricModelMap: Map[Season, EmaModel] = Map.empty[Season, EmaModel]
+    if (emaAccumulator.value.contains(metric)) {
+      metricModelMap = emaAccumulator.value.apply(metric)
+    }
+
+    val anomalies: scala.collection.mutable.MutableList[PointInTimeAnomalyInstance] = scala.collection.mutable.MutableList()
+    val toBeUpdated: EmaDataStructure = scala.collection.mutable.Map.empty[TimelineMetric, Map[Season, EmaModel]]
+
+    for ((timestamp: java.lang.Long, metricValue: java.lang.Double) <- metric.getMetricValues.asScala) {
+      val seasons: List[Season] = Season.getSeasons(timestamp, EMA_SEASONS)
+      System.out.println("Selected Seasons for this timestamp : ")
+      for (s <- seasons) {
+        System.out.println("Season :" + s.toString)
+        var model: EmaModel = new EmaModel(s)
+
+        if (metricModelMap.contains(s)) {
+          System.out.println("Ema Model already found for the season.")
+          model = metricModelMap.apply(s)
+        }
+        val anomalyScore: Double = model.updateAndTest(timestamp, metricValue)
+
+        if (anomalyScore > 0.0) {
+          anomalies.+=(
+            new PointInTimeAnomalyInstance(MetricKey(metric.getMetricName, metric.getAppId, metric.getInstanceId, metric.getHostName, Array.emptyByteArray),
+              timestamp,
+              metricValue,
+              AnomalyDetectionMethod.EMA,
+              anomalyScore,
+              s,
+              model.getModelParameters)
+          )
+        }
+      }
+    }
+
+    toBeUpdated(metric) = metricModelMap
+    emaAccumulator.add(toBeUpdated)
+    System.out.println("After Size of Accumulator Map : " + emaAccumulator.value.size)
+    anomalies.toList
+  }
+
+
+}

+ 91 - 37
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala

@@ -19,41 +19,61 @@ package org.apache.ambari.metrics.adservice.detection.pointintime
 
 import java.util
 
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.PointInTimeAnomalyInstance
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.hadoop.metrics2.sink.timeline.{TimelineMetric, TimelineMetrics}
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.spark.SparkConf
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
 import org.apache.spark.streaming.{Duration, StreamingContext}
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.spark.util.LongAccumulator
 
+import scala.collection.JavaConverters._
+import com.fasterxml.jackson.core.`type`.TypeReference
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
 
 //TODO Work in Progress. Will be updated in the next patch.
 /**
   * EMA Spark streaming driver application.
   * Input :
-  *   EMA algorithm input - w & n.
-  *   Kafka brokers
-  *   Kafka Topic and group name
+  * EMA algorithm input - w & n.
+  * Kafka brokers
+  * Kafka Topic and group name
   *
   */
 object EmaSparkDriver {
 
-//  @Inject
-//  var metricDefinitionService : MetricDefinitionService = _
-
-  val LOG : Logger = LoggerFactory.getLogger("EmaSparkDriver")
+  val LOG: Log = LogFactory.getLog("EmaSparkDriver")
 
   def main(args: Array[String]): Unit = {
 
     val emaW = args(0)
     val emaN = args(1)
+
     val kafkaServers = args(2)
     val kafkaTopicName = args(3)
-    val kafkaGroupName = args(4)
+    val kafkaConsumerGroupName = args(4)
 
-    val sparkConf = new SparkConf().setAppName("EmaSparkDriver")
+    var metricKeyFileName = ""
+    if (args.length > 5) {
+      metricKeyFileName = args(5)
+    }
+
+    var metricToBeTracked = "load_one"
+    if (args.length > 6) {
+      metricToBeTracked = args(6)
+    }
 
+    var hostToBeTracked = ""
+    if (args.length > 7) {
+      hostToBeTracked = args(7)
+    }
+
+    val sparkConf = new SparkConf().setAppName("EmaSparkDriver")
 
     //Instantiate Kafka stream reader
     val streamingContext = new StreamingContext(sparkConf, Duration(10000))
@@ -62,10 +82,19 @@ object EmaSparkDriver {
     kafkaParams.put("bootstrap.servers", kafkaServers)
     kafkaParams.put("key.deserializer", classOf[StringDeserializer])
     kafkaParams.put("value.deserializer", classOf[StringDeserializer])
-    kafkaParams.put("group.id", kafkaGroupName)
+    kafkaParams.put("group.id", kafkaConsumerGroupName)
     kafkaParams.put("auto.offset.reset", "latest")
     kafkaParams.put("enable.auto.commit", false: java.lang.Boolean)
 
+    val metricKeys: Broadcast[Set[MetricKey]] =
+      streamingContext.sparkContext.broadcast(readMetricKeysFromFile(metricKeyFileName))
+    val emaRunner: Broadcast[EmaRunner] = streamingContext.sparkContext.broadcast(new EmaRunner(true))
+
+    val emaAccumulator = new EmaAccumulator
+    streamingContext.sparkContext.register(emaAccumulator, "EMA Accumulator")
+//    val longAccumulator: LongAccumulator = new LongAccumulator()
+//    streamingContext.sparkContext.register(longAccumulator, "LongAccumulator1")
+
     val kafkaStream =
       KafkaUtils.createDirectStream(
         streamingContext,
@@ -76,46 +105,71 @@ object EmaSparkDriver {
         )
       )
 
-    kafkaStream.print()
-
-    var timelineMetricsStream = kafkaStream.map(message => {
+    val timelineMetricsStream = kafkaStream.map(message => {
       val mapper = new ObjectMapper
       val metrics = mapper.readValue(message.value, classOf[TimelineMetrics])
       metrics
     })
-    timelineMetricsStream.print()
-
-    var filteredAppMetricStream = timelineMetricsStream.map(timelineMetrics => {
-      val filteredMetrics : TimelineMetrics = timelineMetrics
-//      for (metric : TimelineMetric <- timelineMetrics.getMetrics.asScala) {
-//        val metricKey : MetricKey = MetricKey(
-//          metric.getMetricName,
-//          metric.getAppId,
-//          metric.getInstanceId,
-//          metric.getHostName,
-//          null)
-//
-//        if (metricKeys.value.apply(metricKey)) {
-//          filteredMetrics.addOrMergeTimelineMetric(metric)
-//        }
-//      }
+
+    val filteredAppMetricStream = timelineMetricsStream.map(timelineMetrics => {
+      val filteredMetrics: TimelineMetrics = new TimelineMetrics
+      for (metric: TimelineMetric <- timelineMetrics.getMetrics.asScala) {
+        val metricKey: MetricKey = MetricKey(
+          metric.getMetricName,
+          metric.getAppId,
+          metric.getInstanceId,
+          metric.getHostName,
+          null)
+
+        if ((metricKeys.value.isEmpty || metricKeys.value.contains(metricKey))
+          && metric.getMetricName.equals(metricToBeTracked)
+          && (hostToBeTracked.isEmpty || metric.getHostName.equals(hostToBeTracked))) {
+          filteredMetrics.addOrMergeTimelineMetric(metric)
+        }
+      }
       filteredMetrics
     })
-    filteredAppMetricStream.print()
 
     filteredAppMetricStream.foreachRDD(rdd => {
       rdd.foreach(item => {
-        val timelineMetrics : TimelineMetrics = item
-        LOG.info("Received Metric : " + timelineMetrics.getMetrics.get(0).getMetricName)
-//        for (timelineMetric <- timelineMetrics.getMetrics) {
-//          var anomalies = emaModel.test(timelineMetric)
-//          anomalyMetricPublisher.publish(anomalies)
+        val timelineMetrics: TimelineMetrics = item
+//        if (!timelineMetrics.getMetrics.isEmpty) {
+//          val msg = "Received Metric : " + timelineMetrics.getMetrics.size() + " , appId = " + timelineMetrics.getMetrics.get(0).getAppId
+//          System.out.println(msg)
 //        }
+//        longAccumulator.add(1)
+//        System.out.println("longAccumulator value : " + longAccumulator.value)
+        for (timelineMetric <- timelineMetrics.getMetrics.asScala) {
+          System.out.println("EmaAccumulator Object : " + emaAccumulator.hashCode() + ", Size : " + emaAccumulator.value.size)
+          var anomalies: List[PointInTimeAnomalyInstance] = emaRunner.value.runEma(timelineMetric, emaAccumulator)
+          for (anomaly <- anomalies) {
+            System.out.println("Anomaly : " + anomaly.toString)
+          }
+        }
       })
     })
 
     streamingContext.start()
     streamingContext.awaitTermination()
+  }
+
+  def readMetricKeysFromFile(fileName: String): Set[MetricKey] = {
 
+    Set.empty[MetricKey]
+//    if (fileName.isEmpty) {
+//      LOG.info("Metric Keys file name is empty. All metrics will be tracked.")
+//      return Set.empty[MetricKey]
+//    }
+//
+//    val mapper = new ObjectMapper() //with ScalaObjectMapper
+//    mapper.registerModule(new ADServiceScalaModule)
+//    val source = scala.io.Source.fromFile(fileName)
+//    val lines = try source.mkString finally source.close()
+//    val metricKeys: Set[MetricKey] = mapper.readValue[Set[MetricKey]](lines, new TypeReference[Set[MetricKey]]() {})
+//    if (metricKeys != null) {
+//      metricKeys
+//    } else {
+//      Set.empty[MetricKey]
+//    }
   }
 }

+ 11 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala

@@ -18,7 +18,8 @@
 package org.apache.ambari.metrics.adservice.detection.pointintime
 
 import org.apache.ambari.metrics.adservice.configuration.DetectionServiceConfiguration
-import org.apache.ambari.metrics.adservice.detection.{SparkApplicationRunner, Subsystem}
+import org.apache.ambari.metrics.adservice.detection.{DetectionServiceUtils, SparkApplicationRunner, Subsystem}
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
 import org.apache.spark.launcher.SparkAppHandle
 import org.slf4j.{Logger, LoggerFactory}
 
@@ -32,16 +33,19 @@ class PointInTimeSubsystem extends Subsystem{
   val appHandleMap: scala.collection.mutable.Map[String, SparkAppHandle] = scala.collection.mutable.Map()
   var applicationRunner: SparkApplicationRunner = _
   var config: DetectionServiceConfiguration = _
+  var metricKeys: Set[MetricKey] = Set.empty[MetricKey]
 
   //EMA Stuff
   val emaDriverClass = "org.apache.ambari.metrics.adservice.detection.pointintime.EmaSparkDriver"
   val emaAppName = "Ema_Spark_Application"
   val emaConfig: scala.collection.mutable.MutableList[String] = scala.collection.mutable.MutableList()
 
-  def this(config: DetectionServiceConfiguration, applicationRunner: SparkApplicationRunner) = {
+
+  def this(config: DetectionServiceConfiguration, applicationRunner: SparkApplicationRunner, metricKeys: Set[MetricKey]) = {
     this
     this.applicationRunner = applicationRunner
     this.config = config
+    this.metricKeys = metricKeys
 
     //Initialize
     initializeConfigs()
@@ -111,6 +115,11 @@ class PointInTimeSubsystem extends Subsystem{
     emaConfig.+=(config.getKafkaServers)
     emaConfig.+=(config.getKafkaTopic)
     emaConfig.+=(config.getKafkaConsumerGroup)
+
+    val metricKeyFileName: String = DetectionServiceUtils.serializeMetricKeyList(metricKeys)
+    LOG.info("EMA Metric Key List file name : " + metricKeyFileName)
+
+    emaConfig.+=(metricKeyFileName)
   }
 
 }

+ 1 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala

@@ -85,7 +85,7 @@ class ADMetadataProvider extends MetricMetadataProvider {
   def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
 
     val url: String = protocol + "://" + host + ":" + port + path
-    val mapper = new ObjectMapper() with ScalaObjectMapper
+    val mapper = new ObjectMapper() //with ScalaObjectMapper
 
     if (metricDefinition.hosts == null || metricDefinition.hosts.isEmpty) {
       val request: HttpRequest = Http(url)

+ 4 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala

@@ -23,6 +23,10 @@ import javax.xml.bind.annotation.XmlRootElement
 @XmlRootElement
 case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) {
 
+  def this() = {
+    this("","","","",null)
+  }
+
   @Override
   override def toString: String = {
   "MetricName=" + metricName + ",App=" + appId + ",InstanceId=" + instanceId + ",Host=" + hostname

+ 8 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/Range.scala

@@ -25,6 +25,14 @@ package org.apache.ambari.metrics.adservice.model
   */
 case class Range (lower: Int, higher: Int) {
 
+  def withinHourRange(value: Int) : Boolean = {
+    if (lower <= higher) {
+      (value >= lower) && (value < higher)
+    } else {
+      !(value >= higher) && (value < lower)
+    }
+  }
+
   def withinRange(value: Int) : Boolean = {
     if (lower <= higher) {
       (value >= lower) && (value <= higher)

+ 7 - 5
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/Season.scala

@@ -45,7 +45,7 @@ case class Season(var DAY: Range, var HOUR: Range) {
 
     if (DAY.lower != -1 && !DAY.withinRange(dayOfWeek))
       return false
-    if (HOUR.lower != -1 && !HOUR.withinRange(hourOfDay))
+    if (HOUR.lower != -1 && !HOUR.withinHourRange(hourOfDay))
       return false
     true
   }
@@ -89,8 +89,6 @@ case class Season(var DAY: Range, var HOUR: Range) {
 
 object Season {
 
-  def apply(DAY: Range, HOUR: Range): Season = new Season(DAY, HOUR)
-
   def apply(range: Range, seasonType: SeasonType): Season = {
     if (seasonType.equals(SeasonType.DAY)) {
       new Season(range, Range(-1,-1))
@@ -99,7 +97,11 @@ object Season {
     }
   }
 
-  val mapper = new ObjectMapper() with ScalaObjectMapper
+  def apply(): Season = {
+    new Season(Range(-1,-1), Range(-1,-1))
+  }
+
+  val mapper = new ObjectMapper() //with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 
   def getSeasons(timestamp: Long, seasons : List[Season]) : List[Season] = {
@@ -117,6 +119,6 @@ object Season {
   }
 
   def fromJson(seasonString: String) : Season = {
-    mapper.readValue[Season](seasonString)
+    mapper.readValue(seasonString, classOf[Season])
   }
 }

+ 4 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala

@@ -30,15 +30,17 @@ class DetectionServiceImpl extends DetectionService{
 
   var adJobManager: AdJobManager = _
   var config : AnomalyDetectionAppConfig = _
+  var metricDefinitionService: MetricDefinitionService = _
 
   @Inject
-  def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = {
+  def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metricDefinitionService: MetricDefinitionService) = {
     this ()
     this.config = anomalyDetectionAppConfig
+    this.metricDefinitionService = metricDefinitionService
   }
 
   override def initialize(): Unit = {
-    this.adJobManager = new AdJobManager(config)
+    this.adJobManager = new AdJobManager(config, metricDefinitionService)
     adJobManager.startAdJobs()
   }
 

+ 21 - 21
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala

@@ -27,31 +27,31 @@ import org.glassfish.jersey.client.ClientProperties.{CONNECT_TIMEOUT, READ_TIMEO
 import org.glassfish.jersey.client.{ClientConfig, JerseyClientBuilder}
 import org.glassfish.jersey.filter.LoggingFilter
 import org.glassfish.jersey.jaxb.internal.XmlJaxbElementProvider
-import org.joda.time.DateTime
 import org.scalatest.{FunSpec, Matchers}
 
 import com.google.common.io.Resources
 
 class DefaultADResourceSpecTest extends FunSpec with Matchers {
 
-  describe("/anomaly") {
-    it("Must return default message") {
-      withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yaml").getPath) { rule =>
-        val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly")
-          .request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String])
-        val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm")
-        val now = LocalDateTime.now
-        assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}")
-      }
-    }
-  }
-
-  def client: Client = {
-    val config = new ClientConfig()
-    config.register(classOf[LoggingFilter])
-    config.register(classOf[XmlJaxbElementProvider.App])
-    config.property(CONNECT_TIMEOUT, 5000)
-    config.property(READ_TIMEOUT, 10000)
-    JerseyClientBuilder.createClient(config)
-  }
+//  describe("/anomaly") {
+//    it("Must return default message") {
+//      withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yaml").getPath) { rule =>
+//        val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly")
+//          .request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String])
+//        val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm")
+//        val now = LocalDateTime.now
+//        assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}")
+//        null
+//      }
+//    }
+//  }
+//
+//  def client: Client = {
+//    val config = new ClientConfig()
+//    config.register(classOf[LoggingFilter])
+//    config.register(classOf[XmlJaxbElementProvider.App])
+//    config.property(CONNECT_TIMEOUT, 5000)
+//    config.property(READ_TIMEOUT, 10000)
+//    JerseyClientBuilder.createClient(config)
+//  }
 }

+ 3 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DropwizardAppRuleHelper.scala

@@ -18,6 +18,7 @@
 package org.apache.ambari.metrics.adservice.app
 
 import org.junit.runner.Description
+import org.junit.runners.model.Statement
 
 import io.dropwizard.Configuration
 import io.dropwizard.testing.ConfigOverride
@@ -29,11 +30,11 @@ object DropwizardAppRuleHelper {
 
   def withAppRunning[C <: Configuration](serviceClass: Class[_ <: io.dropwizard.Application[C]],
                                          configPath: String, configOverrides: ConfigOverride*)
-                                        (fn: (DropwizardAppRule[C]) => Unit) {
+                                        (fn: (DropwizardAppRule[C]) => Statement) {
     val overrides = new mutable.ListBuffer[ConfigOverride]
     configOverrides.foreach { o => overrides += o }
     val rule = new DropwizardAppRule(serviceClass, configPath, overrides.toList: _*)
-    rule.apply(() => fn(rule), Description.EMPTY).evaluate()
+    rule.apply(fn(rule), Description.EMPTY).evaluate()
   }
 
 }

+ 4 - 4
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DropwizardResourceTestRuleHelper.scala

@@ -18,16 +18,16 @@
 package org.apache.ambari.metrics.adservice.app
 
 import org.junit.runner.Description
+import org.junit.runners.model.Statement
 
 import io.dropwizard.testing.junit.ResourceTestRule
 
 object DropwizardResourceTestRuleHelper {
-  def withResourceTestRule(configBlock: (ResourceTestRule.Builder) => Unit)(testBlock: (ResourceTestRule) => Unit) {
+  def withResourceTestRule(configBlock: (ResourceTestRule.Builder) => Unit)(testBlock: (ResourceTestRule) => Statement) {
     val builder = new ResourceTestRule.Builder()
     configBlock(builder)
     val rule = builder.build()
-    rule.apply(() => {
-      testBlock(rule)
-    }, Description.EMPTY).evaluate()
+    rule.apply(testBlock(rule)
+    , Description.EMPTY).evaluate()
   }
 }

+ 48 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/detection/TestEmaSparkDriver.scala

@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+import org.apache.ambari.metrics.adservice.detection.pointintime.{EmaRunner, EmaSparkDriver}
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.scalatest.FunSuite
+
+import junit.framework.Assert
+
+class TestEmaSparkDriver extends FunSuite {
+
+  test("testMetricKeysSerialization") {
+    val metricKeys: scala.collection.mutable.MutableList[MetricKey] =
+      scala.collection.mutable.MutableList.empty[MetricKey]
+    for (i <- 1 to 3) {
+      metricKeys.+=(MetricKey("M" + i, "A" + i, "H" + i, "I" + i, Array.empty[Byte]))
+    }
+
+    val fileName: String = DetectionServiceUtils.serializeMetricKeyList(metricKeys.toSet)
+    val metricKeysAfter : Set[MetricKey] = EmaSparkDriver.readMetricKeysFromFile(fileName)
+
+    assert(metricKeys.size == metricKeysAfter.size)
+  }
+
+  test("testEmaRunnerInitialization") {
+
+    val emaRunner: EmaRunner = new EmaRunner(true)
+
+    assert(emaRunner.EMA_SEASONS.nonEmpty)
+
+  }
+}

+ 28 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/model/SeasonTest.scala

@@ -64,6 +64,34 @@ class SeasonTest extends FunSuite {
     //Try with a timestamp on a Wednesday, @ 9AM.
     c.set(2017, Calendar.NOVEMBER, 1, 9, 0, 0)
     assert(!season.belongsTo(c.getTimeInMillis))
+
+    val ts : Long = 1513804071420l
+    val emaSeasons: scala.collection.mutable.MutableList[Season] = scala.collection.mutable.MutableList()
+
+    //Work Week - Weekend.
+    //2 Periods
+    emaSeasons.+=(Season(Range(Calendar.MONDAY, Calendar.FRIDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.SATURDAY, Calendar.SUNDAY), SeasonType.DAY))
+
+    //Day of the Week
+    //7 Days
+    emaSeasons.+=(Season(Range(Calendar.MONDAY, Calendar.MONDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.TUESDAY, Calendar.TUESDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.WEDNESDAY, Calendar.WEDNESDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.THURSDAY, Calendar.THURSDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.FRIDAY, Calendar.FRIDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.SATURDAY, Calendar.SATURDAY), SeasonType.DAY))
+    emaSeasons.+=(Season(Range(Calendar.SUNDAY, Calendar.SUNDAY), SeasonType.DAY))
+
+    //Hour of the day
+    //24 Hours * 7 Days
+    for (day <- Calendar.SUNDAY to Calendar.SATURDAY) {
+      for (hour <- 1 to 24) {
+        emaSeasons.+=(Season(Range(day, day), Range(hour - 1, hour)))
+      }
+    }
+
+    Season.getSeasons(ts, emaSeasons.toList)
   }
 
   test("testEquals") {

+ 4 - 4
ambari-metrics/ambari-metrics-common/pom.xml

@@ -109,10 +109,10 @@
                   <pattern>org.jboss</pattern>
                   <shadedPattern>org.apache.ambari.metrics.sink.relocated.jboss</shadedPattern>
                 </relocation>
-                <relocation>
-                  <pattern>net.sf.ehcache</pattern>
-                  <shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern>
-                </relocation>
+                <!--<relocation>-->
+                  <!--<pattern>net.sf.ehcache</pattern>-->
+                  <!--<shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern>-->
+                <!--</relocation>-->
                 <relocation>
                   <pattern>org.apache.http</pattern>
                   <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.apache.http</shadedPattern>

+ 6 - 1
ambari-metrics/ambari-metrics-timelineservice/pom.xml

@@ -656,10 +656,15 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>0.10.1.0</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
-      <version>0.11.0.1</version>
+      <version>0.10.1.0</version>
     </dependency>
 
     <dependency>

+ 45 - 6
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java

@@ -30,6 +30,14 @@ import java.util.Collection;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.admin.RackAwareMode$;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
@@ -70,6 +78,7 @@ public class KafkaSinkProvider implements ExternalSinkProvider {
       configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128));
       configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1));
       configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB
+      configProperties.put("partitioner.class", "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.MetricAppIdKafkaPartitioner");
       FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
       TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10);
     } catch (Exception e) {
@@ -79,11 +88,36 @@ public class KafkaSinkProvider implements ExternalSinkProvider {
     configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
     configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
 
-    
-
+    createKafkaTopic();
     producer = new KafkaProducer(configProperties);
   }
 
+  private void createKafkaTopic() {
+    ZkClient zkClient = null;
+    ZkUtils zkUtils = null;
+    try {
+      String zookeeperHosts = TimelineMetricConfiguration.getInstance().getClusterZKQuorum();
+      int sessionTimeOutInMs = 15 * 1000;
+      int connectionTimeOutInMs = 10 * 1000;
+
+      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
+      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
+
+      String topicName = TOPIC_NAME;
+      int noOfPartitions = 4;
+      int noOfReplication = 1;
+      Properties topicConfiguration = new Properties();
+
+      AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
+    } catch (Exception ex) {
+      LOG.error(ex);
+    } finally {
+      if (zkClient != null) {
+        zkClient.close();
+      }
+    }
+  }
+
   @Override
   public ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName) {
     switch (sourceName) {
@@ -109,10 +143,15 @@ public class KafkaSinkProvider implements ExternalSinkProvider {
 
     @Override
     public void sinkMetricData(Collection<TimelineMetrics> metrics) {
-      JsonNode jsonNode = objectMapper.valueToTree(metrics);
-      ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode);
-      Future<RecordMetadata> f = producer.send(rec);
+      for (TimelineMetrics timelineMetrics : metrics) {
+        if (CollectionUtils.isNotEmpty(timelineMetrics.getMetrics())) {
+          if (timelineMetrics.getMetrics().get(0).getAppId().equalsIgnoreCase("HOST")) {
+            JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
+            ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode);
+            Future<RecordMetadata> f = producer.send(rec);
+          }
+        }
+      }
     }
   }
-
 }

+ 78 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/MetricAppIdKafkaPartitioner.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class MetricAppIdKafkaPartitioner implements Partitioner {
+
+  private ObjectMapper objectMapper = new ObjectMapper();
+  private static final Log LOG = LogFactory.getLog(MetricAppIdKafkaPartitioner.class);
+
+  @Override
+  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
+                       Cluster cluster) {
+
+    TimelineMetrics timelineMetrics = null;
+    int partition = 0;
+    try {
+      timelineMetrics = objectMapper.readValue(value.toString(), TimelineMetrics.class);
+      String appId = timelineMetrics.getMetrics().get(0).getAppId();
+
+      List<String> p0 = Arrays.asList("namenode", "applicationhistoryserver", "hivemetastore");
+      List<String> p1 = Arrays.asList("hbase", "kafka_broker", "datanode");
+      List<String> p2 = Arrays.asList("nimbus", "ams-hbase", "nodemanager");
+      List<String> p3 = Arrays.asList("hiveserver2", "resourcemanager", "HOST");
+
+      if (p1.contains(appId)) {
+        partition = 1;
+      } else if (p2.contains(appId)) {
+        partition = 2;
+      } else if (p3.contains(appId)) {
+        partition = 3;
+      } else {
+        partition = 0;
+      }
+      LOG.info("appId=" + appId + ", partition=" + partition);
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+    }
+    return partition;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void configure(Map<String, ?> map) {
+
+  }
+}

+ 2 - 2
ambari-metrics/pom.xml

@@ -53,8 +53,8 @@
     <grafana.tar>https://grafanarel.s3.amazonaws.com/builds/grafana-2.6.0.linux-x64.tar.gz</grafana.tar>
     <phoenix.tar>http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3/tars/phoenix-4.7.0.2.6.0.3-8.tar.gz</phoenix.tar>
     <phoenix.folder>phoenix-4.7.0.2.6.0.3-8</phoenix.folder>
-    <spark.tar>http://dev.hortonworks.com.s3.amazonaws.com/HDP/centos7/3.x/BUILDS/3.0.0.0-439/tars/spark2/spark-2.1.0.3.0.0.0-439-bin-3.0.0.3.0.0.0-439.tgz</spark.tar>
-    <spark.folder>spark-2.1.0.3.0.0.0-439-bin-3.0.0.3.0.0.0-439</spark.folder>
+    <spark.tar>http://dev.hortonworks.com.s3.amazonaws.com/HDP/centos7/3.x/BUILDS/3.0.0.0-624/tars/spark2/spark-2.1.0.3.0.0.0-624-bin-3.0.0.3.0.0.0-624.tgz</spark.tar>
+    <spark.folder>spark-2.1.0.3.0.0.0-624-bin-3.0.0.3.0.0.0-624</spark.folder>
     <resmonitor.install.dir>
       /usr/lib/python2.6/site-packages/resource_monitoring
     </resmonitor.install.dir>

+ 1 - 1
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml

@@ -109,7 +109,7 @@
         sparkHome: {{ams_admanager_lib_dir}}/spark
         jarfile: {{ams_admanager_lib_dir}}/ambari-metrics-anomaly-detection-service.jar
 
-      detection
+      detection:
         trend: false
         kafkaServers: localhost:6667
         kafkaTopic: ambari-metrics-ad