|
@@ -56,6 +56,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
|
|
|
/**
|
|
|
* Test the MetricsSystemImpl class
|
|
@@ -80,7 +81,7 @@ public class TestMetricsSystemImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test public void testInitFirst() throws Exception {
|
|
|
+ @Test public void testInitFirstVerifyStopInvokedImmediately() throws Exception {
|
|
|
new ConfigBuilder().add("*.period", 8)
|
|
|
//.add("test.sink.plugin.urls", getPluginUrlsAsString())
|
|
|
.add("test.sink.test.class", TestSink.class.getName())
|
|
@@ -106,14 +107,61 @@ public class TestMetricsSystemImpl {
|
|
|
ms.stop();
|
|
|
ms.shutdown();
|
|
|
|
|
|
- verify(sink1, times(2)).putMetrics(r1.capture());
|
|
|
+ //When we call stop, at most two sources will be consumed by each sink thread.
|
|
|
+ verify(sink1, atMost(2)).putMetrics(r1.capture());
|
|
|
+ List<MetricsRecord> mr1 = r1.getAllValues();
|
|
|
+ verify(sink2, atMost(2)).putMetrics(r2.capture());
|
|
|
+ List<MetricsRecord> mr2 = r2.getAllValues();
|
|
|
+ if (mr1.size() != 0 && mr2.size() != 0) {
|
|
|
+ checkMetricsRecords(mr1);
|
|
|
+ assertEquals("output", mr1, mr2);
|
|
|
+ } else if (mr1.size() != 0) {
|
|
|
+ checkMetricsRecords(mr1);
|
|
|
+ } else if (mr2.size() != 0) {
|
|
|
+ checkMetricsRecords(mr2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test public void testInitFirstVerifyCallBacks() throws Exception {
|
|
|
+ DefaultMetricsSystem.shutdown();
|
|
|
+ new ConfigBuilder().add("*.period", 8)
|
|
|
+ //.add("test.sink.plugin.urls", getPluginUrlsAsString())
|
|
|
+ .add("test.sink.test.class", TestSink.class.getName())
|
|
|
+ .add("test.*.source.filter.exclude", "s0")
|
|
|
+ .add("test.source.s1.metric.filter.exclude", "X*")
|
|
|
+ .add("test.sink.sink1.metric.filter.exclude", "Y*")
|
|
|
+ .add("test.sink.sink2.metric.filter.exclude", "Y*")
|
|
|
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
|
|
|
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
|
|
|
+ ms.start();
|
|
|
+ ms.register("s0", "s0 desc", new TestSource("s0rec"));
|
|
|
+ TestSource s1 = ms.register("s1", "s1 desc", new TestSource("s1rec"));
|
|
|
+ s1.c1.incr();
|
|
|
+ s1.xxx.incr();
|
|
|
+ s1.g1.set(2);
|
|
|
+ s1.yyy.incr(2);
|
|
|
+ s1.s1.add(0);
|
|
|
+ MetricsSink sink1 = mock(MetricsSink.class);
|
|
|
+ MetricsSink sink2 = mock(MetricsSink.class);
|
|
|
+ ms.registerSink("sink1", "sink1 desc", sink1);
|
|
|
+ ms.registerSink("sink2", "sink2 desc", sink2);
|
|
|
+ ms.publishMetricsNow(); // publish the metrics
|
|
|
+
|
|
|
+ try {
|
|
|
+ verify(sink1, timeout(200).times(2)).putMetrics(r1.capture());
|
|
|
+ verify(sink2, timeout(200).times(2)).putMetrics(r2.capture());
|
|
|
+ } finally {
|
|
|
+ ms.stop();
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
+ //When we call stop, at most two sources will be consumed by each sink thread.
|
|
|
List<MetricsRecord> mr1 = r1.getAllValues();
|
|
|
- verify(sink2, times(2)).putMetrics(r2.capture());
|
|
|
List<MetricsRecord> mr2 = r2.getAllValues();
|
|
|
checkMetricsRecords(mr1);
|
|
|
assertEquals("output", mr1, mr2);
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
+
|
|
|
@Test public void testMultiThreadedPublish() throws Exception {
|
|
|
new ConfigBuilder().add("*.period", 80)
|
|
|
.add("test.sink.Collector.queue.capacity", "20")
|