浏览代码

AMBARI-22607 : Design and implement an AD job scheduler. (avijayan)

Aravindan Vijayan 8 年之前
父节点
当前提交
2129b3963b
共有 26 个文件被更改,包括 807 次插入135 次删除
  1. 12 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml
  2. 15 3
      ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
  3. 84 85
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
  4. 4 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
  5. 10 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
  6. 4 3
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
  7. 59 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala
  8. 16 9
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala
  9. 2 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
  10. 88 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala
  11. 77 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala
  12. 27 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala
  13. 121 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala
  14. 116 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala
  15. 0 20
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
  16. 40 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala
  17. 2 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
  18. 26 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala
  19. 56 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala
  20. 8 2
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala
  21. 11 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala
  22. 11 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml
  23. 4 1
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
  24. 1 0
      ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala
  25. 11 1
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml
  26. 2 2
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py

+ 12 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml

@@ -38,8 +38,18 @@ metricDefinitionDB:
   # raise an error as soon as it detects an internal corruption
   performParanoidChecks: false
   # Path to Level DB directory
-  dbDirPath: /tmp/ambari-metrics-anomaly-detection/db
+  dbDirPath: /var/lib/ambari-metrics-anomaly-detection/
 
 spark:
   mode: standalone
-  masterHostPort: localhost:7077
+  master: spark://localhost:7077
+  sparkHome: /usr/lib/ambari-metrics-anomaly-detection/spark
+  jarfile: /usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar
+
+detection:
+  trend: false
+  kafkaServers: localhost:6667
+  kafkaTopic: ambari-metrics-ad
+  kafkaConsumerGroup: ambari-metrics-ad-group
+  ema-w: 0.9
+  ema-n: 3

+ 15 - 3
ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml

@@ -239,6 +239,11 @@
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-launcher_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
@@ -289,8 +294,8 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka_2.10</artifactId>
-      <version>1.6.3</version>
+      <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.phoenix</groupId>
@@ -338,7 +343,7 @@
       <exclusions>
         <exclusion>
           <groupId>com.fasterxml.jackson.module</groupId>
-          <artifactId>jackson-module-scala_2.11</artifactId>
+          <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -524,5 +529,12 @@
       <version>1.8.4</version>
       <scope>test</scope>
     </dependency>
+    <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

+ 84 - 85
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java

@@ -32,7 +32,6 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
 import scala.Tuple2;
 
 import java.io.FileInputStream;
@@ -152,90 +151,90 @@ public class MetricSparkConsumer {
     Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns);
     Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts);
 
