Преглед изворни кода

Revert "AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen)"

This reverts commit 7b427d66c9eec999a7a6a369fc5aae9d3019929c.
Dmytro Sen пре 9 година
родитељ
комит
014ea0e3f3

+ 0 - 28
ambari-metrics/ambari-metrics-common/pom.xml

@@ -70,38 +70,10 @@
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>2.3</version>
-        <executions>
-          <!-- Run shade goal on package phase -->
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <artifactSet>
-                <includes>
-                  <include>com.google.guava:*</include>
-                </includes>
-              </artifactSet>
-              <minimizeJar>true</minimizeJar>
-              <createDependencyReducedPom>false</createDependencyReducedPom>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
 
   <dependencies>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>14.0.1</version>
-    </dependency>
     <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>

+ 111 - 50
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline.cache;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -27,18 +25,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class TimelineMetricsCache {
 
-  private final Cache<String, TimelineMetricWrapper> timelineMetricCache;
+  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
   private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
   public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
   public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
@@ -56,81 +54,144 @@ public class TimelineMetricsCache {
     this.maxRecsPerName = maxRecsPerName;
     this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
     this.skipCounterTransform = skipCounterTransform;
-    this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
   }
 
   class TimelineMetricWrapper {
-    private Cache<Long, Double> dataPointsCache;
+    private long timeDiff = -1;
+    private long oldestTimestamp = -1;
     private TimelineMetric timelineMetric;
-    private Long oldestTimeStamp;
-    private Long newestTimeStamp;
 
     TimelineMetricWrapper(TimelineMetric timelineMetric) {
       this.timelineMetric = timelineMetric;
-      dataPointsCache = CacheBuilder.newBuilder().
-              maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
+      this.oldestTimestamp = timelineMetric.getStartTime();
+    }
 
-      putMetric(timelineMetric);
+    private void updateTimeDiff(long timestamp) {
+      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+        timeDiff = timestamp - oldestTimestamp;
+      } else {
+        oldestTimestamp = timestamp;
+      }
     }
 
     public synchronized void putMetric(TimelineMetric metric) {
-      if (dataPointsCache.size() == 0) {
-        oldestTimeStamp = metric.getStartTime();
-        newestTimeStamp = metric.getStartTime();
+      TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues();
+      if (metricValues.size() > maxRecsPerName) {
+        // remove values for eldest maxEvictionTimeInMillis
+        long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis;
+        TreeMap<Long, Double> metricsSubSet =
+          new TreeMap<>(metricValues.tailMap(newEldestTimestamp));
+        if (metricsSubSet.isEmpty()) {
+          oldestTimestamp = metric.getStartTime();
+          this.timelineMetric.setStartTime(metric.getStartTime());
+        } else {
+          Long newStartTime = metricsSubSet.firstKey();
+          oldestTimestamp = newStartTime;
+          this.timelineMetric.setStartTime(newStartTime);
+        }
+        this.timelineMetric.setMetricValues(metricsSubSet);
+        LOG.warn("Metrics cache overflow. Values for metric " +
+          metric.getMetricName() + " older than " + newEldestTimestamp +
+          " were removed to clean up the cache.");
       }
-      TreeMap<Long, Double> metricValues = metric.getMetricValues();
-      for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
-        Long key = entry.getKey();
-        dataPointsCache.put(key, entry.getValue());
-      }
-      oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime());
-      newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime());
+      this.timelineMetric.addMetricValues(metric.getMetricValues());
+      updateTimeDiff(metric.getStartTime());
+    }
+
+    public synchronized long getTimeDiff() {
+      return timeDiff;
     }
 
     public synchronized TimelineMetric getTimelineMetric() {
-      TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap());
-      if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis) {
+      return timelineMetric;
+    }
+  }
+
+  // TODO: Add weighted eviction
+  class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper> {
+    private static final long serialVersionUID = 2L;
+    // To avoid duplication at the end of the buffer and beginning of the next
+    // segment of values
+    private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
+
+    public TimelineMetric evict(String metricName) {
+      TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+      if (metricWrapper == null
+        || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
         return null;
       }
-      dataPointsCache.invalidateAll();
-      timelineMetric.setStartTime(metricValues.firstKey());
-      timelineMetric.setMetricValues(metricValues);
-      return new TimelineMetric(timelineMetric);
+
+      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+      this.remove(metricName);
+
+      return timelineMetric;
+    }
+
+    public TimelineMetrics evictAll() {
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+      for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
+        TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
+        if (metricWrapper != null) {
+          TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
+          metricList.add(timelineMetric);
+        }
+        it.remove();
+      }
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+      return timelineMetrics;
+    }
+
+    public void put(String metricName, TimelineMetric timelineMetric) {
+      if (isDuplicate(timelineMetric)) {
+        return;
+      }
+      TimelineMetricWrapper metric = this.get(metricName);
+      if (metric == null) {
+        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+      } else {
+        metric.putMetric(timelineMetric);
+      }
+      // Buffer last ts value
+      endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+    }
+
+    /**
+     * Test whether last buffered timestamp is same as the newly received.
+     * @param timelineMetric @TimelineMetric
+     * @return true/false
+     */
+    private boolean isDuplicate(TimelineMetric timelineMetric) {
+      return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+        && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
     }
   }
 
   public TimelineMetric getTimelineMetric(String metricName) {
-    TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
-    if (timelineMetricWrapper != null) {
-      return timelineMetricWrapper.getTimelineMetric();
+    if (timelineMetricCache.containsKey(metricName)) {
+      return timelineMetricCache.evict(metricName);
     }
+
     return null;
   }
 
   public TimelineMetrics getAllMetrics() {
-    TimelineMetrics timelineMetrics = new TimelineMetrics();
-    Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values();
-    List<TimelineMetric> timelineMetricList =
-            new ArrayList<>(timelineMetricWrapperCollection.size());
-
-    for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) {
-      timelineMetricList.add(timelineMetricWrapper.getTimelineMetric());
-    }
-
-    timelineMetrics.setMetrics(timelineMetricList);
-    return timelineMetrics;
+    return timelineMetricCache.evictAll();
   }
 
