|
@@ -17,8 +17,6 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.metrics2.sink.timeline.cache;
|
|
|
|
|
|
-import com.google.common.cache.Cache;
|
|
|
-import com.google.common.cache.CacheBuilder;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -27,18 +25,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Evolving
|
|
|
public class TimelineMetricsCache {
|
|
|
|
|
|
- private final Cache<String, TimelineMetricWrapper> timelineMetricCache;
|
|
|
+ private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
|
|
|
private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
|
|
|
public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
|
|
|
public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
|
|
@@ -56,81 +54,144 @@ public class TimelineMetricsCache {
|
|
|
this.maxRecsPerName = maxRecsPerName;
|
|
|
this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
|
|
|
this.skipCounterTransform = skipCounterTransform;
|
|
|
- this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
|
|
|
}
|
|
|
|
|
|
class TimelineMetricWrapper {
|
|
|
- private Cache<Long, Double> dataPointsCache;
|
|
|
+ private long timeDiff = -1;
|
|
|
+ private long oldestTimestamp = -1;
|
|
|
private TimelineMetric timelineMetric;
|
|
|
- private Long oldestTimeStamp;
|
|
|
- private Long newestTimeStamp;
|
|
|
|
|
|
TimelineMetricWrapper(TimelineMetric timelineMetric) {
|
|
|
this.timelineMetric = timelineMetric;
|
|
|
- dataPointsCache = CacheBuilder.newBuilder().
|
|
|
- maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
|
|
|
+ this.oldestTimestamp = timelineMetric.getStartTime();
|
|
|
+ }
|
|
|
|
|
|
- putMetric(timelineMetric);
|
|
|
+ private void updateTimeDiff(long timestamp) {
|
|
|
+ if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
|
|
|
+ timeDiff = timestamp - oldestTimestamp;
|
|
|
+ } else {
|
|
|
+ oldestTimestamp = timestamp;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public synchronized void putMetric(TimelineMetric metric) {
|
|
|
- if (dataPointsCache.size() == 0) {
|
|
|
- oldestTimeStamp = metric.getStartTime();
|
|
|
- newestTimeStamp = metric.getStartTime();
|
|
|
+ TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues();
|
|
|
+ if (metricValues.size() > maxRecsPerName) {
|
|
|
+ // remove values for eldest maxEvictionTimeInMillis
|
|
|
+ long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis;
|
|
|
+ TreeMap<Long, Double> metricsSubSet =
|
|
|
+ new TreeMap<>(metricValues.tailMap(newEldestTimestamp));
|
|
|
+ if (metricsSubSet.isEmpty()) {
|
|
|
+ oldestTimestamp = metric.getStartTime();
|
|
|
+ this.timelineMetric.setStartTime(metric.getStartTime());
|
|
|
+ } else {
|
|
|
+ Long newStartTime = metricsSubSet.firstKey();
|
|
|
+ oldestTimestamp = newStartTime;
|
|
|
+ this.timelineMetric.setStartTime(newStartTime);
|
|
|
+ }
|
|
|
+ this.timelineMetric.setMetricValues(metricsSubSet);
|
|
|
+ LOG.warn("Metrics cache overflow. Values for metric " +
|
|
|
+ metric.getMetricName() + " older than " + newEldestTimestamp +
|
|
|
+ " were removed to clean up the cache.");
|
|
|
}
|
|
|
- TreeMap<Long, Double> metricValues = metric.getMetricValues();
|
|
|
- for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
|
|
|
- Long key = entry.getKey();
|
|
|
- dataPointsCache.put(key, entry.getValue());
|
|
|
- }
|
|
|
- oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime());
|
|
|
- newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime());
|
|
|
+ this.timelineMetric.addMetricValues(metric.getMetricValues());
|
|
|
+ updateTimeDiff(metric.getStartTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getTimeDiff() {
|
|
|
+ return timeDiff;
|
|
|
}
|
|
|
|
|
|
public synchronized TimelineMetric getTimelineMetric() {
|
|
|
- TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap());
|
|
|
- if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis) {
|
|
|
+ return timelineMetric;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: Add weighted eviction
|
|
|
+ class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper> {
|
|
|
+ private static final long serialVersionUID = 2L;
|
|
|
+ // To avoid duplication at the end of the buffer and beginning of the next
|
|
|
+ // segment of values
|
|
|
+ private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
|
|
|
+
|
|
|
+ public TimelineMetric evict(String metricName) {
|
|
|
+ TimelineMetricWrapper metricWrapper = this.get(metricName);
|
|
|
+
|
|
|
+ if (metricWrapper == null
|
|
|
+ || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
|
|
|
return null;
|
|
|
}
|
|
|
- dataPointsCache.invalidateAll();
|
|
|
- timelineMetric.setStartTime(metricValues.firstKey());
|
|
|
- timelineMetric.setMetricValues(metricValues);
|
|
|
- return new TimelineMetric(timelineMetric);
|
|
|
+
|
|
|
+ TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
|
|
|
+ this.remove(metricName);
|
|
|
+
|
|
|
+ return timelineMetric;
|
|
|
+ }
|
|
|
+
|
|
|
+ public TimelineMetrics evictAll() {
|
|
|
+ List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
|
|
|
+
|
|
|
+ for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator(); it.hasNext();) {
|
|
|
+ Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
|
|
|
+ TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
|
|
|
+ if (metricWrapper != null) {
|
|
|
+ TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
|
|
|
+ metricList.add(timelineMetric);
|
|
|
+ }
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
+ timelineMetrics.setMetrics(metricList);
|
|
|
+ return timelineMetrics;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void put(String metricName, TimelineMetric timelineMetric) {
|
|
|
+ if (isDuplicate(timelineMetric)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TimelineMetricWrapper metric = this.get(metricName);
|
|
|
+ if (metric == null) {
|
|
|
+ this.put(metricName, new TimelineMetricWrapper(timelineMetric));
|
|
|
+ } else {
|
|
|
+ metric.putMetric(timelineMetric);
|
|
|
+ }
|
|
|
+ // Buffer last ts value
|
|
|
+ endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test whether last buffered timestamp is same as the newly received.
|
|
|
+ * @param timelineMetric @TimelineMetric
|
|
|
+ * @return true/false
|
|
|
+ */
|
|
|
+ private boolean isDuplicate(TimelineMetric timelineMetric) {
|
|
|
+ return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
|
|
|
+ && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public TimelineMetric getTimelineMetric(String metricName) {
|
|
|
- TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
|
|
|
- if (timelineMetricWrapper != null) {
|
|
|
- return timelineMetricWrapper.getTimelineMetric();
|
|
|
+ if (timelineMetricCache.containsKey(metricName)) {
|
|
|
+ return timelineMetricCache.evict(metricName);
|
|
|
}
|
|
|
+
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
public TimelineMetrics getAllMetrics() {
|
|
|
- TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
- Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values();
|
|
|
- List<TimelineMetric> timelineMetricList =
|
|
|
- new ArrayList<>(timelineMetricWrapperCollection.size());
|
|
|
-
|
|
|
- for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) {
|
|
|
- timelineMetricList.add(timelineMetricWrapper.getTimelineMetric());
|
|
|
- }
|
|
|
-
|
|
|
- timelineMetrics.setMetrics(timelineMetricList);
|
|
|
- return timelineMetrics;
|
|
|
+ return timelineMetricCache.evictAll();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Getter method to help testing eviction
|
|
|
+ * @return @int
|
|
|
+ */
|
|
|
+ public int getMaxEvictionTimeInMillis() {
|
|
|
+ return maxEvictionTimeInMillis;
|
|
|
+ }
|
|
|
|
|
|
public void putTimelineMetric(TimelineMetric timelineMetric) {
|
|
|
- String metricName = timelineMetric.getMetricName();
|
|
|
- TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
|
|
|
-
|
|
|
- if (timelineMetricWrapper != null) {
|
|
|
- timelineMetricWrapper.putMetric(timelineMetric);
|
|
|
- } else {
|
|
|
- timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric));
|
|
|
- }
|
|
|
+ timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
|
|
|
}
|
|
|
|
|
|
private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {
|