|
@@ -23,6 +23,7 @@ import static org.apache.hadoop.metrics2.lib.Interns.info;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -43,6 +44,7 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
@@ -123,6 +125,31 @@ public class QueueMetrics implements MetricsSource {
|
|
|
protected final Configuration conf;
|
|
|
private QueueMetricsForCustomResources queueMetricsForCustomResources;
|
|
|
|
|
|
+ private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
|
|
|
+ "AllocatedResource.";
|
|
|
+ private static final String ALLOCATED_RESOURCE_METRIC_DESC =
|
|
|
+ "Allocated NAME";
|
|
|
+
|
|
|
+ private static final String AVAILABLE_RESOURCE_METRIC_PREFIX =
|
|
|
+ "AvailableResource.";
|
|
|
+ private static final String AVAILABLE_RESOURCE_METRIC_DESC =
|
|
|
+ "Available NAME";
|
|
|
+
|
|
|
+ private static final String PENDING_RESOURCE_METRIC_PREFIX =
|
|
|
+ "PendingResource.";
|
|
|
+ private static final String PENDING_RESOURCE_METRIC_DESC =
|
|
|
+ "Pending NAME";
|
|
|
+
|
|
|
+ private static final String RESERVED_RESOURCE_METRIC_PREFIX =
|
|
|
+ "ReservedResource.";
|
|
|
+ private static final String RESERVED_RESOURCE_METRIC_DESC =
|
|
|
+ "Reserved NAME";
|
|
|
+
|
|
|
+ private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX =
|
|
|
+ "AggregatePreemptedSeconds.";
|
|
|
+ private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
|
|
|
+ "Aggregate Preempted Seconds for NAME";
|
|
|
+
|
|
|
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
|
|
boolean enableUserMetrics, Configuration conf) {
|
|
|
registry = new MetricsRegistry(RECORD_INFO);
|
|
@@ -137,6 +164,7 @@ public class QueueMetrics implements MetricsSource {
|
|
|
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
|
|
this.queueMetricsForCustomResources =
|
|
|
new QueueMetricsForCustomResources();
|
|
|
+ registerCustomResources();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -368,6 +396,9 @@ public class QueueMetrics implements MetricsSource {
|
|
|
availableVCores.set(limit.getVirtualCores());
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.setAvailable(limit);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getAvailableValues(),
|
|
|
+ AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -420,16 +451,67 @@ public class QueueMetrics implements MetricsSource {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Register all custom resources metrics as part of initialization. As and
|
|
|
+ * when this metric object construction happens for any queue, all custom
|
|
|
+ * resource metrics value would be initialized with '0' like any other
|
|
|
+ * mandatory resources metrics
|
|
|
+ */
|
|
|
+ private void registerCustomResources() {
|
|
|
+ Map<String, Long> customResources =
|
|
|
+ new HashMap<String, Long>();
|
|
|
+ ResourceInformation[] resources =
|
|
|
+ ResourceUtils.getResourceTypesArray();
|
|
|
+
|
|
|
+ for (int i =
|
|
|
+ 2; i < resources.length; i++) {
|
|
|
+ ResourceInformation resource =
|
|
|
+ resources[i];
|
|
|
+ customResources.put(resource.getName(), new Long(0));
|
|
|
+ }
|
|
|
+
|
|
|
+ registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX,
|
|
|
+ ALLOCATED_RESOURCE_METRIC_DESC);
|
|
|
+ registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX,
|
|
|
+ AVAILABLE_RESOURCE_METRIC_DESC);
|
|
|
+ registerCustomResources(customResources, PENDING_RESOURCE_METRIC_PREFIX,
|
|
|
+ PENDING_RESOURCE_METRIC_DESC);
|
|
|
+ registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX,
|
|
|
+ RESERVED_RESOURCE_METRIC_DESC);
|
|
|
+ registerCustomResources(customResources,
|
|
|
+ AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
|
|
|
+ AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void registerCustomResources(Map<String, Long> customResources,
|
|
|
+ String metricPrefix, String metricDesc) {
|
|
|
+ for (Entry<String, Long> entry : customResources.entrySet()) {
|
|
|
+ String resourceName = entry.getKey();
|
|
|
+ Long resourceValue = entry.getValue();
|
|
|
+
|
|
|
+ MutableGaugeLong resourceMetric =
|
|
|
+ (MutableGaugeLong) this.registry.get(metricPrefix + resourceName);
|
|
|
+
|
|
|
+ if (resourceMetric == null) {
|
|
|
+ resourceMetric =
|
|
|
+ this.registry.newGauge(metricPrefix + resourceName,
|
|
|
+ metricDesc.replace("NAME", resourceName), 0L);
|
|
|
+ }
|
|
|
+ resourceMetric.set(resourceValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void _incrPendingResources(int containers, Resource res) {
|
|
|
pendingContainers.incr(containers);
|
|
|
pendingMB.incr(res.getMemorySize() * containers);
|
|
|
pendingVCores.incr(res.getVirtualCores() * containers);
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.increasePending(res, containers);
|
|
|
+ registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
|
|
|
+ PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public void decrPendingResources(String partition, String user,
|
|
|
int containers, Resource res) {
|
|
|
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
|
@@ -450,6 +532,8 @@ public class QueueMetrics implements MetricsSource {
|
|
|
pendingVCores.decr(res.getVirtualCores() * containers);
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.decreasePending(res, containers);
|
|
|
+ registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
|
|
|
+ PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -482,6 +566,9 @@ public class QueueMetrics implements MetricsSource {
|
|
|
allocatedVCores.incr(res.getVirtualCores() * containers);
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.increaseAllocated(res, containers);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getAllocatedValues(),
|
|
|
+ ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
|
|
|
if (decrPending) {
|
|
@@ -510,12 +597,18 @@ public class QueueMetrics implements MetricsSource {
|
|
|
allocatedVCores.incr(res.getVirtualCores());
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.increaseAllocated(res);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getAllocatedValues(),
|
|
|
+ ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
|
|
|
pendingMB.decr(res.getMemorySize());
|
|
|
pendingVCores.decr(res.getVirtualCores());
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.decreasePending(res);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getPendingValues(),
|
|
|
+ PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
|
|
|
QueueMetrics userMetrics = getUserMetrics(user);
|
|
@@ -537,6 +630,9 @@ public class QueueMetrics implements MetricsSource {
|
|
|
allocatedVCores.decr(res.getVirtualCores() * containers);
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.decreaseAllocated(res, containers);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getAllocatedValues(),
|
|
|
+ ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
|
|
|
QueueMetrics userMetrics = getUserMetrics(user);
|
|
@@ -560,6 +656,9 @@ public class QueueMetrics implements MetricsSource {
|
|
|
allocatedVCores.decr(res.getVirtualCores());
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.decreaseAllocated(res);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getAllocatedValues(),
|
|
|
+ ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
|
|
|
QueueMetrics userMetrics = getUserMetrics(user);
|
|
@@ -597,6 +696,11 @@ public class QueueMetrics implements MetricsSource {
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources
|
|
|
.increaseAggregatedPreemptedSeconds(res, seconds);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getAggregatePreemptedSeconds()
|
|
|
+ .getValues(),
|
|
|
+ AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
|
|
|
+ AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
|
|
}
|
|
|
if (parent != null) {
|
|
|
parent.updatePreemptedSecondsForCustomResources(res, seconds);
|
|
@@ -623,6 +727,9 @@ public class QueueMetrics implements MetricsSource {
|
|
|
reservedVCores.incr(res.getVirtualCores());
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.increaseReserved(res);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getReservedValues(),
|
|
|
+ RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
QueueMetrics userMetrics = getUserMetrics(user);
|
|
|
if (userMetrics != null) {
|
|
@@ -639,6 +746,9 @@ public class QueueMetrics implements MetricsSource {
|
|
|
reservedVCores.decr(res.getVirtualCores());
|
|
|
if (queueMetricsForCustomResources != null) {
|
|
|
queueMetricsForCustomResources.decreaseReserved(res);
|
|
|
+ registerCustomResources(
|
|
|
+ queueMetricsForCustomResources.getReservedValues(),
|
|
|
+ RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
|
|
|
}
|
|
|
QueueMetrics userMetrics = getUserMetrics(user);
|
|
|
if (userMetrics != null) {
|