Browse Source

HADOOP-8541. Better high-percentile latency metrics. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1360501 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 years ago
parent
commit
b5b0ac64a1

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -252,6 +252,8 @@ Branch-2 ( Unreleased changes )
     EOFException on Snappy or LZO block-compressed data
     (todd via harsh)
 
+    HADOOP-8541. Better high-percentile latency metrics. (Andrew Wang via atm)
+
   BUG FIXES
 
     HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname

+ 9 - 0
hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml

@@ -295,4 +295,13 @@
       <Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
     </Match>
 
+    <!--
+       Manually checked, misses child thread manually syncing on parent's intrinsic lock.
+    -->
+     <Match>
+       <Class name="org.apache.hadoop.metrics2.lib.MutableQuantiles" />
+       <Field name="previousSnapshot" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
+
  </FindBugsFilter>

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java

@@ -180,6 +180,24 @@ public class MetricsRegistry {
     return ret;
   }
 
+  /**
+   * Create a mutable metric that estimates quantiles of a stream of values
+   * @param name of the metric
+   * @param desc metric description
+   * @param sampleName of the metric (e.g., "Ops")
+   * @param valueName of the metric (e.g., "Time" or "Latency")
+   * @param interval rollover interval of estimator in seconds
+   * @return a new quantile estimator object
+   */
+  public synchronized MutableQuantiles newQuantiles(String name, String desc,
+      String sampleName, String valueName, int interval) {
+    checkMetricName(name);
+    MutableQuantiles ret = 
+        new MutableQuantiles(name, desc, sampleName, valueName, interval);
+    metricsMap.put(name, ret);
+    return ret;
+  }
+  
   /**
    * Create a mutable metric with stats
    * @param name  of the metric

+ 165 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java

@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.apache.hadoop.metrics2.util.SampleQuantiles;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Watches a stream of long values, maintaining online estimates of specific
+ * quantiles with provably low error bounds. This is particularly useful for
+ * accurate high-percentile (e.g. 95th, 99th) latency metrics.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MutableQuantiles extends MutableMetric {
+
+  static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
+      new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
+      new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
+
+  private final MetricsInfo numInfo;
+  private final MetricsInfo[] quantileInfos;
+  private final int interval;
+
+  private SampleQuantiles estimator;
+  private long previousCount = 0;
+
+  @VisibleForTesting
+  protected Map<Quantile, Long> previousSnapshot = null;
+
+  private final ScheduledExecutorService scheduler = Executors
+      .newScheduledThreadPool(1);
+
+  /**
+   * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself
+   * over on the specified time interval.
+   * 
+   * @param name
+   *          of the metric
+   * @param description
+   *          long-form textual description of the metric
+   * @param sampleName
+   *          type of items in the stream (e.g., "Ops")
+   * @param valueName
+   *          type of the values
+   * @param interval
+   *          rollover interval (in seconds) of the estimator
+   */
+  public MutableQuantiles(String name, String description, String sampleName,
+      String valueName, int interval) {
+    String ucName = StringUtils.capitalize(name);
+    String usName = StringUtils.capitalize(sampleName);
+    String uvName = StringUtils.capitalize(valueName);
+    String desc = StringUtils.uncapitalize(description);
+    String lsName = StringUtils.uncapitalize(sampleName);
+    String lvName = StringUtils.uncapitalize(valueName);
+
+    numInfo = info(ucName + "Num" + usName, String.format(
+        "Number of %s for %s with %ds interval", lsName, desc, interval));
+    // Construct the MetricsInfos for the quantiles, converting to percentiles
+    quantileInfos = new MetricsInfo[quantiles.length];
+    String nameTemplate = ucName + "%dthPercentile" + interval + "sInterval"
+        + uvName;
+    String descTemplate = "%d percentile " + lvName + " with " + interval
+        + " second interval for " + desc;
+    for (int i = 0; i < quantiles.length; i++) {
+      int percentile = (int) (100 * quantiles[i].quantile);
+      quantileInfos[i] = info(String.format(nameTemplate, percentile),
+          String.format(descTemplate, percentile));
+    }
+
+    estimator = new SampleQuantiles(quantiles);
+
+    this.interval = interval;
+    scheduler.scheduleAtFixedRate(new RolloverSample(this), interval, interval,
+        TimeUnit.SECONDS);
+  }
+
+  @Override
+  public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
+    if (all || changed()) {
+      builder.addGauge(numInfo, previousCount);
+      for (int i = 0; i < quantiles.length; i++) {
+        long newValue = 0;
+        // If snapshot is null, we failed to update since the window was empty
+        if (previousSnapshot != null) {
+          newValue = previousSnapshot.get(quantiles[i]);
+        }
+        builder.addGauge(quantileInfos[i], newValue);
+      }
+      if (changed()) {
+        clearChanged();
+      }
+    }
+  }
+
+  public synchronized void add(long value) {
+    estimator.insert(value);
+  }
+
+  public int getInterval() {
+    return interval;
+  }
+
+  /**
+   * Runnable used to periodically roll over the internal
+   * {@link SampleQuantiles} every interval.
+   */
+  private static class RolloverSample implements Runnable {
+
+    MutableQuantiles parent;
+
+    public RolloverSample(MutableQuantiles parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void run() {
+      synchronized (parent) {
+        try {
+          parent.previousCount = parent.estimator.getCount();
+          parent.previousSnapshot = parent.estimator.snapshot();
+        } catch (IOException e) {
+          // Couldn't get a new snapshot because the window was empty
+          parent.previousCount = 0;
+          parent.previousSnapshot = null;
+        }
+        parent.estimator.clear();
+      }
+      parent.setChanged();
+    }
+
+  }
+}