-    JavaPairReceiverInputDStream<String, String> messages =
-      KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
-
-    //Convert JSON string to TimelineMetrics.
-    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
-      @Override
-      public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
-        return metrics;
-      }
-    });
-
-    timelineMetricsStream.print();
-
-    //Group TimelineMetric by AppId.
-    JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
-      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
-    );
-
-    appMetricStream.print();
-
-    //Filter AppIds that are not needed.
-    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
-      @Override
-      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
-        return appIds.contains(appMetricTuple._1);
-      }
-    });
-
-    filteredAppMetricStream.print();
-
-    filteredAppMetricStream.foreachRDD(rdd -> {
-      rdd.foreach(
-        tuple2 -> {
-          long currentTime = System.currentTimeMillis();
-          EmaTechnique ema = emaTechniqueBroadcast.getValue();
-          if (currentTime > pitStartTime + pitTestInterval) {
-            LOG.info("Running Tukeys....");
-            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
-            pitStartTime = pitStartTime + pitTestInterval;
-          }
-
-          if (currentTime > ksStartTime + ksTestInterval) {
-            LOG.info("Running KS Test....");
-            trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
-            ksStartTime = ksStartTime + ksTestInterval;
-          }
-
-          if (currentTime > hdevStartTime + hsdevInterval) {
-            LOG.info("Running HSdev Test....");
-            trendADSystemBroadcast.getValue().runHsdevMethod();
-            hdevStartTime = hdevStartTime + hsdevInterval;
-          }
-
-          TimelineMetrics metrics = tuple2._2();
-          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
-
-            boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
-            boolean includeMetric = false;
-            if (includeHost) {
-              if (includePatternBroadcast.getValue().isEmpty()) {
-                includeMetric = true;
-              }
-              for (Pattern p : includePatternBroadcast.getValue()) {
-                Matcher m = p.matcher(timelineMetric.getMetricName());
-                if (m.find()) {
-                  includeMetric = true;
-                }
-              }
-            }
-
-            if (includeMetric) {
-              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
-                timelineMetric.getHostName()));
-              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
-              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
-            }
-          }
-        });
-    });
-
-    jssc.start();
-    jssc.awaitTermination();
+//    JavaPairReceiverInputDStream<String, String> messages =
+//      KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+//
+//    //Convert JSON string to TimelineMetrics.
+//    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+//      @Override
+//      public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+//        ObjectMapper mapper = new ObjectMapper();
+//        TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+//        return metrics;
+//      }
+//    });
+//
+//    timelineMetricsStream.print();
+//
+//    //Group TimelineMetric by AppId.
+//    JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+//      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
+//    );
+//
+//    appMetricStream.print();
+//
+//    //Filter AppIds that are not needed.
+//    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+//      @Override
+//      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+//        return appIds.contains(appMetricTuple._1);
+//      }
+//    });
+//
+//    filteredAppMetricStream.print();
+//
+//    filteredAppMetricStream.foreachRDD(rdd -> {
+//      rdd.foreach(
+//        tuple2 -> {
+//          long currentTime = System.currentTimeMillis();
+//          EmaTechnique ema = emaTechniqueBroadcast.getValue();
+//          if (currentTime > pitStartTime + pitTestInterval) {
+//            LOG.info("Running Tukeys....");
+//            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+//            pitStartTime = pitStartTime + pitTestInterval;
+//          }
+//
+//          if (currentTime > ksStartTime + ksTestInterval) {
+//            LOG.info("Running KS Test....");
+//            trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
+//            ksStartTime = ksStartTime + ksTestInterval;
+//          }
+//
+//          if (currentTime > hdevStartTime + hsdevInterval) {
+//            LOG.info("Running HSdev Test....");
+//            trendADSystemBroadcast.getValue().runHsdevMethod();
+//            hdevStartTime = hdevStartTime + hsdevInterval;
+//          }
+//
+//          TimelineMetrics metrics = tuple2._2();
+//          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+//
+//            boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+//            boolean includeMetric = false;
+//            if (includeHost) {
+//              if (includePatternBroadcast.getValue().isEmpty()) {
+//                includeMetric = true;
+//              }
+//              for (Pattern p : includePatternBroadcast.getValue()) {
+//                Matcher m = p.matcher(timelineMetric.getMetricName());
+//                if (m.find()) {
+//                  includeMetric = true;
+//                }
+//              }
+//            }
+//
+//            if (includeMetric) {
+//              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+//                timelineMetric.getHostName()));
+//              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+//              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+//            }
+//          }
+//        });
+//    });
+//
+//    jssc.start();
+//    jssc.awaitTermination();
   }
 }
 

+ 4 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala

@@ -22,8 +22,7 @@ import javax.ws.rs.container.{ContainerRequestFilter, ContainerResponseFilter}
 
 import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, wrap}
 import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, MetadataDatasource}
-import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService
-import org.apache.ambari.metrics.adservice.service.ADQueryService
+import org.apache.ambari.metrics.adservice.service.{ADQueryService, MetricDefinitionService, DetectionService}
 import org.glassfish.jersey.filter.LoggingFilter
 
 import com.codahale.metrics.health.HealthCheck
@@ -53,6 +52,9 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
       injector.getInstance(classOf[MetadataDatasource]).initialize
       injector.getInstance(classOf[MetricDefinitionService]).initialize
       injector.getInstance(classOf[ADQueryService]).initialize
