Browse Source

AMBARI-8994. AMS : Yarn service - RPC metrics returns duplicate array elements. (swagle)

Siddharth Wagle 10 years ago
parent
commit
dec5f7d69c

+ 6 - 2
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java → ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.metrics2.sink.timeline.base;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
@@ -37,7 +37,7 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String METRICS_SEND_INTERVAL = "sendInterval";
   public static final String COLLECTOR_HOST_PROPERTY = "collector";
 
-  protected final Log LOG = LogFactory.getLog(this.getClass());
+  protected final Log LOG;
   private HttpClient httpClient = new HttpClient();
 
   protected static ObjectMapper mapper;
@@ -50,6 +50,10 @@ public abstract class AbstractTimelineMetricsSink {
         .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
   }
 
+  public AbstractTimelineMetricsSink() {
+    LOG = LogFactory.getLog(this.getClass());
+  }
+
   protected void emitMetrics(TimelineMetrics metrics) throws IOException {
     String jsonData = mapper.writeValueAsString(metrics);
 

+ 28 - 2
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -78,6 +79,9 @@ public class TimelineMetricsCache {
   class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
     private static final long serialVersionUID = 1L;
     private boolean gotOverflow = false;
+    // To avoid duplication at the end of the buffer and beginning of the next
+    // segment of values
+    private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
 
     @Override
     protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
@@ -93,7 +97,7 @@ public class TimelineMetricsCache {
       TimelineMetricWrapper metricWrapper = this.get(metricName);
 
       if (metricWrapper == null
-        || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
+        || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
         return null;
       }
 
@@ -104,13 +108,27 @@ public class TimelineMetricsCache {
     }
 
     public void put(String metricName, TimelineMetric timelineMetric) {
-
+      if (isDuplicate(timelineMetric)) {
+        return;
+      }
       TimelineMetricWrapper metric = this.get(metricName);
       if (metric == null) {
         this.put(metricName, new TimelineMetricWrapper(timelineMetric));
       } else {
         metric.putMetric(timelineMetric);
       }
+      // Buffer last ts value
+      endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+    }
+
+    /**
+     * Test whether last buffered timestamp is same as the newly received.
+     * @param timelineMetric @TimelineMetric
+     * @return true/false
+     */
+    private boolean isDuplicate(TimelineMetric timelineMetric) {
+      return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+        && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
     }
   }
 
@@ -122,6 +140,14 @@ public class TimelineMetricsCache {
     return null;
   }
 
+  /**
+   * Getter method to help testing eviction
+   * @return @int
+   */
+  public int getMaxEvictionTimeInMillis() {
+    return maxEvictionTimeInMillis;
+  }
+
   public void putTimelineMetric(TimelineMetric timelineMetric) {
     timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
   }

+ 1 - 1
ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java

@@ -26,7 +26,7 @@ import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.util.Servers;

+ 5 - 6
ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java

@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.*;
 import org.apache.hadoop.metrics2.impl.MsInfo;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
@@ -151,19 +150,19 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
         (Collection<AbstractMetric>) record.metrics();
 
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+      long startTime = record.timestamp();
 
       for (AbstractMetric metric : metrics) {
         sb.append(metric.name());
         String name = sb.toString();
+        Number value = metric.value();
         TimelineMetric timelineMetric = new TimelineMetric();
         timelineMetric.setMetricName(name);
         timelineMetric.setHostName(hostName);
         timelineMetric.setAppId(serviceName);
-        timelineMetric.setStartTime(record.timestamp());
-        timelineMetric.setType(ClassUtils.getShortCanonicalName(
-          metric.value(), "Number"));
-        timelineMetric.getMetricValues().put(record.timestamp(),
-          metric.value().doubleValue());
+        timelineMetric.setStartTime(startTime);
+        timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number"));
+        timelineMetric.getMetricValues().put(startTime, value.doubleValue());
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
 

+ 139 - 3
ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java

@@ -23,25 +23,41 @@ import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.easymock.IArgumentMatcher;
+import org.junit.Assert;
+import org.junit.Test;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
-import static org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink.*;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOST_PROPERTY;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reportMatcher;
 import static org.easymock.EasyMock.verify;
 
 public class HadoopTimelineMetricsSinkTest {
 
-  @org.junit.Test
+  @Test
   public void testPutMetrics() throws Exception {
     HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
 
@@ -110,7 +126,127 @@ public class HadoopTimelineMetricsSinkTest {
     sink.putMetrics(record);
 
     verify(conf, httpClient, record, metric);
+  }
+
+  @Test
+  public void testDuplicateTimeSeriesNotSaved() throws Exception {
+    HadoopTimelineMetricsSink sink =
+      createMockBuilder(HadoopTimelineMetricsSink.class)
+        .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("emitMetrics").createNiceMock();
+
+    SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
+    expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    expect(conf.getParent()).andReturn(null).anyTimes();
+    expect(conf.getPrefix()).andReturn("service").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_HOST_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+
+    expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
+    // Return eviction time smaller than time diff for first 3 entries
+    // Third entry will result in eviction
+    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+
+    conf.setListDelimiter(eq(','));
+    expectLastCall().anyTimes();
+
+    expect(conf.getKeys()).andReturn(new Iterator() {
+      @Override
+      public boolean hasNext() {
+        return false;
+      }
+
+      @Override
+      public Object next() {
+        return null;
+      }
+
+      @Override
+      public void remove() {
+
+      }
+    }).once();
+
+    AbstractMetric metric = createNiceMock(AbstractMetric.class);
+    expect(metric.name()).andReturn("metricName").anyTimes();
+    expect(metric.value()).andReturn(1.0).once();
+    expect(metric.value()).andReturn(2.0).once();
+    expect(metric.value()).andReturn(3.0).once();
+    expect(metric.value()).andReturn(4.0).once();
+    expect(metric.value()).andReturn(5.0).once();
+    expect(metric.value()).andReturn(6.0).once();
 
+    MetricsRecord record = createNiceMock(MetricsRecord.class);
+    expect(record.name()).andReturn("testName").anyTimes();
+    expect(record.context()).andReturn("testContext").anyTimes();
+
+    sink.appendPrefix(eq(record), (StringBuilder) anyObject());
+    expectLastCall().anyTimes().andStubAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        return null;
+      }
+    });
+
+    final Long now = System.currentTimeMillis();
+    // TODO: Current implementation of cache needs > 1 elements to evict any
+    expect(record.timestamp()).andReturn(now).times(2);
+    expect(record.timestamp()).andReturn(now + 100l).times(2);
+    expect(record.timestamp()).andReturn(now + 200l).once();
+    expect(record.timestamp()).andReturn(now + 300l).once();
+
+    expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
+
+    final List<TimelineMetrics> capturedMetrics = new ArrayList<TimelineMetrics>();
+    sink.emitMetrics((TimelineMetrics) anyObject());
+    expectLastCall().andStubAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        capturedMetrics.add((TimelineMetrics) EasyMock.getCurrentArguments()[0]);
+        return null;
+      }
+    });
+
+    replay(conf, sink, record, metric);
+
+    sink.init(conf);
+
+    // time = t1
+    sink.putMetrics(record);
+    // time = t1
+    sink.putMetrics(record);
+    // time = t2
+    sink.putMetrics(record);
+    // Evict
+    // time = t2
+    sink.putMetrics(record);
+    // time = t3
+    sink.putMetrics(record);
+    // time = t4
+    sink.putMetrics(record);
 
