فهرست منبع

YARN-3022. Expose Container resource information from NodeManager for monitoring (adhoot via ranter)

(cherry picked from commit f7a77819a1e4ff394e110941c1f8dd80f47dd38f)
Robert Kanter 10 سال پیش
والد
کامیت
410830fe8c

+ 92 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Utility class mainly for tests
+ */
+public class MetricsRecords {
+
+  public static void assertTag(MetricsRecord record, String tagName,
+      String expectedValue) {
+    MetricsTag processIdTag = getFirstTagByName(record,
+        tagName);
+    assertNotNull(processIdTag);
+    assertEquals(expectedValue, processIdTag.value());
+  }
+
+  public static void assertMetric(MetricsRecord record,
+      String metricName,
+      Number expectedValue) {
+    AbstractMetric resourceLimitMetric = getFirstMetricByName(
+        record, metricName);
+    assertNotNull(resourceLimitMetric);
+    assertEquals(expectedValue, resourceLimitMetric.value());
+  }
+
+  private static MetricsTag getFirstTagByName(MetricsRecord record, String name) {
+    return Iterables.getFirst(Iterables.filter(record.tags(),
+        new MetricsTagPredicate(name)), null);
+  }
+
+  private static AbstractMetric getFirstMetricByName(
+      MetricsRecord record, String name) {
+    return Iterables.getFirst(
+        Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)),
+        null);
+  }
+
+  private static class MetricsTagPredicate implements Predicate<MetricsTag> {
+    private String tagName;
+
+    public MetricsTagPredicate(String tagName) {
+
+      this.tagName = tagName;
+    }
+
+    @Override
+    public boolean apply(MetricsTag input) {
+      return input.name().equals(tagName);
+    }
+  }
+
+  private static class AbstractMetricPredicate
+      implements Predicate<AbstractMetric> {
+    private String metricName;
+
+    public AbstractMetricPredicate(
+        String metricName) {
+      this.metricName = metricName;
+    }
+
+    @Override
+    public boolean apply(AbstractMetric input) {
+      return input.name().equals(metricName);
+    }
+  }
+}

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -194,6 +194,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3085. Application summary should include the application type (Rohith
     via jlowe)
 
