|
@@ -234,14 +234,35 @@ public abstract class SchedulerMetrics {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public abstract void trackQueue(String queueName);
|
|
|
-
|
|
|
- public void untrackQueue(String queueName) {
|
|
|
- for (String m : queueTrackedMetrics) {
|
|
|
- metrics.remove("variable.queue." + queueName + "." + m);
|
|
|
+ /**
|
|
|
+ * Track a queue by registering its metrics.
|
|
|
+ *
|
|
|
+ * @param queue queue name
|
|
|
+ */
|
|
|
+ public void trackQueue(String queue) {
|
|
|
+ queueLock.lock();
|
|
|
+ try {
|
|
|
+ if (!isTracked(queue)) {
|
|
|
+ trackedQueues.add(queue);
|
|
|
+ registerQueueMetrics(queue);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ queueLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ protected void registerQueueMetrics(String queueName) {
|
|
|
+ SortedMap<String, Counter> counterMap = metrics.getCounters();
|
|
|
+
|
|
|
+ for (QueueMetric queueMetric : QueueMetric.values()) {
|
|
|
+ String metricName = getQueueMetricName(queueName, queueMetric);
|
|
|
+ if (!counterMap.containsKey(metricName)) {
|
|
|
+ metrics.counter(metricName);
|
|
|
+ queueTrackedMetrics.add(metricName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public boolean isTracked(String queueName) {
|
|
|
return trackedQueues.contains(queueName);
|
|
|
}
|
|
@@ -547,40 +568,13 @@ public abstract class SchedulerMetrics {
|
|
|
return "counter.queue." + queue + "." + metric.value;
|
|
|
}
|
|
|
|
|
|
- private void traceQueueIfNotTraced(String queue) {
|
|
|
- queueLock.lock();
|
|
|
- try {
|
|
|
- if (!isTracked(queue)) {
|
|
|
- trackQueue(queue);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- queueLock.unlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void initQueueMetric(String queueName){
|
|
|
- SortedMap<String, Counter> counterMap = metrics.getCounters();
|
|
|
-
|
|
|
- for (QueueMetric queueMetric : QueueMetric.values()) {
|
|
|
- String metricName = getQueueMetricName(queueName, queueMetric);
|
|
|
- if (!counterMap.containsKey(metricName)) {
|
|
|
- metrics.counter(metricName);
|
|
|
- counterMap = metrics.getCounters();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- traceQueueIfNotTraced(queueName);
|
|
|
- }
|
|
|
-
|
|
|
void updateQueueMetrics(Resource pendingResource, Resource allocatedResource,
|
|
|
String queueName) {
|
|
|
+ trackQueue(queueName);
|
|
|
+
|
|
|
SortedMap<String, Counter> counterMap = metrics.getCounters();
|
|
|
for(QueueMetric metric : QueueMetric.values()) {
|
|
|
String metricName = getQueueMetricName(queueName, metric);
|
|
|
- if (!counterMap.containsKey(metricName)) {
|
|
|
- metrics.counter(metricName);
|
|
|
- counterMap = metrics.getCounters();
|
|
|
- }
|
|
|
|
|
|
if (metric == QueueMetric.PENDING_MEMORY) {
|
|
|
counterMap.get(metricName).inc(pendingResource.getMemorySize());
|
|
@@ -592,8 +586,6 @@ public abstract class SchedulerMetrics {
|
|
|
counterMap.get(metricName).inc(allocatedResource.getVirtualCores());
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- traceQueueIfNotTraced(queueName);
|
|
|
}
|
|
|
|
|
|
void updateQueueMetricsByRelease(Resource releaseResource, String queue) {
|