Browse Source

HDFS-3650. Use MutableQuantiles to provide latency histograms for various operations. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1366246 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 years ago
parent
commit
c1ea9b4490
14 changed files with 292 additions and 59 deletions
  1. 19 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
  2. 3 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java
  3. 9 9
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java
  4. 2 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java
  5. 25 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
  6. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  7. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  8. 8 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  9. 7 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  10. 68 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
  11. 33 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
  12. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  13. 78 25
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
  14. 26 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

+ 19 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java

@@ -899,6 +899,25 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
     }
     return Integer.parseInt(valueString);
   }
+  
+  /**
+   * Get the value of the <code>name</code> property as a set of comma-delimited
+   * <code>int</code> values.
+   * 
+   * If no such property exists, an empty array is returned.
+   * 
+   * @param name property name
+   * @return property value interpreted as an array of comma-delimited
+   *         <code>int</code> values
+   */
+  public int[] getInts(String name) {
+    String[] strings = getTrimmedStrings(name);
+    int[] ints = new int[strings.length];
+    for (int i = 0; i < strings.length; i++) {
+      ints[i] = Integer.parseInt(strings[i]);
+    }
+    return ints;
+  }
 
   /** 
    * Set the value of the <code>name</code> property to an <code>int</code>.

+ 3 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java

@@ -45,7 +45,8 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public class MutableQuantiles extends MutableMetric {
 
-  static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
+  @VisibleForTesting
+  public static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
       new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
       new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
 
@@ -90,8 +91,7 @@ public class MutableQuantiles extends MutableMetric {
         "Number of %s for %s with %ds interval", lsName, desc, interval));
     // Construct the MetricsInfos for the quantiles, converting to percentiles
     quantileInfos = new MetricsInfo[quantiles.length];
-    String nameTemplate = ucName + "%dthPercentile" + interval + "sInterval"
-        + uvName;
+    String nameTemplate = ucName + "%dthPercentile" + uvName;
     String descTemplate = "%d percentile " + lvName + " with " + interval
         + " second interval for " + desc;
     for (int i = 0; i < quantiles.length; i++) {

+ 9 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java

@@ -31,8 +31,8 @@ import java.nio.channels.WritableByteChannel;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.metrics2.lib.MutableRate;
-import org.apache.hadoop.util.Progressable;
 
 /**
  * This implements an output stream that can have a timeout while writing.
@@ -179,9 +179,9 @@ public class SocketOutputStream extends OutputStream
    * @param fileCh FileChannel to transfer data from.
    * @param position position within the channel where the transfer begins
    * @param count number of bytes to transfer.
-   * @param waitForWritableTime updated by the nanoseconds spent waiting for 
-   * the socket to become writable
-   * @param transferTime updated by the nanoseconds spent transferring data
+   * @param waitForWritableTime nanoseconds spent waiting for the socket 
+   *        to become writable
+   * @param transferTime nanoseconds spent transferring data
    * 
    * @throws EOFException 
    *         If end of input file is reached before requested number of 
@@ -195,8 +195,8 @@ public class SocketOutputStream extends OutputStream
    *         {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
    */
   public void transferToFully(FileChannel fileCh, long position, int count,
-      MutableRate waitForWritableTime,
-      MutableRate transferToTime) throws IOException {
+      LongWritable waitForWritableTime,
+      LongWritable transferToTime) throws IOException {
     long waitTime = 0;
     long transferTime = 0;
     while (count > 0) {
@@ -236,12 +236,12 @@ public class SocketOutputStream extends OutputStream
       waitTime += wait - start;
       transferTime += transfer - wait;
     }
-
+    
     if (waitForWritableTime != null) {
-      waitForWritableTime.add(waitTime);
+      waitForWritableTime.set(waitTime);
     }
     if (transferToTime != null) {
-      transferToTime.add(transferTime);
+      transferToTime.set(transferTime);
     }
   }
 

+ 2 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java

@@ -150,7 +150,7 @@ public class TestMutableMetrics {
         info("FooNumOps", "Number of ops for stat with 5s interval"),
         (long) 2000);
     Quantile[] quants = MutableQuantiles.quantiles;