+      injector.getInstance(classOf[DetectionService]).initialize
+
+      env.lifecycle().manage(injector.getInstance(classOf[DetectionService]))
     }
     env.jersey.register(jacksonJaxbJsonProvider)
     env.jersey.register(new LoggingFilter)

+ 10 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala

@@ -20,7 +20,7 @@ package org.apache.ambari.metrics.adservice.app
 
 import javax.validation.Valid
 
-import org.apache.ambari.metrics.adservice.configuration.{HBaseConfiguration, _}
+import org.apache.ambari.metrics.adservice.configuration.{DetectionServiceConfiguration, HBaseConfiguration, _}
 
 import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties, JsonProperty}
 
@@ -59,6 +59,12 @@ class AnomalyDetectionAppConfig extends Configuration {
   @Valid
   private val sparkConfiguration = new SparkConfiguration
 
+  /**
+    * Detection Service configurations
+    */
+  @Valid
+  private val detectionServiceConfiguration = new DetectionServiceConfiguration
+
   /*
    AMS HBase Conf
     */
@@ -86,4 +92,7 @@ class AnomalyDetectionAppConfig extends Configuration {
   @JsonProperty("spark")
   def getSparkConfiguration: SparkConfiguration = sparkConfiguration
 
+  @JsonProperty("detection")
+  def getDetectionServiceConfiguration: DetectionServiceConfiguration = detectionServiceConfiguration
+
 }

+ 4 - 3
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala

@@ -19,9 +19,8 @@ package org.apache.ambari.metrics.adservice.app
 
 import org.apache.ambari.metrics.adservice.db._
 import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl}
-import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource}
-import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl}
+import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, DetectionResource, MetricDefinitionResource, RootResource}
+import org.apache.ambari.metrics.adservice.service._
 
 import com.codahale.metrics.health.HealthCheck
 import com.google.inject.AbstractModule
@@ -37,11 +36,13 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
     healthCheckBinder.addBinding().to(classOf[DefaultHealthCheck])
     bind(classOf[AnomalyResource])
     bind(classOf[MetricDefinitionResource])
+    bind(classOf[DetectionResource])
     bind(classOf[RootResource])
     bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl])
     bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
     bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl])
     bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
     bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor])
+    bind(classOf[DetectionService]).to(classOf[DetectionServiceImpl])
   }
 }

+ 59 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala

@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+class DetectionServiceConfiguration {
+
+  @JsonProperty("pointInTime")
+  private val pointInTime: Boolean = true
+
+  @JsonProperty("trend")
+  private val trend: Boolean = true
+
+  @JsonProperty("kafkaServers")
+  private val kafkaServers: String = null
+
+  @JsonProperty("kafkaTopic")
+  private val kafkaTopic: String = "ambari-metrics-ad"
+
+  @JsonProperty("kafkaConsumerGroup")
+  private val kafkaConsumerGroup: String = "ambari-metrics-ad-group"
+
+  @JsonProperty("ema-w")
+  private val emaW: String = "0.9"
+
+  @JsonProperty("ema-n")
+  private val emaN: String = "3"
+
+  def isPointInTimeSubsystemEnabled: Boolean = pointInTime
+
+  def isTrendSubsystemEnabled: Boolean = trend
+
+  def getKafkaServers : String = kafkaServers
+
+  def getKafkaTopic : String = kafkaTopic
+
+  def getKafkaConsumerGroup : String = kafkaConsumerGroup
+
+  def getEmaW : String = emaW
+
+  def getEmaN : String = emaN
+
+}

+ 16 - 9
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala

@@ -18,22 +18,29 @@
 
 package org.apache.ambari.metrics.adservice.configuration
 
