|
@@ -35,6 +35,11 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.lang.NotImplementedException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
|
|
|
+import org.apache.hadoop.metrics2.MetricsCollector;
|
|
|
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSource;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.lib.Interns;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -43,7 +48,7 @@ import org.slf4j.LoggerFactory;
|
|
|
* A queue with multiple levels for each priority.
|
|
|
*/
|
|
|
public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
- implements BlockingQueue<E> {
|
|
|
+ implements BlockingQueue<E> {
|
|
|
@Deprecated
|
|
|
public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
|
|
|
@Deprecated
|
|
@@ -335,7 +340,8 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
* MetricsProxy is a singleton because we may init multiple
|
|
|
* FairCallQueues, but the metrics system cannot unregister beans cleanly.
|
|
|
*/
|
|
|
- private static final class MetricsProxy implements FairCallQueueMXBean {
|
|
|
+ private static final class MetricsProxy implements FairCallQueueMXBean,
|
|
|
+ MetricsSource {
|
|
|
// One singleton per namespace
|
|
|
private static final HashMap<String, MetricsProxy> INSTANCES =
|
|
|
new HashMap<String, MetricsProxy>();
|
|
@@ -346,8 +352,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
// Keep track of how many objects we registered
|
|
|
private int revisionNumber = 0;
|
|
|
|
|
|
+ private String namespace;
|
|
|
+
|
|
|
private MetricsProxy(String namespace) {
|
|
|
+ this.namespace = namespace;
|
|
|
MBeans.register(namespace, "FairCallQueue", this);
|
|
|
+ final String name = namespace + ".FairCallQueue";
|
|
|
+ DefaultMetricsSystem.instance().register(name, name, this);
|
|
|
}
|
|
|
|
|
|
public static synchronized MetricsProxy getInstance(String namespace) {
|
|
@@ -389,6 +400,23 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
@Override public int getRevision() {
|
|
|
return revisionNumber;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void getMetrics(MetricsCollector collector, boolean all) {
|
|
|
+ MetricsRecordBuilder rb = collector.addRecord("FairCallQueue")
|
|
|
+ .setContext("rpc")
|
|
|
+ .tag(Interns.info("namespace", "Namespace"), namespace);
|
|
|
+
|
|
|
+ final int[] currentQueueSizes = getQueueSizes();
|
|
|
+ final long[] currentOverflowedCalls = getOverflowedCalls();
|
|
|
+
|
|
|
+ for (int i = 0; i < currentQueueSizes.length; i++) {
|
|
|
+ rb.addGauge(Interns.info("FairCallQueueSize_p" + i, "FCQ Queue Size"),
|
|
|
+ currentQueueSizes[i]);
|
|
|
+ rb.addCounter(Interns.info("FairCallQueueOverflowedCalls_p" + i,
|
|
|
+ "FCQ Overflowed Calls"), currentOverflowedCalls[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// FairCallQueueMXBean
|