+ 60 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Quantile.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Specifies a quantile (with error bounds) to be watched by a
+ * {@link SampleQuantiles} object.
+ */
+@InterfaceAudience.Private
+public class Quantile {
+  public final double quantile;
+  public final double error;
+
+  public Quantile(double quantile, double error) {
+    this.quantile = quantile;
+    this.error = error;
+  }
+
+  @Override
+  public boolean equals(Object aThat) {
+    if (this == aThat) {
+      return true;
+    }
+    if (!(aThat instanceof Quantile)) {
+      return false;
+    }
+
+    Quantile that = (Quantile) aThat;
+
+    long qbits = Double.doubleToLongBits(quantile);
+    long ebits = Double.doubleToLongBits(error);
+
+    return qbits == Double.doubleToLongBits(that.quantile)
+        && ebits == Double.doubleToLongBits(that.error);
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) (Double.doubleToLongBits(quantile) ^ Double
+        .doubleToLongBits(error));
+  }
+}

+ 310 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleQuantiles.java

@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
+ * for streaming calculation of targeted high-percentile epsilon-approximate
+ * quantiles.
+ * 
+ * This is a generalization of the earlier work by Greenwald and Khanna (GK),
+ * which essentially allows different error bounds on the targeted quantiles,
+ * which allows for far more efficient calculation of high-percentiles.
+ * 
+ * See: Cormode, Korn, Muthukrishnan, and Srivastava
+ * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
+ * 
+ * Greenwald and Khanna,
+ * "Space-efficient online computation of quantile summaries" in SIGMOD 2001
+ * 
+ */
+@InterfaceAudience.Private
+public class SampleQuantiles {
+
+  /**
+   * Total number of items in stream
+   */
+  private long count = 0;
+
+  /**
+   * Current list of sampled items, maintained in sorted order with error bounds
+   */
+  private LinkedList<SampleItem> samples;
+
+  /**
+   * Buffers incoming items to be inserted in batch. Items are inserted into 
+   * the buffer linearly. When the buffer fills, it is flushed into the samples
+   * array in its entirety.
+   */
+  private long[] buffer = new long[500];
+  private int bufferCount = 0;
+
+  /**
+   * Array of Quantiles that we care about, along with desired error.
+   */
+  private final Quantile quantiles[];
+
+  public SampleQuantiles(Quantile[] quantiles) {
+    this.quantiles = quantiles;
+    this.samples = new LinkedList<SampleItem>();
+  }
+
+  /**
+   * Specifies the allowable error for this rank, depending on which quantiles
+   * are being targeted.
+   * 
+   * This is the f(r_i, n) function from the CKMS paper. It's basically how wide
+   * the range of this rank can be.
+   * 
+   * @param rank
+   *          the index in the list of samples
+   */
+  private double allowableError(int rank) {
+    int size = samples.size();
+    double minError = size + 1;
+    for (Quantile q : quantiles) {
+      double error;
+      if (rank <= q.quantile * size) {
+        error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
+      } else {
+        error = (2.0 * q.error * rank) / q.quantile;
+      }
+      if (error < minError) {
+        minError = error;
+      }
+    }
+
+    return minError;
+  }
+
+  /**
+   * Add a new value from the stream.
+   * 
+   * @param v
+   */
+  synchronized public void insert(long v) {
+    buffer[bufferCount] = v;
+    bufferCount++;
+
+    count++;
+
+    if (bufferCount == buffer.length) {
+      insertBatch();
+      compress();
+    }
+  }
+
+  /**
+   * Merges items from buffer into the samples array in one pass.
+   * This is more efficient than doing an insert on every item.
+   */
+  private void insertBatch() {
+    if (bufferCount == 0) {
+      return;
+    }
+
+    Arrays.sort(buffer, 0, bufferCount);
+
+    // Base case: no samples
+    int start = 0;
+    if (samples.size() == 0) {
+      SampleItem newItem = new SampleItem(buffer[0], 1, 0);
+      samples.add(newItem);
+      start++;
+    }
+
+    ListIterator<SampleItem> it = samples.listIterator();
+    SampleItem item = it.next();
+    for (int i = start; i < bufferCount; i++) {
+      long v = buffer[i];
+      while (it.nextIndex() < samples.size() && item.value < v) {
+        item = it.next();
+      }
+      // If we found that bigger item, back up so we insert ourselves before it
+      if (item.value > v) {
+        it.previous();
+      }
+      // We use different indexes for the edge comparisons, because of the above
+      // if statement that adjusts the iterator
+      int delta;
+      if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
+        delta = 0;
+      } else {
+        delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
+      }
+      SampleItem newItem = new SampleItem(v, 1, delta);
+      it.add(newItem);
+      item = newItem;
+    }
+
+    bufferCount = 0;
+  }
+
+  /**
+   * Try to remove extraneous items from the set of sampled items. This checks
+   * if an item is unnecessary based on the desired error bounds, and merges it
+   * with the adjacent item if it is.
+   */
+  private void compress() {
+    if (samples.size() < 2) {
+      return;
+    }
+
+    ListIterator<SampleItem> it = samples.listIterator();
+    SampleItem prev = null;
+    SampleItem next = it.next();
+
+    while (it.hasNext()) {
+      prev = next;
+      next = it.next();
+      if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
+        next.g += prev.g;
+        // Remove prev. it.remove() kills the last thing returned.
+        it.previous();
+        it.previous();
+        it.remove();
+        // it.next() is now equal to next, skip it back forward again
+        it.next();
+      }
+    }
+  }
+
+  /**
+   * Get the estimated value at the specified quantile.
+   * 
+   * @param quantile Queried quantile, e.g. 0.50 or 0.99.
+   * @return Estimated value at that quantile.
+   */
+  private long query(double quantile) throws IOException {
+    if (samples.size() == 0) {
+      throw new IOException("No samples present");
+    }
+
+    int rankMin = 0;
+    int desired = (int) (quantile * count);
+
+    for (int i = 1; i < samples.size(); i++) {
+      SampleItem prev = samples.get(i - 1);
+      SampleItem cur = samples.get(i);
+
+      rankMin += prev.g;
+
+      if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
+        return prev.value;
+      }
+    }
+
+    // edge case of wanting max value
+    return samples.get(samples.size() - 1).value;
+  }
+
+  /**
+   * Get a snapshot of the current values of all the tracked quantiles.
+   * 
+   * @return snapshot of the tracked quantiles
+   * @throws IOException
+   *           if no items have been added to the estimator
+   */
+  synchronized public Map<Quantile, Long> snapshot() throws IOException {
+    // flush the buffer first for best results
+    insertBatch();
+    Map<Quantile, Long> values = new HashMap<Quantile, Long>(quantiles.length);
+    for (int i = 0; i < quantiles.length; i++) {
+      values.put(quantiles[i], query(quantiles[i].quantile));
+    }
+
+    return values;
+  }
+
+  /**
+   * Returns the number of items that the estimator has processed
+   * 
+   * @return count total number of items processed
+   */
+  synchronized public long getCount() {
+    return count;
+  }
+
+  /**
+   * Returns the number of samples kept by the estimator
+   * 
+   * @return count current number of samples
+   */
+  @VisibleForTesting
+  synchronized public int getSampleCount() {
+    return samples.size();
+  }
+
+  /**
+   * Resets the estimator, clearing out all previously inserted items
+   */
+  synchronized public void clear() {
+    count = 0;
+    bufferCount = 0;
+    samples.clear();
+  }
+
+  /**
+   * Describes a measured value passed to the estimator, tracking additional
+   * metadata required by the CKMS algorithm.
+   */
+  private static class SampleItem {
+    
+    /**
+     * Value of the sampled item (e.g. a measured latency value)
+     */
+    public final long value;
+    
+    /**
+     * Difference between the lowest possible rank of the previous item, and 
+     * the lowest possible rank of this item.
+     * 
+     * The sum of the g of all previous items yields this item's lower bound. 
+     */
+    public int g;
+    
+    /**
+     * Difference between the item's greatest possible rank and lowest possible
+     * rank.
+     */
+    public final int delta;
+
+    public SampleItem(long value, int lowerDelta, int delta) {
+      this.value = value;
+      this.g = lowerDelta;
+      this.delta = delta;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%d, %d, %d", value, g, delta);
+    }
+  }
+}

