|
@@ -18,13 +18,153 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
|
|
|
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
|
|
|
|
|
+/**
|
|
|
+ * Implementation of the node resource monitor. It periodically tracks the
|
|
|
+ * resource utilization of the node and reports it to the NM.
|
|
|
+ */
|
|
|
public class NodeResourceMonitorImpl extends AbstractService implements
|
|
|
NodeResourceMonitor {
|
|
|
|
|
|
+ /** Logging infrastructure. */
|
|
|
+ final static Log LOG = LogFactory
|
|
|
+ .getLog(NodeResourceMonitorImpl.class);
|
|
|
+
|
|
|
+ /** Interval to monitor the node resource utilization. */
|
|
|
+ private long monitoringInterval;
|
|
|
+ /** Thread to monitor the node resource utilization. */
|
|
|
+ private MonitoringThread monitoringThread;
|
|
|
+
|
|
|
+ /** Resource calculator. */
|
|
|
+ private ResourceCalculatorPlugin resourceCalculatorPlugin;
|
|
|
+
|
|
|
+ /** Current <em>resource utilization</em> of the node. */
|
|
|
+ private ResourceUtilization nodeUtilization;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize the node resource monitor.
|
|
|
+ */
|
|
|
public NodeResourceMonitorImpl() {
|
|
|
super(NodeResourceMonitorImpl.class.getName());
|
|
|
+
|
|
|
+ this.monitoringThread = new MonitoringThread();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize the service with the proper parameters.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
+ this.monitoringInterval =
|
|
|
+ conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS);
|
|
|
+
|
|
|
+ Class<? extends ResourceCalculatorPlugin> clazz =
|
|
|
+ conf.getClass(YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
|
|
|
+ ResourceCalculatorPlugin.class);
|
|
|
+
|
|
|
+ this.resourceCalculatorPlugin =
|
|
|
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
|
|
|
+
|
|
|
+ LOG.info(" Using ResourceCalculatorPlugin : "
|
|
|
+ + this.resourceCalculatorPlugin);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check if we should be monitoring.
|
|
|
+ * @return <em>true</em> if we can monitor the node resource utilization.
|
|
|
+ */
|
|
|
+ private boolean isEnabled() {
|
|
|
+ if (resourceCalculatorPlugin == null) {
|
|
|
+ LOG.info("ResourceCalculatorPlugin is unavailable on this system. "
|
|
|
+ + this.getClass().getName() + " is disabled.");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start the thread that does the node resource utilization monitoring.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
+ if (this.isEnabled()) {
|
|
|
+ this.monitoringThread.start();
|
|
|
+ }
|
|
|
+ super.serviceStart();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop the thread that does the node resource utilization monitoring.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected void serviceStop() throws Exception {
|
|
|
+ if (this.isEnabled()) {
|
|
|
+ this.monitoringThread.interrupt();
|
|
|
+ try {
|
|
|
+ this.monitoringThread.join(10 * 1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Could not wait for the thread to join");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ super.serviceStop();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Thread that monitors the resource utilization of this node.
|
|
|
+ */
|
|
|
+ private class MonitoringThread extends Thread {
|
|
|
+ /**
|
|
|
+ * Initialize the node resource monitoring thread.
|
|
|
+ */
|
|
|
+ public MonitoringThread() {
|
|
|
+ super("Node Resource Monitor");
|
|
|
+ this.setDaemon(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Periodically monitor the resource utilization of the node.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (true) {
|
|
|
+ // Get node utilization and save it into the health status
|
|
|
+ long pmem = resourceCalculatorPlugin.getPhysicalMemorySize() -
|
|
|
+ resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
|
|
|
+ long vmem =
|
|
|
+ resourceCalculatorPlugin.getVirtualMemorySize()
|
|
|
+ - resourceCalculatorPlugin.getAvailableVirtualMemorySize();
|
|
|
+ float cpu = resourceCalculatorPlugin.getCpuUsage();
|
|
|
+ nodeUtilization =
|
|
|
+ ResourceUtilization.newInstance(
|
|
|
+ (int) (pmem >> 20), // B -> MB
|
|
|
+ (int) (vmem >> 20), // B -> MB
|
|
|
+ cpu); // 1 CPU at 100% is 1
|
|
|
+
|
|
|
+ try {
|
|
|
+ Thread.sleep(monitoringInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn(NodeResourceMonitorImpl.class.getName()
|
|
|
+ + " is interrupted. Exiting.");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the <em>resource utilization</em> of the node.
|
|
|
+ * @return <em>resource utilization</em> of the node.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public ResourceUtilization getUtilization() {
|
|
|
+ return this.nodeUtilization;
|
|
|
+ }
|
|
|
}
|