|
@@ -19,9 +19,7 @@
|
|
package org.apache.hadoop.metrics2.lib;
|
|
package org.apache.hadoop.metrics2.lib;
|
|
|
|
|
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
|
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
|
|
|
-import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
|
|
|
-import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
|
|
|
|
|
|
+import static org.apache.hadoop.test.MetricsAsserts.*;
|
|
import static org.mockito.AdditionalMatchers.eq;
|
|
import static org.mockito.AdditionalMatchers.eq;
|
|
import static org.mockito.AdditionalMatchers.geq;
|
|
import static org.mockito.AdditionalMatchers.geq;
|
|
import static org.mockito.AdditionalMatchers.leq;
|
|
import static org.mockito.AdditionalMatchers.leq;
|
|
@@ -29,10 +27,15 @@ import static org.mockito.Matchers.anyLong;
|
|
import static org.mockito.Matchers.eq;
|
|
import static org.mockito.Matchers.eq;
|
|
import static org.mockito.Mockito.times;
|
|
import static org.mockito.Mockito.times;
|
|
import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.verify;
|
|
|
|
+import static org.junit.Assert.*;
|
|
|
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
|
|
+import java.util.Random;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
import org.apache.hadoop.metrics2.util.Quantile;
|
|
import org.apache.hadoop.metrics2.util.Quantile;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
@@ -42,6 +45,7 @@ import org.junit.Test;
|
|
*/
|
|
*/
|
|
public class TestMutableMetrics {
|
|
public class TestMutableMetrics {
|
|
|
|
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestMutableMetrics.class);
|
|
private final double EPSILON = 1e-42;
|
|
private final double EPSILON = 1e-42;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -129,6 +133,144 @@ public class TestMutableMetrics {
|
|
assertGauge("BarAvgTime", 0.0, rb);
|
|
assertGauge("BarAvgTime", 0.0, rb);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test public void testMutableRatesWithAggregationInit() {
|
|
|
|
+ MetricsRecordBuilder rb = mockMetricsRecordBuilder();
|
|
|
|
+ MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
|
|
|
|
+
|
|
|
|
+ rates.init(TestProtocol.class);
|
|
|
|
+ rates.snapshot(rb, false);
|
|
|
|
+
|
|
|
|
+ assertCounter("FooNumOps", 0L, rb);
|
|
|
|
+ assertGauge("FooAvgTime", 0.0, rb);
|
|
|
|
+ assertCounter("BarNumOps", 0L, rb);
|
|
|
|
+ assertGauge("BarAvgTime", 0.0, rb);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test public void testMutableRatesWithAggregationSingleThread() {
|
|
|
|
+ MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
|
|
|
|
+
|
|
|
|
+ rates.add("foo", 1);
|
|
|
|
+ rates.add("bar", 5);
|
|
|
|
+
|
|
|
|
+ MetricsRecordBuilder rb = mockMetricsRecordBuilder();
|
|
|
|
+ rates.snapshot(rb, false);
|
|
|
|
+ assertCounter("FooNumOps", 1L, rb);
|
|
|
|
+ assertGauge("FooAvgTime", 1.0, rb);
|
|
|
|
+ assertCounter("BarNumOps", 1L, rb);
|
|
|
|
+ assertGauge("BarAvgTime", 5.0, rb);
|
|
|
|
+
|
|
|
|
+ rates.add("foo", 1);
|
|
|
|
+ rates.add("foo", 3);
|
|
|
|
+ rates.add("bar", 6);
|
|
|
|
+
|
|
|
|
+ rb = mockMetricsRecordBuilder();
|
|
|
|
+ rates.snapshot(rb, false);
|
|
|
|
+ assertCounter("FooNumOps", 3L, rb);
|
|
|
|
+ assertGauge("FooAvgTime", 2.0, rb);
|
|
|
|
+ assertCounter("BarNumOps", 2L, rb);
|
|
|
|
+ assertGauge("BarAvgTime", 6.0, rb);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test public void testMutableRatesWithAggregationManyThreads()
|
|
|
|
+ throws InterruptedException {
|
|
|
|
+ final MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
|
|
|
|
+
|
|
|
|
+ final int n = 10;
|
|
|
|
+ long[] opCount = new long[n];
|
|
|
|
+ double[] opTotalTime = new double[n];
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < n; i++) {
|
|
|
|
+ opCount[i] = 0;
|
|
|
|
+ opTotalTime[i] = 0;
|
|
|
|
+ // Initialize so that the getLongCounter() method doesn't complain
|
|
|
|
+ rates.add("metric" + i, 0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Thread[] threads = new Thread[n];
|
|
|
|
+ final CountDownLatch firstAddsFinished = new CountDownLatch(threads.length);
|
|
|
|
+ final CountDownLatch firstSnapshotsFinished = new CountDownLatch(1);
|
|
|
|
+ final CountDownLatch secondAddsFinished =
|
|
|
|
+ new CountDownLatch(threads.length);
|
|
|
|
+ final CountDownLatch secondSnapshotsFinished = new CountDownLatch(1);
|
|
|
|
+ long seed = new Random().nextLong();
|
|
|
|
+ LOG.info("Random seed = " + seed);
|
|
|
|
+ final Random sleepRandom = new Random(seed);
|
|
|
|
+ for (int tIdx = 0; tIdx < threads.length; tIdx++) {
|
|
|
|
+ final int threadIdx = tIdx;
|
|
|
|
+ threads[threadIdx] = new Thread() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < 1000; i++) {
|
|
|
|
+ rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2);
|
|
|
|
+ // Sleep so additions can be interleaved with snapshots
|
|
|
|
+ Thread.sleep(sleepRandom.nextInt(5));
|
|
|
|
+ }
|
|
|
|
+ firstAddsFinished.countDown();
|
|
|
|
+
|
|
|
|
+ // Make sure all threads stay alive long enough for the first
|
|
|
|
+ // snapshot to complete; else their metrics may be lost to GC
|
|
|
|
+ firstSnapshotsFinished.await();
|
|
|
|
+
|
|
|
|
+ // Let half the threads continue with more metrics and let half die
|
|
|
|
+ if (threadIdx % 2 == 0) {
|
|
|
|
+ for (int i = 0; i < 1000; i++) {
|
|
|
|
+ rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2);
|
|
|
|
+ }
|
|
|
|
+ secondAddsFinished.countDown();
|
|
|
|
+ secondSnapshotsFinished.await();
|
|
|
|
+ } else {
|
|
|
|
+ secondAddsFinished.countDown();
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ // Ignore
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+ for (Thread t : threads) {
|
|
|
|
+ t.start();
|
|
|
|
+ }
|
|
|
|
+ // Snapshot concurrently with additions but aggregate the totals into
|
|
|
|
+ // opCount / opTotalTime
|
|
|
|
+ for (int i = 0; i < 100; i++) {
|
|
|
|
+ snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
|
|
|
|
+ Thread.sleep(sleepRandom.nextInt(20));
|
|
|
|
+ }
|
|
|
|
+ firstAddsFinished.await();
|
|
|
|
+ // Final snapshot to grab any remaining metrics and then verify that
|
|
|
|
+ // the totals are as expected
|
|
|
|
+ snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
|
|
|
|
+ for (int i = 0; i < n; i++) {
|
|
|
|
+ assertEquals("metric" + i + " count", 1001, opCount[i]);
|
|
|
|
+ assertEquals("metric" + i + " total", 1500, opTotalTime[i], 1.0);
|
|
|
|
+ }
|
|
|
|
+ firstSnapshotsFinished.countDown();
|
|
|
|
+
|
|
|
|
+ // After half of the threads die, ensure that the remaining ones still
|
|
|
|
+ // add metrics correctly and that snapshot occurs correctly
|
|
|
|
+ secondAddsFinished.await();
|
|
|
|
+ snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
|
|
|
|
+ for (int i = 0; i < n; i++) {
|
|
|
|
+ assertEquals("metric" + i + " count", 1501, opCount[i]);
|
|
|
|
+ assertEquals("metric" + i + " total", 2250, opTotalTime[i], 1.0);
|
|
|
|
+ }
|
|
|
|
+ secondSnapshotsFinished.countDown();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void snapshotMutableRatesWithAggregation(
|
|
|
|
+ MutableRatesWithAggregation rates, long[] opCount, double[] opTotalTime) {
|
|
|
|
+ MetricsRecordBuilder rb = mockMetricsRecordBuilder();
|
|
|
|
+ rates.snapshot(rb, true);
|
|
|
|
+ for (int i = 0; i < opCount.length; i++) {
|
|
|
|
+ long prevOpCount = opCount[i];
|
|
|
|
+ long newOpCount = getLongCounter("Metric" + i + "NumOps", rb);
|
|
|
|
+ opCount[i] = newOpCount;
|
|
|
|
+ double avgTime = getDoubleGauge("Metric" + i + "AvgTime", rb);
|
|
|
|
+ opTotalTime[i] += avgTime * (newOpCount - prevOpCount);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Tests that when using {@link MutableStat#add(long, long)}, even with a high
|
|
* Tests that when using {@link MutableStat#add(long, long)}, even with a high
|
|
* sample count, the mean does not lose accuracy.
|
|
* sample count, the mean does not lose accuracy.
|