瀏覽代碼

YARN-4862. Handle duplicate completed containers in RMNodeImpl. Contributed by Rohith Sharma K S

Jason Lowe 8 年之前
父節點
當前提交
352cbaa7a5

+ 26 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -141,6 +141,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final Set<ContainerId> launchedContainers =
     new HashSet<ContainerId>();
 
+  /* track completed container globally */
+  private final Set<ContainerId> completedContainers =
+      new HashSet<ContainerId>();
+
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
       new ContainerIdComparator());
@@ -578,6 +582,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       response.addContainersToBeRemovedFromNM(
           new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
       response.addAllContainersToSignal(this.containersToSignal);
+      this.completedContainers.removeAll(this.containersToBeRemovedFromNM);
       this.containersToClean.clear();
       this.finishedApplications.clear();
       this.containersToSignal.clear();
@@ -1287,6 +1292,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     return this.launchedContainers;
   }
 
+  @VisibleForTesting
+  public Set<ContainerId> getCompletedContainers() {
+    return this.completedContainers;
+  }
+
   @Override
   public Set<String> getNodeLabels() {
     RMNodeLabelsManager nlm = context.getNodeLabelManager();
@@ -1329,7 +1339,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     // containers.
     List<ContainerStatus> newlyLaunchedContainers =
         new ArrayList<ContainerStatus>();
-    List<ContainerStatus> completedContainers =
+    List<ContainerStatus> newlyCompletedContainers =
         new ArrayList<ContainerStatus>();
     int numRemoteRunningContainers = 0;
     for (ContainerStatus remoteContainer : containerStatuses) {
@@ -1385,15 +1395,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         }
         // Completed containers should also include the OPPORTUNISTIC containers
         // so that the AM gets properly notified.
-        completedContainers.add(remoteContainer);
+        if (completedContainers.add(containerId)) {
+          newlyCompletedContainers.add(remoteContainer);
+        }
+      }
+    }
+
+    List<ContainerStatus> lostContainers =
+        findLostContainers(numRemoteRunningContainers, containerStatuses);
+    for (ContainerStatus remoteContainer : lostContainers) {
+      ContainerId containerId = remoteContainer.getContainerId();
+      if (completedContainers.add(containerId)) {
+        newlyCompletedContainers.add(remoteContainer);
       }
     }
-    completedContainers.addAll(findLostContainers(
-          numRemoteRunningContainers, containerStatuses));
 
-    if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
+    if (newlyLaunchedContainers.size() != 0
+        || newlyCompletedContainers.size() != 0) {
       nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
-          completedContainers));
+          newlyCompletedContainers));
     }
   }
 

+ 18 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -909,16 +910,18 @@ public abstract class AbstractYarnScheduler
    * Process completed container list.
    * @param completedContainers Extracted list of completed containers
    * @param releasedResources Reference resource object for completed containers
+   * @param nodeId NodeId corresponding to the NodeManager
    * @return The total number of released containers
    */
   protected int updateCompletedContainers(List<ContainerStatus>
-      completedContainers, Resource releasedResources) {
+      completedContainers, Resource releasedResources, NodeId nodeId) {
     int releasedContainers = 0;
+    List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
     for (ContainerStatus completedContainer : completedContainers) {
       ContainerId containerId = completedContainer.getContainerId();
       LOG.debug("Container FINISHED: " + containerId);
       RMContainer container = getRMContainer(containerId);
-      completedContainer(getRMContainer(containerId),
+      completedContainer(container,
           completedContainer, RMContainerEventType.FINISHED);
       if (container != null) {
         releasedContainers++;
@@ -930,8 +933,19 @@ public abstract class AbstractYarnScheduler
         if (rrs != null) {
           Resources.addTo(releasedResources, rrs);
         }
+      } else {
+        // Add containers which are untracked by RM.
+        untrackedContainerIdList.add(containerId);
       }
     }
+
+    // Acknowledge NM to remove RM-untracked-containers from NM context.
+    if (!untrackedContainerIdList.isEmpty()) {
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
+              untrackedContainerIdList));
+    }
+
     return releasedContainers;
   }
 
@@ -977,7 +991,7 @@ public abstract class AbstractYarnScheduler
     // Process completed containers
     Resource releasedResources = Resource.newInstance(0, 0);
     int releasedContainers = updateCompletedContainers(completedContainers,
-        releasedResources);
+        releasedResources, nm.getNodeID());
 
     // If the node is decommissioning, send an update to have the total
     // resource equal to the used resource, so no available resource to
@@ -1004,4 +1018,5 @@ public abstract class AbstractYarnScheduler
           " availableResource: " + node.getUnallocatedResource());
     }
   }
+
 }

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
@@ -1065,4 +1066,43 @@ public class TestRMNodeTransitions {
     Assert.assertTrue("second container not running",
         node.getLaunchedContainers().contains(cid2));
   }
+
+  @Test
+  public void testForHandlingDuplicatedCompltedContainers() {
+    // Start the node
+    node.handle(new RMNodeStartedEvent(null, null, null));
+    // Add info to the queue first
+    node.setNextHeartBeat(false);
+
+    ContainerId completedContainerId1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0);
+
+    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
+
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+
+    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
+    doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1)
+        .getContainers();
+
+    verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class));
+    node.handle(statusEvent1);
+    verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class));
+    Assert.assertEquals(1, node.getQueueSize());
+    Assert.assertEquals(1, node.getCompletedContainers().size());
+
+    // test for duplicate entries
+    node.handle(statusEvent1);
+    Assert.assertEquals(1, node.getQueueSize());
+
+    // send clean up container event
+    node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(),
+        Collections.singletonList(completedContainerId1)));
+
+    NodeHeartbeatResponse hbrsp =
+        Records.newRecord(NodeHeartbeatResponse.class);
+    node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+    Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
+    Assert.assertEquals(0, node.getCompletedContainers().size());
+  }
 }

+ 55 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -19,9 +19,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -2003,4 +2005,57 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
       DefaultMetricsSystem.shutdown();
     }
   }
+
+  @Test(timeout = 60000)
+  public void testNodeHeartBeatResponseForUnknownContainerCleanUp()
+      throws Exception {
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    rm.drainEvents();
+
+    // send 1st heartbeat
+    nm1.nodeHeartbeat(true);
+
+    // Create 2 unknown containers tracked by NM
+    ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId applicationAttemptId = BuilderUtils
+        .newApplicationAttemptId(applicationId, 1);
+    ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 2);
+    ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 3);
+    ArrayList<ContainerStatus> containerStats =
+        new ArrayList<ContainerStatus>();
+    containerStats.add(
+        ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1));
+    containerStats.add(
+        ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1));
+
+    Map<ApplicationId, List<ContainerStatus>> conts =
+        new HashMap<ApplicationId, List<ContainerStatus>>();
+    conts.put(applicationAttemptId.getApplicationId(), containerStats);
+
+    // add RMApp into context.
+    RMApp app1 = mock(RMApp.class);
+    when(app1.getApplicationId()).thenReturn(applicationId);
+    rm.getRMContext().getRMApps().put(applicationId, app1);
+
+    // Send unknown container status in heartbeat
+    nm1.nodeHeartbeat(conts, true);
+    rm.drainEvents();
+
+    int containersToBeRemovedFromNM = 0;
+    while (true) {
+      NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+      rm.drainEvents();
+      containersToBeRemovedFromNM +=
+          nodeHeartbeat.getContainersToBeRemovedFromNM().size();
+      // asserting for 2 since two unknown containers status has been sent
+      if (containersToBeRemovedFromNM == 2) {
+        break;
+      }
+    }
+  }
 }