-    String name = "Foo%dthPercentile5sIntervalLatency";
+    String name = "Foo%dthPercentileLatency";
     String desc = "%d percentile latency with 5 second interval for stat";
     for (Quantile q : quants) {
       int percentile = (int) (100 * q.quantile);
@@ -176,7 +176,7 @@ public class TestMutableMetrics {
         "Latency", 5);
 
     Quantile[] quants = MutableQuantiles.quantiles;
-    String name = "Foo%dthPercentile5sIntervalLatency";
+    String name = "Foo%dthPercentileLatency";
     String desc = "%d percentile latency with 5 second interval for stat";
 
     // Push values for three intervals

+ 25 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java

@@ -23,7 +23,9 @@ import static com.google.common.base.Preconditions.*;
 import org.hamcrest.Description;
 import org.junit.Assert;
 
+import static org.mockito.AdditionalMatchers.geq;
 import static org.mockito.Mockito.*;
+
 import org.mockito.stubbing.Answer;
 import org.mockito.internal.matchers.GreaterThan;
 import org.mockito.invocation.InvocationOnMock;
@@ -39,7 +41,11 @@ import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.util.Quantile;
+
 import static org.apache.hadoop.metrics2.lib.Interns.*;
+import static org.apache.hadoop.test.MetricsAsserts.eqName;
 
 /**
  * Helpers for metrics source tests
@@ -328,4 +334,23 @@ public class MetricsAsserts {
                                    MetricsSource source) {
     assertGaugeGt(name, greater, getMetrics(source));
   }
+  
+  /**
+   * Asserts that the NumOps and quantiles for a metric have been changed at
+   * some point to a non-zero value.
+   * 
+   * @param prefix of the metric
+   * @param rb MetricsRecordBuilder with the metric
+   */
+  public static void assertQuantileGauges(String prefix, 
+      MetricsRecordBuilder rb) {
+    verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l));
+    for (Quantile q : MutableQuantiles.quantiles) {
+      String nameTemplate = prefix + "%dthPercentileLatency";
+      int percentile = (int) (100 * q.quantile);
+      verify(rb).addGauge(
+          eqName(info(String.format(nameTemplate, percentile), "")),
+          geq(0l));
+    }
+  }
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -356,6 +356,9 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3711. Manually convert remaining tests to JUnit4. (Andrew Wang via atm)
 
+    HDFS-3650. Use MutableQuantiles to provide latency histograms for various
+    operations. (Andrew Wang via atm)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -203,6 +203,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; 
   public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
+  public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
   public static final String  DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
   public static final String  DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -76,6 +76,8 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -1210,7 +1212,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   //
   // returns the list of targets, if any, that is being currently used.
   //
-  synchronized DatanodeInfo[] getPipeline() {
+  @VisibleForTesting
+  public synchronized DatanodeInfo[] getPipeline() {
     if (streamer == null) {
       return null;
     }
@@ -1752,11 +1755,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     }
   }
 