-import javax.validation.constraints.NotNull
-
 import com.fasterxml.jackson.annotation.JsonProperty
 
 class SparkConfiguration {
 
-  @NotNull
-  private var mode: String = _
+  @JsonProperty("mode")
+  private val mode: String = "standalone"
+
+  @JsonProperty("master")
+  private val master: String = "spark://localhost:7077"
+
+  @JsonProperty("sparkHome")
+  private val sparkHome: String = "/usr/lib/ambari-metrics-anomaly-detection/spark"
+
+  @JsonProperty("jarfile")
+  private val jarfile: String = "/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar"
 
-  @NotNull
-  private var masterHostPort: String = _
 
-  @JsonProperty
   def getMode: String = mode
 
-  @JsonProperty
-  def getMasterHostPort: String = masterHostPort
+  def getMaster: String = master
+
+  def getSparkHome: String = sparkHome
+
+  def getJarFile: String = jarfile
 
 }

+ 2 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala

@@ -22,10 +22,11 @@ import java.util.concurrent.TimeUnit.SECONDS
 
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey}
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
 import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
 import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
 import org.apache.ambari.metrics.adservice.model._
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionService
 import org.apache.hadoop.hbase.util.RetryCounterFactory
 import org.slf4j.{Logger, LoggerFactory}
 

+ 88 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala

@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.detection.pointintime.PointInTimeSubsystem
+
+/**
+  * Class to start Anomaly detection jobs on spark.
+   */
+class AdJobManager{
+
+  val LOG : Logger = LoggerFactory.getLogger(classOf[AdJobManager])
+
+  var config: AnomalyDetectionAppConfig = _
+  var sparkApplicationRunner: SparkApplicationRunner = _
+
+  val configuredSubsystems: scala.collection.mutable.Map[String, Subsystem] = scala.collection.mutable.Map()
+  var isInitialized : Boolean = false
+
+  def this (config: AnomalyDetectionAppConfig) = {
+    this ()
+    this.config = config
+    this.sparkApplicationRunner = new SparkApplicationRunner(config.getSparkConfiguration)
+  }
+
+  /**
+    * Initialize subsystems
+    */
+  def initializeSubsystems() : Unit = {
+    if (config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled) {
+      configuredSubsystems("pointintime") = new PointInTimeSubsystem(config.getDetectionServiceConfiguration, sparkApplicationRunner)
+    }
+  }
+
+
+  /**
+    * Start AD jobs.
+    */
+  def startAdJobs() : Unit = {
+    if (!isInitialized) {
+      initializeSubsystems()
+      isInitialized = true
+    }
+
+    for (subsystem <- configuredSubsystems.values) {
+      subsystem.start()
+    }
+  }
+
+  /**
+    * Stop AD jobs.
+    */
+  def stopAdJobs() : Unit = {
+    for (subsystem <- configuredSubsystems.values) {
+      subsystem.stop()
+    }
+  }
+
+  /**
+    * Get State of all the AD jobs.
+    * @return
+    */
+  def state() : Map[String, Map[String, String]] = {
+    val stateMap: scala.collection.mutable.Map[String, Map[String, String]] = scala.collection.mutable.Map()
+
+    for ((subsystemName, subsystem) <- configuredSubsystems) {
+      stateMap(subsystemName) = subsystem.state
+    }
+    stateMap.toMap
+  }
+}

+ 77 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala

