Browse Source

YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri.

Giovanni Matteo Fumarola 5 years ago
parent
commit
1ac967a6b7

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -582,6 +582,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private void updateNMResource(Resource resource) {
   private void updateNMResource(Resource resource) {
     metrics.addResource(Resources.subtract(resource, totalResource));
     metrics.addResource(Resources.subtract(resource, totalResource));
     this.totalResource = resource;
     this.totalResource = resource;
+
+    // Update the containers monitor
+    ContainersMonitor containersMonitor =
+        this.context.getContainerManager().getContainersMonitor();
+    containersMonitor.setAllocatedResourcesForContainers(totalResource);
   }
   }
 
 
   // Iterate through the NMContext and clone and get all the containers'
   // Iterate through the NMContext and clone and get all the containers'

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java

@@ -20,10 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 
 public interface ResourceView {
 public interface ResourceView {
 
 
+  /**
+   * Get virtual memory allocated to the containers.
+   * @return Virtual memory in bytes.
+   */
   long getVmemAllocatedForContainers();
   long getVmemAllocatedForContainers();
 
 
   boolean isVmemCheckEnabled();
   boolean isVmemCheckEnabled();
 
 
+  /**
+   * Get physical memory allocated to the containers.
+   * @return Physical memory in bytes.
+   */
   long getPmemAllocatedForContainers();
   long getPmemAllocatedForContainers();
 
 
   boolean isPmemCheckEnabled();
   boolean isPmemCheckEnabled();

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java

@@ -64,4 +64,10 @@ public interface ContainersMonitor extends Service,
         * containersMonitor.getVmemRatio());
         * containersMonitor.getVmemRatio());
     resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
     resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
   }
   }
+
+  /**
+   * Set the allocated resources for containers.
+   * @param resource Resources allocated for the containers.
+   */
+  void setAllocatedResourcesForContainers(Resource resource);
 }
 }

+ 34 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -84,7 +84,9 @@ public class ContainersMonitorImpl extends AbstractService implements
   private static float vmemRatio;
   private static float vmemRatio;
   private Class<? extends ResourceCalculatorProcessTree> processTreeClass;
   private Class<? extends ResourceCalculatorProcessTree> processTreeClass;
 
 
+  /** Maximum virtual memory in bytes. */
   private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
   private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
+  /** Maximum physical memory in bytes. */
   private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
   private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
 
 
   private boolean pmemCheckEnabled;
   private boolean pmemCheckEnabled;
@@ -152,25 +154,23 @@ public class ContainersMonitorImpl extends AbstractService implements
 
 
     long configuredPMemForContainers =
     long configuredPMemForContainers =
         NodeManagerHardwareUtils.getContainerMemoryMB(
         NodeManagerHardwareUtils.getContainerMemoryMB(
-            this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L;
+            this.resourceCalculatorPlugin, this.conf);
 
 
-    long configuredVCoresForContainers =
-        NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin,
-            this.conf);
-
-    // Setting these irrespective of whether checks are enabled. Required in
-    // the UI.
-    // ///////// Physical memory configuration //////
-    this.maxPmemAllottedForContainers = configuredPMemForContainers;
-    this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
+    int configuredVCoresForContainers =
+        NodeManagerHardwareUtils.getVCores(
+            this.resourceCalculatorPlugin, this.conf);
 
 
     // ///////// Virtual memory configuration //////
     // ///////// Virtual memory configuration //////
     vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
     vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
         YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
         YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
     Preconditions.checkArgument(vmemRatio > 0.99f,
     Preconditions.checkArgument(vmemRatio > 0.99f,
         YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
         YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
-    this.maxVmemAllottedForContainers =
-        (long) (vmemRatio * configuredPMemForContainers);
+
+    // Setting these irrespective of whether checks are enabled.
+    // Required in the UI.
+    Resource resourcesForContainers = Resource.newInstance(
+        configuredPMemForContainers, configuredVCoresForContainers);
+    setAllocatedResourcesForContainers(resourcesForContainers);
 
 
     pmemCheckEnabled = this.conf.getBoolean(
     pmemCheckEnabled = this.conf.getBoolean(
         YarnConfiguration.NM_PMEM_CHECK_ENABLED,
         YarnConfiguration.NM_PMEM_CHECK_ENABLED,
@@ -908,6 +908,16 @@ public class ContainersMonitorImpl extends AbstractService implements
     return this.maxVCoresAllottedForContainers;
     return this.maxVCoresAllottedForContainers;
   }
   }
 
 
+  @Override
+  public void setAllocatedResourcesForContainers(final Resource resource) {
+    LOG.info("Setting the resources allocated to containers to {}", resource);
+    this.maxVCoresAllottedForContainers = resource.getVirtualCores();
+    this.maxPmemAllottedForContainers = convertMBytesToBytes(
+        resource.getMemorySize());
+    this.maxVmemAllottedForContainers =
+        (long) (getVmemRatio() * maxPmemAllottedForContainers);
+  }
+
   /**
   /**
    * Is the total virtual memory check enabled?
    * Is the total virtual memory check enabled?
    *
    *
@@ -973,10 +983,10 @@ public class ContainersMonitorImpl extends AbstractService implements
       }
       }
       LOG.info("Changing resource-monitoring for {}", containerId);
       LOG.info("Changing resource-monitoring for {}", containerId);
       updateContainerMetrics(monitoringEvent);
       updateContainerMetrics(monitoringEvent);
-      long pmemLimit =
-          changeEvent.getResource().getMemorySize() * 1024L * 1024L;
+      Resource resource = changeEvent.getResource();
+      long pmemLimit = convertMBytesToBytes(resource.getMemorySize());
       long vmemLimit = (long) (pmemLimit * vmemRatio);
       long vmemLimit = (long) (pmemLimit * vmemRatio);
-      int cpuVcores = changeEvent.getResource().getVirtualCores();
+      int cpuVcores = resource.getVirtualCores();
       processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
       processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
     }
     }
   }
   }
@@ -999,4 +1009,13 @@ public class ContainersMonitorImpl extends AbstractService implements
             startEvent.getVmemLimit(), startEvent.getPmemLimit(),
             startEvent.getVmemLimit(), startEvent.getPmemLimit(),
             startEvent.getCpuVcores()));
             startEvent.getCpuVcores()));
   }
   }
+
+  /**
+   * Convert MegaBytes to Bytes.
+   * @param mb MegaBytes (MB).
+   * @return Bytes representing the input MB.
+   */
+  private static long convertMBytesToBytes(long mb) {
+    return mb * 1024L * 1024L;
+  }
 }
 }

