|
@@ -19,136 +19,141 @@ package org.apache.hadoop.ipc.metrics;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
-import org.apache.hadoop.metrics.MetricsContext;
|
|
|
-import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
-import org.apache.hadoop.metrics.MetricsUtil;
|
|
|
-import org.apache.hadoop.metrics.Updater;
|
|
|
-import org.apache.hadoop.metrics.util.MetricsBase;
|
|
|
-import org.apache.hadoop.metrics.util.MetricsIntValue;
|
|
|
-import org.apache.hadoop.metrics.util.MetricsRegistry;
|
|
|
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
|
|
|
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
|
|
|
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.metrics2.annotation.Metric;
|
|
|
+import org.apache.hadoop.metrics2.annotation.Metrics;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
|
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
|
|
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
|
+import org.apache.hadoop.metrics2.lib.MutableRate;
|
|
|
|
|
|
/**
|
|
|
- *
|
|
|
* This class is for maintaining the various RPC statistics
|
|
|
* and publishing them through the metrics interfaces.
|
|
|
- * This also registers the JMX MBean for RPC.
|
|
|
- * <p>
|
|
|
- * This class has a number of metrics variables that are publicly accessible;
|
|
|
- * these variables (objects) have methods to update their values;
|
|
|
- * for example:
|
|
|
- * <p> {@link #rpcQueueTime}.inc(time)
|
|
|
- *
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
-public class RpcMetrics implements Updater {
|
|
|
- private final MetricsRegistry registry = new MetricsRegistry();
|
|
|
- private final MetricsRecord metricsRecord;
|
|
|
- private final Server myServer;
|
|
|
- private static final Log LOG = LogFactory.getLog(RpcMetrics.class);
|
|
|
- RpcActivityMBean rpcMBean;
|
|
|
+@Metrics(about="Aggregate RPC metrics", context="rpc")
|
|
|
+public class RpcMetrics {
|
|
|
+
|
|
|
+ static final Log LOG = LogFactory.getLog(RpcMetrics.class);
|
|
|
+ final Server server;
|
|
|
+ final MetricsRegistry registry;
|
|
|
+ final String name;
|
|
|
|
|
|
- public RpcMetrics(final String hostName, final String port,
|
|
|
- final Server server) {
|
|
|
- myServer = server;
|
|
|
- MetricsContext context = MetricsUtil.getContext("rpc");
|
|
|
- metricsRecord = MetricsUtil.createRecord(context, "metrics");
|
|
|
+ RpcMetrics(Server server) {
|
|
|
+ String port = String.valueOf(server.getListenerAddress().getPort());
|
|
|
+ name = "RpcActivityForPort"+ port;
|
|
|
+ this.server = server;
|
|
|
+ registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
|
|
|
+ LOG.debug("Initialized "+ registry);
|
|
|
+ }
|
|
|
|
|
|
- metricsRecord.setTag("port", port);
|
|
|
+ public String name() { return name; }
|
|
|
|
|
|
- LOG.info("Initializing RPC Metrics with hostName="
|
|
|
- + hostName + ", port=" + port);
|
|
|
+ public static RpcMetrics create(Server server) {
|
|
|
+ RpcMetrics m = new RpcMetrics(server);
|
|
|
+ return DefaultMetricsSystem.instance().register(m.name, null, m);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Metric("Number of received bytes") MutableCounterLong receivedBytes;
|
|
|
+ @Metric("Number of sent bytes") MutableCounterLong sentBytes;
|
|
|
+ @Metric("Queue time") MutableRate rpcQueueTime;
|
|
|
+ @Metric("Processsing time") MutableRate rpcProcessingTime;
|
|
|
+ @Metric("Number of authentication failures")
|
|
|
+ MutableCounterInt rpcAuthenticationFailures;
|
|
|
+ @Metric("Number of authentication successes")
|
|
|
+ MutableCounterInt rpcAuthenticationSuccesses;
|
|
|
+ @Metric("Number of authorization failures")
|
|
|
+ MutableCounterInt rpcAuthorizationFailures;
|
|
|
+ @Metric("Number of authorization sucesses")
|
|
|
+ MutableCounterInt rpcAuthorizationSuccesses;
|
|
|
|
|
|
- context.registerUpdater(this);
|
|
|
-
|
|
|
- // Need to clean up the interface to RpcMgt - don't need both metrics and server params
|
|
|
- rpcMBean = new RpcActivityMBean(registry, hostName, port);
|
|
|
+ @Metric("Number of open connections") public int numOpenConnections() {
|
|
|
+ return server.getNumOpenConnections();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * The metrics variables are public:
|
|
|
- * - they can be set directly by calling their set/inc methods
|
|
|
- * -they can also be read directly - e.g. JMX does this.
|
|
|
- */
|
|
|
+
|
|
|
+ @Metric("Length of the call queue") public int callQueueLength() {
|
|
|
+ return server.getCallQueueLen();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Public instrumentation methods that could be extracted to an
|
|
|
+ // abstract class if we decide to do custom instrumentation classes a la
|
|
|
+ // JobTrackerInstrumenation. The methods with //@Override comment are
|
|
|
+ // candidates for abstract methods in a abstract instrumentation class.
|
|
|
|
|
|
/**
|
|
|
- * metrics - number of bytes received
|
|
|
- */
|
|
|
- public final MetricsTimeVaryingLong receivedBytes =
|
|
|
- new MetricsTimeVaryingLong("ReceivedBytes", registry);
|
|
|
- /**
|
|
|
- * metrics - number of bytes sent
|
|
|
- */
|
|
|
- public final MetricsTimeVaryingLong sentBytes =
|
|
|
- new MetricsTimeVaryingLong("SentBytes", registry);
|
|
|
- /**
|
|
|
- * metrics - rpc queue time
|
|
|
- */
|
|
|
- public final MetricsTimeVaryingRate rpcQueueTime =
|
|
|
- new MetricsTimeVaryingRate("RpcQueueTime", registry);
|
|
|
- /**
|
|
|
- * metrics - rpc processing time
|
|
|
+ * One authentication failure event
|
|
|
*/
|
|
|
- public final MetricsTimeVaryingRate rpcProcessingTime =
|
|
|
- new MetricsTimeVaryingRate("RpcProcessingTime", registry);
|
|
|
+ //@Override
|
|
|
+ public void incrAuthenticationFailures() {
|
|
|
+ rpcAuthenticationFailures.incr();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * metrics - number of open connections
|
|
|
+ * One authentication success event
|
|
|
*/
|
|
|
- public final MetricsIntValue numOpenConnections =
|
|
|
- new MetricsIntValue("NumOpenConnections", registry);
|
|
|
+ //@Override
|
|
|
+ public void incrAuthenticationSuccesses() {
|
|
|
+ rpcAuthenticationSuccesses.incr();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * metrics - length of the queue
|
|
|
+ * One authorization success event
|
|
|
*/
|
|
|
- public final MetricsIntValue callQueueLen =
|
|
|
- new MetricsIntValue("callQueueLen", registry);
|
|
|
+ //@Override
|
|
|
+ public void incrAuthorizationSuccesses() {
|
|
|
+ rpcAuthorizationSuccesses.incr();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * metrics - number of failed authentications
|
|
|
+ * One authorization failure event
|
|
|
*/
|
|
|
- public final MetricsTimeVaryingInt authenticationFailures =
|
|
|
- new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
|
|
|
+ //@Override
|
|
|
+ public void incrAuthorizationFailures() {
|
|
|
+ rpcAuthorizationFailures.incr();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * metrics - number of successful authentications
|
|
|
+ * Shutdown the instrumentation for the process
|
|
|
*/
|
|
|
- public final MetricsTimeVaryingInt authenticationSuccesses =
|
|
|
- new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
|
|
|
+ //@Override
|
|
|
+ public void shutdown() {}
|
|
|
+
|
|
|
/**
|
|
|
- * metrics - number of failed authorizations
|
|
|
+ * Increment sent bytes by count
|
|
|
+ * @param count to increment
|
|
|
*/
|
|
|
- public final MetricsTimeVaryingInt authorizationFailures =
|
|
|
- new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
|
|
|
+ //@Override
|
|
|
+ public void incrSentBytes(int count) {
|
|
|
+ sentBytes.incr(count);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * metrics - number of successful authorizations
|
|
|
+ * Increment received bytes by count
|
|
|
+ * @param count to increment
|
|
|
*/
|
|
|
- public final MetricsTimeVaryingInt authorizationSuccesses =
|
|
|
- new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
|
|
|
-
|
|
|
+ //@Override
|
|
|
+ public void incrReceivedBytes(int count) {
|
|
|
+ receivedBytes.incr(count);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Push the metrics to the monitoring subsystem on doUpdate() call.
|
|
|
+ * Add an RPC queue time sample
|
|
|
+ * @param qTime the queue time
|
|
|
*/
|
|
|
- public void doUpdates(final MetricsContext context) {
|
|
|
-
|
|
|
- synchronized (this) {
|
|
|
- // ToFix - fix server to use the following two metrics directly so
|
|
|
- // the metrics do not have be copied here.
|
|
|
- numOpenConnections.set(myServer.getNumOpenConnections());
|
|
|
- callQueueLen.set(myServer.getCallQueueLen());
|
|
|
- for (MetricsBase m : registry.getMetricsList()) {
|
|
|
- m.pushMetric(metricsRecord);
|
|
|
- }
|
|
|
- }
|
|
|
- metricsRecord.update();
|
|
|
+ //@Override
|
|
|
+ public void addRpcQueueTime(int qTime) {
|
|
|
+ rpcQueueTime.add(qTime);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * shutdown the metrics
|
|
|
+ * Add an RPC processing time sample
|
|
|
+ * @param processingTime the processing time
|
|
|
*/
|
|
|
- public void shutdown() {
|
|
|
- if (rpcMBean != null)
|
|
|
- rpcMBean.shutdown();
|
|
|
+ //@Override
|
|
|
+ public void addRpcProcessingTime(int processingTime) {
|
|
|
+ rpcProcessingTime.add(processingTime);
|
|
|
}
|
|
|
}
|