|
@@ -72,8 +72,8 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
private final Map<String, MetricsSinkAdapter> sinks;
|
|
|
private final List<Callback> callbacks;
|
|
|
private final MetricsBuilderImpl metricsBuilder;
|
|
|
- private final MetricMutableStat sampleStat =
|
|
|
- new MetricMutableStat("sample", "sampling stats", "ops", "time", true);
|
|
|
+ private final MetricMutableStat snapshotStat =
|
|
|
+ new MetricMutableStat("snapshot", "snapshot stats", "ops", "time", true);
|
|
|
private final MetricMutableStat publishStat =
|
|
|
new MetricMutableStat("publish", "publishing stats", "ops", "time", true);
|
|
|
private final MetricMutableCounterLong dropStat =
|
|
@@ -175,8 +175,8 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized <T extends MetricsSource> T register(final String name,
|
|
|
- final String desc, final T source) {
|
|
|
+ public synchronized <T extends MetricsSource>
|
|
|
+ T register(final String name, final String desc, final T source) {
|
|
|
if (monitoring) {
|
|
|
registerSource(name, desc, source);
|
|
|
}
|
|
@@ -189,7 +189,6 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
}
|
|
|
|
|
|
});
|
|
|
- LOG.debug("Registered source "+ name);
|
|
|
return source;
|
|
|
}
|
|
|
|
|
@@ -209,11 +208,12 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
injectedTags, period, config.subset(SOURCE_KEY));
|
|
|
sources.put(name, sa);
|
|
|
sa.start();
|
|
|
+ LOG.debug("Registered source "+ name);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized <T extends MetricsSink> T register(final String name,
|
|
|
- final String description, final T sink) {
|
|
|
+ public synchronized <T extends MetricsSink>
|
|
|
+ T register(final String name, final String description, final T sink) {
|
|
|
if (config != null) {
|
|
|
registerSink(name, description, sink);
|
|
|
}
|
|
@@ -226,7 +226,6 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
}
|
|
|
|
|
|
});
|
|
|
- LOG.debug("Registered sink "+ name);
|
|
|
return sink;
|
|
|
}
|
|
|
|
|
@@ -243,6 +242,7 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
: newSink(name, desc, sink, config.subset(SINK_KEY));
|
|
|
sinks.put(name, sa);
|
|
|
sa.start();
|
|
|
+ LOG.debug("Registered sink "+ name);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -265,8 +265,8 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
|
|
|
@Override
|
|
|
public synchronized void refreshMBeans() {
|
|
|
- for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
|
|
|
- entry.getValue().refreshMBean();
|
|
|
+ for (MetricsSourceAdapter sa : sources.values()) {
|
|
|
+ sa.refreshMBean();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -300,43 +300,43 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
}
|
|
|
}
|
|
|
}, millis, millis);
|
|
|
- LOG.info("Scheduled sampling period at "+ period +" second(s).");
|
|
|
+ LOG.info("Scheduled snapshot period at "+ period +" second(s).");
|
|
|
}
|
|
|
|
|
|
synchronized void onTimerEvent() {
|
|
|
logicalTime += period;
|
|
|
if (sinks.size() > 0) {
|
|
|
- publishMetrics(sampleMetrics());
|
|
|
+ publishMetrics(snapshotMetrics());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Sample all the sources for a snapshot of metrics/tags
|
|
|
+ * snapshot all the sources for a snapshot of metrics/tags
|
|
|
* @return the metrics buffer containing the snapshot
|
|
|
*/
|
|
|
- synchronized MetricsBuffer sampleMetrics() {
|
|
|
+ synchronized MetricsBuffer snapshotMetrics() {
|
|
|
metricsBuilder.clear();
|
|
|
MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
|
|
|
|
|
|
for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
|
|
|
if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
|
|
|
- sampleMetrics(entry.getValue(), bufferBuilder);
|
|
|
+ snapshotMetrics(entry.getValue(), bufferBuilder);
|
|
|
}
|
|
|
}
|
|
|
if (publishSelfMetrics) {
|
|
|
- sampleMetrics(sysSource, bufferBuilder);
|
|
|
+ snapshotMetrics(sysSource, bufferBuilder);
|
|
|
}
|
|
|
MetricsBuffer buffer = bufferBuilder.get();
|
|
|
return buffer;
|
|
|
}
|
|
|
|
|
|
- private void sampleMetrics(MetricsSourceAdapter sa,
|
|
|
- MetricsBufferBuilder bufferBuilder) {
|
|
|
+ private void snapshotMetrics(MetricsSourceAdapter sa,
|
|
|
+ MetricsBufferBuilder bufferBuilder) {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
bufferBuilder.add(sa.name(), sa.getMetrics(metricsBuilder, false));
|
|
|
metricsBuilder.clear();
|
|
|
- sampleStat.add(System.currentTimeMillis() - startTime);
|
|
|
- LOG.debug("Sampled source "+ sa.name());
|
|
|
+ snapshotStat.add(System.currentTimeMillis() - startTime);
|
|
|
+ LOG.debug("Snapshotted source "+ sa.name());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -345,9 +345,9 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
*/
|
|
|
synchronized void publishMetrics(MetricsBuffer buffer) {
|
|
|
int dropped = 0;
|
|
|
- for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
|
|
|
+ for (MetricsSinkAdapter sa : sinks.values()) {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
- dropped += entry.getValue().putMetrics(buffer, logicalTime) ? 0 : 1;
|
|
|
+ dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
|
|
|
publishStat.add(System.currentTimeMillis() - startTime);
|
|
|
}
|
|
|
dropStat.incr(dropped);
|
|
@@ -369,6 +369,7 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
sa.source().getClass().getName() +")");
|
|
|
sa.stop();
|
|
|
}
|
|
|
+ sysSource.stop();
|
|
|
sources.clear();
|
|
|
}
|
|
|
|
|
@@ -395,26 +396,23 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
}
|
|
|
|
|
|
private synchronized void configureSinks() {
|
|
|
- Map<String, MetricsConfig> confs = config.getInstanceConfigs(SINK_KEY);
|
|
|
+ sinkConfigs = config.getInstanceConfigs(SINK_KEY);
|
|
|
int confPeriod = 0;
|
|
|
- for (Entry<String, MetricsConfig> entry : confs.entrySet()) {
|
|
|
+ for (Entry<String, MetricsConfig> entry : sinkConfigs.entrySet()) {
|
|
|
MetricsConfig conf = entry.getValue();
|
|
|
int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
|
|
|
confPeriod = confPeriod == 0 ? sinkPeriod
|
|
|
: MathUtils.gcd(confPeriod, sinkPeriod);
|
|
|
- String sinkName = conf.getString(NAME_KEY);
|
|
|
- if (sinkName != null && !sinkName.isEmpty()) {
|
|
|
- // named config is for internally registered sinks
|
|
|
- sinkConfigs.put(sinkName, conf);
|
|
|
- }
|
|
|
- else {
|
|
|
- sinkName = "sink"+ entry.getKey();
|
|
|
- }
|
|
|
+ String sinkName = entry.getKey();
|
|
|
+ LOG.debug("sink "+ sinkName +" config:\n"+ conf);
|
|
|
try {
|
|
|
MetricsSinkAdapter sa = newSink(sinkName,
|
|
|
conf.getString(DESC_KEY, sinkName), conf);
|
|
|
- sa.start();
|
|
|
- sinks.put(sinkName, sa);
|
|
|
+ // we allow config of later registered sinks
|
|
|
+ if (sa != null) {
|
|
|
+ sa.start();
|
|
|
+ sinks.put(sinkName, sa);
|
|
|
+ }
|
|
|
}
|
|
|
catch (Exception e) {
|
|
|
LOG.warn("Error creating "+ sinkName, e);
|
|
@@ -427,9 +425,9 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
static MetricsSinkAdapter newSink(String name, String desc, MetricsSink sink,
|
|
|
MetricsConfig conf) {
|
|
|
return new MetricsSinkAdapter(name, desc, sink, conf.getString(CONTEXT_KEY),
|
|
|
- (MetricsFilter) conf.getPlugin(SOURCE_FILTER_KEY),
|
|
|
- (MetricsFilter) conf.getPlugin(RECORD_FILTER_KEY),
|
|
|
- (MetricsFilter) conf.getPlugin(METRIC_FILTER_KEY),
|
|
|
+ conf.getFilter(SOURCE_FILTER_KEY),
|
|
|
+ conf.getFilter(RECORD_FILTER_KEY),
|
|
|
+ conf.getFilter(METRIC_FILTER_KEY),
|
|
|
conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
|
|
|
conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
|
|
|
conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
|
|
@@ -439,12 +437,13 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
|
|
|
static MetricsSinkAdapter newSink(String name, String desc,
|
|
|
MetricsConfig conf) {
|
|
|
- return newSink(name, desc, (MetricsSink) conf.getPlugin(""), conf);
|
|
|
+ MetricsSink sink = conf.getPlugin("");
|
|
|
+ if (sink == null) return null;
|
|
|
+ return newSink(name, desc, sink, conf);
|
|
|
}
|
|
|
|
|
|
private void configureSources() {
|
|
|
- sourceFilter =
|
|
|
- (MetricsFilter) config.getPlugin(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
|
|
|
+ sourceFilter = config.getFilter(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
|
|
|
Map<String, MetricsConfig> confs = config.getInstanceConfigs(SOURCE_KEY);
|
|
|
for (Entry<String, MetricsConfig> entry : confs.entrySet()) {
|
|
|
sourceConfigs.put(entry.getKey(), entry.getValue());
|
|
@@ -484,11 +483,11 @@ public class MetricsSystemImpl implements MetricsSystem {
|
|
|
.addGauge(NUM_SOURCES_KEY, NUM_SOURCES_DESC, numSources)
|
|
|
.addGauge(NUM_SINKS_KEY, NUM_SINKS_DESC, numSinks);
|
|
|
synchronized(MetricsSystemImpl.this) {
|
|
|
- for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
|
|
|
- entry.getValue().sample(rb, all);
|
|
|
+ for (MetricsSinkAdapter sa : sinks.values()) {
|
|
|
+ sa.snapshot(rb, all);
|
|
|
}
|
|
|
}
|
|
|
- sampleStat.snapshot(rb, all);
|
|
|
+ snapshotStat.snapshot(rb, all);
|
|
|
publishStat.snapshot(rb, all);
|
|
|
dropStat.snapshot(rb, all);
|
|
|
}
|