|
@@ -17,6 +17,8 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.metrics2.sink.timeline.cache;
|
|
|
|
|
|
+import com.google.common.cache.Cache;
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -25,18 +27,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Evolving
|
|
|
public class TimelineMetricsCache {
|
|
|
|
|
|
- private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
|
|
|
+ private final Cache<String, TimelineMetricWrapper> timelineMetricCache;
|
|
|
private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
|
|
|
public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
|
|
|
public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
|
|
@@ -54,144 +56,81 @@ public class TimelineMetricsCache {
|
|
|
this.maxRecsPerName = maxRecsPerName;
|
|
|
this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
|
|
|
this.skipCounterTransform = skipCounterTransform;
|
|
|
+ this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
|
|
|
}
|
|
|
|
|
|
class TimelineMetricWrapper {
|
|
|
- private long timeDiff = -1;
|
|
|
- private long oldestTimestamp = -1;
|
|
|
+ private Cache<Long, Double> dataPointsCache;
|
|
|
private TimelineMetric timelineMetric;
|
|
|
+ private Long oldestTimeStamp;
|
|
|
+ private Long newestTimeStamp;
|
|
|
|
|
|
TimelineMetricWrapper(TimelineMetric timelineMetric) {
|
|
|
this.timelineMetric = timelineMetric;
|
|
|
- this.oldestTimestamp = timelineMetric.getStartTime();
|
|
|
- }
|
|
|
+ dataPointsCache = CacheBuilder.newBuilder().
|
|
|
+ maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
|
|
|
|
|
|
- private void updateTimeDiff(long timestamp) {
|
|
|
- if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
|
|
|
- timeDiff = timestamp - oldestTimestamp;
|
|
|
- } else {
|
|
|
- oldestTimestamp = timestamp;
|
|
|
- }
|
|
|
+ putMetric(timelineMetric);
|
|
|
}
|
|
|
|
|
|
public synchronized void putMetric(TimelineMetric metric) {
|
|
|
- TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues();
|
|
|
- if (metricValues.size() > maxRecsPerName) {
|
|
|
- // remove values for eldest maxEvictionTimeInMillis
|
|
|
- long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis;
|
|
|
- TreeMap<Long, Double> metricsSubSet =
|
|
|
- new TreeMap<>(metricValues.tailMap(newEldestTimestamp));
|
|
|
- if (metricsSubSet.isEmpty()) {
|
|
|
- oldestTimestamp = metric.getStartTime();
|
|
|
- this.timelineMetric.setStartTime(metric.getStartTime());
|
|
|
- } else {
|
|
|
- Long newStartTime = metricsSubSet.firstKey();
|
|
|
- oldestTimestamp = newStartTime;
|
|
|
- this.timelineMetric.setStartTime(newStartTime);
|
|
|
- }
|
|
|
- this.timelineMetric.setMetricValues(metricsSubSet);
|
|
|
- LOG.warn("Metrics cache overflow. Values for metric " +
|
|
|
- metric.getMetricName() + " older than " + newEldestTimestamp +
|
|
|
- " were removed to clean up the cache.");
|
|
|
+ if (dataPointsCache.size() == 0) {
|
|
|
+ oldestTimeStamp = metric.getStartTime();
|
|
|
+ newestTimeStamp = metric.getStartTime();
|
|
|
}
|
|
|
- this.timelineMetric.addMetricValues(metric.getMetricValues());
|
|
|
- updateTimeDiff(metric.getStartTime());
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized long getTimeDiff() {
|
|
|
- return timeDiff;
|
|
|
+ TreeMap<Long, Double> metricValues = metric.getMetricValues();
|
|
|
+ for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
|
|
|
+ Long key = entry.getKey();
|
|
|
+ dataPointsCache.put(key, entry.getValue());
|
|
|
+ }
|
|
|
+ oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime());
|
|
|
+ newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime());
|
|
|
}
|
|
|
|
|
|
public synchronized TimelineMetric getTimelineMetric() {
|
|
|
- return timelineMetric;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: Add weighted eviction
|
|
|
- class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper> {
|
|
|
- private static final long serialVersionUID = 2L;
|
|
|
- // To avoid duplication at the end of the buffer and beginning of the next
|
|
|
- // segment of values
|
|
|
- private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
|
|
|
-
|
|
|
- public TimelineMetric evict(String metricName) {
|
|
|
- TimelineMetricWrapper metricWrapper = this.get(metricName);
|
|
|
-
|
|
|
- if (metricWrapper == null
|
|
|
- || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
|
|
|
+ TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap());
|
|
|
+ if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis) {
|
|
|
return null;
|
|
|
}
|
|
|
-
|
|
|
- TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
|
|
|
- this.remove(metricName);
|
|
|
-
|
|
|
- return timelineMetric;
|
|
|
- }
|
|
|
-
|
|
|
- public TimelineMetrics evictAll() {
|
|
|
- List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
|
|
|
-
|
|
|
- for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator(); it.hasNext();) {
|
|
|
- Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
|
|
|
- TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
|
|
|
- if (metricWrapper != null) {
|
|
|
- TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
|
|
|
- metricList.add(timelineMetric);
|
|
|
- }
|
|
|
- it.remove();
|
|
|
- }
|
|
|
- TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
- timelineMetrics.setMetrics(metricList);
|
|
|
- return timelineMetrics;
|
|
|
- }
|
|
|
-
|
|
|
- public void put(String metricName, TimelineMetric timelineMetric) {
|
|
|
- if (isDuplicate(timelineMetric)) {
|
|
|
- return;
|
|
|
- }
|
|
|
- TimelineMetricWrapper metric = this.get(metricName);
|
|
|
- if (metric == null) {
|
|
|
- this.put(metricName, new TimelineMetricWrapper(timelineMetric));
|
|
|
- } else {
|
|
|
- metric.putMetric(timelineMetric);
|
|
|
- }
|
|
|
- // Buffer last ts value
|
|
|
- endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Test whether last buffered timestamp is same as the newly received.
|
|
|
- * @param timelineMetric @TimelineMetric
|
|
|
- * @return true/false
|
|
|
- */
|
|
|
- private boolean isDuplicate(TimelineMetric timelineMetric) {
|
|
|
- return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
|
|
|
- && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
|
|
|
+ dataPointsCache.invalidateAll();
|
|
|
+ timelineMetric.setStartTime(metricValues.firstKey());
|
|
|
+ timelineMetric.setMetricValues(metricValues);
|
|
|
+ return new TimelineMetric(timelineMetric);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public TimelineMetric getTimelineMetric(String metricName) {
|
|
|
- if (timelineMetricCache.containsKey(metricName)) {
|
|
|
- return timelineMetricCache.evict(metricName);
|
|
|
+ TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
|
|
|
+ if (timelineMetricWrapper != null) {
|
|
|
+ return timelineMetricWrapper.getTimelineMetric();
|
|
|
}
|
|
|
-
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
public TimelineMetrics getAllMetrics() {
|
|
|
- return timelineMetricCache.evictAll();
|
|
|
- }
|
|
|
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
+ Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values();
|
|
|
+ List<TimelineMetric> timelineMetricList =
|
|
|
+ new ArrayList<>(timelineMetricWrapperCollection.size());
|
|
|
+
|
|
|
+ for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) {
|
|
|
+ timelineMetricList.add(timelineMetricWrapper.getTimelineMetric());
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Getter method to help testing eviction
|
|
|
- * @return @int
|
|
|
- */
|
|
|
- public int getMaxEvictionTimeInMillis() {
|
|
|
- return maxEvictionTimeInMillis;
|
|
|
+ timelineMetrics.setMetrics(timelineMetricList);
|
|
|
+ return timelineMetrics;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
public void putTimelineMetric(TimelineMetric timelineMetric) {
|
|
|
- timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
|
|
|
+ String metricName = timelineMetric.getMetricName();
|
|
|
+ TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
|
|
|
+
|
|
|
+ if (timelineMetricWrapper != null) {
|
|
|
+ timelineMetricWrapper.putMetric(timelineMetric);
|
|
|
+ } else {
|
|
|
+ timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {
|