@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+import org.apache.ambari.metrics.adservice.configuration.SparkConfiguration
+import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
+import org.apache.spark.launcher.SparkAppHandle.State
+
+/**
+  * Class to run Spark jobs given a set of arguments
+  */
+class SparkApplicationRunner{
+
+  var config: SparkConfiguration = _
+  val env: java.util.HashMap[String, String] = new java.util.HashMap()
+  env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
+  val sparkArgs: Map[String, String] = Map.empty
+
+  def this(config: SparkConfiguration) = {
+    this()
+    this.config = config
+  }
+
+  /**
+    * Run Spark Job.
+    * @param appName Name of the application
+    * @param appClass Application Class
+    * @param appArgs Command Line args to be submitted to Spark.
+    * @return Handle to the Spark job.
+    */
+  def runSparkJob(appName: String,
+                  appClass: String,
+                  appArgs: Iterable[String]): SparkAppHandle = {
+
+    val launcher: SparkLauncher = new SparkLauncher(env)
+      .setAppName(appName)
+      .setSparkHome(config.getSparkHome)
+      .setAppResource(config.getJarFile)
+      .setMainClass(appClass)
+      .setMaster(config.getMaster)
+
+    for (arg <- appArgs) {
+      launcher.addAppArgs(arg)
+    }
+
+    for ((name,value) <- sparkArgs) {
+      launcher.addSparkArg(name, value)
+    }
+
+    val handle: SparkAppHandle = launcher.startApplication()
+    handle
+  }
+
+  /**
+    * Helper method to check if a Spark job is running.
+    * @param handle
+    * @return
+    */
+  def isRunning(handle: SparkAppHandle): Boolean = {
+    handle.getState.equals(State.CONNECTED) || handle.getState.equals(State.SUBMITTED) || handle.getState.equals(State.RUNNING)
+  }
+}

+ 27 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+trait Subsystem {
+
+  def start() : Unit
+
+  def stop(): Unit
+
+  def state: Map[String, String]
+}

+ 121 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala

@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import java.util
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
+import org.apache.spark.streaming.{Duration, StreamingContext}
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+//TODO Work in Progress. Will be updated in the next patch.
+/**
+  * EMA Spark streaming driver application.
+  * Input :
+  *   EMA algorithm input - w & n.
+  *   Kafka brokers
+  *   Kafka Topic and group name
+  *
+  */
+object EmaSparkDriver {
+
+//  @Inject
+//  var metricDefinitionService : MetricDefinitionService = _
+
+  val LOG : Logger = LoggerFactory.getLogger("EmaSparkDriver")
+
+  def main(args: Array[String]): Unit = {
+
+    val emaW = args(0)
+    val emaN = args(1)
+    val kafkaServers = args(2)
+    val kafkaTopicName = args(3)
+    val kafkaGroupName = args(4)
+
+    val sparkConf = new SparkConf().setAppName("EmaSparkDriver")
+
+
+    //Instantiate Kafka stream reader
+    val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+
+    val kafkaParams: java.util.HashMap[String, Object] = new java.util.HashMap[String, Object]
+    kafkaParams.put("bootstrap.servers", kafkaServers)
+    kafkaParams.put("key.deserializer", classOf[StringDeserializer])
+    kafkaParams.put("value.deserializer", classOf[StringDeserializer])
+    kafkaParams.put("group.id", kafkaGroupName)
+    kafkaParams.put("auto.offset.reset", "latest")
+    kafkaParams.put("enable.auto.commit", false: java.lang.Boolean)
+
+    val kafkaStream =
+      KafkaUtils.createDirectStream(
+        streamingContext,
+        LocationStrategies.PreferConsistent,
+        ConsumerStrategies.Subscribe[String, String](
+          util.Arrays.asList(kafkaTopicName),
+          kafkaParams.asInstanceOf[java.util.Map[String, Object]]
+        )
+      )
+
+    kafkaStream.print()
+
+    var timelineMetricsStream = kafkaStream.map(message => {
+      val mapper = new ObjectMapper
+      val metrics = mapper.readValue(message.value, classOf[TimelineMetrics])
+      metrics
+    })
+    timelineMetricsStream.print()
+
+    var filteredAppMetricStream = timelineMetricsStream.map(timelineMetrics => {
+      val filteredMetrics : TimelineMetrics = timelineMetrics
+//      for (metric : TimelineMetric <- timelineMetrics.getMetrics.asScala) {
+//        val metricKey : MetricKey = MetricKey(
+//          metric.getMetricName,
+//          metric.getAppId,
+//          metric.getInstanceId,
+//          metric.getHostName,
+//          null)
+//
+//        if (metricKeys.value.apply(metricKey)) {
+//          filteredMetrics.addOrMergeTimelineMetric(metric)
+//        }
+//      }
+      filteredMetrics
+    })
+    filteredAppMetricStream.print()
+
+    filteredAppMetricStream.foreachRDD(rdd => {
+      rdd.foreach(item => {
+        val timelineMetrics : TimelineMetrics = item
+        LOG.info("Received Metric : " + timelineMetrics.getMetrics.get(0).getMetricName)
+//        for (timelineMetric <- timelineMetrics.getMetrics) {
+//          var anomalies = emaModel.test(timelineMetric)
+//          anomalyMetricPublisher.publish(anomalies)
+//        }
+      })
+    })
+
+    streamingContext.start()
+    streamingContext.awaitTermination()
+
+  }
+}

