Selaa lähdekoodia

AMBARI-19661. Kafka Brokers go down after Wire Encryption. (dsen via swagle)

Siddharth Wagle 8 vuotta sitten
vanhempi
commit
501492c44a

+ 15 - 11
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -60,17 +60,21 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
 
   private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class);
 
-  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval";
-  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize";
-  private static final String TIMELINE_HOSTS_PROPERTY = "kafka.timeline.metrics.hosts";
-  private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
-  private static final String TIMELINE_PROTOCOL_PROPERTY = "kafka.timeline.metrics.protocol";
-  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
-  private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
-  private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
+  private static final String TIMELINE_METRICS_KAFKA_PREFIX = "kafka.timeline.metrics.";
+  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "sendInterval";
+  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "maxRowCacheSize";
+  private static final String TIMELINE_HOSTS_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "hosts";
+  private static final String TIMELINE_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "port";
+  private static final String TIMELINE_PROTOCOL_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "protocol";
+  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "reporter.enabled";
+  private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY;
+  private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY;
+  private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
+  private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
+  private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
 
   private volatile boolean initialized = false;
   private boolean running = false;
@@ -159,9 +163,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
         if (metricCollectorProtocol.contains("https")) {
-          String trustStorePath = props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
-          String trustStoreType = props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
-          String trustStorePwd = props.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+          String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
+          String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+          String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
           loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
         }
 

+ 26 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java

@@ -99,6 +99,32 @@ public class KafkaTimelineMetricsReporterTest {
     verifyAll();
   }
 
+  @Test
+  public void testReporterStartStopHttps() {
+    mockStatic(Metrics.class);
+    EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter);
+    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
+    replay(Metrics.class, timelineMetricsCache);
+
+    Properties properties = new Properties();
+    properties.setProperty("zookeeper.connect", "localhost:2181");
+    properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
+    properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
+    properties.setProperty("kafka.timeline.metrics.hosts", "localhost:6188");
+    properties.setProperty("kafka.timeline.metrics.port", "6188");
+    properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
+    properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
+    properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+    properties.setProperty("kafka.timeline.metrics.protocol", "https");
+    properties.setProperty("kafka.timeline.metrics.truststore.path", "");
+    properties.setProperty("kafka.timeline.metrics.truststore.type", "");
+    properties.setProperty("kafka.timeline.metrics.truststore.password", "");
+    kafkaTimelineMetricsReporter.init(new VerifiableProperties(properties));
+    kafkaTimelineMetricsReporter.stopReporter();
+    verifyAll();
+  }
+
   @Test
   public void testMetricsExclusionPolicy() throws Exception {
     mockStatic(Metrics.class);