|
@@ -1,494 +0,0 @@
|
|
|
-/*
|
|
|
- * AbstractMetricsContext.java
|
|
|
- *
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
- * distributed with this work for additional information
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
- * limitations under the License.
|
|
|
- */
|
|
|
-
|
|
|
-package org.apache.hadoop.metrics.spi;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.Timer;
|
|
|
-import java.util.TimerTask;
|
|
|
-import java.util.TreeMap;
|
|
|
-import java.util.Map.Entry;
|
|
|
-
|
|
|
-import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
-import org.apache.hadoop.classification.InterfaceStability;
|
|
|
-import org.apache.hadoop.metrics.ContextFactory;
|
|
|
-import org.apache.hadoop.metrics.MetricsContext;
|
|
|
-import org.apache.hadoop.metrics.MetricsException;
|
|
|
-import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
-import org.apache.hadoop.metrics.Updater;
|
|
|
-
|
|
|
-/**
|
|
|
- * The main class of the Service Provider Interface. This class should be
|
|
|
- * extended in order to integrate the Metrics API with a specific metrics
|
|
|
- * client library. <p/>
|
|
|
- *
|
|
|
- * This class implements the internal table of metric data, and the timer
|
|
|
- * on which data is to be sent to the metrics system. Subclasses must
|
|
|
- * override the abstract <code>emitRecord</code> method in order to transmit
|
|
|
- * the data. <p/>
|
|
|
- *
|
|
|
- * @deprecated Use org.apache.hadoop.metrics2 package instead.
|
|
|
- */
|
|
|
-@Deprecated
|
|
|
-@InterfaceAudience.Public
|
|
|
-@InterfaceStability.Evolving
|
|
|
-public abstract class AbstractMetricsContext implements MetricsContext {
|
|
|
-
|
|
|
- private int period = MetricsContext.DEFAULT_PERIOD;
|
|
|
- private Timer timer = null;
|
|
|
-
|
|
|
- private Set<Updater> updaters = new HashSet<Updater>(1);
|
|
|
- private volatile boolean isMonitoring = false;
|
|
|
-
|
|
|
- private ContextFactory factory = null;
|
|
|
- private String contextName = null;
|
|
|
-
|
|
|
- @InterfaceAudience.Private
|
|
|
- public static class TagMap extends TreeMap<String,Object> {
|
|
|
- private static final long serialVersionUID = 3546309335061952993L;
|
|
|
- TagMap() {
|
|
|
- super();
|
|
|
- }
|
|
|
- TagMap(TagMap orig) {
|
|
|
- super(orig);
|
|
|
- }
|
|
|
- /**
|
|
|
- * Returns true if this tagmap contains every tag in other.
|
|
|
- */
|
|
|
- public boolean containsAll(TagMap other) {
|
|
|
- for (Map.Entry<String,Object> entry : other.entrySet()) {
|
|
|
- Object value = get(entry.getKey());
|
|
|
- if (value == null || !value.equals(entry.getValue())) {
|
|
|
- // either key does not exist here, or the value is different
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @InterfaceAudience.Private
|
|
|
- public static class MetricMap extends TreeMap<String,Number> {
|
|
|
- private static final long serialVersionUID = -7495051861141631609L;
|
|
|
- MetricMap() {
|
|
|
- super();
|
|
|
- }
|
|
|
- MetricMap(MetricMap orig) {
|
|
|
- super(orig);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static class RecordMap extends HashMap<TagMap,MetricMap> {
|
|
|
- private static final long serialVersionUID = 259835619700264611L;
|
|
|
- }
|
|
|
-
|
|
|
- private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Creates a new instance of AbstractMetricsContext
|
|
|
- */
|
|
|
- protected AbstractMetricsContext() {
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Initializes the context.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void init(String contextName, ContextFactory factory)
|
|
|
- {
|
|
|
- this.contextName = contextName;
|
|
|
- this.factory = factory;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Convenience method for subclasses to access factory attributes.
|
|
|
- */
|
|
|
- protected String getAttribute(String attributeName) {
|
|
|
- String factoryAttribute = contextName + "." + attributeName;
|
|
|
- return (String) factory.getAttribute(factoryAttribute);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns an attribute-value map derived from the factory attributes
|
|
|
- * by finding all factory attributes that begin with
|
|
|
- * <i>contextName</i>.<i>tableName</i>. The returned map consists of
|
|
|
- * those attributes with the contextName and tableName stripped off.
|
|
|
- */
|
|
|
- protected Map<String,String> getAttributeTable(String tableName) {
|
|
|
- String prefix = contextName + "." + tableName + ".";
|
|
|
- Map<String,String> result = new HashMap<String,String>();
|
|
|
- for (String attributeName : factory.getAttributeNames()) {
|
|
|
- if (attributeName.startsWith(prefix)) {
|
|
|
- String name = attributeName.substring(prefix.length());
|
|
|
- String value = (String) factory.getAttribute(attributeName);
|
|
|
- result.put(name, value);
|
|
|
- }
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the context name.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public String getContextName() {
|
|
|
- return contextName;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the factory by which this context was created.
|
|
|
- */
|
|
|
- public ContextFactory getContextFactory() {
|
|
|
- return factory;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Starts or restarts monitoring, the emitting of metrics records.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void startMonitoring()
|
|
|
- throws IOException {
|
|
|
- if (!isMonitoring) {
|
|
|
- startTimer();
|
|
|
- isMonitoring = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Stops monitoring. This does not free buffered data.
|
|
|
- * @see #close()
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void stopMonitoring() {
|
|
|
- if (isMonitoring) {
|
|
|
- stopTimer();
|
|
|
- isMonitoring = false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns true if monitoring is currently in progress.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public boolean isMonitoring() {
|
|
|
- return isMonitoring;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Stops monitoring and frees buffered data, returning this
|
|
|
- * object to its initial state.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void close() {
|
|
|
- stopMonitoring();
|
|
|
- clearUpdaters();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
|
|
|
- * Throws an exception if the metrics implementation is configured with a fixed
|
|
|
- * set of record names and <code>recordName</code> is not in that set.
|
|
|
- *
|
|
|
- * @param recordName the name of the record
|
|
|
- * @throws MetricsException if recordName conflicts with configuration data
|
|
|
- */
|
|
|
- @Override
|
|
|
- public final synchronized MetricsRecord createRecord(String recordName) {
|
|
|
- if (bufferedData.get(recordName) == null) {
|
|
|
- bufferedData.put(recordName, new RecordMap());
|
|
|
- }
|
|
|
- return newRecord(recordName);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Subclasses should override this if they subclass MetricsRecordImpl.
|
|
|
- * @param recordName the name of the record
|
|
|
- * @return newly created instance of MetricsRecordImpl or subclass
|
|
|
- */
|
|
|
- protected MetricsRecord newRecord(String recordName) {
|
|
|
- return new MetricsRecordImpl(recordName, this);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Registers a callback to be called at time intervals determined by
|
|
|
- * the configuration.
|
|
|
- *
|
|
|
- * @param updater object to be run periodically; it should update
|
|
|
- * some metrics records
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void registerUpdater(final Updater updater) {
|
|
|
- if (!updaters.contains(updater)) {
|
|
|
- updaters.add(updater);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Removes a callback, if it exists.
|
|
|
- *
|
|
|
- * @param updater object to be removed from the callback list
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void unregisterUpdater(Updater updater) {
|
|
|
- updaters.remove(updater);
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void clearUpdaters() {
|
|
|
- updaters.clear();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Starts timer if it is not already started
|
|
|
- */
|
|
|
- private synchronized void startTimer() {
|
|
|
- if (timer == null) {
|
|
|
- timer = new Timer("Timer thread for monitoring " + getContextName(),
|
|
|
- true);
|
|
|
- TimerTask task = new TimerTask() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- timerEvent();
|
|
|
- } catch (IOException ioe) {
|
|
|
- ioe.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
- long millis = period * 1000;
|
|
|
- timer.scheduleAtFixedRate(task, millis, millis);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Stops timer if it is running
|
|
|
- */
|
|
|
- private synchronized void stopTimer() {
|
|
|
- if (timer != null) {
|
|
|
- timer.cancel();
|
|
|
- timer = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Timer callback.
|
|
|
- */
|
|
|
- private void timerEvent() throws IOException {
|
|
|
- if (isMonitoring) {
|
|
|
- Collection<Updater> myUpdaters;
|
|
|
- synchronized (this) {
|
|
|
- myUpdaters = new ArrayList<Updater>(updaters);
|
|
|
- }
|
|
|
- // Run all the registered updates without holding a lock
|
|
|
- // on this context
|
|
|
- for (Updater updater : myUpdaters) {
|
|
|
- try {
|
|
|
- updater.doUpdates(this);
|
|
|
- } catch (Throwable throwable) {
|
|
|
- throwable.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- emitRecords();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Emits the records.
|
|
|
- */
|
|
|
- private synchronized void emitRecords() throws IOException {
|
|
|
- for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
|
|
|
- RecordMap recordMap = recordEntry.getValue();
|
|
|
- synchronized (recordMap) {
|
|
|
- Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
|
|
|
- for (Entry<TagMap, MetricMap> entry : entrySet) {
|
|
|
- OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
|
|
|
- emitRecord(contextName, recordEntry.getKey(), outRec);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- flush();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Retrieves all the records managed by this MetricsContext.
|
|
|
- * Useful for monitoring systems that are polling-based.
|
|
|
- * @return A non-null collection of all monitoring records.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
|
|
|
- Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
|
|
|
- for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
|
|
|
- RecordMap recordMap = recordEntry.getValue();
|
|
|
- synchronized (recordMap) {
|
|
|
- List<OutputRecord> records = new ArrayList<OutputRecord>();
|
|
|
- Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
|
|
|
- for (Entry<TagMap, MetricMap> entry : entrySet) {
|
|
|
- OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
|
|
|
- records.add(outRec);
|
|
|
- }
|
|
|
- out.put(recordEntry.getKey(), records);
|
|
|
- }
|
|
|
- }
|
|
|
- return out;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sends a record to the metrics system.
|
|
|
- */
|
|
|
- protected abstract void emitRecord(String contextName, String recordName,
|
|
|
- OutputRecord outRec) throws IOException;
|
|
|
-
|
|
|
- /**
|
|
|
- * Called each period after all records have been emitted, this method does nothing.
|
|
|
- * Subclasses may override it in order to perform some kind of flush.
|
|
|
- */
|
|
|
- protected void flush() throws IOException {
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Called by MetricsRecordImpl.update(). Creates or updates a row in
|
|
|
- * the internal table of metric data.
|
|
|
- */
|
|
|
- protected void update(MetricsRecordImpl record) {
|
|
|
- String recordName = record.getRecordName();
|
|
|
- TagMap tagTable = record.getTagTable();
|
|
|
- Map<String,MetricValue> metricUpdates = record.getMetricTable();
|
|
|
-
|
|
|
- RecordMap recordMap = getRecordMap(recordName);
|
|
|
- synchronized (recordMap) {
|
|
|
- MetricMap metricMap = recordMap.get(tagTable);
|
|
|
- if (metricMap == null) {
|
|
|
- metricMap = new MetricMap();
|
|
|
- TagMap tagMap = new TagMap(tagTable); // clone tags
|
|
|
- recordMap.put(tagMap, metricMap);
|
|
|
- }
|
|
|
-
|
|
|
- Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
|
|
|
- for (Entry<String, MetricValue> entry : entrySet) {
|
|
|
- String metricName = entry.getKey ();
|
|
|
- MetricValue updateValue = entry.getValue ();
|
|
|
- Number updateNumber = updateValue.getNumber();
|
|
|
- Number currentNumber = metricMap.get(metricName);
|
|
|
- if (currentNumber == null || updateValue.isAbsolute()) {
|
|
|
- metricMap.put(metricName, updateNumber);
|
|
|
- }
|
|
|
- else {
|
|
|
- Number newNumber = sum(updateNumber, currentNumber);
|
|
|
- metricMap.put(metricName, newNumber);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized RecordMap getRecordMap(String recordName) {
|
|
|
- return bufferedData.get(recordName);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Adds two numbers, coercing the second to the type of the first.
|
|
|
- *
|
|
|
- */
|
|
|
- private Number sum(Number a, Number b) {
|
|
|
- if (a instanceof Integer) {
|
|
|
- return Integer.valueOf(a.intValue() + b.intValue());
|
|
|
- }
|
|
|
- else if (a instanceof Float) {
|
|
|
- return new Float(a.floatValue() + b.floatValue());
|
|
|
- }
|
|
|
- else if (a instanceof Short) {
|
|
|
- return Short.valueOf((short)(a.shortValue() + b.shortValue()));
|
|
|
- }
|
|
|
- else if (a instanceof Byte) {
|
|
|
- return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
|
|
|
- }
|
|
|
- else if (a instanceof Long) {
|
|
|
- return Long.valueOf((a.longValue() + b.longValue()));
|
|
|
- }
|
|
|
- else {
|
|
|
- // should never happen
|
|
|
- throw new MetricsException("Invalid number type");
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Called by MetricsRecordImpl.remove(). Removes all matching rows in
|
|
|
- * the internal table of metric data. A row matches if it has the same
|
|
|
- * tag names and values as record, but it may also have additional
|
|
|
- * tags.
|
|
|
- */
|
|
|
- protected void remove(MetricsRecordImpl record) {
|
|
|
- String recordName = record.getRecordName();
|
|
|
- TagMap tagTable = record.getTagTable();
|
|
|
-
|
|
|
- RecordMap recordMap = getRecordMap(recordName);
|
|
|
- synchronized (recordMap) {
|
|
|
- Iterator<TagMap> it = recordMap.keySet().iterator();
|
|
|
- while (it.hasNext()) {
|
|
|
- TagMap rowTags = it.next();
|
|
|
- if (rowTags.containsAll(tagTable)) {
|
|
|
- it.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the timer period.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public int getPeriod() {
|
|
|
- return period;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets the timer period
|
|
|
- */
|
|
|
- protected void setPeriod(int period) {
|
|
|
- this.period = period;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * If a period is set in the attribute passed in, override
|
|
|
- * the default with it.
|
|
|
- */
|
|
|
- protected void parseAndSetPeriod(String attributeName) {
|
|
|
- String periodStr = getAttribute(attributeName);
|
|
|
- if (periodStr != null) {
|
|
|
- int period = 0;
|
|
|
- try {
|
|
|
- period = Integer.parseInt(periodStr);
|
|
|
- } catch (NumberFormatException nfe) {
|
|
|
- }
|
|
|
- if (period <= 0) {
|
|
|
- throw new MetricsException("Invalid period: " + periodStr);
|
|
|
- }
|
|
|
- setPeriod(period);
|
|
|
- }
|
|
|
- }
|
|
|
-}
|