+ 116 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala

@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import org.apache.ambari.metrics.adservice.configuration.DetectionServiceConfiguration
+import org.apache.ambari.metrics.adservice.detection.{SparkApplicationRunner, Subsystem}
+import org.apache.spark.launcher.SparkAppHandle
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.google.inject.Singleton
+
+@Singleton
+class PointInTimeSubsystem extends Subsystem{
+
+  val LOG : Logger = LoggerFactory.getLogger(classOf[PointInTimeSubsystem])
+
+  val appHandleMap: scala.collection.mutable.Map[String, SparkAppHandle] = scala.collection.mutable.Map()
+  var applicationRunner: SparkApplicationRunner = _
+  var config: DetectionServiceConfiguration = _
+
+  //EMA Stuff
+  val emaDriverClass = "org.apache.ambari.metrics.adservice.detection.pointintime.EmaSparkDriver"
+  val emaAppName = "Ema_Spark_Application"
+  val emaConfig: scala.collection.mutable.MutableList[String] = scala.collection.mutable.MutableList()
+
+  def this(config: DetectionServiceConfiguration, applicationRunner: SparkApplicationRunner) = {
+    this
+    this.applicationRunner = applicationRunner
+    this.config = config
+
+    //Initialize
+    initializeConfigs()
+  }
+
+  override def start(): Unit = {
+
+    LOG.info("Starting Point in Time AD jobs...")
+
+    if (!appHandleMap.contains(emaAppName) || !applicationRunner.isRunning(appHandleMap.apply(emaAppName))) {
+      LOG.info("Starting " + emaAppName)
+
+      if (config.getKafkaServers == null || config.getKafkaServers.isEmpty) {
+        LOG.error("Cannot run" + emaAppName + " without Kafka Servers config")
+      } else {
+        val args : scala.collection.mutable.MutableList[String] = scala.collection.mutable.MutableList()
+        val emaHandle: SparkAppHandle = applicationRunner.runSparkJob(emaAppName, emaDriverClass, emaConfig)
+        appHandleMap(emaAppName) = emaHandle
+
+        Thread.sleep(3000)
+
+        if (applicationRunner.isRunning(emaHandle)) {
+          LOG.info(emaAppName + " successfully started.")
+        } else {
+          LOG.error(emaAppName + " start failed.")
+        }
+      }
+    } else {
+      LOG.info(emaAppName + " already running. Moving ahead.")
+    }
+
+  }
+
+  override def stop(): Unit = {
+
+    LOG.info("Stopping Point in Time AD jobs...")
+    for ((app, handle) <- appHandleMap) {
+      handle.stop()
+    }
+    //Sleep for 3 seconds
+    Thread.sleep(3000)
+
+    for ((app, handle) <- appHandleMap) {
+      if (!applicationRunner.isRunning(handle)) {
+        LOG.info(app + " successfully stopped.")
+      }
+    }
+  }
+
+  override def state: Map[String, String] = {
+    val stateMap: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map()
+    for ((app, handle) <- appHandleMap) {
+      if (handle == null) {
+        stateMap(app) = null
+      } else {
+        stateMap(app) = handle.getState.toString
+      }
+    }
+    stateMap.toMap
+  }
+
+
+  private def initializeConfigs(): Unit = {
+    //EMA Configs
+    emaConfig.+=(config.getEmaW)
+    emaConfig.+=(config.getEmaN)
+    emaConfig.+=(config.getKafkaServers)
+    emaConfig.+=(config.getKafkaTopic)
+    emaConfig.+=(config.getKafkaConsumerGroup)
+  }
+
+}

