|
|
@@ -18,17 +18,21 @@
|
|
|
|
|
|
package org.apache.hadoop.metrics2.sink.flume;
|
|
|
|
|
|
+import static org.easymock.EasyMock.anyObject;
|
|
|
import static org.easymock.EasyMock.anyString;
|
|
|
import static org.easymock.EasyMock.createNiceMock;
|
|
|
+import static org.easymock.EasyMock.eq;
|
|
|
import static org.easymock.EasyMock.expect;
|
|
|
import static org.powermock.api.easymock.PowerMock.mockStatic;
|
|
|
import static org.powermock.api.easymock.PowerMock.replay;
|
|
|
-import static org.powermock.api.easymock.PowerMock.replayAll;
|
|
|
import static org.powermock.api.easymock.PowerMock.resetAll;
|
|
|
import static org.powermock.api.easymock.PowerMock.verifyAll;
|
|
|
|
|
|
import java.net.InetAddress;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.flume.Context;
|
|
|
import org.apache.flume.instrumentation.util.JMXPollUtil;
|
|
|
@@ -43,7 +47,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
|
|
|
import org.powermock.modules.junit4.PowerMockRunner;
|
|
|
|
|
|
@RunWith(PowerMockRunner.class)
|
|
|
-@PrepareForTest(JMXPollUtil.class)
|
|
|
+@PrepareForTest({JMXPollUtil.class, Executors.class, FlumeTimelineMetricsSink.class})
|
|
|
public class FlumeTimelineMetricsSinkTest {
|
|
|
@Test
|
|
|
public void testNonNumericMetricMetricExclusion() throws InterruptedException {
|
|
|
@@ -76,7 +80,7 @@ public class FlumeTimelineMetricsSinkTest {
|
|
|
flumeTimelineMetricsSink.setMetricsCaches(Collections.singletonMap("SINK",timelineMetricsCache));
|
|
|
EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1"))
|
|
|
.andReturn(new TimelineMetric()).once();
|
|
|
- timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class));
|
|
|
+ timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
|
|
|
EasyMock.expectLastCall().once();
|
|
|
return timelineMetricsCache;
|
|
|
}
|
|
|
@@ -86,15 +90,18 @@ public class FlumeTimelineMetricsSinkTest {
|
|
|
FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
|
|
|
TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
|
|
|
flumeTimelineMetricsSink.setPollFrequency(1);
|
|
|
- mockStatic(JMXPollUtil.class);
|
|
|
- EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
|
|
|
- Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once();
|
|
|
- flumeTimelineMetricsSink.start();
|
|
|
- flumeTimelineMetricsSink.stop();
|
|
|
- replay(JMXPollUtil.class, timelineMetricsCache);
|
|
|
+ mockStatic(Executors.class);
|
|
|
+ ScheduledExecutorService executor = createNiceMock(ScheduledExecutorService.class);
|
|
|
+ expect(Executors.newSingleThreadScheduledExecutor()).andReturn(executor);
|
|
|
+ FlumeTimelineMetricsSink.TimelineMetricsCollector collector = anyObject();
|
|
|
+ TimeUnit unit = anyObject();
|
|
|
+ expect(executor.scheduleWithFixedDelay(collector, eq(0), eq(1), unit)).andReturn(null);
|
|
|
+ executor.shutdown();
|
|
|
+ replay(timelineMetricsCache, Executors.class, executor);
|
|
|
+
|
|
|
flumeTimelineMetricsSink.start();
|
|
|
- Thread.sleep(5);
|
|
|
flumeTimelineMetricsSink.stop();
|
|
|
+
|
|
|
verifyAll();
|
|
|
}
|
|
|
|