+    verify(conf, sink, record, metric);
+
+    Assert.assertEquals(2, capturedMetrics.size());
+    Iterator<TimelineMetrics> metricsIterator = capturedMetrics.iterator();
+
+    // t1, t2
+    TimelineMetric timelineMetric1 = metricsIterator.next().getMetrics().get(0);
+    Assert.assertEquals(2, timelineMetric1.getMetricValues().size());
+    Iterator<Long> timestamps = timelineMetric1.getMetricValues().keySet().iterator();
+    Assert.assertEquals(now, timestamps.next());
+    Assert.assertEquals(new Long(now + 100l), timestamps.next());
+    Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
+    Assert.assertEquals(new Double(1.0), values.next());
+    Assert.assertEquals(new Double(3.0), values.next());
+    // t3, t4
+    TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
+    Assert.assertEquals(2, timelineMetric2.getMetricValues().size());
+    timestamps = timelineMetric2.getMetricValues().keySet().iterator();
+    Assert.assertEquals(new Long(now + 200l), timestamps.next());
+    Assert.assertEquals(new Long(now + 300l), timestamps.next());
+    values = timelineMetric2.getMetricValues().values().iterator();
+    Assert.assertEquals(new Double(5.0), values.next());
+    Assert.assertEquals(new Double(6.0), values.next());
   }
 }

+ 1 - 1
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -25,7 +25,7 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.util.Servers;