+ 0 - 20
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala

@@ -22,26 +22,6 @@ import javax.xml.bind.annotation.XmlRootElement
 import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinitionType.MetricSourceDefinitionType
 import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
 
-/*
-{
- "definition-name": "host-memory",
- "app-id" : "HOST",
- "hosts" : [“c6401.ambari.apache.org”],
- "metric-definitions" : [
-   {
-       "metric-name": "mem_free",
-       "metric-description" : "Free memory on a Host.",
-       "troubleshooting-info" : "Sudden drop / hike in free memory on a host.",
-       "static-threshold" : 10,
-       “app-id” : “HOST”
-}   ],
-
- "related-definition-names" : ["host-cpu", “host-network”],
- “anomaly-detection-subsystems” : [“point-in-time”, “trend”]
-}
-*/
-
-
 @SerialVersionUID(10001L)
 @XmlRootElement
 class MetricSourceDefinition extends Serializable{

+ 40 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala

@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.resource
+
+import javax.ws.rs.core.MediaType.APPLICATION_JSON
+import javax.ws.rs.core.Response
+import javax.ws.rs.{GET, Path, Produces}
+
+import org.apache.ambari.metrics.adservice.service.DetectionService
+
+import com.google.inject.Inject
+
+@Path("/detection")
+class DetectionResource {
+
+  @Inject
+  var detectionService: DetectionService = _
+
+  @GET
+  @Produces(Array(APPLICATION_JSON))
+  @Path("/state")
+  def getState: Response = {
+    Response.ok.entity(detectionService.state()).build()
+  }
+}

+ 2 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala

@@ -21,7 +21,8 @@ import javax.ws.rs._
 import javax.ws.rs.core.MediaType.APPLICATION_JSON
 import javax.ws.rs.core.Response
 
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey, MetricSourceDefinition}
+import org.apache.ambari.metrics.adservice.metadata.{MetricKey, MetricSourceDefinition}
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionService
 import org.apache.commons.lang.StringUtils
 
 import com.google.inject.Inject

+ 26 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala

@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+import io.dropwizard.lifecycle.Managed
+
+trait DetectionService extends AbstractADService with Managed{
+
+  def state() : Map[String, Map[String, String]]
+
+}

+ 56 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala

@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.detection.AdJobManager
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.google.inject.{Inject, Singleton}
+
+@Singleton
+class DetectionServiceImpl extends DetectionService{
+
+  val LOG : Logger = LoggerFactory.getLogger(classOf[DetectionServiceImpl])
+
+  var adJobManager: AdJobManager = _
+  var config : AnomalyDetectionAppConfig = _
+
+  @Inject
+  def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = {
+    this ()
+    this.config = anomalyDetectionAppConfig
+  }
+
+  override def initialize(): Unit = {
+    this.adJobManager = new AdJobManager(config)
+    adJobManager.startAdJobs()
+  }
+
+  override def state(): Map[String, Map[String, String]] = {
+    adJobManager.state()
+  }
+
+  override def start(): Unit = {
+  }
+
+  override def stop(): Unit = {
+    LOG.info("Stop Detection Service hook invoked. Stopping AD Jobs")
+    adJobManager.stopAdJobs()
+  }
+}

+ 8 - 2
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala → ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala

@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.metrics.adservice.metadata
+package org.apache.ambari.metrics.adservice.service
 
