Selaa lähdekoodia

YARN-11003. Make RMNode aware of all (OContainer inclusive) allocated resources (#3646)

Andrew Chung 3 vuotta sitten
vanhempi
commit
5b1b2c8ef6

+ 5 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -99,6 +99,11 @@ public class RMNodeWrapper implements RMNode {
     return node.getTotalCapability();
   }
 
+  @Override
+  public Resource getAllocatedContainerResource() {
+    return node.getAllocatedContainerResource();
+  }
+
   @Override
   public String getRackName() {
     return node.getRackName();

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Node managers information on available resources 
@@ -104,6 +105,17 @@ public interface RMNode {
    */
   public Resource getTotalCapability();
 
+  /**
+   * The total allocated resources to containers.
+   * This will include the sum of Guaranteed and Opportunistic
+   * containers queued + running + paused on the node.
+   * @return the total allocated resources, including all Guaranteed and
+   * Opportunistic containers in queued, running and paused states.
+   */
+  default Resource getAllocatedContainerResource() {
+    return Resources.none();
+  }
+
   /**
    * If the total available resources has been updated.
    * @return If the capability has been updated.

+ 28 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* Snapshot of total resources before receiving decommissioning command */
   private volatile Resource originalTotalCapability;
   private volatile Resource totalCapability;
+  private volatile Resource allocatedContainerResource =
+      Resource.newInstance(Resources.none());
   private volatile boolean updatedCapability = false;
   private final Node node;
 
@@ -464,6 +466,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     return this.totalCapability;
   }
 
+  @Override
+  public Resource getAllocatedContainerResource() {
+    return this.allocatedContainerResource;
+  }
+
   @Override
   public boolean isUpdatedCapability() {
     return this.updatedCapability;
@@ -952,13 +959,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           ClusterMetrics.getMetrics().decrDecommisionedNMs();
         }
         containers = startEvent.getNMContainerStatuses();
+        final Resource allocatedResource = Resource.newInstance(
+            Resources.none());
         if (containers != null && !containers.isEmpty()) {
           for (NMContainerStatus container : containers) {
-            if (container.getContainerState() == ContainerState.RUNNING) {
-              rmNode.launchedContainers.add(container.getContainerId());
+            if (container.getContainerState() == ContainerState.NEW ||
+                container.getContainerState() == ContainerState.RUNNING) {
+              Resources.addTo(allocatedResource,
+                  container.getAllocatedResource());
+              if (container.getContainerState() == ContainerState.RUNNING) {
+                rmNode.launchedContainers.add(container.getContainerId());
+              }
             }
           }
         }
+
+        rmNode.allocatedContainerResource = allocatedResource;
       }
 
       if (null != startEvent.getRunningApplications()) {
@@ -1554,6 +1570,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
         new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
     int numRemoteRunningContainers = 0;
+    final Resource allocatedResource = Resource.newInstance(Resources.none());
+
     for (ContainerStatus remoteContainer : containerStatuses) {
       ContainerId containerId = remoteContainer.getContainerId();
 
@@ -1622,8 +1640,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         containerAllocationExpirer
             .unregister(new AllocationExpirationInfo(containerId));
       }
+
+      if ((remoteContainer.getState() == ContainerState.RUNNING ||
+          remoteContainer.getState() == ContainerState.NEW) &&
+          remoteContainer.getCapability() != null) {
+        Resources.addTo(allocatedResource, remoteContainer.getCapability());
+      }
     }
 
+    allocatedContainerResource = allocatedResource;
+
     List<ContainerStatus> lostContainers =
         findLostContainers(numRemoteRunningContainers, containerStatuses);
     for (ContainerStatus remoteContainer : lostContainers) {

+ 220 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -43,13 +43,17 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -79,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -231,6 +236,32 @@ public class TestRMNodeTransitions {
     return event;
   }
 
+  private static ContainerStatus getMockContainerStatus(
+      final ContainerId containerId, final Resource capability,
+      final ContainerState containerState) {
+    return getMockContainerStatus(containerId, capability, containerState,
+        ExecutionType.GUARANTEED);
+  }
+
+  private static ContainerStatus getMockContainerStatus(
+      final ContainerId containerId, final Resource capability,
+      final ContainerState containerState, final ExecutionType executionType) {
+    final ContainerStatus containerStatus = mock(ContainerStatus.class);
+    doReturn(containerId).when(containerStatus).getContainerId();
+    doReturn(containerState).when(containerStatus).getState();
+    doReturn(capability).when(containerStatus).getCapability();
+    doReturn(executionType).when(containerStatus).getExecutionType();
+    return containerStatus;
+  }
+
+  private static NMContainerStatus createNMContainerStatus(
+      final ContainerId containerId, final ExecutionType executionType,
+      final ContainerState containerState, final Resource capability) {
+    return NMContainerStatus.newInstance(containerId, 0, containerState,
+        capability, "", 0, Priority.newInstance(0), 0,
+        CommonNodeLabelsManager.NO_LABEL, executionType, -1);
+  }
+
   @Test (timeout = 5000)
   public void testExpiredContainer() {
     NodeStatus mockNodeStatus = createMockNodeStatus();
@@ -248,8 +279,8 @@ public class TestRMNodeTransitions {
     // Now verify that scheduler isn't notified of an expired container
     // by checking number of 'completedContainers' it got in the previous event
     RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
-    ContainerStatus containerStatus = mock(ContainerStatus.class);
-    doReturn(completedContainerId).when(containerStatus).getContainerId();
+    ContainerStatus containerStatus = getMockContainerStatus(
+        completedContainerId, null, ContainerState.COMPLETE);
     doReturn(Collections.singletonList(containerStatus)).
         when(statusEvent).getContainers();
     node.handle(statusEvent);
@@ -321,12 +352,13 @@ public class TestRMNodeTransitions {
     RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
     RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null);
 
-    ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
-    ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
-    ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
+    ContainerStatus containerStatusFromNode1 = getMockContainerStatus(
+        completedContainerIdFromNode1, null, ContainerState.COMPLETE);
+    ContainerStatus containerStatusFromNode2_1 = getMockContainerStatus(
+        completedContainerIdFromNode2_1, null, ContainerState.COMPLETE);
+    ContainerStatus containerStatusFromNode2_2 = getMockContainerStatus(
+        completedContainerIdFromNode2_2, null, ContainerState.COMPLETE);
 
-    doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
-        .getContainerId();
     doReturn(Collections.singletonList(containerStatusFromNode1))
         .when(statusEventFromNode1).getContainers();
     node.handle(statusEventFromNode1);
@@ -336,13 +368,9 @@ public class TestRMNodeTransitions {
 
     completedContainers.clear();
 
-    doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
-        .getContainerId();
     doReturn(Collections.singletonList(containerStatusFromNode2_1))
         .when(statusEventFromNode2_1).getContainers();
 
-    doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
-        .getContainerId();
     doReturn(Collections.singletonList(containerStatusFromNode2_2))
         .when(statusEventFromNode2_2).getContainers();
 
@@ -358,6 +386,181 @@ public class TestRMNodeTransitions {
         .getContainerId());
   }
 
+  /**
+   * Tests that allocated resources are counted correctly on new nodes
+   * that are added to the cluster.
+   */
+  @Test
+  public void testAddWithAllocatedContainers() {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    RMNodeImpl node = getNewNode();
+    ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
+
+    // Independently computed expected allocated resource to verify against
+    final Resource expectedResource = Resource.newInstance(Resources.none());
+
+    // Guaranteed containers
+    final ContainerId newContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 0);
+    final Resource newContainerCapability =
+        Resource.newInstance(100, 1);
+    Resources.addTo(expectedResource, newContainerCapability);
+    final NMContainerStatus newContainerStatus = createNMContainerStatus(
+        newContainerId, ExecutionType.GUARANTEED,
+        ContainerState.NEW, newContainerCapability);
+
+    final ContainerId runningContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 1);
+    final Resource runningContainerCapability =
+        Resource.newInstance(200, 2);
+    Resources.addTo(expectedResource, runningContainerCapability);
+    final NMContainerStatus runningContainerStatus = createNMContainerStatus(
+        runningContainerId, ExecutionType.GUARANTEED,
+        ContainerState.RUNNING, runningContainerCapability);
+
+    // Opportunistic containers
+    final ContainerId newOppContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 2);
+    final Resource newOppContainerCapability =
+        Resource.newInstance(300, 3);
+    Resources.addTo(expectedResource, newOppContainerCapability);
+    final NMContainerStatus newOppContainerStatus = createNMContainerStatus(
+        newOppContainerId, ExecutionType.OPPORTUNISTIC,
+        ContainerState.NEW, newOppContainerCapability);
+
+    final ContainerId runningOppContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 3);
+    final Resource runningOppContainerCapability =
+        Resource.newInstance(400, 4);
+    Resources.addTo(expectedResource, runningOppContainerCapability);
+    final NMContainerStatus runningOppContainerStatus = createNMContainerStatus(
+        runningOppContainerId, ExecutionType.OPPORTUNISTIC,
+        ContainerState.RUNNING, runningOppContainerCapability);
+
+    node.handle(new RMNodeStartedEvent(node.getNodeID(),
+        Arrays.asList(newContainerStatus, runningContainerStatus,
+            newOppContainerStatus, runningOppContainerStatus),
+        null, mockNodeStatus));
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    Assert.assertNotNull(nodesListManagerEvent);
+    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+        nodesListManagerEvent.getType());
+    Assert.assertEquals(expectedResource, node.getAllocatedContainerResource());
+  }
+
+  /**
+   * Tests that allocated container resources are counted correctly in
+   * {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode}
+   * upon a node update. Resources should be counted for both GUARANTEED
+   * and OPPORTUNISTIC containers.
+   */
+  @Test (timeout = 5000)
+  public void testAllocatedContainerUpdate() {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    //Start the node
+    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
+
+    // Make sure that the node starts with no allocated resources
+    Assert.assertEquals(Resources.none(), node.getAllocatedContainerResource());
+
+    ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
+    final ContainerId newContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 0);
+    final ContainerId runningContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 1);
+
+    rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));
+
+    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
+
+    final List<ContainerStatus> containerStatuses = new ArrayList<>();
+
+    // Use different memory and VCores for new and running state containers
+    // to test that they add up correctly
+    final Resource newContainerCapability =
+        Resource.newInstance(100, 1);
+    final Resource runningContainerCapability =
+        Resource.newInstance(200, 2);
+    final Resource completedContainerCapability =
+        Resource.newInstance(50, 3);
+    final ContainerStatus newContainerStatusFromNode = getMockContainerStatus(
+        newContainerId, newContainerCapability, ContainerState.NEW);
+    final ContainerStatus runningContainerStatusFromNode =
+        getMockContainerStatus(runningContainerId, runningContainerCapability,
+            ContainerState.RUNNING);
+
+    containerStatuses.addAll(Arrays.asList(
+        newContainerStatusFromNode, runningContainerStatusFromNode));
+    doReturn(containerStatuses).when(statusEventFromNode1).getContainers();
+    node.handle(statusEventFromNode1);
+    Assert.assertEquals(Resource.newInstance(300, 3),
+        node.getAllocatedContainerResource());
+
+    final ContainerId newOppContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 2);
+    final ContainerId runningOppContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 3);
+
+    // Use the same resource capability as in previous for opportunistic case
+    RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null);
+    final ContainerStatus newOppContainerStatusFromNode =
+        getMockContainerStatus(newOppContainerId, newContainerCapability,
+            ContainerState.NEW, ExecutionType.OPPORTUNISTIC);
+    final ContainerStatus runningOppContainerStatusFromNode =
+        getMockContainerStatus(runningOppContainerId,
+            runningContainerCapability, ContainerState.RUNNING,
+            ExecutionType.OPPORTUNISTIC);
+
+    containerStatuses.addAll(Arrays.asList(
+        newOppContainerStatusFromNode, runningOppContainerStatusFromNode));
+
+    // Pass in both guaranteed and opportunistic container statuses
+    doReturn(containerStatuses).when(statusEventFromNode2).getContainers();
+
+    node.handle(statusEventFromNode2);
+
+    // The result here should be double the first check,
+    // since allocated resources are doubled, just
+    // with different execution types
+    Assert.assertEquals(Resource.newInstance(600, 6),
+        node.getAllocatedContainerResource());
+
+    RMNodeStatusEvent statusEventFromNode3 = getMockRMNodeStatusEvent(null);
+    final ContainerId completedContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 4);
+    final ContainerId completedOppContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(app0, 0), 5);
+    final ContainerStatus completedContainerStatusFromNode =
+        getMockContainerStatus(completedContainerId, completedContainerCapability,
+            ContainerState.COMPLETE, ExecutionType.OPPORTUNISTIC);
+    final ContainerStatus completedOppContainerStatusFromNode =
+        getMockContainerStatus(completedOppContainerId,
+            completedContainerCapability, ContainerState.COMPLETE,
+            ExecutionType.OPPORTUNISTIC);
+
+    containerStatuses.addAll(Arrays.asList(
+        completedContainerStatusFromNode, completedOppContainerStatusFromNode));
+
+    doReturn(containerStatuses).when(statusEventFromNode3).getContainers();
+    node.handle(statusEventFromNode3);
+
+    // Adding completed containers should not have changed
+    // the resources allocated
+    Assert.assertEquals(Resource.newInstance(600, 6),
+        node.getAllocatedContainerResource());
+
+    RMNodeStatusEvent emptyStatusEventFromNode =
+        getMockRMNodeStatusEvent(null);
+
+    doReturn(Collections.emptyList())
+        .when(emptyStatusEventFromNode).getContainers();
+    node.handle(emptyStatusEventFromNode);
+
+    // Passing an empty containers list should yield no resources allocated
+    Assert.assertEquals(Resources.none(),
+        node.getAllocatedContainerResource());
+  }
+
   @Test (timeout = 5000)
   public void testStatusChange(){
     NodeStatus mockNodeStatus = createMockNodeStatus();
@@ -376,14 +579,14 @@ public class TestRMNodeTransitions {
     RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
     RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null);
 
-    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
-    ContainerStatus containerStatus2 = mock(ContainerStatus.class);
+    ContainerStatus containerStatus1 = getMockContainerStatus(
+        completedContainerId1, null, null);
+    ContainerStatus containerStatus2 = getMockContainerStatus(
+        completedContainerId2, null, null);
 
-    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
     doReturn(Collections.singletonList(containerStatus1))
         .when(statusEvent1).getContainers();
      
-    doReturn(completedContainerId2).when(containerStatus2).getContainerId();
     doReturn(Collections.singletonList(containerStatus2))
         .when(statusEvent2).getContainers();
 
@@ -1153,9 +1356,9 @@ public class TestRMNodeTransitions {
 
     RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
 
-    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatus1 = getMockContainerStatus(
+        completedContainerId1, null, ContainerState.COMPLETE);
 
-    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
     doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1)
         .getContainers();