+  /**
+   * Getter method to help testing eviction
+   * @return @int
+   */
+  public int getMaxEvictionTimeInMillis() {
+    return maxEvictionTimeInMillis;
+  }
 
   public void putTimelineMetric(TimelineMetric timelineMetric) {
-    String metricName = timelineMetric.getMetricName();
-    TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
-
-    if (timelineMetricWrapper != null) {
-      timelineMetricWrapper.putMetric(timelineMetric);
-    } else {
-      timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric));
-    }
+    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
   }
 
   private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {

+ 2 - 18
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java

@@ -26,7 +26,6 @@ import java.util.TreeMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class TimelineMetricsCacheTest {
 
@@ -76,8 +75,8 @@ public class TimelineMetricsCacheTest {
   }
 
   @Test
-  public void testMaxRecsPerNameForTimelineMetricWrapperCache() throws Exception {
-    int maxRecsPerName = 3;
+  public void testMaxRecsPerName() throws Exception {
+    int maxRecsPerName = 2;
     int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ;
     TimelineMetricsCache timelineMetricsCache =
       new TimelineMetricsCache(maxRecsPerName, maxEvictionTime);
@@ -125,21 +124,6 @@ public class TimelineMetricsCacheTest {
     assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime());
   }
 
-  @Test
-  public void testEvictionTimeForTimelineMetricWrapperCache() {
-    int maxEvictionTime = 10;
-    TimelineMetricsCache timelineMetricsCache =
-            new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, maxEvictionTime);
-    int numberOfMetricsInserted = 1000;
-    for (int i = 0; i < numberOfMetricsInserted; i++) {
-      timelineMetricsCache.putTimelineMetric(
-              createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * i));
-    }
-    TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME);
-    assertNotNull(cachedMetric);
-    assertTrue("Some metric values should have been removed", cachedMetric.getMetricValues().size() < numberOfMetricsInserted);
-  }
-
   private TimelineMetric createTimelineMetricSingleValue(final long startTime) {
     TreeMap<Long, Double> values = new TreeMap<Long, Double>();
     values.put(startTime, 0.0);

+ 7 - 4
ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java

@@ -150,7 +150,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     // Return eviction time smaller than time diff for first 3 entries
     // Third entry will result in eviction
-    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(90).anyTimes();
+    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
 
     conf.setListDelimiter(eq(','));
     expectLastCall().anyTimes();
@@ -179,6 +179,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(metric.value()).andReturn(3.0).once();
     expect(metric.value()).andReturn(4.0).once();
     expect(metric.value()).andReturn(5.0).once();
+    expect(metric.value()).andReturn(6.0).once();
 
     MetricsRecord record = createNiceMock(MetricsRecord.class);
     expect(record.name()).andReturn("testName").anyTimes();
@@ -195,7 +196,7 @@ public class HadoopTimelineMetricsSinkTest {
     final Long now = System.currentTimeMillis();
     // TODO: Current implementation of cache needs > 1 elements to evict any
     expect(record.timestamp()).andReturn(now).times(2);
-    expect(record.timestamp()).andReturn(now + 100l).once();
+    expect(record.timestamp()).andReturn(now + 100l).times(2);
     expect(record.timestamp()).andReturn(now + 200l).once();
     expect(record.timestamp()).andReturn(now + 300l).once();
 
@@ -226,6 +227,8 @@ public class HadoopTimelineMetricsSinkTest {
     sink.putMetrics(record);
     // time = t3
     sink.putMetrics(record);
+    // time = t4
+    sink.putMetrics(record);
 
     verify(conf, sink, record, metric);
 
@@ -239,7 +242,7 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(now, timestamps.next());
     Assert.assertEquals(new Long(now + 100l), timestamps.next());
     Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
-    Assert.assertEquals(new Double(2.0), values.next());
+    Assert.assertEquals(new Double(1.0), values.next());
     Assert.assertEquals(new Double(3.0), values.next());
     // t3, t4
     TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
@@ -248,8 +251,8 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(new Long(now + 200l), timestamps.next());
     Assert.assertEquals(new Long(now + 300l), timestamps.next());
     values = timelineMetric2.getMetricValues().values().iterator();
-    Assert.assertEquals(new Double(4.0), values.next());
     Assert.assertEquals(new Double(5.0), values.next());
+    Assert.assertEquals(new Double(6.0), values.next());
   }
 
   @Test