|
|
@@ -0,0 +1,240 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.metrics2.sink.timeline;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
+import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.powermock.api.easymock.PowerMock;
|
|
|
+import org.powermock.core.classloader.annotations.PrepareForTest;
|
|
|
+import org.powermock.modules.junit4.PowerMockRunner;
|
|
|
+
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.net.HttpURLConnection;
|
|
|
+import java.net.URL;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.TreeMap;
|
|
|
+
|
|
|
+import static org.easymock.EasyMock.anyString;
|
|
|
+import static org.easymock.EasyMock.expect;
|
|
|
+import static org.powermock.api.easymock.PowerMock.expectNew;
|
|
|
+import static org.powermock.api.easymock.PowerMock.replayAll;
|
|
|
+
|
|
|
+@RunWith(PowerMockRunner.class)
|
|
|
+@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class})
|
|
|
+public class AbstractTimelineMetricSinkTest {
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testParseHostsStringIntoCollection() {
|
|
|
+ AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink();
|
|
|
+ Collection<String> hosts;
|
|
|
+
|
|
|
+ hosts = sink.parseHostsStringIntoCollection("");
|
|
|
+ Assert.assertTrue(hosts.isEmpty());
|
|
|
+
|
|
|
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local");
|
|
|
+ Assert.assertTrue(hosts.size() == 1);
|
|
|
+ Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
|
|
|
+
|
|
|
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local ");
|
|
|
+ Assert.assertTrue(hosts.size() == 1);
|
|
|
+ Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
|
|
|
+
|
|
|
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local");
|
|
|
+ Assert.assertTrue(hosts.size() == 2);
|
|
|
+
|
|
|
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, test1.456.abc.def.local");
|
|
|
+ Assert.assertTrue(hosts.size() == 2);
|
|
|
+ Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
|
|
|
+ Assert.assertTrue(hosts.contains("test1.456.abc.def.local"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class, TimelineMetric.class})
|
|
|
+ public void testEmitMetrics() throws Exception {
|
|
|
+ HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class);
|
|
|
+ URL url = PowerMock.createNiceMock(URL.class);
|
|
|
+ expectNew(URL.class, anyString()).andReturn(url).anyTimes();
|
|
|
+ expect(url.openConnection()).andReturn(connection).anyTimes();
|
|
|
+ expect(connection.getResponseCode()).andReturn(200).anyTimes();
|
|
|
+ OutputStream os = PowerMock.createNiceMock(OutputStream.class);
|
|
|
+ expect(connection.getOutputStream()).andReturn(os).anyTimes();
|
|
|
+
|
|
|
+
|
|
|
+ TestTimelineMetricsSink sink = new TestTimelineMetricsSink();
|
|
|
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
+ long startTime = System.currentTimeMillis() / 60000 * 60000;
|
|
|
+
|
|
|
+ long seconds = 1000;
|
|
|
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
|
|
|
+ /*
|
|
|
+
|
|
|
+ 0 +30s +60s
|
|
|
+ | | |
|
|
|
+ (1)(2)(3) (4)(5) (6) m1
|
|
|
+
|
|
|
+ */
|
|
|
+ // (6) should be cached, the rest - posted
|
|
|
+
|
|
|
+ metricValues.put(startTime + 4*seconds, 1.0);
|
|
|
+ metricValues.put(startTime + 14*seconds, 2.0);
|
|
|
+ metricValues.put(startTime + 24*seconds, 3.0);
|
|
|
+ metricValues.put(startTime + 34*seconds, 4.0);
|
|
|
+ metricValues.put(startTime + 44*seconds, 5.0);
|
|
|
+ metricValues.put(startTime + 64*seconds, 6.0);
|
|
|
+
|
|
|
+ TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
|
|
|
+ timelineMetric.setStartTime(metricValues.firstKey());
|
|
|
+ timelineMetric.addMetricValues(metricValues);
|
|
|
+
|
|
|
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
|
|
|
+
|
|
|
+ replayAll();
|
|
|
+ sink.emitMetrics(timelineMetrics);
|
|
|
+ Assert.assertEquals(1, sink.getMetricsPostCache().size());
|
|
|
+ metricValues = new TreeMap<>();
|
|
|
+ metricValues.put(startTime + 64*seconds, 6.0);
|
|
|
+ Assert.assertEquals(metricValues, sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());
|
|
|
+
|
|
|
+ timelineMetrics = new TimelineMetrics();
|
|
|
+ metricValues = new TreeMap<>();
|
|
|
+ /*
|
|
|
+
|
|
|
+ +60 +90s +120s +150s +180s
|
|
|
+ | | | | |
|
|
|
+ (7) (8) (9) (10) (11) m1
|
|
|
+
|
|
|
+ */
|
|
|
+ // (6) from previous post should be merged with current data
|
|
|
+ // (6),(7),(8),(9),(10) - should be posted, (11) - cached
|
|
|
+ metricValues.put(startTime + 74*seconds, 7.0);
|
|
|
+ metricValues.put(startTime + 94*seconds, 8.0);
|
|
|
+ metricValues.put(startTime + 124*seconds, 9.0);
|
|
|
+ metricValues.put(startTime + 154*seconds, 10.0);
|
|
|
+ metricValues.put(startTime + 184*seconds, 11.0);
|
|
|
+
|
|
|
+ timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
|
|
|
+ timelineMetric.setStartTime(metricValues.firstKey());
|
|
|
+ timelineMetric.addMetricValues(metricValues);
|
|
|
+
|
|
|
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
|
|
|
+ sink.emitMetrics(timelineMetrics);
|
|
|
+
|
|
|
+ Assert.assertEquals(1, sink.getMetricsPostCache().size());
|
|
|
+ metricValues = new TreeMap<>();
|
|
|
+ metricValues.put(startTime + 184*seconds, 11.0);
|
|
|
+ Assert.assertEquals(metricValues, sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());timelineMetrics = new TimelineMetrics();
|
|
|
+
|
|
|
+ metricValues = new TreeMap<>();
|
|
|
+ /*
|
|
|
+
|
|
|
+ +180s +210s +240s
|
|
|
+ | | |
|
|
|
+ (12) (13)
|
|
|
+
|
|
|
+ */
|
|
|
+ // (11) from previous post should be merged with current data
|
|
|
+ // (11),(12),(13) - should be posted, cache should be empty
|
|
|
+ metricValues.put(startTime + 194*seconds, 12.0);
|
|
|
+ metricValues.put(startTime + 239*seconds, 13.0);
|
|
|
+
|
|
|
+ timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
|
|
|
+ timelineMetric.setStartTime(metricValues.firstKey());
|
|
|
+ timelineMetric.addMetricValues(metricValues);
|
|
|
+
|
|
|
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
|
|
|
+ sink.emitMetrics(timelineMetrics);
|
|
|
+
|
|
|
+ Assert.assertEquals(0, sink.getMetricsPostCache().size());
|
|
|
+
|
|
|
+ metricValues = new TreeMap<>();
|
|
|
+ /*
|
|
|
+
|
|
|
+ +240s +270s +300s +330s
|
|
|
+ | | | |
|
|
|
+ (14) (15) (16)
|
|
|
+
|
|
|
+ */
|
|
|
+ // since postAllCachedMetrics in emitMetrics call is true (14),(15),(16) - should be posted, cache should be empty
|
|
|
+ metricValues.put(startTime + 245*seconds, 14.0);
|
|
|
+ metricValues.put(startTime + 294*seconds, 15.0);
|
|
|
+ metricValues.put(startTime + 315*seconds, 16.0);
|
|
|
+
|
|
|
+ timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
|
|
|
+ timelineMetric.setStartTime(metricValues.firstKey());
|
|
|
+ timelineMetric.addMetricValues(metricValues);
|
|
|
+
|
|
|
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
|
|
|
+ sink.emitMetrics(timelineMetrics, true);
|
|
|
+
|
|
|
+ Assert.assertEquals(0, sink.getMetricsPostCache().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
|
|
|
+ @Override
|
|
|
+ protected String getCollectorUri(String host) {
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected String getCollectorProtocol() {
|
|
|
+ return "http";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected String getCollectorPort() {
|
|
|
+ return "2181";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected int getTimeoutSeconds() {
|
|
|
+ return 10;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected String getZookeeperQuorum() {
|
|
|
+ return "localhost:2181";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Collection<String> getConfiguredCollectorHosts() {
|
|
|
+ return Arrays.asList("localhost");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected String getHostname() {
|
|
|
+ return "h1";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected boolean isHostInMemoryAggregationEnabled() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected int getHostInMemoryAggregationPort() {
|
|
|
+ return 61888;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected String getHostInMemoryAggregationProtocol() {
|
|
|
+ return "http";
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|