+ 103 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 package org.apache.hadoop.yarn.server.nodemanager;
 
 
 import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
 import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
 
 
@@ -57,6 +59,7 @@ import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
@@ -80,6 +83,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
 import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
 import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
 import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
@@ -96,11 +100,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -118,6 +124,10 @@ import org.apache.hadoop.ipc.Server;
 
 
 @SuppressWarnings("rawtypes")
 @SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater extends NodeManagerTestBase {
 public class TestNodeStatusUpdater extends NodeManagerTestBase {
+
+  /** Bytes in a GigaByte. */
+  private static final long GB = 1024L * 1024L * 1024L;
+
   volatile int heartBeatID = 0;
   volatile int heartBeatID = 0;
   volatile Throwable nmStartError = null;
   volatile Throwable nmStartError = null;
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
@@ -1774,6 +1784,99 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
     Assert.assertTrue("Test failed with exception(s)" + exceptions,
     Assert.assertTrue("Test failed with exception(s)" + exceptions,
         exceptions.isEmpty());
         exceptions.isEmpty());
   }
   }
+
+  /**
+   * Test if the {@link NodeManager} updates the resources in the
+   * {@link ContainersMonitor} when the {@link ResourceManager} triggers the
+   * change.
+   * @throws Exception If the test cannot run.
+   */
+  @Test
+  public void testUpdateNMResources() throws Exception {
+
+    // The resource set for the Node Manager from the Resource Tracker
+    final Resource resource = Resource.newInstance(8 * 1024, 1);
+
+    LOG.info("Start the Resource Tracker to mock heartbeats");
+    Server resourceTracker = getMockResourceTracker(resource);
+    resourceTracker.start();
+
+    LOG.info("Start the Node Manager");
+    NodeManager nodeManager = new NodeManager();
+    YarnConfiguration nmConf = new YarnConfiguration();
+    nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        resourceTracker.getListenerAddress());
+    nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
+    nodeManager.init(nmConf);
+    nodeManager.start();
+
+    LOG.info("Initially the Node Manager should have the default resources");
+    ContainerManager containerManager = nodeManager.getContainerManager();
+    ContainersMonitor containerMonitor =
+        containerManager.getContainersMonitor();
+    assertEquals(8, containerMonitor.getVCoresAllocatedForContainers());
+    assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+    LOG.info("The first heartbeat should trigger a resource change to {}",
+        resource);
+    GenericTestUtils.waitFor(
+        () -> containerMonitor.getVCoresAllocatedForContainers() == 1,
+        100, 2 * 1000);
+    assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+    resource.setVirtualCores(5);
+    resource.setMemorySize(4 * 1024);
+    LOG.info("Change the resources to {}", resource);
+    GenericTestUtils.waitFor(
+        () -> containerMonitor.getVCoresAllocatedForContainers() == 5,
+        100, 2 * 1000);
+    assertEquals(4 * GB, containerMonitor.getPmemAllocatedForContainers());
+
+    LOG.info("Cleanup");
+    nodeManager.stop();
+    nodeManager.close();
+    resourceTracker.stop();
+  }
+
+  /**
+   * Create a mock Resource Tracker server that returns the resources we want
+   * in the heartbeat.
+   * @param resource Resource to reply in the heartbeat.
+   * @return RPC server for the Resource Tracker.
+   * @throws Exception If it cannot create the Resource Tracker.
+   */
+  private static Server getMockResourceTracker(final Resource resource)
+      throws Exception {
+
+    // Setup the mock Resource Tracker
+    final ResourceTracker rt = mock(ResourceTracker.class);
+    when(rt.registerNodeManager(any())).thenAnswer(invocation -> {
+      RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+          RegisterNodeManagerResponse.class);
+      response.setContainerTokenMasterKey(createMasterKey());
+      response.setNMTokenMasterKey(createMasterKey());
+      return response;
+    });
+    when(rt.nodeHeartbeat(any())).thenAnswer(invocation -> {
+      NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+          NodeHeartbeatResponse.class);
+      response.setResource(resource);
+      return response;
+    });
+    when(rt.unRegisterNodeManager(any())).thenAnswer(invocaiton -> {
+      UnRegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+          UnRegisterNodeManagerResponse.class);
+      return response;
+    });
+
+    // Get the RPC server
+    YarnConfiguration conf = new YarnConfiguration();
+    YarnRPC rpc = YarnRPC.create(conf);
+    Server server = rpc.getServer(ResourceTracker.class, rt,
+        new InetSocketAddress("0.0.0.0", 0), conf, null, 1);
+    return server;
+  }
+
   // Add new containers info into NM context each time node heart beats.
   // Add new containers info into NM context each time node heart beats.
   private class MyNMContext extends NMContext {
   private class MyNMContext extends NMContext {