+ 135 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java

@@ -18,13 +18,24 @@
 
 package org.apache.hadoop.metrics2.lib;
 
-import org.junit.Test;
-import static org.mockito.Mockito.*;
-import static org.mockito.AdditionalMatchers.*;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
+import static org.mockito.AdditionalMatchers.eq;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.AdditionalMatchers.leq;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.metrics2.lib.Interns.*;
-import static org.apache.hadoop.test.MetricsAsserts.*;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.junit.Test;
 
 /**
  * Test metrics record builder interface and mutable metrics
@@ -103,4 +114,123 @@ public class TestMutableMetrics {
     assertCounter("BarNumOps", 0L, rb);
     assertGauge("BarAvgTime", 0.0, rb);
   }
+
+  /**
+   * Ensure that quantile estimates from {@link MutableQuantiles} are within
+   * specified error bounds.
+   */
+  @Test(timeout = 30000)
+  public void testMutableQuantilesError() throws Exception {
+    MetricsRecordBuilder mb = mockMetricsRecordBuilder();
+    MetricsRegistry registry = new MetricsRegistry("test");
+    // Use a 5s rollover period
+    MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops",
+        "Latency", 5);
+    // Push some values in and wait for it to publish
+    long start = System.nanoTime() / 1000000;
+    for (long i = 1; i <= 1000; i++) {
+      quantiles.add(i);
+      quantiles.add(1001 - i);
+    }
+    long end = System.nanoTime() / 1000000;
+
+    Thread.sleep(6000 - (end - start));
+
+    registry.snapshot(mb, false);
+
+    // Print out the snapshot
+    Map<Quantile, Long> previousSnapshot = quantiles.previousSnapshot;
+    for (Entry<Quantile, Long> item : previousSnapshot.entrySet()) {
+      System.out.println(String.format("Quantile %.2f has value %d",
+          item.getKey().quantile, item.getValue()));
+    }
+
+    // Verify the results are within our requirements
+    verify(mb).addGauge(
+        info("FooNumOps", "Number of ops for stat with 5s interval"),
+        (long) 2000);
+    Quantile[] quants = MutableQuantiles.quantiles;
+    String name = "Foo%dthPercentile5sIntervalLatency";
+    String desc = "%d percentile latency with 5 second interval for stat";
+    for (Quantile q : quants) {
+      int percentile = (int) (100 * q.quantile);
+      int error = (int) (1000 * q.error);
+      String n = String.format(name, percentile);
+      String d = String.format(desc, percentile);
+      long expected = (long) (q.quantile * 1000);
+      verify(mb).addGauge(eq(info(n, d)), leq(expected + error));
+      verify(mb).addGauge(eq(info(n, d)), geq(expected - error));
+    }
+  }
+
+  /**
+   * Test that {@link MutableQuantiles} rolls the window over at the specified
+   * interval.
+   */
+  @Test(timeout = 30000)
+  public void testMutableQuantilesRollover() throws Exception {
+    MetricsRecordBuilder mb = mockMetricsRecordBuilder();
+    MetricsRegistry registry = new MetricsRegistry("test");
+    // Use a 5s rollover period
+    MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops",
+        "Latency", 5);
+
+    Quantile[] quants = MutableQuantiles.quantiles;
+    String name = "Foo%dthPercentile5sIntervalLatency";
+    String desc = "%d percentile latency with 5 second interval for stat";
+
+    // Push values for three intervals
+    long start = System.nanoTime() / 1000000;
+    for (int i = 1; i <= 3; i++) {
+      // Insert the values
+      for (long j = 1; j <= 1000; j++) {
+        quantiles.add(i);
+      }
+      // Sleep until 1s after the next 5s interval, to let the metrics
+      // roll over
+      long sleep = (start + (5000 * i) + 1000) - (System.nanoTime() / 1000000);
+      Thread.sleep(sleep);
+      // Verify that the window reset, check it has the values we pushed in
+      registry.snapshot(mb, false);
+      for (Quantile q : quants) {
+        int percentile = (int) (100 * q.quantile);
+        String n = String.format(name, percentile);
+        String d = String.format(desc, percentile);
+        verify(mb).addGauge(info(n, d), (long) i);
+      }
+    }
+
+    // Verify the metrics were added the right number of times
+    verify(mb, times(3)).addGauge(
+        info("FooNumOps", "Number of ops for stat with 5s interval"),
+        (long) 1000);
+    for (Quantile q : quants) {
+      int percentile = (int) (100 * q.quantile);
+      String n = String.format(name, percentile);
+      String d = String.format(desc, percentile);
+      verify(mb, times(3)).addGauge(eq(info(n, d)), anyLong());
+    }
+  }
+
+  /**
+   * Test that {@link MutableQuantiles} rolls over correctly even if no items
+   * have been added to the window
+   */
+  @Test(timeout = 30000)
+  public void testMutableQuantilesEmptyRollover() throws Exception {
+    MetricsRecordBuilder mb = mockMetricsRecordBuilder();
+    MetricsRegistry registry = new MetricsRegistry("test");
+    // Use a 5s rollover period
+    MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops",
+        "Latency", 5);
+
+    // Check it initially
+    quantiles.snapshot(mb, true);
+    verify(mb).addGauge(
+        info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0);
+    Thread.sleep(6000);
+    quantiles.snapshot(mb, false);
+    verify(mb, times(2)).addGauge(
+        info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0);
+  }
 }

