瀏覽代碼

HADOOP-2398. Additional instrumentation for NameNode and RPC server.
Add support for accessing instrumentation statistics via JMX.
(Sanjay radia via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@611906 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 年之前
父節點
當前提交
ace52ba7f5

+ 4 - 0
CHANGES.txt

@@ -230,6 +230,10 @@ Trunk (unreleased changes)
     HADOOP-2077. Added version and build information to STARTUP_MSG for all
     hadoop daemons to aid error-reporting, debugging etc. (acmurthy) 
 
+    HADOOP-2398. Additional instrumentation for NameNode and RPC server.
+    Add support for accessing instrumentation statistics via JMX.
+    (Sanjay radia via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 5 - 2
bin/hadoop

@@ -167,12 +167,13 @@ unset IFS
 # figure out which class to run
 if [ "$COMMAND" = "namenode" ] ; then
   CLASS='org.apache.hadoop.dfs.NameNode'
+  HADOOP_OPTS="-Dcom.sun.management.jmxremote $HADOOP_OPTS" 
 elif [ "$COMMAND" = "secondarynamenode" ] ; then
   CLASS='org.apache.hadoop.dfs.SecondaryNameNode'
+  HADOOP_OPTS="-Dcom.sun.management.jmxremote $HADOOP_OPTS" 
 elif [ "$COMMAND" = "datanode" ] ; then
   CLASS='org.apache.hadoop.dfs.DataNode'
-elif [ "$COMMAND" = "datanodecluster" ] ; then
-  CLASS='org.apache.hadoop.dfs.DataNodeCluster'
+  HADOOP_OPTS="-Dcom.sun.management.jmxremote $HADOOP_OPTS" 
 elif [ "$COMMAND" = "fs" ] ; then
   CLASS=org.apache.hadoop.fs.FsShell
 elif [ "$COMMAND" = "dfs" ] ; then
@@ -183,8 +184,10 @@ elif [ "$COMMAND" = "fsck" ] ; then
   CLASS=org.apache.hadoop.dfs.DFSck
 elif [ "$COMMAND" = "balancer" ] ; then
   CLASS=org.apache.hadoop.dfs.Balancer
+  HADOOP_OPTS="-Dcom.sun.management.jmxremote $HADOOP_OPTS" 
 elif [ "$COMMAND" = "jobtracker" ] ; then
   CLASS=org.apache.hadoop.mapred.JobTracker
+  HADOOP_OPTS="-Dcom.sun.management.jmxremote $HADOOP_OPTS" 
 elif [ "$COMMAND" = "tasktracker" ] ; then
   CLASS=org.apache.hadoop.mapred.TaskTracker
 elif [ "$COMMAND" = "job" ] ; then

+ 1 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java

@@ -400,7 +400,7 @@ public class HbaseRPC {
       this.verbose = verbose;
     }
 
-    public Writable call(Writable param) throws IOException {
+    public Writable call(Writable param, long receiveTime) throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);

+ 15 - 3
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -234,6 +234,8 @@ class FSNamesystem implements FSConstants {
    * Initialize FSNamesystem.
    */
   private void initialize(NameNode nn, Configuration conf) throws IOException {
+    this.systemStart = now();
+    this.startTime = new Date(systemStart); 
     setConfigurationParameters(conf);
 
     this.localMachine = nn.getNameNodeAddress().getHostName();
@@ -241,6 +243,10 @@ class FSNamesystem implements FSConstants {
     this.dir = new FSDirectory(this, conf);
     StartupOption startOpt = NameNode.getStartupOption(conf);
     this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
+    long timeTakenToLoadFSImage = now() - systemStart;
+    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
+    NameNode.getNameNodeMetrics().fsImageLoadTime.set(
+                              (int) timeTakenToLoadFSImage);
     this.safeMode = new SafeModeInfo(conf);
     setBlockTotal();
     pendingReplications = new PendingReplicationBlocks(
@@ -252,8 +258,7 @@ class FSNamesystem implements FSConstants {
     hbthread.start();
     lmthread.start();
     replthread.start();
-    this.systemStart = now();
-    this.startTime = new Date(systemStart); 
+
 
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                                            conf.get("dfs.hosts.exclude",""));
@@ -2313,6 +2318,7 @@ class FSNamesystem implements FSConstants {
   public synchronized Block[] processReport(DatanodeID nodeID, 
                                             BlockListAsLongs newReport
                                             ) throws IOException {
+    long startTime = now();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
                              + "from " + nodeID.getName()+" " + 
@@ -2377,6 +2383,7 @@ class FSNamesystem implements FSConstants {
                                       +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
       }
     }
+    NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
     return obsolete.toArray(new Block[obsolete.size()]);
   }
 
@@ -3501,9 +3508,14 @@ class FSNamesystem implements FSConstants {
           return;
         }
       }
-      if (reached >= 0)
+      long timeInSafemode = now() - systemStart;
+      LOG.info("Leaving safemode after " + timeInSafemode + " msecs");
+      NameNode.getNameNodeMetrics().safeModeTime.set((int) timeInSafemode);
+      
+      if (reached >= 0) {
         NameNode.stateChangeLog.info(
                                      "STATE* SafeModeInfo.leave: " + "Safe mode is OFF."); 
+      }
       reached = -1;
       safeMode = null;
       NameNode.stateChangeLog.info("STATE* Network topology has "

+ 2 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -124,7 +124,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     conf.set("fs.default.name", nameNodeAddress.getHostName() + ":" + nameNodeAddress.getPort());
     LOG.info("Namenode up at: " + this.nameNodeAddress);
 
-    myMetrics = new NameNodeMetrics(conf);
+    myMetrics = new NameNodeMetrics(conf, this);
 
     this.namesystem = new FSNamesystem(this, conf);
     this.server.start();  //start RPC server   
@@ -479,7 +479,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   /**
    * Is the cluster currently in safe mode?
    */
-  boolean isInSafeMode() {
+  public boolean isInSafeMode() {
     return namesystem.isInSafeMode();
   }
 

+ 64 - 61
src/java/org/apache/hadoop/dfs/NameNodeMetrics.java

@@ -17,35 +17,62 @@
  */
 package org.apache.hadoop.dfs;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.dfs.namenode.metrics.NameNodeMgt;
+import org.apache.hadoop.metrics.*;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
-class NameNodeMetrics implements Updater {
+/**
+ * 
+ * This class is for maintaining  the various NameNode statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ *  for example:
+ *  <p> {@link #syncs}.inc()
+ *
+ */
+public class NameNodeMetrics implements Updater {
+    private static Log log = LogFactory.getLog(NameNodeMetrics.class);
     private final MetricsRecord metricsRecord;
     
-    private int numFilesCreated = 0;
-    private int numFilesOpened = 0;
-    private int numFilesRenamed = 0;
-    private int numFilesListed = 0;
+    public MetricsTimeVaryingInt numFilesCreated = new MetricsTimeVaryingInt("FilesCreated");
+    public MetricsTimeVaryingInt numFilesOpened = new MetricsTimeVaryingInt("FilesOpened");
+    public MetricsTimeVaryingInt numFilesRenamed = new MetricsTimeVaryingInt("FilesRenamed");
+    public MetricsTimeVaryingInt numFilesListed = new MetricsTimeVaryingInt("FilesListed");
+
+    
+    public MetricsTimeVaryingRate transactions = new MetricsTimeVaryingRate("Transactions");
+    public MetricsTimeVaryingRate syncs = new MetricsTimeVaryingRate("Syncs");
+    public MetricsTimeVaryingRate blockReport = new MetricsTimeVaryingRate("blockReport");
+    public MetricsIntValue safeModeTime = new MetricsIntValue("SafemodeTime");
+    public MetricsIntValue fsImageLoadTime = 
+                                        new MetricsIntValue("fsImageLoadTime");
 
-    private int numTransactions = 0;
-    private int totalTimeTransactionsLogMemory = 0;
-    private int numSyncs = 0;
-    private int totalTimeSyncs = 0;
       
-    NameNodeMetrics(Configuration conf) {
+    NameNodeMetrics(Configuration conf, NameNode nameNode) {
       String sessionId = conf.get("session.id");
       // Initiate Java VM metrics
       JvmMetrics.init("NameNode", sessionId);
+
+      
+      // Now the Mbean for the name node
+      new NameNodeMgt(sessionId, this, nameNode);
+      
       // Create a record for NameNode metrics
       MetricsContext metricsContext = MetricsUtil.getContext("dfs");
       metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
       metricsRecord.setTag("sessionId", sessionId);
       metricsContext.registerUpdater(this);
+      log.info("Initializing NameNodeMeterics using context object:" +
+                metricsContext.getClass().getName());
     }
       
     /**
@@ -54,66 +81,42 @@ class NameNodeMetrics implements Updater {
      */
     public void doUpdates(MetricsContext unused) {
       synchronized (this) {
-        metricsRecord.incrMetric("files_created", numFilesCreated);
-        metricsRecord.incrMetric("files_opened", numFilesOpened);
-        metricsRecord.incrMetric("files_renamed", numFilesRenamed);
-        metricsRecord.incrMetric("files_listed", numFilesListed);
-        metricsRecord.incrMetric("num_transactions", numTransactions);
-        metricsRecord.incrMetric("avg_time_transactions_memory", 
-                                 getAverageTimeTransaction());
-        metricsRecord.incrMetric("num_syncs", numSyncs);
-        metricsRecord.incrMetric("avg_time_transactions_sync", 
-                                 getAverageTimeSync());
-              
-        numFilesCreated = 0;
-        numFilesOpened = 0;
-        numFilesRenamed = 0;
-        numFilesListed = 0;
-        numTransactions = 0;
-        totalTimeTransactionsLogMemory = 0;
-        numSyncs = 0;
-        totalTimeSyncs = 0;
+        numFilesCreated.pushMetric(metricsRecord);
+        numFilesOpened.pushMetric(metricsRecord);
+        numFilesRenamed.pushMetric(metricsRecord);
+        numFilesListed.pushMetric(metricsRecord);
+
+
+        transactions.pushMetric(metricsRecord);
+        syncs.pushMetric(metricsRecord);
+        blockReport.pushMetric(metricsRecord);
+        safeModeTime.pushMetric(metricsRecord);
+        fsImageLoadTime.pushMetric(metricsRecord);
       }
       metricsRecord.update();
     }
       
-    synchronized void createFile() {
-      ++numFilesCreated;
+    void createFile() {
+      numFilesCreated.inc();
     }
       
-    synchronized void openFile() {
-      ++numFilesOpened;
+    void openFile() {
+      numFilesOpened.inc();
     }
       
-    synchronized void renameFile() {
-      ++numFilesRenamed;
+    void renameFile() {
+      numFilesRenamed.inc();
     }
       
-    synchronized void listFile(int nfiles) {
-      numFilesListed += nfiles;
-    }
-
-    synchronized void incrNumTransactions(int count, int time) {
-      numTransactions += count;
-      totalTimeTransactionsLogMemory += time;
+    void listFile(int nfiles) {
+      numFilesListed.inc(nfiles);
     }
 
-    synchronized void incrSyncs(int count, int time) {
-      numSyncs += count;
-      totalTimeSyncs += time;
+    void incrNumTransactions(int count, int time) {
+      transactions.inc(count, time);
     }
 
-    synchronized private int getAverageTimeTransaction() {
-      if (numTransactions == 0) {
-        return 0;
-      }
-      return totalTimeTransactionsLogMemory/numTransactions;
-    }
-
-    synchronized private int getAverageTimeSync() {
-      if (numSyncs == 0) {
-        return 0;
-      }
-      return totalTimeSyncs/numSyncs;
+    void incrSyncs(int count, int time) {
+      syncs.inc(count, time);
     }
 }

+ 181 - 0
src/java/org/apache/hadoop/dfs/namenode/metrics/NameNodeMgt.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs.namenode.metrics;
+
+import org.apache.hadoop.dfs.NameNode;
+import org.apache.hadoop.dfs.NameNodeMetrics;
+import org.apache.hadoop.metrics.util.MBeanUtil;
+
+/**
+ * 
+ * This is the implementation of the Name Node JMX MBean
+ *
+ */
+public class NameNodeMgt implements NameNodeMgtMBean {
+  private NameNodeMetrics myMetrics;
+  private NameNode myNameNode;
+
+  public NameNodeMgt(String sessionId, NameNodeMetrics nameNodeMetrics, NameNode nameNode) {
+    myMetrics = nameNodeMetrics;
+    myNameNode = nameNode;
+    MBeanUtil.registerMBean("NameNode", "NameNodeStatistics", this);
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long  getBlockReportAverageTime() {
+    return myMetrics.blockReport.getPreviousIntervalAverageTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getBlockReportMaxTime() {
+    return myMetrics.blockReport.getMaxTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getBlockReportMinTime() {
+    return myMetrics.blockReport.getMinTime();
+  }
+ 
+  /**
+   * @inheritDoc
+   */
+  public int getBlockReportNum() {
+    return myMetrics.blockReport.getPreviousIntervalNumOps();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long  getJournalTransactionAverageTime() {
+    return myMetrics.transactions.getPreviousIntervalAverageTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getJournalTransactionNum() {
+    return myMetrics.transactions.getPreviousIntervalNumOps();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getJournalTransactionMaxTime() {
+    return myMetrics.transactions.getMaxTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getJournalTransactionMinTime() {
+    return myMetrics.transactions.getMinTime();
+  }
+
+  /**
+   * @inheritDoc
+   */  
+  public long getJournalSyncAverageTime() {
+    return myMetrics.syncs.getPreviousIntervalAverageTime();
+  }
+ 
+  /**
+   * @inheritDoc
+   */
+  public long getJournalSyncMaxTime() {
+    return myMetrics.syncs.getMaxTime();
+  }
+
+  
+  /**
+   * @inheritDoc
+   */
+  public long getJournalSyncMinTime() {
+    return myMetrics.syncs.getMinTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getJournalSyncNum() {
+    return myMetrics.syncs.getPreviousIntervalNumOps();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public String getNameNodeState() {
+    return myNameNode.isInSafeMode() ? "safeMode" : "Operational";
+  }
+ 
+  /**
+   * @inheritDoc
+   */
+  public int getSafemodeTime() {
+    return myMetrics.safeModeTime.get();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getFSImageLoadTime() {
+    return myMetrics.fsImageLoadTime.get();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public void resetAllMinMax() {
+    myMetrics.syncs.resetMinMax();
+    myMetrics.transactions.resetMinMax();
+    myMetrics.blockReport.resetMinMax();
+  }
+  
+  /**
+   * @inheritDoc
+   */
+  public int getNumFilesCreated() {
+    return myMetrics.numFilesCreated.getPreviousIntervalValue();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getNumFilesListed() {
+    return myMetrics.numFilesListed.getPreviousIntervalValue();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getNumFilesOpened() {
+    return myMetrics.numFilesOpened.getPreviousIntervalValue();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getNumFilesRenamed() {
+    return myMetrics.numFilesRenamed.getPreviousIntervalValue();
+  }
+}

+ 161 - 0
src/java/org/apache/hadoop/dfs/namenode/metrics/NameNodeMgtMBean.java

@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs.namenode.metrics;
+
+/**
+ * 
+ * This is the JMX management interface for the name node.
+ * Many of the statistics are sampled and averaged on an interval 
+ * which can be specified in the config file.
+ * <p>
+ * For the statistics that are sampled and averaged, one must specify 
+ * a metrics context that does periodic update calls. Most do.
+ * The default Null metrics context however does NOT. So if you aren't
+ * using any other metrics context then you can turn on the viewing and averaging
+ * of sampled metrics by  specifying the following two lines
+ *  in the hadoop-meterics.properties file:
+ *  <pre>
+ *        rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
+ *        rpc.period=10
+ *  </pre>
+ *<p>
+ * Note that the metrics are collected regardless of the context used.
+ * The context with the update thread is used to average the data periodically
+ *
+ */
+public interface NameNodeMgtMBean {
+  /**
+   * The state of the name node: Safemode or Operational
+   * @return the state
+   */
+  String getNameNodeState();
+  
+  /**
+   * The time spent in the Safemode at startup
+   * @return time in msec
+   */
+  int getSafemodeTime();
+  
+  /**
+   * Time spent loading the FS Image at startup
+   * @return time in msec
+   */
+  int getFSImageLoadTime();
+  
+  /**
+   * Number of Journal Transactions in the last interval
+   * @return number of operations
+   */
+  int getJournalTransactionNum();
+  
+  /**
+   * Average time for Journal transactions in last interval
+   * @return time in msec
+   */
+  long getJournalTransactionAverageTime();
+  
+  /**
+   * The Minimum Journal Transaction Time since reset was called
+   * @return time in msec
+   */
+  long getJournalTransactionMinTime();
+  
+  /**
+   *  The Maximum Journal Transaction Time since reset was called
+   * @return time in msec
+   */
+  long getJournalTransactionMaxTime();
+  
+  /**
+   *  Number of block Reports processed in the last interval
+   * @return number of operations
+   */
+  int getBlockReportNum();
+  
+  /**
+   * Average time for Block Report Processing in last interval
+   * @return time in msec
+   */
+  long getBlockReportAverageTime();
+  
+  /**
+   *  The Minimum Block Report Processing Time since reset was called
+   * @return time in msec
+   */
+  long getBlockReportMinTime();
+  
+  /**
+   *  The Maximum Block Report Processing Time since reset was called
+   * @return time in msec
+   */
+  long getBlockReportMaxTime();
+  
+  /**
+   *  Number of Journal Syncs in the last interval
+   * @return number of operations
+   */
+  int getJournalSyncNum();
+  
+  /**
+   * Average time for Journal Sync in last interval
+   * @return time in msec
+   */
+  long getJournalSyncAverageTime();
+  
+  /**
+   *  The Minimum Journal Sync Time since reset was called
+   * @return time in msec
+   */
+  long getJournalSyncMinTime();
+  
+  /**
+   *   The Maximum Journal Sync Time since reset was called
+   * @return time in msec
+   */
+  long getJournalSyncMaxTime();
+  
+  /**
+   * Reset all min max times
+   */
+  void resetAllMinMax();
+  
+  /**
+   *  Number of files created in the last interval
+   * @return  number of operations
+   */
+  int getNumFilesCreated();
+  
+  /**
+   *   Number of files opened for reading in the last interval
+   * @return  number of operations
+   */
+  int getNumFilesOpened();
+  
+  /**
+   *   Number of files renamed in the last interval
+   * @return number of operations
+   */
+  int getNumFilesRenamed();
+  
+  /**
+   *   Number of files listed in the last interval
+   * @return number of operations
+   */
+  int getNumFilesListed();
+}

+ 19 - 5
src/java/org/apache/hadoop/ipc/RPC.java

@@ -371,7 +371,15 @@ public class RPC {
       throws IOException {
       this(instance, conf,  bindAddress, port, 1, false);
     }
-
+    
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+    
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
@@ -382,13 +390,13 @@ public class RPC {
      */
     public Server(Object instance, Configuration conf, String bindAddress,  int port,
                   int numHandlers, boolean verbose) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf);
+      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
       this.instance = instance;
       this.implementation = instance.getClass();
       this.verbose = verbose;
     }
 
-    public Writable call(Writable param) throws IOException {
+    public Writable call(Writable param, long receivedTime) throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
@@ -399,8 +407,14 @@ public class RPC {
 
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Served: " + call.getMethodName() + " " + callTime);
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime-receivedTime);
+        LOG.debug("Served: " + call.getMethodName() +
+            " queueTime= " + qTime +
+            " procesingTime= " + processingTime);
+        rpcMetrics.rpcQueueTime.inc(qTime);
+        rpcMetrics.rpcProcessingTime.inc(processingTime);
+        
         if (verbose) log("Return: "+value);
 
         return new ObjectWritable(method.getReturnType(), value);

+ 36 - 5
src/java/org/apache/hadoop/ipc/Server.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 
@@ -91,7 +92,7 @@ public abstract class Server {
   private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
 
   /** Returns the server instance called under or null.  May be called under
-   * {@link #call(Writable)} implementations, and under {@link Writable}
+   * {@link #call(Writable, long)} implementations, and under {@link Writable}
    * methods of paramters and return values.  Permits applications to access
    * the server context.*/
   public static Server get() {
@@ -152,6 +153,8 @@ public abstract class Server {
                                                   // connections to nuke
                                                   //during a cleanup
   
+  protected RpcMetrics  rpcMetrics;
+  
   private Configuration conf;
 
   private int timeout;
@@ -884,9 +887,11 @@ public abstract class Server {
           if (System.currentTimeMillis() - call.receivedTime > 
               maxCallStartAge) {
             ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
+            int timeInQ = (int) (System.currentTimeMillis() - call.receivedTime);
             LOG.warn(getName()+", call "+call
                      +": discarded for being too old (" +
-                     (System.currentTimeMillis() - call.receivedTime) + ")");
+                     timeInQ + ")");
+            rpcMetrics.rpcDiscardedOps.inc(timeInQ);
             continue;
           }
           
@@ -900,7 +905,7 @@ public abstract class Server {
           
           CurCall.set(call);
           try {
-            value = call(call.param);             // make the call
+            value = call(call.param, call.receivedTime);             // make the call
           } catch (Throwable e) {
             LOG.info(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
@@ -935,12 +940,19 @@ public abstract class Server {
     }
 
   }
+  
+  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf)
+    throws IOException 
+  {
+    this(bindAddress, port, paramClass, handlerCount,  conf, Integer.toString(port));
+  }
   /** Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) 
+  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf,
+                  String serverName) 
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
@@ -954,6 +966,7 @@ public abstract class Server {
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+    this.rpcMetrics = new RpcMetrics(serverName, Integer.toString(port), this);
     
     // Start the listener here and let it bind to the port
     listener = new Listener();
@@ -1017,6 +1030,24 @@ public abstract class Server {
   }
   
   /** Called for each call. */
-  public abstract Writable call(Writable param) throws IOException;
+  public abstract Writable call(Writable param, long receiveTime)
+                                                throws IOException;
+  
+  
+  /**
+   * The number of open RPC conections
+   * @return the number of open rpc connections
+   */
+  public int getNumOpenConnections() {
+    return numConnections;
+  }
+  
+  /**
+   * The number of rpc calls in the queue.
+   * @return The number of rpc calls in the queue.
+   */
+  public int getCallQueueLen() {
+    return callQueue.size();
+  }
   
 }

+ 84 - 0
src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * 
+ * This class is for maintaining  the various RPC statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ * for example:
+ *  <p> {@link #rpcDiscardedOps}.inc(time)
+ *
+ */
+public class RpcMetrics implements Updater {
+  private MetricsRecord metricsRecord;
+  private static Log LOG = LogFactory.getLog(JvmMetrics.class);
+  
+  private void setTags(String serverName, String port) {
+    metricsRecord.setTag("serverName", serverName);
+    metricsRecord.setTag("port", port);
+    LOG.info("Initializing RPC Metrics with serverName=" 
+        + serverName + ", port=" + port);
+  }
+
+  public RpcMetrics(String serverName, String port, Server server) {
+    MetricsContext context = MetricsUtil.getContext("rpc");
+    metricsRecord = MetricsUtil.createRecord(context, "metrics");
+    setTags(serverName, port);
+    context.registerUpdater(this);
+    
+    // Need to clean up the interface to RpcMgt - don't need both metrics and server params
+    new RpcMgt(serverName, port, this, server);
+  }
+  
+  
+  /**
+   * The metrics variables are public:
+   *  - they can be set directly by calling their set/inc methods
+   *  -they can also be read directly - e.g. JMX does this.
+   */
+  
+  public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime");
+  public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime");
+  public MetricsTimeVaryingRate rpcDiscardedOps = new MetricsTimeVaryingRate("RpcDiscardedOps");
+  
+  
+  /**
+   * Push the metrics to the monitoring subsystem on doUpdate() call.
+   */
+  public void doUpdates(MetricsContext context) {
+    rpcQueueTime.pushMetric(metricsRecord);
+    rpcProcessingTime.pushMetric(metricsRecord);
+    rpcDiscardedOps.pushMetric(metricsRecord);
+    
+  }
+}

+ 125 - 0
src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics.util.MBeanUtil;
+
+
+/**
+ * This class implements the RpcMgt MBean
+ *
+ */
+class RpcMgt implements RpcMgtMBean {
+  private RpcMetrics myMetrics;
+  private Server myServer;
+  
+  RpcMgt(final String serviceName, final String port,
+                final RpcMetrics metrics, Server server) {
+    myMetrics = metrics;
+    myServer = server;
+    MBeanUtil.registerMBean(serviceName, "RpcStatistics", this);
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsAvgProcessingTime() {
+    return myMetrics.rpcProcessingTime.getPreviousIntervalAverageTime();
+  }
+  
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsAvgProcessingTimeMax() {
+    return myMetrics.rpcProcessingTime.getMaxTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsAvgProcessingTimeMin() {
+    return myMetrics.rpcProcessingTime.getMinTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsAvgQueueTime() {
+    return myMetrics.rpcQueueTime.getPreviousIntervalAverageTime();
+  }
+  
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsAvgQueueTimeMax() {
+    return myMetrics.rpcQueueTime.getMaxTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsAvgQueueTimeMin() {
+    return myMetrics.rpcQueueTime.getMinTime();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getRpcOpsNumber() {
+    return myMetrics.rpcProcessingTime.getPreviousIntervalNumOps() ;
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public int getRpcOpsDiscardedOpsNum() {
+    return myMetrics.rpcDiscardedOps.getPreviousIntervalNumOps();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public long getRpcOpsDiscardedOpsQtime() {
+    return myMetrics.rpcDiscardedOps.getPreviousIntervalAverageTime();
+  }
+  
+  /**
+   * @inheritDoc
+   */
+  public int getNumOpenConnections() {
+    return myServer.getNumOpenConnections();
+  }
+  
+  /**
+   * @inheritDoc
+   */
+  public int getCallQueueLen() {
+    return myServer.getCallQueueLen();
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public void resetAllMinMax() {
+    myMetrics.rpcProcessingTime.resetMinMax();
+    myMetrics.rpcQueueTime.resetMinMax();
+    myMetrics.rpcDiscardedOps.resetMinMax();
+  }
+}

+ 118 - 0
src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+
+/**
+ * 
+ * This is the JMX management interface for the RPC layer.
+ * Many of the statistics are sampled and averaged on an interval 
+ * which can be specified in the metrics config file.
+ * <p>
+ * For the statistics that are sampled and averaged, one must specify 
+ * a metrics context that does periodic update calls. Most do.
+ * The default Null metrics context however does NOT. So if you aren't
+ * using any other metrics context then you can turn on the viewing and averaging
+ * of sampled metrics by  specifying the following two lines
+ *  in the hadoop-meterics.properties file:
+ *  <pre>
+ *        rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
+ *        rpc.period=10
+ *  </pre>
+ *<p>
+ * Note that the metrics are collected regardless of the context used.
+ * The context with the update thread is used to average the data periodically
+ *
+ */
+public interface RpcMgtMBean {
+  
+  /**
+   * Number of RPC Operations in the last interval
+   * @return number of operations
+   */
+  int getRpcOpsNumber();
+  
+  /**
+   * Average time for RPC Operations in last interval
+   * @return time in msec
+   */
+  long getRpcOpsAvgProcessingTime();
+  
+  /**
+   * The Minimum RPC Operation Processing Time since reset was called
+   * @return time in msec
+   */
+  long getRpcOpsAvgProcessingTimeMin();
+  
+  
+  /**
+   * The Maximum RPC Operation Processing Time since reset was called
+   * @return time in msec
+   */
+  long getRpcOpsAvgProcessingTimeMax();
+  
+  
+  /**
+   * The Average RPC Operation Queued Time in the last interval
+   * @return time in msec
+   */
+  long getRpcOpsAvgQueueTime();
+  
+  
+  /**
+   * The Minimum RPC Operation Queued Time since reset was called
+   * @return time in msec
+   */
+  long getRpcOpsAvgQueueTimeMin();
+  
+  /**
+   * The Maximum RPC Operation Queued Time since reset was called
+   * @return time in msec
+   */
+  long getRpcOpsAvgQueueTimeMax();
+  
+  
+  /**
+   * Number of Discarded RPC operations due to timeout in the last interval
+   * @return number of operations
+   */
+  int getRpcOpsDiscardedOpsNum();
+  
+  /**
+   * Average Queued time for Discarded RPC Operations in last interval
+   * @return time in msec
+   */
+  long getRpcOpsDiscardedOpsQtime();
+  
+  /**
+   * Reset all min max times
+   */
+  void resetAllMinMax();
+  
+  /**
+   * The number of open RPC conections
+   * @return the number of open rpc connections
+   */
+  public int getNumOpenConnections();
+  
+  /**
+   * The number of rpc calls in the queue.
+   * @return The number of rpc calls in the queue.
+   */
+  public int getCallQueueLen();
+}

+ 83 - 0
src/java/org/apache/hadoop/metrics/spi/NullContextWithUpdateThread.java

@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics.spi;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsException;
+
+/**
+ * A null context which has a thread calling 
+ * periodically when monitoring is started. This keeps the data sampled 
+ * correctly.
+ * In all other respects, this is like the NULL context: No data is emitted.
+ * This is suitable for Monitoring systems like JMX which reads the metrics
+ *  when someone reads the data from JMX.
+ * 
+ * The default impl of start and stop monitoring:
+ *  is the AbstractMetricsContext is good enough.
+ * 
+ */
+
+public class NullContextWithUpdateThread extends AbstractMetricsContext {
+  
+  private static final String PERIOD_PROPERTY = "period";
+    
+  /** Creates a new instance of NullContextWithUpdateThread */
+  public NullContextWithUpdateThread() {
+  }
+  
+  public void init(String contextName, ContextFactory factory) {
+    super.init(contextName, factory);
+    
+    // If period is specified, use it, otherwise the default is good enough
+        
+    String periodStr = getAttribute(PERIOD_PROPERTY);
+    if (periodStr != null) {
+      int period = 0;
+      try {
+        period = Integer.parseInt(periodStr);
+      } catch (NumberFormatException nfe) {
+      }
+      if (period <= 0) {
+        throw new MetricsException("Invalid period: " + periodStr);
+      }
+      setPeriod(period);
+    }
+  }
+   
+    
+  /**
+   * Do-nothing version of emitRecord
+   */
+  protected void emitRecord(String contextName, String recordName,
+                            OutputRecord outRec) 
+  {}
+    
+  /**
+   * Do-nothing version of update
+   */
+  protected void update(MetricsRecordImpl record) {
+  }
+    
+  /**
+   * Do-nothing version of remove
+   */
+  protected void remove(MetricsRecordImpl record) {
+  }
+}

+ 67 - 0
src/java/org/apache/hadoop/metrics/util/MBeanUtil.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics.util;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+
+/**
+ * This util class provides a method to register an MBean using
+ * our standard naming convention as described in the doc
+ *  for {link {@link #registerMBean(String, String, Object)}
+ *
+ */
+public class MBeanUtil {
+	
+  /**
+   * Register the mbean using out standard MBeanName format
+   * "hadoop.dfs:service=<serviceName>,name=<nameName>"
+   * Where the <serviceName> and <nameName> are the supplied parameters
+   *    
+   * @param serviceName
+   * @param nameName
+   * @param theMbean - the MBean to register
+   */	
+  static public void registerMBean(final String serviceName, 
+		  							final String nameName,
+		  							final Object theMbean) {
+    final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName name = getMBeanName(serviceName, nameName);
+    try {
+      mbs.registerMBean(theMbean, name);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  static private ObjectName getMBeanName(final String serviceName,
+		  								 final String nameName) {
+    ObjectName name = null;
+    try {
+      name = new ObjectName("hadoop.dfs:" +
+                  "service=" + serviceName + ",name=" + nameName);
+    } catch (MalformedObjectNameException e) {
+      e.printStackTrace();
+    }
+    return name;
+  }
+}

+ 76 - 0
src/java/org/apache/hadoop/metrics/util/MetricsIntValue.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics.util;
+
+import org.apache.hadoop.metrics.MetricsRecord;
+
+
+/**
+ * The MetricsIntValue class is for a metric that is not time varied
+ * but changes only when it is set. 
+ * Each time its value is set, it is published only *once* at the next update
+ * call.
+ *
+ */
+public class MetricsIntValue {  
+  private String name;
+  private int value;
+  private boolean changed;
+  
+  /**
+   * Constructor - create a new metric
+   * @param nam the name of the metrics to be used to publish the metric
+   */
+  public MetricsIntValue(final String nam) {
+    name = nam;
+    value = 0;
+    changed = false;
+  }
+  
+  /**
+   * Set the value
+   * @param newValue
+   */
+  public synchronized void set(final int newValue) {
+    value = newValue;
+    changed = true;
+  }
+  
+  /**
+   * Get value
+   * @return the value last set
+   */
+  public synchronized int get() { 
+    return value;
+  } 
+  
+  /**
+   * Push the metric to the mr.
+   * The metric is pushed only if it was updated since last push
+   * 
+   * Note this does NOT push to JMX
+   * (JMX gets the info via {@link #get()}
+   *
+   * @param mr
+   */
+  public synchronized void pushMetric(final MetricsRecord mr) {
+    if (changed) 
+      mr.incrMetric(name, value);
+    changed = false;
+  }
+}

+ 90 - 0
src/java/org/apache/hadoop/metrics/util/MetricsTimeVaryingInt.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics.util;
+
+import org.apache.hadoop.metrics.MetricsRecord;
+
+/**
+ * The MetricsTimeVaryingInt class is for a metric that naturally
+ * varies over time (e.g. number of files created).
+ * The metric is is published at interval heart beat (the interval
+ * is set in the metrics config file).
+ * Note if one wants a time associated with the metric then use
+ * @see org.apache.hadoop.metrics.util.MetricsTimeVaryingRate
+ *
+ */
+public class MetricsTimeVaryingInt {
+
+  
+  private String name;
+  private int currentValue;
+  private int previousIntervalValue;
+  
+  /**
+   * Constructor - create a new metric
+   * @param nam the name of the metrics to be used to publish the metric
+   */
+  public MetricsTimeVaryingInt(final String nam) {
+    name = nam;
+    currentValue = 0;
+    previousIntervalValue = 0;
+  }
+  
+  /**
+   * Inc metrics for numOps operations
+   * @param numOps - number of operations
+   */
+  public synchronized void inc(final int numOps) {
+    currentValue += numOps;
+  }
+  
+  /**
+   * Inc metrics for one operation
+   */
+  public synchronized void inc() {
+    currentValue++;
+  }
+
+  private synchronized void intervalHeartBeat() {
+     previousIntervalValue = currentValue;
+     currentValue = 0;
+  }
+  
+  /**
+   * Push the delta  metrics to the mr.
+   * The delta is since the last push/interval.
+   * 
+   * Note this does NOT push to JMX
+   * (JMX gets the info via {@link #previousIntervalValue}
+   *
+   * @param mr
+   */
+  public synchronized void pushMetric(final MetricsRecord mr) {
+    intervalHeartBeat();
+    mr.incrMetric(name + "_num_ops", getPreviousIntervalValue());
+  }
+  
+  
+  /**
+   * The Value at the Previous interval
+   * @return prev interval value
+   */
+  public synchronized int getPreviousIntervalValue() { 
+    return previousIntervalValue;
+  } 
+}

+ 172 - 0
src/java/org/apache/hadoop/metrics/util/MetricsTimeVaryingRate.java

@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics.util;
+
+import org.apache.hadoop.metrics.MetricsRecord;
+
+/**
+ * The MetricsTimeVaryingRate class is for a rate based metric that
+ * naturally varies over time (e.g. time taken to create a file).
+ * The rate is averaged at each interval heart beat (the interval
+ * is set in the metrics config file).
+ * This class also keeps track of the min and max rates along with 
+ * a method to reset the min-max.
+ *
+ */
+public class MetricsTimeVaryingRate {
+
+  static class Metrics {
+    int numOperations = 0;
+    long time = 0;  // total time or average time
+
+    void set(final Metrics resetTo) {
+      numOperations = resetTo.numOperations;
+      time = resetTo.time;
+    }
+    
+    void reset() {
+      numOperations = 0;
+      time = 0;
+    }
+  }
+  
+  static class MinMax {
+    long minTime = -1;
+    long maxTime = 0;
+    
+    void set(final MinMax newVal) {
+      minTime = newVal.minTime;
+      maxTime = newVal.maxTime;
+    }
+    
+    void reset() {
+      minTime = -1;
+      maxTime = 0;
+    }
+    void update(final long time) { // update min max
+      minTime = (minTime == -1) ? time : Math.min(minTime, time);
+      minTime = Math.min(minTime, time);
+      maxTime = Math.max(maxTime, time);
+    }
+  }
+  private String name;
+  private Metrics currentData;
+  private Metrics previousIntervalData;
+  private MinMax minMax;
+  
+  
+  /**
+   * Constructor - create a new metric
+   * @param n the name of the metrics to be used to publish the metric
+   */
+  public MetricsTimeVaryingRate(final String n) {
+    name = n;
+    currentData = new Metrics();
+    previousIntervalData = new Metrics();
+    minMax = new MinMax();
+  }
+  
+  
+  /**
+   * Increment the metrics for numOps operations
+   * @param numOps - number of operations
+   * @param time - time for numOps operations
+   */
+  public synchronized void inc(final int numOps, final long time) {
+    currentData.numOperations += numOps;
+    currentData.time += time;
+    long timePerOps = time/numOps;
+    minMax.update(timePerOps);
+  }
+  
+  /**
+   * Increment the metrics for one operation
+   * @param time for one operation
+   */
+  public synchronized void inc(final long time) {
+    currentData.numOperations++;
+    currentData.time += time;
+    minMax.update(time);
+  }
+  
+  
+
+  private synchronized void intervalHeartBeat() {
+     previousIntervalData.numOperations = currentData.numOperations;
+     previousIntervalData.time = (currentData.numOperations == 0) ?
+                             0 : currentData.time / currentData.numOperations;
+     currentData.reset();
+  }
+  
+  /**
+   * Push the delta  metrics to the mr.
+   * The delta is since the last push/interval.
+   * 
+   * Note this does NOT push to JMX
+   * (JMX gets the info via {@link #getPreviousIntervalAverageTime()} and
+   * {@link #getPreviousIntervalNumOps()}
+   *
+   * @param mr
+   */
+  public synchronized void pushMetric(final MetricsRecord mr) {
+    intervalHeartBeat();
+    mr.incrMetric(name + "_num_ops", getPreviousIntervalNumOps());
+    mr.incrMetric(name + "_avg_time", getPreviousIntervalAverageTime());
+  }
+  
+  /**
+   * The number of operations in the previous interval
+   * @return - ops in prev interval
+   */
+  public synchronized int getPreviousIntervalNumOps() { 
+    return previousIntervalData.numOperations;
+  }
+  
+  /**
+   * The average rate of an operation in the previous interval
+   * @return - the average rate.
+   */
+  public synchronized long getPreviousIntervalAverageTime() {
+    return previousIntervalData.time;
+  } 
+  
+  /**
+   * The min time for a single operation since the last reset
+   *  {@link #resetMinMax()}
+   * @return min time for an operation
+   */
+  public synchronized long getMinTime() {
+    return  minMax.minTime;
+  }
+  
+  /**
+   * The max time for a single operation since the last reset
+   *  {@link #resetMinMax()}
+   * @return max time for an operation
+   */
+  public synchronized long getMaxTime() {
+    return minMax.maxTime;
+  }
+  
+  /**
+   * Reset the min max values
+   */
+  public synchronized void resetMinMax() {
+    minMax.reset();
+  }
+}

+ 1 - 1
src/test/org/apache/hadoop/ipc/TestIPC.java

@@ -54,7 +54,7 @@ public class TestIPC extends TestCase {
       this.sleep = sleep;
     }
 
-    public Writable call(Writable param) throws IOException {
+    public Writable call(Writable param, long receivedTime) throws IOException {
       if (sleep) {
         try {
           Thread.sleep(RANDOM.nextInt(200));      // sleep a bit