|
@@ -20,9 +20,20 @@ package org.apache.hadoop.yarn.event;
|
|
|
|
|
|
import java.lang.reflect.Field;
|
|
|
import java.lang.reflect.Modifier;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
+
|
|
|
+import org.apache.hadoop.metrics2.AbstractMetric;
|
|
|
+import org.apache.hadoop.metrics2.MetricsRecord;
|
|
|
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -30,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import static org.apache.hadoop.metrics2.lib.Interns.info;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.mockito.Mockito.*;
|
|
|
|
|
@@ -118,7 +130,7 @@ public class TestAsyncDispatcher {
|
|
|
}
|
|
|
|
|
|
private enum TestEnum {
|
|
|
- TestEventType
|
|
|
+ TestEventType, TestEventType2
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
@@ -230,5 +242,171 @@ public class TestAsyncDispatcher {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
+ @Test
|
|
|
+ public void testMetricsForDispatcher() throws Exception {
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ AsyncDispatcher dispatcher = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ dispatcher = new AsyncDispatcher("RM Event dispatcher");
|
|
|
+
|
|
|
+ GenericEventTypeMetrics genericEventTypeMetrics =
|
|
|
+ new GenericEventTypeMetrics.EventTypeMetricsBuilder()
|
|
|
+ .setMs(DefaultMetricsSystem.instance())
|
|
|
+ .setInfo(info("GenericEventTypeMetrics for "
|
|
|
+ + TestEnum.class.getName(),
|
|
|
+ "Metrics for " + dispatcher.getName()))
|
|
|
+ .setEnumClass(TestEnum.class)
|
|
|
+ .setEnums(TestEnum.class.getEnumConstants())
|
|
|
+ .build().registerMetrics();
|
|
|
+
|
|
|
+ // We can the metrics enabled for TestEnum
|
|
|
+ dispatcher.addMetrics(genericEventTypeMetrics,
|
|
|
+ genericEventTypeMetrics.getEnumClass());
|
|
|
+ dispatcher.init(conf);
|
|
|
+
|
|
|
+ // Register handler
|
|
|
+ dispatcher.register(TestEnum.class, new TestHandler());
|
|
|
+ dispatcher.start();
|
|
|
+
|
|
|
+ for (int i = 0; i < 3; ++i) {
|
|
|
+ Event event = mock(Event.class);
|
|
|
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
|
|
|
+ dispatcher.getEventHandler().handle(event);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 2; ++i) {
|
|
|
+ Event event = mock(Event.class);
|
|
|
+ when(event.getType()).thenReturn(TestEnum.TestEventType2);
|
|
|
+ dispatcher.getEventHandler().handle(event);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check event type count.
|
|
|
+ GenericTestUtils.waitFor(() -> genericEventTypeMetrics.
|
|
|
+ get(TestEnum.TestEventType) == 3, 1000, 10000);
|
|
|
+
|
|
|
+ GenericTestUtils.waitFor(() -> genericEventTypeMetrics.
|
|
|
+ get(TestEnum.TestEventType2) == 2, 1000, 10000);
|
|
|
+
|
|
|
+ // Check time spend.
|
|
|
+ Assert.assertTrue(genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType)
|
|
|
+ >= 1500*3);
|
|
|
+ Assert.assertTrue(genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType)
|
|
|
+ < 1500*4);
|
|
|
+
|
|
|
+ Assert.assertTrue(genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType2)
|
|
|
+ >= 1500*2);
|
|
|
+ Assert.assertTrue(genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType2)
|
|
|
+ < 1500*3);
|
|
|
+
|
|
|
+ // Make sure metrics consistent.
|
|
|
+ Assert.assertEquals(Long.toString(genericEventTypeMetrics.
|
|
|
+ get(TestEnum.TestEventType)),
|
|
|
+ genericEventTypeMetrics.
|
|
|
+ getRegistry().get("TestEventType_event_count").toString());
|
|
|
+ Assert.assertEquals(Long.toString(genericEventTypeMetrics.
|
|
|
+ get(TestEnum.TestEventType2)),
|
|
|
+ genericEventTypeMetrics.
|
|
|
+ getRegistry().get("TestEventType2_event_count").toString());
|
|
|
+ Assert.assertEquals(Long.toString(genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType)),
|
|
|
+ genericEventTypeMetrics.
|
|
|
+ getRegistry().get("TestEventType_processing_time").toString());
|
|
|
+ Assert.assertEquals(Long.toString(genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType2)),
|
|
|
+ genericEventTypeMetrics.
|
|
|
+ getRegistry().get("TestEventType2_processing_time").toString());
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ dispatcher.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDispatcherMetricsHistogram() throws Exception {
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ AsyncDispatcher dispatcher = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ dispatcher = new AsyncDispatcher("RM Event dispatcher");
|
|
|
+
|
|
|
+ GenericEventTypeMetrics genericEventTypeMetrics =
|
|
|
+ new GenericEventTypeMetrics.EventTypeMetricsBuilder()
|
|
|
+ .setMs(DefaultMetricsSystem.instance())
|
|
|
+ .setInfo(info("GenericEventTypeMetrics for "
|
|
|
+ + TestEnum.class.getName(),
|
|
|
+ "Metrics for " + dispatcher.getName()))
|
|
|
+ .setEnumClass(TestEnum.class)
|
|
|
+ .setEnums(TestEnum.class.getEnumConstants())
|
|
|
+ .build().registerMetrics();
|
|
|
+
|
|
|
+ // We can the metrics enabled for TestEnum
|
|
|
+ dispatcher.addMetrics(genericEventTypeMetrics,
|
|
|
+ genericEventTypeMetrics.getEnumClass());
|
|
|
+ dispatcher.init(conf);
|
|
|
+
|
|
|
+ // Register handler
|
|
|
+ dispatcher.register(TestEnum.class, new TestHandler());
|
|
|
+ dispatcher.start();
|
|
|
+
|
|
|
+ for (int i = 0; i < 3; ++i) {
|
|
|
+ Event event = mock(Event.class);
|
|
|
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
|
|
|
+ dispatcher.getEventHandler().handle(event);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 2; ++i) {
|
|
|
+ Event event = mock(Event.class);
|
|
|
+ when(event.getType()).thenReturn(TestEnum.TestEventType2);
|
|
|
+ dispatcher.getEventHandler().handle(event);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check event type count.
|
|
|
+ GenericTestUtils.waitFor(() -> genericEventTypeMetrics.
|
|
|
+ get(TestEnum.TestEventType) == 3, 1000, 10000);
|
|
|
+
|
|
|
+ GenericTestUtils.waitFor(() -> genericEventTypeMetrics.
|
|
|
+ get(TestEnum.TestEventType2) == 2, 1000, 10000);
|
|
|
+
|
|
|
+ // submit actual values
|
|
|
+ Map<String, Long> expectedValues = new HashMap<>();
|
|
|
+ expectedValues.put("TestEventType_event_count",
|
|
|
+ genericEventTypeMetrics.get(TestEnum.TestEventType));
|
|
|
+ expectedValues.put("TestEventType_processing_time",
|
|
|
+ genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType));
|
|
|
+ expectedValues.put("TestEventType2_event_count",
|
|
|
+ genericEventTypeMetrics.get(TestEnum.TestEventType2));
|
|
|
+ expectedValues.put("TestEventType2_processing_time",
|
|
|
+ genericEventTypeMetrics.
|
|
|
+ getTotalProcessingTime(TestEnum.TestEventType2));
|
|
|
+ Set<String> testResults = new HashSet<>();
|
|
|
|
|
|
+ MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
|
|
+ genericEventTypeMetrics.getMetrics(collector, true);
|
|
|
+
|
|
|
+ for (MetricsRecord record : collector.getRecords()) {
|
|
|
+ for (AbstractMetric metric : record.metrics()) {
|
|
|
+ String metricName = metric.name();
|
|
|
+ if (expectedValues.containsKey(metricName)) {
|
|
|
+ Long expectedValue = expectedValues.get(metricName);
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Metric " + metricName + " doesn't have expected value",
|
|
|
+ expectedValue, metric.value());
|
|
|
+ testResults.add(metricName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(expectedValues.keySet(), testResults);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ dispatcher.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|