|
@@ -0,0 +1,565 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.metrics2.impl;
|
|
|
+
|
|
|
+import java.io.StringWriter;
|
|
|
+import java.lang.reflect.InvocationHandler;
|
|
|
+import java.lang.reflect.Method;
|
|
|
+import java.lang.reflect.Proxy;
|
|
|
+import java.net.InetAddress;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
+import java.util.Timer;
|
|
|
+import java.util.TimerTask;
|
|
|
+import javax.management.ObjectName;
|
|
|
+
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
+import java.util.Locale;
|
|
|
+import static com.google.common.base.Preconditions.*;
|
|
|
+
|
|
|
+import org.apache.commons.configuration.PropertiesConfiguration;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.math.util.MathUtils;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.metrics2.MetricsInfo;
|
|
|
+import org.apache.hadoop.metrics2.MetricsCollector;
|
|
|
+import org.apache.hadoop.metrics2.MetricsException;
|
|
|
+import org.apache.hadoop.metrics2.MetricsFilter;
|
|
|
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSink;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSource;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.MetricsTag;
|
|
|
+import org.apache.hadoop.metrics2.annotation.Metric;
|
|
|
+import org.apache.hadoop.metrics2.annotation.Metrics;
|
|
|
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
|
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.lib.Interns;
|
|
|
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
|
|
|
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
|
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
|
|
|
+import org.apache.hadoop.metrics2.lib.MutableStat;
|
|
|
+import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
+
|
|
|
+/**
|
|
|
+ * A base class for metrics system singletons
|
|
|
+ */
|
|
|
+@InterfaceAudience.Private
|
|
|
+@Metrics(context="metricssystem")
|
|
|
+public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
|
|
|
+
|
|
|
+ static final Log LOG = LogFactory.getLog(MetricsSystemImpl.class);
|
|
|
+ static final String MS_NAME = "MetricsSystem";
|
|
|
+ static final String MS_STATS_NAME = MS_NAME +",sub=Stats";
|
|
|
+ static final String MS_STATS_DESC = "Metrics system metrics";
|
|
|
+ static final String MS_CONTROL_NAME = MS_NAME +",sub=Control";
|
|
|
+ static final String MS_INIT_MODE_KEY = "hadoop.metrics.init.mode";
|
|
|
+
|
|
|
+ enum InitMode { NORMAL, STANDBY }
|
|
|
+
|
|
|
+ private final Map<String, MetricsSourceAdapter> sources;
|
|
|
+ private final Map<String, MetricsSource> allSources;
|
|
|
+ private final Map<String, MetricsSinkAdapter> sinks;
|
|
|
+ private final Map<String, MetricsSink> allSinks;
|
|
|
+ private final List<Callback> callbacks;
|
|
|
+ private final MetricsCollectorImpl collector;
|
|
|
+ private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
|
|
|
+ @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
|
|
|
+ @Metric({"Publish", "Publishing stats"}) MutableStat publishStat;
|
|
|
+ @Metric("Dropped updates by all sinks") MutableCounterLong droppedPubAll;
|
|
|
+
|
|
|
+ private final List<MetricsTag> injectedTags;
|
|
|
+
|
|
|
+ // Things that are changed by init()/start()/stop()
|
|
|
+ private String prefix;
|
|
|
+ private MetricsFilter sourceFilter;
|
|
|
+ private MetricsConfig config;
|
|
|
+ private Map<String, MetricsConfig> sourceConfigs, sinkConfigs;
|
|
|
+ private boolean monitoring = false;
|
|
|
+ private Timer timer;
|
|
|
+ private int period; // seconds
|
|
|
+ private long logicalTime; // number of timer invocations * period
|
|
|
+ private ObjectName mbeanName;
|
|
|
+ private boolean publishSelfMetrics = true;
|
|
|
+ private MetricsSourceAdapter sysSource;
|
|
|
+ private int refCount = 0; // for mini cluster mode
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Construct the metrics system
|
|
|
+ * @param prefix for the system
|
|
|
+ */
|
|
|
+ public MetricsSystemImpl(String prefix) {
|
|
|
+ this.prefix = prefix;
|
|
|
+ allSources = Maps.newHashMap();
|
|
|
+ sources = Maps.newLinkedHashMap();
|
|
|
+ allSinks = Maps.newHashMap();
|
|
|
+ sinks = Maps.newLinkedHashMap();
|
|
|
+ sourceConfigs = Maps.newHashMap();
|
|
|
+ sinkConfigs = Maps.newHashMap();
|
|
|
+ callbacks = Lists.newArrayList();
|
|
|
+ injectedTags = Lists.newArrayList();
|
|
|
+ collector = new MetricsCollectorImpl();
|
|
|
+ if (prefix != null) {
|
|
|
+ // prefix could be null for default ctor, which requires init later
|
|
|
+ initSystemMBean();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Construct the system but not initializing (read config etc.) it.
|
|
|
+ */
|
|
|
+ public MetricsSystemImpl() {
|
|
|
+ this(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialized the metrics system with a prefix.
|
|
|
+ * @param prefix the system will look for configs with the prefix
|
|
|
+ * @return the metrics system object itself
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public synchronized MetricsSystem init(String prefix) {
|
|
|
+ if (monitoring && !DefaultMetricsSystem.inMiniClusterMode()) {
|
|
|
+ LOG.warn(this.prefix +" metrics system already initialized!");
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+ this.prefix = checkNotNull(prefix, "prefix");
|
|
|
+ ++refCount;
|
|
|
+ if (monitoring) {
|
|
|
+ // in mini cluster mode
|
|
|
+ LOG.info(this.prefix +" metrics system started (again)");
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+ switch (initMode()) {
|
|
|
+ case NORMAL:
|
|
|
+ try { start(); }
|
|
|
+ catch (MetricsConfigException e) {
|
|
|
+ // Usually because hadoop-metrics2.properties is missing
|
|
|
+ // We can always start the metrics system later via JMX.
|
|
|
+ LOG.warn("Metrics system not started: "+ e.getMessage());
|
|
|
+ LOG.debug("Stacktrace: ", e);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case STANDBY:
|
|
|
+ LOG.info(prefix +" metrics system started in standby mode");
|
|
|
+ }
|
|
|
+ initSystemMBean();
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void start() {
|
|
|
+ checkNotNull(prefix, "prefix");
|
|
|
+ if (monitoring) {
|
|
|
+ LOG.warn(prefix +" metrics system already started!",
|
|
|
+ new MetricsException("Illegal start"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ for (Callback cb : callbacks) cb.preStart();
|
|
|
+ configure(prefix);
|
|
|
+ startTimer();
|
|
|
+ monitoring = true;
|
|
|
+ LOG.info(prefix +" metrics system started");
|
|
|
+ for (Callback cb : callbacks) cb.postStart();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void stop() {
|
|
|
+ if (!monitoring && !DefaultMetricsSystem.inMiniClusterMode()) {
|
|
|
+ LOG.warn(prefix +" metrics system not yet started!",
|
|
|
+ new MetricsException("Illegal stop"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!monitoring) {
|
|
|
+ // in mini cluster mode
|
|
|
+ LOG.info(prefix +" metrics system stopped (again)");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ for (Callback cb : callbacks) cb.preStop();
|
|
|
+ LOG.info("Stopping "+ prefix +" metrics system...");
|
|
|
+ stopTimer();
|
|
|
+ stopSources();
|
|
|
+ stopSinks();
|
|
|
+ clearConfigs();
|
|
|
+ monitoring = false;
|
|
|
+ LOG.info(prefix +" metrics system stopped.");
|
|
|
+ for (Callback cb : callbacks) cb.postStop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override public synchronized <T>
|
|
|
+ T register(String name, String desc, T source) {
|
|
|
+ MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
|
|
|
+ final MetricsSource s = sb.build();
|
|
|
+ MetricsInfo si = sb.info();
|
|
|
+ String name2 = name == null ? si.name() : name;
|
|
|
+ final String finalDesc = desc == null ? si.description() : desc;
|
|
|
+ final String finalName = DefaultMetricsSystem.sourceName(name2);
|
|
|
+ checkArgument(!allSources.containsKey(finalName));
|
|
|
+ allSources.put(finalName, s);
|
|
|
+ LOG.debug(finalName +", "+ finalDesc);
|
|
|
+ if (monitoring) {
|
|
|
+ registerSource(finalName, finalDesc, s);
|
|
|
+ }
|
|
|
+ // We want to re-register the source to pick up new config when the
|
|
|
+ // metrics system restarts.
|
|
|
+ register(new AbstractCallback() {
|
|
|
+ @Override public void postStart() {
|
|
|
+ registerSource(finalName, finalDesc, s);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return source;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized
|
|
|
+ void registerSource(String name, String desc, MetricsSource source) {
|
|
|
+ checkNotNull(config, "config");
|
|
|
+ MetricsConfig conf = sourceConfigs.get(name);
|
|
|
+ MetricsSourceAdapter sa = conf != null
|
|
|
+ ? new MetricsSourceAdapter(prefix, name, desc, source,
|
|
|
+ injectedTags, period, conf)
|
|
|
+ : new MetricsSourceAdapter(prefix, name, desc, source,
|
|
|
+ injectedTags, period, config.subset(SOURCE_KEY));
|
|
|
+ sources.put(name, sa);
|
|
|
+ sa.start();
|
|
|
+ LOG.info("Registered source "+ name);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override public synchronized <T extends MetricsSink>
|
|
|
+ T register(final String name, final String description, final T sink) {
|
|
|
+ LOG.debug(name +", "+ description);
|
|
|
+ if (allSinks.containsKey(name)) {
|
|
|
+ LOG.warn("Sink "+ name +" already exists!");
|
|
|
+ return sink;
|
|
|
+ }
|
|
|
+ allSinks.put(name, sink);
|
|
|
+ if (config != null) {
|
|
|
+ registerSink(name, description, sink);
|
|
|
+ }
|
|
|
+ // We want to re-register the sink to pick up new config
|
|
|
+ // when the metrics system restarts.
|
|
|
+ register(new AbstractCallback() {
|
|
|
+ @Override public void postStart() {
|
|
|
+ register(name, description, sink);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return sink;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void registerSink(String name, String desc, MetricsSink sink) {
|
|
|
+ checkNotNull(config, "config");
|
|
|
+ MetricsConfig conf = sinkConfigs.get(name);
|
|
|
+ MetricsSinkAdapter sa = conf != null
|
|
|
+ ? newSink(name, desc, sink, conf)
|
|
|
+ : newSink(name, desc, sink, config.subset(SINK_KEY));
|
|
|
+ sinks.put(name, sa);
|
|
|
+ sa.start();
|
|
|
+ LOG.info("Registered sink "+ name);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void register(final Callback callback) {
|
|
|
+ callbacks.add((Callback) Proxy.newProxyInstance(
|
|
|
+ callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
|
|
|
+ new InvocationHandler() {
|
|
|
+ public Object invoke(Object proxy, Method method, Object[] args)
|
|
|
+ throws Throwable {
|
|
|
+ try {
|
|
|
+ return method.invoke(callback, args);
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ // These are not considered fatal.
|
|
|
+ LOG.warn("Caught exception in callback "+ method.getName(), e);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void startMetricsMBeans() {
|
|
|
+ for (MetricsSourceAdapter sa : sources.values()) {
|
|
|
+ sa.startMBeans();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void stopMetricsMBeans() {
|
|
|
+ for (MetricsSourceAdapter sa : sources.values()) {
|
|
|
+ sa.stopMBeans();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized String currentConfig() {
|
|
|
+ PropertiesConfiguration saver = new PropertiesConfiguration();
|
|
|
+ StringWriter writer = new StringWriter();
|
|
|
+ saver.copy(config);
|
|
|
+ try { saver.save(writer); }
|
|
|
+ catch (Exception e) {
|
|
|
+ throw new MetricsConfigException("Error stringify config", e);
|
|
|
+ }
|
|
|
+ return writer.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void startTimer() {
|
|
|
+ if (timer != null) {
|
|
|
+ LOG.warn(prefix +" metrics system timer already started!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ logicalTime = 0;
|
|
|
+ long millis = period * 1000;
|
|
|
+ timer = new Timer("Timer for '"+ prefix +"' metrics system", true);
|
|
|
+ timer.scheduleAtFixedRate(new TimerTask() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ onTimerEvent();
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ LOG.warn(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, millis, millis);
|
|
|
+ LOG.info("Scheduled snapshot period at "+ period +" second(s).");
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void onTimerEvent() {
|
|
|
+ logicalTime += period;
|
|
|
+ if (sinks.size() > 0) {
|
|
|
+ publishMetrics(sampleMetrics());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sample all the sources for a snapshot of metrics/tags
|
|
|
+ * @return the metrics buffer containing the snapshot
|
|
|
+ */
|
|
|
+ synchronized MetricsBuffer sampleMetrics() {
|
|
|
+ collector.clear();
|
|
|
+ MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
|
|
|
+
|
|
|
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
|
|
|
+ if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
|
|
|
+ snapshotMetrics(entry.getValue(), bufferBuilder);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (publishSelfMetrics) {
|
|
|
+ snapshotMetrics(sysSource, bufferBuilder);
|
|
|
+ }
|
|
|
+ MetricsBuffer buffer = bufferBuilder.get();
|
|
|
+ return buffer;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void snapshotMetrics(MetricsSourceAdapter sa,
|
|
|
+ MetricsBufferBuilder bufferBuilder) {
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
|
|
|
+ collector.clear();
|
|
|
+ snapshotStat.add(System.currentTimeMillis() - startTime);
|
|
|
+ LOG.debug("Snapshotted source "+ sa.name());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Publish a metrics snapshot to all the sinks
|
|
|
+ * @param buffer the metrics snapshot to publish
|
|
|
+ */
|
|
|
+ synchronized void publishMetrics(MetricsBuffer buffer) {
|
|
|
+ int dropped = 0;
|
|
|
+ for (MetricsSinkAdapter sa : sinks.values()) {
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
|
|
|
+ publishStat.add(System.currentTimeMillis() - startTime);
|
|
|
+ }
|
|
|
+ droppedPubAll.incr(dropped);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void stopTimer() {
|
|
|
+ if (timer == null) {
|
|
|
+ LOG.warn(prefix +" metrics system timer already stopped!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ timer.cancel();
|
|
|
+ timer = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void stopSources() {
|
|
|
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
|
|
|
+ MetricsSourceAdapter sa = entry.getValue();
|
|
|
+ LOG.info("Stopping metrics source "+ entry.getKey());
|
|
|
+ LOG.debug(sa.source().getClass());
|
|
|
+ sa.stop();
|
|
|
+ }
|
|
|
+ sysSource.stop();
|
|
|
+ sources.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void stopSinks() {
|
|
|
+ for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
|
|
|
+ MetricsSinkAdapter sa = entry.getValue();
|
|
|
+ LOG.info("Stopping metrics sink "+ entry.getKey());
|
|
|
+ LOG.debug(sa.sink().getClass());
|
|
|
+ sa.stop();
|
|
|
+ }
|
|
|
+ sinks.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void configure(String prefix) {
|
|
|
+ config = MetricsConfig.create(prefix);
|
|
|
+ configureSinks();
|
|
|
+ configureSources();
|
|
|
+ configureSystem();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void configureSystem() {
|
|
|
+ injectedTags.add(Interns.tag(MsInfo.Hostname, getHostname()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void configureSinks() {
|
|
|
+ sinkConfigs = config.getInstanceConfigs(SINK_KEY);
|
|
|
+ int confPeriod = 0;
|
|
|
+ for (Entry<String, MetricsConfig> entry : sinkConfigs.entrySet()) {
|
|
|
+ MetricsConfig conf = entry.getValue();
|
|
|
+ int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
|
|
|
+ confPeriod = confPeriod == 0 ? sinkPeriod
|
|
|
+ : MathUtils.gcd(confPeriod, sinkPeriod);
|
|
|
+ String clsName = conf.getClassName("");
|
|
|
+ if (clsName == null) continue; // sink can be registered later on
|
|
|
+ String sinkName = entry.getKey();
|
|
|
+ try {
|
|
|
+ MetricsSinkAdapter sa = newSink(sinkName,
|
|
|
+ conf.getString(DESC_KEY, sinkName), conf);
|
|
|
+ sa.start();
|
|
|
+ sinks.put(sinkName, sa);
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ LOG.warn("Error creating sink '"+ sinkName +"'", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ period = confPeriod > 0 ? confPeriod
|
|
|
+ : config.getInt(PERIOD_KEY, PERIOD_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ static MetricsSinkAdapter newSink(String name, String desc, MetricsSink sink,
|
|
|
+ MetricsConfig conf) {
|
|
|
+ return new MetricsSinkAdapter(name, desc, sink, conf.getString(CONTEXT_KEY),
|
|
|
+ conf.getFilter(SOURCE_FILTER_KEY),
|
|
|
+ conf.getFilter(RECORD_FILTER_KEY),
|
|
|
+ conf.getFilter(METRIC_FILTER_KEY),
|
|
|
+ conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
|
|
|
+ conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
|
|
|
+ conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
|
|
|
+ conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
|
|
|
+ conf.getInt(RETRY_COUNT_KEY, RETRY_COUNT_DEFAULT));
|
|
|
+ }
|
|
|
+
|
|
|
+ static MetricsSinkAdapter newSink(String name, String desc,
|
|
|
+ MetricsConfig conf) {
|
|
|
+ return newSink(name, desc, (MetricsSink) conf.getPlugin(""), conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void configureSources() {
|
|
|
+ sourceFilter = config.getFilter(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
|
|
|
+ sourceConfigs = config.getInstanceConfigs(SOURCE_KEY);
|
|
|
+ registerSystemSource();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void clearConfigs() {
|
|
|
+ sinkConfigs.clear();
|
|
|
+ sourceConfigs.clear();
|
|
|
+ injectedTags.clear();
|
|
|
+ config = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getHostname() {
|
|
|
+ try {
|
|
|
+ return InetAddress.getLocalHost().getHostName();
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ LOG.error("Error getting localhost name. Using 'localhost'...", e);
|
|
|
+ }
|
|
|
+ return "localhost";
|
|
|
+ }
|
|
|
+
|
|
|
+ private void registerSystemSource() {
|
|
|
+ MetricsConfig sysConf = sourceConfigs.get(MS_NAME);
|
|
|
+ sysSource = new MetricsSourceAdapter(prefix, MS_STATS_NAME, MS_STATS_DESC,
|
|
|
+ MetricsAnnotations.makeSource(this), injectedTags, period,
|
|
|
+ sysConf == null ? config.subset(SOURCE_KEY) : sysConf);
|
|
|
+ sysSource.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void getMetrics(MetricsCollector builder, boolean all) {
|
|
|
+ MetricsRecordBuilder rb = builder.addRecord(MS_NAME)
|
|
|
+ .addGauge(MsInfo.NumActiveSources, sources.size())
|
|
|
+ .addGauge(MsInfo.NumAllSources, allSources.size())
|
|
|
+ .addGauge(MsInfo.NumActiveSinks, sinks.size())
|
|
|
+ .addGauge(MsInfo.NumAllSinks, allSinks.size());
|
|
|
+
|
|
|
+ for (MetricsSinkAdapter sa : sinks.values()) {
|
|
|
+ sa.snapshot(rb, all);
|
|
|
+ }
|
|
|
+ registry.snapshot(rb, all);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initSystemMBean() {
|
|
|
+ checkNotNull(prefix, "prefix should not be null here!");
|
|
|
+ if (mbeanName == null) {
|
|
|
+ mbeanName = MBeans.register(prefix, MS_CONTROL_NAME, this);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized boolean shutdown() {
|
|
|
+ LOG.debug("refCount="+ refCount);
|
|
|
+ if (refCount <= 0) LOG.debug("Redundant shutdown", new Throwable());
|
|
|
+ if (--refCount > 0) return false;
|
|
|
+ if (monitoring) {
|
|
|
+ try { stop(); }
|
|
|
+ catch (Exception e) {
|
|
|
+ LOG.warn("Error stopping the metrics system", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ allSources.clear();
|
|
|
+ allSinks.clear();
|
|
|
+ callbacks.clear();
|
|
|
+ if (mbeanName != null) {
|
|
|
+ MBeans.unregister(mbeanName);
|
|
|
+ mbeanName = null;
|
|
|
+ }
|
|
|
+ LOG.info(prefix +" metrics system shutdown complete.");
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public MetricsSource getSource(String name) {
|
|
|
+ return allSources.get(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ private InitMode initMode() {
|
|
|
+ LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
|
|
|
+ LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));
|
|
|
+ String m = System.getProperty(MS_INIT_MODE_KEY);
|
|
|
+ String m2 = m == null ? System.getenv(MS_INIT_MODE_KEY) : m;
|
|
|
+ return InitMode.valueOf((m2 == null ? InitMode.NORMAL.name() : m2)
|
|
|
+ .toUpperCase(Locale.US));
|
|
|
+ }
|
|
|
+}
|