Browse Source

YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.

Rohith Sharma K S 9 years ago
parent
commit
33e6986ec9

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1308,6 +1310,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers =
         new ArrayList<ContainerStatus>();
+    int numRemoteRunningContainers = 0;
     for (ContainerStatus remoteContainer : containerStatuses) {
       ContainerId containerId = remoteContainer.getContainerId();
 
@@ -1339,6 +1342,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       // Process running containers
       if (remoteContainer.getState() == ContainerState.RUNNING) {
+        ++numRemoteRunningContainers;
         if (!launchedContainers.contains(containerId)) {
           // Just launched container. RM knows about it the first time.
           launchedContainers.add(containerId);
@@ -1356,12 +1360,44 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             new AllocationExpirationInfo(containerId));
       }
     }
+    completedContainers.addAll(findLostContainers(
+          numRemoteRunningContainers, containerStatuses));
+
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
       nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
           completedContainers));
     }
   }
 
+  private List<ContainerStatus> findLostContainers(int numRemoteRunning,
+      List<ContainerStatus> containerStatuses) {
+    if (numRemoteRunning >= launchedContainers.size()) {
+      return Collections.emptyList();
+    }
+    Set<ContainerId> nodeContainers =
+        new HashSet<ContainerId>(numRemoteRunning);
+    List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
+        launchedContainers.size() - numRemoteRunning);
+    for (ContainerStatus remoteContainer : containerStatuses) {
+      if (remoteContainer.getState() == ContainerState.RUNNING) {
+        nodeContainers.add(remoteContainer.getContainerId());
+      }
+    }
+    Iterator<ContainerId> iter = launchedContainers.iterator();
+    while (iter.hasNext()) {
+      ContainerId containerId = iter.next();
+      if (!nodeContainers.contains(containerId)) {
+        String diag = "Container " + containerId
+            + " was running but not reported from " + nodeId;
+        LOG.warn(diag);
+        lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
+            containerId, diag));
+        iter.remove();
+      }
+    }
+    return lostContainers;
+  }
+
   private void handleLogAggregationStatus(
       List<LogAggregationReport> logAggregationReportsForApps) {
     for (LogAggregationReport report : logAggregationReportsForApps) {

+ 40 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -59,6 +60,8 @@ public class MockNM {
   private MasterKey currentContainerTokenMasterKey;
   private MasterKey currentNMTokenMasterKey;
   private String version;
+  private Map<ContainerId, ContainerStatus> containerStats =
+      new HashMap<ContainerId, ContainerStatus>();
 
   public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
     // scale vcores based on the requested memory
@@ -108,14 +111,12 @@ public class MockNM {
   }
 
   public void containerIncreaseStatus(Container container) throws Exception {
-    Map<ApplicationId, List<ContainerStatus>> conts = new HashMap<>();
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         container.getId(), ContainerState.RUNNING, "Success", 0,
             container.getResource());
-    conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
-        Collections.singletonList(containerStatus));
     List<Container> increasedConts = Collections.singletonList(container);
-    nodeHeartbeat(conts, increasedConts, true, ++responseId);
+    nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
+        true, ++responseId);
   }
 
   public RegisterNodeManagerResponse registerNode() throws Exception {
@@ -144,18 +145,27 @@ public class MockNM {
     this.currentContainerTokenMasterKey =
         registrationResponse.getContainerTokenMasterKey();
     this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
+    containerStats.clear();
+    if (containerReports != null) {
+      for (NMContainerStatus report : containerReports) {
+        if (report.getContainerState() != ContainerState.COMPLETE) {
+          containerStats.put(report.getContainerId(),
+              ContainerStatus.newInstance(report.getContainerId(),
+                  report.getContainerState(), report.getDiagnostics(),
+                  report.getContainerExitStatus()));
+        }
+      }
+    }
     return registrationResponse;
   }
   
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
-        isHealthy, ++responseId);
+    return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
+        Collections.<Container>emptyList(), isHealthy, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
       long containerId, ContainerState containerState) throws Exception {
-    HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
-        new HashMap<ApplicationId, List<ContainerStatus>>(1);
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         BuilderUtils.newContainerId(attemptId, containerId), containerState,
         "Success", 0, BuilderUtils.newResource(memory, vCores));
@@ -163,8 +173,8 @@ public class MockNM {
         new ArrayList<ContainerStatus>(1);
     containerStatusList.add(containerStatus);
     Log.info("ContainerStatus: " + containerStatus);
-    nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
-    return nodeHeartbeat(nodeUpdate, true);
+    return nodeHeartbeat(containerStatusList,
+        Collections.<Container>emptyList(), true, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -174,19 +184,32 @@ public class MockNM {
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
-    return nodeHeartbeat(conts, new ArrayList<Container>(), isHealthy, resId);
+    ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
+    for (List<ContainerStatus> stats : conts.values()) {
+      updatedStats.addAll(stats);
+    }
+    return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
+        isHealthy, resId);
   }
 
-  public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
-      List<ContainerStatus>> conts, List<Container> increasedConts,
-          boolean isHealthy, int resId) throws Exception {
+  public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
+      List<Container> increasedConts, boolean isHealthy, int resId)
+          throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
     status.setNodeId(nodeId);
-    for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
-      Log.info("entry.getValue() " + entry.getValue());
-      status.setContainersStatuses(entry.getValue());
+    ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
+    for (ContainerStatus stat : updatedStats) {
+      if (stat.getState() == ContainerState.COMPLETE) {
+        completedContainers.add(stat.getContainerId());
+      }
+      containerStats.put(stat.getContainerId(), stat);
+    }
+    status.setContainersStatuses(
+        new ArrayList<ContainerStatus>(containerStats.values()));
+    for (ContainerId cid : completedContainers) {
+      containerStats.remove(cid);
     }
     status.setIncreasedContainers(increasedConts);
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -34,6 +34,7 @@ import java.util.Random;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -1021,4 +1022,47 @@ public class TestRMNodeTransitions {
     Resource originalCapacity = node.getOriginalTotalCapability();
     assertEquals("Original total capability not null after recommission", null, originalCapacity);
   }
+
+  @Test
+  public void testDisappearingContainer() {
+    ContainerId cid1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+    ContainerId cid2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(2, 2), 2), 2);
+    ArrayList<ContainerStatus> containerStats =
+        new ArrayList<ContainerStatus>();
+    containerStats.add(ContainerStatus.newInstance(cid1,
+        ContainerState.RUNNING, "", -1));
+    containerStats.add(ContainerStatus.newInstance(cid2,
+        ContainerState.RUNNING, "", -1));
+    node = getRunningNode();
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("unexpected number of running containers",
+        2, node.getLaunchedContainers().size());
+    Assert.assertTrue("first container not running",
+        node.getLaunchedContainers().contains(cid1));
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+    assertEquals("already completed containers",
+        0, completedContainers.size());
+    containerStats.remove(0);
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("expected one container to be completed",
+        1, completedContainers.size());
+    ContainerStatus cs = completedContainers.get(0);
+    assertEquals("first container not the one that completed",
+        cid1, cs.getContainerId());
+    assertEquals("completed container not marked complete",
+        ContainerState.COMPLETE, cs.getState());
+    assertEquals("completed container not marked aborted",
+        ContainerExitStatus.ABORTED, cs.getExitStatus());
+    Assert.assertTrue("completed container not marked missing",
+        cs.getDiagnostics().contains("not reported"));
+    assertEquals("unexpected number of running containers",
+        1, node.getLaunchedContainers().size());
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+  }
 }