+    YARN-3022. Expose Container resource information from NodeManager for
+    monitoring (adhoot via ranter)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -494,10 +494,11 @@ public class ContainerImpl implements Container {
           YarnConfiguration.NM_VMEM_PMEM_RATIO,
           YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
       long vmemBytes = (long) (pmemRatio * pmemBytes);
+      int cpuVcores = getResource().getVirtualCores();
 
       dispatcher.getEventHandler().handle(
           new ContainerStartMonitoringEvent(containerId,
-              vmemBytes, pmemBytes));
+              vmemBytes, pmemBytes, cpuVcores));
   }
 
   private void addDiagnostics(String... diags) {

+ 36 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableStat;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
@@ -41,11 +42,28 @@ import static org.apache.hadoop.metrics2.lib.Interns.info;
 @Metrics(context="container")
 public class ContainerMetrics implements MetricsSource {
 
+  public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimit";
+  public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimit";
+  public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit";
+  public static final String PMEM_USAGE_METRIC_NAME = "pMemUsage";
+
   @Metric
   public MutableStat pMemMBsStat;
 
+  @Metric
+  public MutableGaugeInt pMemLimitMbs;
+
+  @Metric
+  public MutableGaugeInt vMemLimitMbs;
+
+  @Metric
+  public MutableGaugeInt cpuVcores;
+
   static final MetricsInfo RECORD_INFO =
-      info("ContainerUsage", "Resource usage by container");
+      info("ContainerResource", "Resource limit and usage by container");
+
+  public static final MetricsInfo PROCESSID_INFO =
+      info("ContainerPid", "Container Process Id");
 
   final MetricsInfo recordInfo;
   final MetricsRegistry registry;
@@ -76,7 +94,13 @@ public class ContainerMetrics implements MetricsSource {
     scheduleTimerTaskIfRequired();
 
     this.pMemMBsStat = registry.newStat(
-        "pMem", "Physical memory stats", "Usage", "MBs", true);
+        PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true);
+    this.pMemLimitMbs = registry.newGauge(
+        PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0);
+    this.vMemLimitMbs = registry.newGauge(
+        VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0);
+    this.cpuVcores = registry.newGauge(
+        VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0);
   }
 
   ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
@@ -88,10 +112,6 @@ public class ContainerMetrics implements MetricsSource {
     return RECORD_INFO.name() + "_" + containerId.toString();
   }
 
-  public static ContainerMetrics forContainer(ContainerId containerId) {
-    return forContainer(containerId, -1L);
-  }
-
   public static ContainerMetrics forContainer(
       ContainerId containerId, long flushPeriodMs) {
     return forContainer(
@@ -150,6 +170,16 @@ public class ContainerMetrics implements MetricsSource {
     this.pMemMBsStat.add(memoryMBs);
   }
 
+  public void recordProcessId(String processId) {
+    registry.tag(PROCESSID_INFO, processId);
+  }
+
+  public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) {
+    this.vMemLimitMbs.set(vmemLimit);
+    this.pMemLimitMbs.set(pmemLimit);
+    this.cpuVcores.set(cpuVcores);
+  }
+
   private synchronized void scheduleTimerTaskIfRequired() {
     if (flushPeriodMs > 0) {
       // Lazily initialize timer

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java

@@ -24,12 +24,14 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
 
   private final long vmemLimit;
   private final long pmemLimit;
+  private final int cpuVcores;
 
   public ContainerStartMonitoringEvent(ContainerId containerId,
-      long vmemLimit, long pmemLimit) {
+      long vmemLimit, long pmemLimit, int cpuVcores) {
     super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
     this.vmemLimit = vmemLimit;
     this.pmemLimit = pmemLimit;
+    this.cpuVcores = cpuVcores;
   }
 
   public long getVmemLimit() {
@@ -40,4 +42,7 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
     return this.pmemLimit;
   }
 
+  public int getCpuVcores() {
+    return this.cpuVcores;
+  }
 }

+ 27 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -219,14 +219,17 @@ public class ContainersMonitorImpl extends AbstractService implements
     private ResourceCalculatorProcessTree pTree;
     private long vmemLimit;
     private long pmemLimit;
+    private int cpuVcores;
 
     public ProcessTreeInfo(ContainerId containerId, String pid,
-        ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit) {
+        ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit,
+        int cpuVcores) {
       this.containerId = containerId;
       this.pid = pid;
       this.pTree = pTree;
       this.vmemLimit = vmemLimit;
       this.pmemLimit = pmemLimit;
+      this.cpuVcores = cpuVcores;
     }
 
     public ContainerId getContainerId() {
@@ -259,6 +262,14 @@ public class ContainersMonitorImpl extends AbstractService implements
     public long getPmemLimit() {
       return this.pmemLimit;
     }
+
+    /**
+     * Return the number of cpu vcores assigned
+     * @return
+     */
+    public int getCpuVcores() {
+      return this.cpuVcores;
+    }
   }
 
 
@@ -362,7 +373,8 @@ public class ContainersMonitorImpl extends AbstractService implements
         synchronized (containersToBeRemoved) {
           for (ContainerId containerId : containersToBeRemoved) {
             if (containerMetricsEnabled) {
-              ContainerMetrics.forContainer(containerId).finished();
+              ContainerMetrics.forContainer(
+                  containerId, containerMetricsPeriodMs).finished();
             }
             trackingContainers.remove(containerId);
             LOG.info("Stopping resource-monitoring for " + containerId);
@@ -397,6 +409,17 @@ public class ContainersMonitorImpl extends AbstractService implements
                     ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
                 ptInfo.setPid(pId);
                 ptInfo.setProcessTree(pt);
+
+                if (containerMetricsEnabled) {
+                  ContainerMetrics usageMetrics = ContainerMetrics
+                      .forContainer(containerId, containerMetricsPeriodMs);
+                  int cpuVcores = ptInfo.getCpuVcores();
+                  final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
+                  final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
+                  usageMetrics.recordResourceLimit(
+                      vmemLimit, pmemLimit, cpuVcores);
+                  usageMetrics.recordProcessId(pId);
+                }
               }
             }
             // End of initializing any uninitialized processTrees
@@ -576,7 +599,8 @@ public class ContainersMonitorImpl extends AbstractService implements
       synchronized (this.containersToBeAdded) {
         ProcessTreeInfo processTreeInfo =
             new ProcessTreeInfo(containerId, null, null,
-                startEvent.getVmemLimit(), startEvent.getPmemLimit());
+                startEvent.getVmemLimit(), startEvent.getPmemLimit(),
+                startEvent.getCpuVcores());
         this.containersToBeAdded.put(containerId, processTreeInfo);
       }
       break;

+ 38 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java

@@ -18,19 +18,20 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
+import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.metrics2.impl.MetricsRecords;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-import java.util.Timer;
-
 public class TestContainerMetrics {
 
   @Test
@@ -71,4 +72,39 @@ public class TestContainerMetrics {
     metrics.getMetrics(collector, true);
     assertEquals(ERR, 0, collector.getRecords().size());
   }
+
+  @Test
+  public void testContainerMetricsLimit() throws InterruptedException {
+    final String ERR = "Error in number of records";
+
+    MetricsSystem system = mock(MetricsSystem.class);
+    doReturn(this).when(system).register(anyString(), anyString(), any());
+
+    MetricsCollectorImpl collector = new MetricsCollectorImpl();
+    ContainerId containerId = mock(ContainerId.class);
+    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+
+    int anyPmemLimit = 1024;
+    int anyVmemLimit = 2048;
+    int anyVcores = 10;
+    String anyProcessId = "1234";
+
+    metrics.recordResourceLimit(anyVmemLimit, anyPmemLimit, anyVcores);
+    metrics.recordProcessId(anyProcessId);
+
+    Thread.sleep(110);
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 1, collector.getRecords().size());
+    MetricsRecord record = collector.getRecords().get(0);
+
+    MetricsRecords.assertTag(record, ContainerMetrics.PROCESSID_INFO.name(),
+        anyProcessId);
+
+    MetricsRecords.assertMetric(record, ContainerMetrics
+        .PMEM_LIMIT_METRIC_NAME, anyPmemLimit);
+    MetricsRecords.assertMetric(record, ContainerMetrics.VMEM_LIMIT_METRIC_NAME, anyVmemLimit);
+    MetricsRecords.assertMetric(record, ContainerMetrics.VCORE_LIMIT_METRIC_NAME, anyVcores);
+
+    collector.clear();
+  }
 }