-  void setArtificialSlowdown(long period) {
+  @VisibleForTesting
+  public void setArtificialSlowdown(long period) {
     artificialSlowdown = period;
   }
 
-  synchronized void setChunksPerPacket(int value) {
+  @VisibleForTesting
+  public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
     packetSize = PacketHeader.PKT_HEADER_LEN +
                  (checksum.getBytesPerChecksum() + 

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -486,11 +487,14 @@ class BlockSender implements java.io.Closeable {
         
         // no need to flush since we know out is not a buffered stream
         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+        LongWritable waitTime = new LongWritable();
+        LongWritable transferTime = new LongWritable();
         sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
-            datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
-            datanode.metrics.getSendDataPacketTransferNanos());
+            waitTime, transferTime);
+        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
+        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
         blockInPosition += dataLen;
-      } else { 
+      } else {
         // normal transfer
         out.write(buf, 0, dataOff + dataLen);
       }

+ 68 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -74,19 +75,54 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
-
+  MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
+  
   @Metric MutableRate flushNanos;
+  MutableQuantiles[] flushNanosQuantiles;
+  
   @Metric MutableRate fsyncNanos;
+  MutableQuantiles[] fsyncNanosQuantiles;
   
   @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
+  MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
   @Metric MutableRate sendDataPacketTransferNanos;
+  MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
+  
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
 
-  public DataNodeMetrics(String name, String sessionId) {
+  public DataNodeMetrics(String name, String sessionId, int[] intervals) {
     this.name = name;
     registry.tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    packetAckRoundTripTimeNanosQuantiles = new MutableQuantiles[len];
+    flushNanosQuantiles = new MutableQuantiles[len];
+    fsyncNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      packetAckRoundTripTimeNanosQuantiles[i] = registry.newQuantiles(
+          "packetAckRoundTripTimeNanos" + interval + "s",
+          "Packet Ack RTT in ns", "ops", "latency", interval);
+      flushNanosQuantiles[i] = registry.newQuantiles(
+          "flushNanos" + interval + "s", 
+          "Disk flush latency in ns", "ops", "latency", interval);
+      fsyncNanosQuantiles[i] = registry.newQuantiles(
+          "fsyncNanos" + interval + "s", "Disk fsync latency in ns", 
+          "ops", "latency", interval);
+      sendDataPacketBlockedOnNetworkNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketBlockedOnNetworkNanos" + interval + "s", 
+          "Time blocked on network while sending a packet in ns",
+          "ops", "latency", interval);
+      sendDataPacketTransferNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketTransferNanos" + interval + "s", 
+          "Time reading from disk and writing to network while sending " +
+          "a packet in ns", "ops", "latency", interval);
+    }
   }
 
   public static DataNodeMetrics create(Configuration conf, String dnName) {
@@ -94,8 +130,15 @@ public class DataNodeMetrics {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create("DataNode", sessionId, ms);
     String name = "DataNodeActivity-"+ (dnName.isEmpty()
-        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-'));
-    return ms.register(name, null, new DataNodeMetrics(name, sessionId));
+        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() 
+            : dnName.replace(':', '-'));
+
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    
+    return ms.register(name, null, new DataNodeMetrics(name, sessionId,
+        intervals));
   }
 
   public String name() { return name; }
@@ -166,14 +209,23 @@ public class DataNodeMetrics {
 
   public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
     packetAckRoundTripTimeNanos.add(latencyNanos);
+    for (MutableQuantiles q : packetAckRoundTripTimeNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFlushNanos(long latencyNanos) {
     flushNanos.add(latencyNanos);
+    for (MutableQuantiles q : flushNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFsyncNanos(long latencyNanos) {
     fsyncNanos.add(latencyNanos);
+    for (MutableQuantiles q : fsyncNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void shutdown() {
@@ -196,12 +248,18 @@ public class DataNodeMetrics {
   public void incrBlocksGetLocalPathInfo() {
     blocksGetLocalPathInfo.incr();
   }
-  
-  public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
-    return sendDataPacketBlockedOnNetworkNanos;
+
+  public void addSendDataPacketBlockedOnNetworkNanos(long latencyNanos) {
+    sendDataPacketBlockedOnNetworkNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketBlockedOnNetworkNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
-  
-  public MutableRate getSendDataPacketTransferNanos() {
-    return sendDataPacketTransferNanos;
+
+  public void addSendDataPacketTransferNanos(long latencyNanos) {
+    sendDataPacketTransferNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketTransferNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 }

+ 33 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

@@ -17,17 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.metrics;
 
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
-import static org.apache.hadoop.metrics2.impl.MsInfo.*;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -57,15 +60,31 @@ public class NameNodeMetrics {
 
   @Metric("Journal transactions") MutableRate transactions;
   @Metric("Journal syncs") MutableRate syncs;
+  MutableQuantiles[] syncsQuantiles;
   @Metric("Journal transactions batched in sync")
   MutableCounterLong transactionsBatchedInSync;
   @Metric("Block report") MutableRate blockReport;
+  MutableQuantiles[] blockReportQuantiles;
 
   @Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
   @Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
 
-  NameNodeMetrics(String processName, String sessionId) {
+  NameNodeMetrics(String processName, String sessionId, int[] intervals) {
     registry.tag(ProcessName, processName).tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    syncsQuantiles = new MutableQuantiles[len];
+    blockReportQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      syncsQuantiles[i] = registry.newQuantiles(
+          "syncs" + interval + "s",
+          "Journal syncs", "ops", "latency", interval);
+      blockReportQuantiles[i] = registry.newQuantiles(
+          "blockReport" + interval + "s", 
+          "Block report", "ops", "latency", interval);
+    }
   }
 
   public static NameNodeMetrics create(Configuration conf, NamenodeRole r) {
@@ -73,7 +92,11 @@ public class NameNodeMetrics {
     String processName = r.toString();
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create(processName, sessionId, ms);
-    return ms.register(new NameNodeMetrics(processName, sessionId));
+    
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    return ms.register(new NameNodeMetrics(processName, sessionId, intervals));
   }
 
   public void shutdown() {
@@ -146,6 +169,9 @@ public class NameNodeMetrics {
 
   public void addSync(long elapsed) {
     syncs.add(elapsed);
+    for (MutableQuantiles q : syncsQuantiles) {
+      q.add(elapsed);
+    }
   }
 
   public void setFsImageLoadTime(long elapsed) {
@@ -154,6 +180,9 @@ public class NameNodeMetrics {
 
   public void addBlockReport(long latency) {
     blockReport.add(latency);
+    for (MutableQuantiles q : blockReportQuantiles) {
+      q.add(latency);
+    }
   }
 
   public void setSafeModeTime(long elapsed) {

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1004,4 +1004,14 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.metrics.percentiles.intervals</name>
+  <value></value>
+  <description>
+    Comma-delimited set of integers denoting the desired rollover intervals 
+    (in seconds) for percentile latency metrics on the Namenode and Datanode.
+    By default, percentile latency metrics are disabled.
+  </description>
+</property>
+
 </configuration>

+ 78 - 25
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java

@@ -18,21 +18,26 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
-import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.junit.Test;
 
@@ -59,8 +64,10 @@ public class TestDataNodeMetrics {
   }
 
   @Test
-  public void testSendDataPacket() throws Exception {
+  public void testSendDataPacketMetrics() throws Exception {
     Configuration conf = new HdfsConfiguration();
+    final int interval = 1;
+    conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
       FileSystem fs = cluster.getFileSystem();
@@ -73,64 +80,110 @@ public class TestDataNodeMetrics {
       assertEquals(datanodes.size(), 1);
       DataNode datanode = datanodes.get(0);
       MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
-
       // Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet
       // signaling the end of the block
       assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb);
       assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb);
+      // Wait for at least 1 rollover
+      Thread.sleep((interval + 1) * 1000);
+      // Check that the sendPacket percentiles rolled to non-zero values
+      String sec = interval + "s";
+      assertQuantileGauges("SendDataPacketBlockedOnNetworkNanos" + sec, rb);
+      assertQuantileGauges("SendDataPacketTransferNanos" + sec, rb);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
 
   @Test
-  public void testFlushMetric() throws Exception {
+  public void testReceivePacketMetrics() throws Exception {
     Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    final int interval = 1;
+    conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
       cluster.waitActive();
       DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
 
       Path testFile = new Path("/testFlushNanosMetric.txt");
-      DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
-
+      FSDataOutputStream fout = fs.create(testFile);
+      fout.write(new byte[1]);
+      fout.hsync();
+      fout.close();
       List<DataNode> datanodes = cluster.getDataNodes();
       DataNode datanode = datanodes.get(0);
       MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
-      // Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
-      // on closing the data and metadata files.
+      // Expect two flushes, 1 for the flush that occurs after writing, 
+      // 1 that occurs on closing the data and metadata files.
       assertCounter("FlushNanosNumOps", 2L, dnMetrics);
+      // Expect two syncs, one from the hsync, one on close.
+      assertCounter("FsyncNanosNumOps", 2L, dnMetrics);
+      // Wait for at least 1 rollover
+      Thread.sleep((interval + 1) * 1000);
+      // Check the receivePacket percentiles that should be non-zero
+      String sec = interval + "s";
+      assertQuantileGauges("FlushNanos" + sec, dnMetrics);
+      assertQuantileGauges("FsyncNanos" + sec, dnMetrics);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
 
+  /**
+   * Tests that round-trip acks in a datanode write pipeline are correctly 
+   * measured. 
+   */
   @Test
   public void testRoundTripAckMetric() throws Exception {
-    final int DATANODE_COUNT = 2;
-
+    final int datanodeCount = 2;
+    final int interval = 1;
     Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
+    conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        datanodeCount).build();
     try {
       cluster.waitActive();
-      DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-
+      FileSystem fs = cluster.getFileSystem();
+      // Open a file and get the head of the pipeline
       Path testFile = new Path("/testRoundTripAckMetric.txt");
-      DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
-          new Random().nextLong());
-
-      boolean foundNonzeroPacketAckNumOps = false;
+      FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
+      DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
+      // Slow down the writes to catch the write pipeline
+      dout.setChunksPerPacket(5);
+      dout.setArtificialSlowdown(3000);
+      fsout.write(new byte[10000]);
+      DatanodeInfo[] pipeline = null;
+      int count = 0;
+      while (pipeline == null && count < 5) {
+        pipeline = dout.getPipeline();
+        System.out.println("Waiting for pipeline to be created.");
+        Thread.sleep(1000);
+        count++;
+      }
+      // Get the head node that should be receiving downstream acks
+      DatanodeInfo headInfo = pipeline[0];
+      DataNode headNode = null;
       for (DataNode datanode : cluster.getDataNodes()) {
-        MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
-        if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
-          foundNonzeroPacketAckNumOps = true;
+        if (datanode.getDatanodeId().equals(headInfo)) {
+          headNode = datanode;
+          break;
         }
       }
-      assertTrue(
-          "Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
-          foundNonzeroPacketAckNumOps);
+      assertNotNull("Could not find the head of the datanode write pipeline", 
+          headNode);
+      // Close the file and wait for the metrics to rollover
+      Thread.sleep((interval + 1) * 1000);
+      // Check the ack was received
+      MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
+          .name());
+      assertTrue("Expected non-zero number of acks", 
+          getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
+      assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
+          + "s", dnMetrics);
     } finally {
-      if (cluster != null) {cluster.shutdown();}
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 }

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.metrics;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertTrue;
 
@@ -63,6 +64,9 @@ public class TestNameNodeMetrics {
   // Number of datanodes in the cluster
   private static final int DATANODE_COUNT = 3; 
   private static final int WAIT_GAUGE_VALUE_RETRIES = 20;
+  
+  // Rollover interval of percentile metrics (in seconds)
+  private static final int PERCENTILES_INTERVAL = 1;
 
   static {
     CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
@@ -71,6 +75,8 @@ public class TestNameNodeMetrics {
         DFS_REPLICATION_INTERVAL);
     CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
         DFS_REPLICATION_INTERVAL);
+    CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, 
+        "" + PERCENTILES_INTERVAL);
 
     ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
       .getLogger().setLevel(Level.DEBUG);
@@ -352,4 +358,24 @@ public class TestNameNodeMetrics {
     assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
     assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
   }
+  
+  /**
+   * Tests that the sync and block report metrics get updated on cluster
+   * startup.
+   */
+  @Test
+  public void testSyncAndBlockReportMetric() throws Exception {
+    MetricsRecordBuilder rb = getMetrics(NN_METRICS);
+    // We have one sync when the cluster starts up, just opening the journal
+    assertCounter("SyncsNumOps", 1L, rb);
+    // Each datanode reports in when the cluster comes up
+    assertCounter("BlockReportNumOps", (long)DATANODE_COUNT, rb);
+    
+    // Sleep for an interval+slop to let the percentiles rollover
+    Thread.sleep((PERCENTILES_INTERVAL+1)*1000);
+    
+    // Check that the percentiles were updated
+    assertQuantileGauges("Syncs1s", rb);
+    assertQuantileGauges("BlockReport1s", rb);
+  }
 }