|
@@ -18,7 +18,11 @@
|
|
|
|
|
|
package org.apache.hadoop.metrics2.impl;
|
|
package org.apache.hadoop.metrics2.impl;
|
|
|
|
|
|
-import java.util.List;
|
|
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
+import java.util.concurrent.atomic.*;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Nullable;
|
|
|
|
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
import org.junit.runner.RunWith;
|
|
import org.junit.runner.RunWith;
|
|
@@ -26,9 +30,11 @@ import org.junit.runner.RunWith;
|
|
import org.mockito.ArgumentCaptor;
|
|
import org.mockito.ArgumentCaptor;
|
|
import org.mockito.Captor;
|
|
import org.mockito.Captor;
|
|
import org.mockito.runners.MockitoJUnitRunner;
|
|
import org.mockito.runners.MockitoJUnitRunner;
|
|
|
|
+
|
|
import static org.junit.Assert.*;
|
|
import static org.junit.Assert.*;
|
|
import static org.mockito.Mockito.*;
|
|
import static org.mockito.Mockito.*;
|
|
|
|
|
|
|
|
+import com.google.common.base.Predicate;
|
|
import com.google.common.collect.Iterables;
|
|
import com.google.common.collect.Iterables;
|
|
|
|
|
|
import org.apache.commons.configuration.SubsetConfiguration;
|
|
import org.apache.commons.configuration.SubsetConfiguration;
|
|
@@ -36,6 +42,8 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.metrics2.MetricsException;
|
|
import org.apache.hadoop.metrics2.MetricsException;
|
|
import static org.apache.hadoop.test.MoreAsserts.*;
|
|
import static org.apache.hadoop.test.MoreAsserts.*;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.metrics2.AbstractMetric;
|
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
|
import org.apache.hadoop.metrics2.MetricsSink;
|
|
import org.apache.hadoop.metrics2.MetricsSink;
|
|
import org.apache.hadoop.metrics2.MetricsSource;
|
|
import org.apache.hadoop.metrics2.MetricsSource;
|
|
@@ -47,6 +55,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Test the MetricsSystemImpl class
|
|
* Test the MetricsSystemImpl class
|
|
@@ -72,7 +81,7 @@ public class TestMetricsSystemImpl {
|
|
}
|
|
}
|
|
|
|
|
|
@Test public void testInitFirst() throws Exception {
|
|
@Test public void testInitFirst() throws Exception {
|
|
- ConfigBuilder cb = new ConfigBuilder().add("*.period", 8)
|
|
|
|
|
|
+ new ConfigBuilder().add("*.period", 8)
|
|
//.add("test.sink.plugin.urls", getPluginUrlsAsString())
|
|
//.add("test.sink.plugin.urls", getPluginUrlsAsString())
|
|
.add("test.sink.test.class", TestSink.class.getName())
|
|
.add("test.sink.test.class", TestSink.class.getName())
|
|
.add("test.*.source.filter.exclude", "s0")
|
|
.add("test.*.source.filter.exclude", "s0")
|
|
@@ -93,8 +102,9 @@ public class TestMetricsSystemImpl {
|
|
MetricsSink sink2 = mock(MetricsSink.class);
|
|
MetricsSink sink2 = mock(MetricsSink.class);
|
|
ms.registerSink("sink1", "sink1 desc", sink1);
|
|
ms.registerSink("sink1", "sink1 desc", sink1);
|
|
ms.registerSink("sink2", "sink2 desc", sink2);
|
|
ms.registerSink("sink2", "sink2 desc", sink2);
|
|
- ms.onTimerEvent(); // trigger something interesting
|
|
|
|
|
|
+ ms.publishMetricsNow(); // publish the metrics
|
|
ms.stop();
|
|
ms.stop();
|
|
|
|
+ ms.shutdown();
|
|
|
|
|
|
verify(sink1, times(2)).putMetrics(r1.capture());
|
|
verify(sink1, times(2)).putMetrics(r1.capture());
|
|
List<MetricsRecord> mr1 = r1.getAllValues();
|
|
List<MetricsRecord> mr1 = r1.getAllValues();
|
|
@@ -104,6 +114,177 @@ public class TestMetricsSystemImpl {
|
|
assertEquals("output", mr1, mr2);
|
|
assertEquals("output", mr1, mr2);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test public void testMultiThreadedPublish() throws Exception {
|
|
|
|
+ new ConfigBuilder().add("*.period", 80)
|
|
|
|
+ .add("test.sink.Collector.queue.capacity", "20")
|
|
|
|
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
|
|
|
|
+ final MetricsSystemImpl ms = new MetricsSystemImpl("Test");
|
|
|
|
+ ms.start();
|
|
|
|
+ final int numThreads = 10;
|
|
|
|
+ final CollectingSink sink = new CollectingSink(numThreads);
|
|
|
|
+ ms.registerSink("Collector",
|
|
|
|
+ "Collector of values from all threads.", sink);
|
|
|
|
+ final TestSource[] sources = new TestSource[numThreads];
|
|
|
|
+ final Thread[] threads = new Thread[numThreads];
|
|
|
|
+ final String[] results = new String[numThreads];
|
|
|
|
+ final CyclicBarrier barrier1 = new CyclicBarrier(numThreads),
|
|
|
|
+ barrier2 = new CyclicBarrier(numThreads);
|
|
|
|
+ for (int i = 0; i < numThreads; i++) {
|
|
|
|
+ sources[i] = ms.register("threadSource" + i,
|
|
|
|
+ "A source of my threaded goodness.",
|
|
|
|
+ new TestSource("threadSourceRec" + i));
|
|
|
|
+ threads[i] = new Thread(new Runnable() {
|
|
|
|
+ private boolean safeAwait(int mySource, CyclicBarrier barrier) {
|
|
|
|
+ try {
|
|
|
|
+ barrier1.await(2, TimeUnit.SECONDS);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ results[mySource] = "Interrupted";
|
|
|
|
+ return false;
|
|
|
|
+ } catch (BrokenBarrierException e) {
|
|
|
|
+ results[mySource] = "Broken Barrier";
|
|
|
|
+ return false;
|
|
|
|
+ } catch (TimeoutException e) {
|
|
|
|
+ results[mySource] = "Timed out on barrier";
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ int mySource = Integer.parseInt(Thread.currentThread().getName());
|
|
|
|
+ if (sink.collected[mySource].get() != 0L) {
|
|
|
|
+ results[mySource] = "Someone else collected my metric!";
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // Wait for all the threads to come here so we can hammer
|
|
|
|
+ // the system at the same time
|
|
|
|
+ if (!safeAwait(mySource, barrier1)) return;
|
|
|
|
+ sources[mySource].g1.set(230);
|
|
|
|
+ ms.publishMetricsNow();
|
|
|
|
+ // Since some other thread may have snatched my metric,
|
|
|
|
+ // I need to wait for the threads to finish before checking.
|
|
|
|
+ if (!safeAwait(mySource, barrier2)) return;
|
|
|
|
+ if (sink.collected[mySource].get() != 230L) {
|
|
|
|
+ results[mySource] = "Metric not collected!";
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ results[mySource] = "Passed";
|
|
|
|
+ }
|
|
|
|
+ }, "" + i);
|
|
|
|
+ }
|
|
|
|
+ for (Thread t : threads)
|
|
|
|
+ t.start();
|
|
|
|
+ for (Thread t : threads)
|
|
|
|
+ t.join();
|
|
|
|
+ assertEquals(0L, ms.droppedPubAll.value());
|
|
|
|
+ assertTrue(StringUtils.join("\n", Arrays.asList(results)),
|
|
|
|
+ Iterables.all(Arrays.asList(results), new Predicate<String>() {
|
|
|
|
+ @Override
|
|
|
|
+ public boolean apply(@Nullable String input) {
|
|
|
|
+ return input.equalsIgnoreCase("Passed");
|
|
|
|
+ }
|
|
|
|
+ }));
|
|
|
|
+ ms.stop();
|
|
|
|
+ ms.shutdown();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class CollectingSink implements MetricsSink {
|
|
|
|
+ private final AtomicLong[] collected;
|
|
|
|
+
|
|
|
|
+ public CollectingSink(int capacity) {
|
|
|
|
+ collected = new AtomicLong[capacity];
|
|
|
|
+ for (int i = 0; i < capacity; i++) {
|
|
|
|
+ collected[i] = new AtomicLong();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void init(SubsetConfiguration conf) {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void putMetrics(MetricsRecord record) {
|
|
|
|
+ final String prefix = "threadSourceRec";
|
|
|
|
+ if (record.name().startsWith(prefix)) {
|
|
|
|
+ final int recordNumber = Integer.parseInt(
|
|
|
|
+ record.name().substring(prefix.length()));
|
|
|
|
+ ArrayList<String> names = new ArrayList<String>();
|
|
|
|
+ for (AbstractMetric m : record.metrics()) {
|
|
|
|
+ if (m.name().equalsIgnoreCase("g1")) {
|
|
|
|
+ collected[recordNumber].set(m.value().longValue());
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ names.add(m.name());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void flush() {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test public void testHangingSink() {
|
|
|
|
+ new ConfigBuilder().add("*.period", 8)
|
|
|
|
+ .add("test.sink.test.class", TestSink.class.getName())
|
|
|
|
+ .add("test.sink.hanging.retry.delay", "1")
|
|
|
|
+ .add("test.sink.hanging.retry.backoff", "1.01")
|
|
|
|
+ .add("test.sink.hanging.retry.count", "0")
|
|
|
|
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
|
|
|
|
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
|
|
|
|
+ ms.start();
|
|
|
|
+ TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec"));
|
|
|
|
+ s.c1.incr();
|
|
|
|
+ HangingSink hanging = new HangingSink();
|
|
|
|
+ ms.registerSink("hanging", "Hang the sink!", hanging);
|
|
|
|
+ ms.publishMetricsNow();
|
|
|
|
+ assertEquals(1L, ms.droppedPubAll.value());
|
|
|
|
+ assertFalse(hanging.getInterrupted());
|
|
|
|
+ ms.stop();
|
|
|
|
+ ms.shutdown();
|
|
|
|
+ assertTrue(hanging.getInterrupted());
|
|
|
|
+ assertTrue("The sink didn't get called after its first hang " +
|
|
|
|
+ "for subsequent records.", hanging.getGotCalledSecondTime());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class HangingSink implements MetricsSink {
|
|
|
|
+ private volatile boolean interrupted;
|
|
|
|
+ private boolean gotCalledSecondTime;
|
|
|
|
+ private boolean firstTime = true;
|
|
|
|
+
|
|
|
|
+ public boolean getGotCalledSecondTime() {
|
|
|
|
+ return gotCalledSecondTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public boolean getInterrupted() {
|
|
|
|
+ return interrupted;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void init(SubsetConfiguration conf) {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void putMetrics(MetricsRecord record) {
|
|
|
|
+ // No need to hang every time, just the first record.
|
|
|
|
+ if (!firstTime) {
|
|
|
|
+ gotCalledSecondTime = true;
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ firstTime = false;
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(10 * 1000);
|
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
|
+ interrupted = true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void flush() {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test public void testRegisterDups() {
|
|
@Test public void testRegisterDups() {
|
|
MetricsSystem ms = new MetricsSystemImpl();
|
|
MetricsSystem ms = new MetricsSystemImpl();
|
|
TestSource ts1 = new TestSource("ts1");
|
|
TestSource ts1 = new TestSource("ts1");
|
|
@@ -116,6 +297,7 @@ public class TestMetricsSystemImpl {
|
|
MetricsSource s2 = ms.getSource("ts1");
|
|
MetricsSource s2 = ms.getSource("ts1");
|
|
assertNotNull(s2);
|
|
assertNotNull(s2);
|
|
assertNotSame(s1, s2);
|
|
assertNotSame(s1, s2);
|
|
|
|
+ ms.shutdown();
|
|
}
|
|
}
|
|
|
|
|
|
@Test(expected=MetricsException.class) public void testRegisterDupError() {
|
|
@Test(expected=MetricsException.class) public void testRegisterDupError() {
|