|
@@ -21,11 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
|
|
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.multiply;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.metrics2.MetricsCollector;
|
|
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSource;
|
|
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
|
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
|
@@ -34,7 +38,9 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -43,7 +49,7 @@ import com.google.common.base.Splitter;
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
|
@Metrics(context="yarn")
|
|
|
-public class QueueMetrics {
|
|
|
+public class QueueMetrics implements MetricsSource {
|
|
|
@Metric("# of apps submitted") MutableCounterInt appsSubmitted;
|
|
|
@Metric("# of running apps") MutableGaugeInt appsRunning;
|
|
|
@Metric("# of pending apps") MutableGaugeInt appsPending;
|
|
@@ -62,6 +68,8 @@ public class QueueMetrics {
|
|
|
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
|
|
|
@Metric("# of active users") MutableGaugeInt activeUsers;
|
|
|
@Metric("# of active users") MutableGaugeInt activeApplications;
|
|
|
+ private final MutableGaugeInt[] runningTime;
|
|
|
+ private TimeBucketMetrics<ApplicationId> runBuckets;
|
|
|
|
|
|
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
|
|
|
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
|
|
@@ -76,14 +84,18 @@ public class QueueMetrics {
|
|
|
final QueueMetrics parent;
|
|
|
final MetricsSystem metricsSystem;
|
|
|
private final Map<String, QueueMetrics> users;
|
|
|
+ private final Configuration conf;
|
|
|
|
|
|
- QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics) {
|
|
|
+ QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
|
|
+ boolean enableUserMetrics, Configuration conf) {
|
|
|
registry = new MetricsRegistry(RECORD_INFO);
|
|
|
this.queueName = queueName;
|
|
|
this.parent = parent != null ? parent.getMetrics() : null;
|
|
|
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
|
|
|
: null;
|
|
|
metricsSystem = ms;
|
|
|
+ this.conf = conf;
|
|
|
+ runningTime = buildBuckets(conf);
|
|
|
}
|
|
|
|
|
|
QueueMetrics tag(MetricsInfo info, String value) {
|
|
@@ -102,15 +114,18 @@ public class QueueMetrics {
|
|
|
|
|
|
public synchronized
|
|
|
static QueueMetrics forQueue(String queueName, Queue parent,
|
|
|
- boolean enableUserMetrics) {
|
|
|
+ boolean enableUserMetrics,
|
|
|
+ Configuration conf) {
|
|
|
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
|
|
|
- enableUserMetrics);
|
|
|
+ enableUserMetrics, conf);
|
|
|
}
|
|
|
|
|
|
public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
|
|
|
- Queue parent, boolean enableUserMetrics) {
|
|
|
- QueueMetrics metrics = new QueueMetrics(ms, queueName, parent,
|
|
|
- enableUserMetrics).tag(QUEUE_INFO, queueName);
|
|
|
+ Queue parent, boolean enableUserMetrics,
|
|
|
+ Configuration conf) {
|
|
|
+ QueueMetrics metrics =
|
|
|
+ new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf
|
|
|
+ ).tag(QUEUE_INFO, queueName);
|
|
|
return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
|
|
|
"Metrics for queue: " + queueName, metrics);
|
|
|
}
|
|
@@ -121,7 +136,7 @@ public class QueueMetrics {
|
|
|
}
|
|
|
QueueMetrics metrics = users.get(userName);
|
|
|
if (metrics == null) {
|
|
|
- metrics = new QueueMetrics(metricsSystem, queueName, null, false);
|
|
|
+ metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
|
|
|
users.put(userName, metrics);
|
|
|
metricsSystem.register(
|
|
|
sourceName(queueName).append(",user=").append(userName).toString(),
|
|
@@ -131,6 +146,41 @@ public class QueueMetrics {
|
|
|
return metrics;
|
|
|
}
|
|
|
|
|
|
+ private ArrayList<Integer> parseInts(String value) {
|
|
|
+ ArrayList<Integer> result = new ArrayList<Integer>();
|
|
|
+ for(String s: value.split(",")) {
|
|
|
+ result.add(Integer.parseInt(s.trim()));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private MutableGaugeInt[] buildBuckets(Configuration conf) {
|
|
|
+ ArrayList<Integer> buckets =
|
|
|
+ parseInts(conf.get(YarnConfiguration.RM_METRICS_RUNTIME_BUCKETS,
|
|
|
+ YarnConfiguration.DEFAULT_RM_METRICS_RUNTIME_BUCKETS));
|
|
|
+ MutableGaugeInt[] result = new MutableGaugeInt[buckets.size() + 1];
|
|
|
+ result[0] = registry.newGauge("running_0", "", 0);
|
|
|
+ long[] cuts = new long[buckets.size()];
|
|
|
+ for(int i=0; i < buckets.size(); ++i) {
|
|
|
+ result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
|
|
|
+ cuts[i] = buckets.get(i) * 1000L * 60; // covert from min to ms
|
|
|
+ }
|
|
|
+ this.runBuckets = new TimeBucketMetrics<ApplicationId>(cuts);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateRunningTime() {
|
|
|
+ int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
|
|
|
+ for(int i=0; i < counts.length; ++i) {
|
|
|
+ runningTime[i].set(counts[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void getMetrics(MetricsCollector collector, boolean all) {
|
|
|
+ updateRunningTime();
|
|
|
+ registry.snapshot(collector.addRecord(registry.info()), all);
|
|
|
+ }
|
|
|
+
|
|
|
public void submitApp(String user) {
|
|
|
appsSubmitted.incr();
|
|
|
appsPending.incr();
|
|
@@ -143,20 +193,22 @@ public class QueueMetrics {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void incrAppsRunning(String user) {
|
|
|
+ public void incrAppsRunning(AppSchedulingInfo app, String user) {
|
|
|
+ runBuckets.add(app.getApplicationId(), System.currentTimeMillis());
|
|
|
appsRunning.incr();
|
|
|
appsPending.decr();
|
|
|
QueueMetrics userMetrics = getUserMetrics(user);
|
|
|
if (userMetrics != null) {
|
|
|
- userMetrics.incrAppsRunning(user);
|
|
|
+ userMetrics.incrAppsRunning(app, user);
|
|
|
}
|
|
|
if (parent != null) {
|
|
|
- parent.incrAppsRunning(user);
|
|
|
+ parent.incrAppsRunning(app, user);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void finishApp(AppSchedulingInfo app,
|
|
|
RMAppAttemptState rmAppAttemptFinalState) {
|
|
|
+ runBuckets.remove(app.getApplicationId());
|
|
|
switch (rmAppAttemptFinalState) {
|
|
|
case KILLED: appsKilled.incr(); break;
|
|
|
case FAILED: appsFailed.incr(); break;
|