+ 125 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSampleQuantiles {
+
+  static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
+      new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
+      new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
+
+  SampleQuantiles estimator;
+
+  @Before
+  public void init() {
+    estimator = new SampleQuantiles(quantiles);
+  }
+
+  /**
+   * Check that the counts of the number of items in the window and sample are
+   * incremented correctly as items are added.
+   */
+  @Test
+  public void testCount() throws IOException {
+    // Counts start off zero
+    assertEquals(estimator.getCount(), 0);
+    assertEquals(estimator.getSampleCount(), 0);
+    try {
+      estimator.snapshot();
+      fail("Expected IOException from empty window");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("No samples", e);
+    }
+
+    // Count increment correctly by 1
+    estimator.insert(1337);
+    assertEquals(estimator.getCount(), 1);
+    estimator.snapshot();
+    assertEquals(estimator.getSampleCount(), 1);
+  }
+
+  /**
+   * Check that counts and quantile estimates are correctly reset after a call
+   * to {@link SampleQuantiles#clear()}.
+   */
+  @Test
+  public void testClear() throws IOException {
+    for (int i = 0; i < 1000; i++) {
+      estimator.insert(i);
+    }
+    estimator.clear();
+    assertEquals(estimator.getCount(), 0);
+    assertEquals(estimator.getSampleCount(), 0);
+    try {
+      estimator.snapshot();
+      fail("Expected IOException for an empty window.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("No samples", e);
+    }
+  }
+
+  /**
+   * Correctness test that checks that absolute error of the estimate is within
+   * specified error bounds for some randomly permuted streams of items.
+   */
+  @Test
+  public void testQuantileError() throws IOException {
+    final int count = 100000;
+    Random r = new Random(0xDEADDEAD);
+    Long[] values = new Long[count];
+    for (int i = 0; i < count; i++) {
+      values[i] = (long) (i + 1);
+    }
+    // Do 10 shuffle/insert/check cycles
+    for (int i = 0; i < 10; i++) {
+      System.out.println("Starting run " + i);
+      Collections.shuffle(Arrays.asList(values), r);
+      estimator.clear();
+      for (int j = 0; j < count; j++) {
+        estimator.insert(values[j]);
+      }
+      Map<Quantile, Long> snapshot;
+      snapshot = estimator.snapshot();
+      for (Quantile q : quantiles) {
+        long actual = (long) (q.quantile * count);
+        long error = (long) (q.error * count);
+        long estimate = snapshot.get(q);
+        System.out
+            .println(String.format("Expected %d with error %d, estimated %d",
+                actual, error, estimate));
+        assertTrue(estimate <= actual + error);
+        assertTrue(estimate >= actual - error);
+      }
+    }
+  }
+}