|
@@ -61,7 +61,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
|
|
|
|
private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval";
|
|
|
private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize";
|
|
|
- private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host";
|
|
|
+ private static final String TIMELINE_HOSTS_PROPERTY = "kafka.timeline.metrics.hosts";
|
|
|
private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
|
|
|
private static final String TIMELINE_PROTOCOL_PROPERTY = "kafka.timeline.metrics.protocol";
|
|
|
private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
|
|
@@ -74,7 +74,6 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
|
private volatile boolean initialized = false;
|
|
|
private boolean running = false;
|
|
|
private final Object lock = new Object();
|
|
|
- private String collectorUri;
|
|
|
private String hostname;
|
|
|
private String metricCollectorPort;
|
|
|
private String collectors;
|
|
@@ -147,17 +146,12 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
|
int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT);
|
|
|
|
|
|
zookeeperQuorum = props.getString("zookeeper.connect");
|
|
|
- collectors = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
|
|
|
- metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
|
|
|
-
|
|
|
- String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
|
|
|
metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
|
|
|
+ collectors = props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST + ":" + metricCollectorPort);
|
|
|
+ metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
|
|
|
|
|
|
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
|
|
|
|
|
|
- collectorUri = constructTimelineMetricUri(metricCollectorProtocol,
|
|
|
- metricCollectorHost, metricCollectorPort);
|
|
|
-
|
|
|
if (metricCollectorProtocol.contains("https")) {
|
|
|
String trustStorePath = props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
|
|
|
String trustStoreType = props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
|
|
@@ -181,7 +175,6 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
|
startReporter(metricsConfig.pollingIntervalSecs());
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("CollectorUri = " + collectorUri);
|
|
|
LOG.debug("MetricsSendInterval = " + metricsSendInterval);
|
|
|
LOG.debug("MaxRowCacheSize = " + maxRowCacheSize);
|
|
|
LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr);
|