-import org.apache.ambari.metrics.adservice.service.AbstractADService
+import org.apache.ambari.metrics.adservice.metadata.{MetricKey, MetricSourceDefinition}
 
 trait MetricDefinitionService extends AbstractADService{
 
@@ -75,4 +75,10 @@ trait MetricDefinitionService extends AbstractADService{
     */
   def getMetricKeys:  Map[String, Set[MetricKey]]
 
+  /**
+    * Return the set of metric keys.
+    * @return Set of metric keys.
+    */
+  def getMetricKeyList: Set[MetricKey]
+
 }

+ 11 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala → ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala

@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.metrics.adservice.metadata
+package org.apache.ambari.metrics.adservice.service
 
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+import org.apache.ambari.metrics.adservice.metadata._
 import org.slf4j.{Logger, LoggerFactory}
 
 import com.google.inject.{Inject, Singleton}
@@ -239,4 +240,13 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
     }
     metricKeyMap.toMap
   }
+
+  /**
+    * Return the set of metric keys.
+    *
+    * @return Set of metric keys.
+    */
+  override def getMetricKeyList: Set[MetricKey] = {
+    metricKeys.toSet
+  }
 }

+ 11 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml

@@ -32,4 +32,14 @@ metricDefinitionDB:
 
 spark:
   mode: standalone
-  masterHostPort: localhost:7077
+  master: spark://localhost:7077
+  sparkHome: /usr/lib/ambari-metrics-anomaly-detection/spark
+  jarfile: /usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar
+
+detection:
+  trend: false
+  kafkaServers: localhost:6667
+  kafkaTopic: ambari-metrics-ad
+  kafkaConsumerGroup: ambari-metrics-ad-group
+  ema-w: 0.9
+  ema-n: 3

+ 4 - 1
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala

@@ -60,7 +60,10 @@ class AnomalyDetectionAppConfigTest extends FunSuite {
     assert(!config.getMetricDefinitionDBConfiguration.getPerformParanoidChecks)
 
     assert(config.getSparkConfiguration.getMode.equals("standalone"))
-    assert(config.getSparkConfiguration.getMasterHostPort.equals("localhost:7077"))
+    assert(config.getSparkConfiguration.getMaster.equals("spark://localhost:7077"))
+
+    assert(config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled)
+    assert(!config.getDetectionServiceConfiguration.isTrendSubsystemEnabled)
 
   }
 

+ 1 - 0
ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala

@@ -20,6 +20,7 @@ package org.apache.ambari.metrics.adservice.metadata
 
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionServiceImpl
 import org.easymock.EasyMock.{anyObject, expect, expectLastCall, replay}
 import org.scalatest.FunSuite
 import org.scalatest.easymock.EasyMockSugar

+ 11 - 1
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml

@@ -105,7 +105,17 @@
 
       spark:
         mode: {{admanager_spark_op_mode}}
-        masterHostPort: {{admanager_spark_hostport}}
+        master: {{admanager_spark_master}}
+        sparkHome: {{ams_admanager_lib_dir}}/spark
+        jarfile: {{ams_admanager_lib_dir}}/ambari-metrics-anomaly-detection-service.jar
+
+      detection
+        trend: false
+        kafkaServers: localhost:6667
+        kafkaTopic: ambari-metrics-ad
+        kafkaConsumerGroup: ambari-metrics-ad-group
+        ema-w: 0.9
+        ema-n: 3
     </value>
     <value-attributes>
       <type>content</type>

+ 2 - 2
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py

@@ -215,10 +215,10 @@ spark_worker_webui_port = default("/configurations/ams-admanager-spark-env/spark
 spark_daemon_memory = default("/configurations/ams-admanager-spark-env/spark_daemon_memory", 1024)
 
 if admanager_spark_op_mode == 'spark-on-yarn':
-  admanager_spark_hostport = hostname + ":" + spark_master_port #TODO : Fix for spark on yarn mode.
+  admanager_spark_master = "spark://" + hostname + ":" + spark_master_port #TODO : Fix for spark on yarn mode.
   ams_ad_standalone_spark_enabled = False
 else:
-  admanager_spark_hostport = hostname + ":" + spark_master_port
+  admanager_spark_master = "spark://" + hostname + ":" + spark_master_port
   ams_ad_standalone_spark_enabled = True
 
 if (('ams-admanager-log4j' in config['configurations']) and ('content' in config['configurations']['ams-admanager-log4j'])):