Kaynağa Gözat

YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.

Rohith Sharma K S 9 yıl önce
ebeveyn
işleme
4a498f5fa1

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -12,6 +12,9 @@ Release 2.7.4 - UNRELEASED
 
   BUG FIXES
 
+    YARN-5197. RM leaks containers if running container disappears from
+    node update. Contributed by Jason Lowe.
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -898,6 +901,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers =
         new ArrayList<ContainerStatus>();
+    int numRemoteRunningContainers = 0;
     for (ContainerStatus remoteContainer : containerStatuses) {
       ContainerId containerId = remoteContainer.getContainerId();
 
@@ -919,6 +923,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       // Process running containers
       if (remoteContainer.getState() == ContainerState.RUNNING) {
+        ++numRemoteRunningContainers;
         if (!launchedContainers.contains(containerId)) {
           // Just launched container. RM knows about it the first time.
           launchedContainers.add(containerId);
@@ -930,10 +935,41 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         completedContainers.add(remoteContainer);
       }
     }
+    completedContainers.addAll(findLostContainers(
+          numRemoteRunningContainers, containerStatuses));
+
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
       nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
           completedContainers));
     }
   }
 
+  private List<ContainerStatus> findLostContainers(int numRemoteRunning,
+      List<ContainerStatus> containerStatuses) {
+    if (numRemoteRunning >= launchedContainers.size()) {
+      return Collections.emptyList();
+    }
+    Set<ContainerId> nodeContainers =
+        new HashSet<ContainerId>(numRemoteRunning);
+    List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
+        launchedContainers.size() - numRemoteRunning);
+    for (ContainerStatus remoteContainer : containerStatuses) {
+      if (remoteContainer.getState() == ContainerState.RUNNING) {
+        nodeContainers.add(remoteContainer.getContainerId());
+      }
+    }
+    Iterator<ContainerId> iter = launchedContainers.iterator();
+    while (iter.hasNext()) {
+      ContainerId containerId = iter.next();
+      if (!nodeContainers.contains(containerId)) {
+        String diag = "Container " + containerId
+            + " was running but not reported from " + nodeId;
+        LOG.warn(diag);
+        lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
+            containerId, diag));
+        iter.remove();
+      }
+    }
+    return lostContainers;
+  }
  }

+ 37 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -55,6 +57,8 @@ public class MockNM {
   private MasterKey currentContainerTokenMasterKey;
   private MasterKey currentNMTokenMasterKey;
   private String version;
+  private Map<ContainerId, ContainerStatus> containerStats =
+      new HashMap<ContainerId, ContainerStatus>();
 
   public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
     // scale vcores based on the requested memory
@@ -129,18 +133,27 @@ public class MockNM {
     this.currentContainerTokenMasterKey =
         registrationResponse.getContainerTokenMasterKey();
     this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
+    containerStats.clear();
+    if (containerReports != null) {
+      for (NMContainerStatus report : containerReports) {
+        if (report.getContainerState() != ContainerState.COMPLETE) {
+          containerStats.put(report.getContainerId(),
+              ContainerStatus.newInstance(report.getContainerId(),
+                  report.getContainerState(), report.getDiagnostics(),
+                  report.getContainerExitStatus()));
+        }
+      }
+    }
     return registrationResponse;    
   }
   
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
+    return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
         isHealthy, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
       long containerId, ContainerState containerState) throws Exception {
-    HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
-        new HashMap<ApplicationId, List<ContainerStatus>>(1);
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         BuilderUtils.newContainerId(attemptId, containerId), containerState,
         "Success", 0);
@@ -148,8 +161,7 @@ public class MockNM {
         new ArrayList<ContainerStatus>(1);
     containerStatusList.add(containerStatus);
     Log.info("ContainerStatus: " + containerStatus);
-    nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
-    return nodeHeartbeat(nodeUpdate, true);
+    return nodeHeartbeat(containerStatusList, true, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -159,13 +171,30 @@ public class MockNM {
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
+    ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
+    for (List<ContainerStatus> stats : conts.values()) {
+      updatedStats.addAll(stats);
+    }
+    return nodeHeartbeat(updatedStats, isHealthy, resId);
+  }
+
+  public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
+      boolean isHealthy, int resId) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
     status.setNodeId(nodeId);
-    for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
-      Log.info("entry.getValue() " + entry.getValue());
-      status.setContainersStatuses(entry.getValue());
+    ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
+    for (ContainerStatus stat : updatedStats) {
+      if (stat.getState() == ContainerState.COMPLETE) {
+        completedContainers.add(stat.getContainerId());
+      }
+      containerStats.put(stat.getContainerId(), stat);
+    }
+    status.setContainersStatuses(
+        new ArrayList<ContainerStatus>(containerStats.values()));
+    for (ContainerId cid : completedContainers) {
+      containerStats.remove(cid);
     }
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
     healthStatus.setHealthReport("");

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -32,7 +32,9 @@ import java.util.List;
 
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -147,6 +149,11 @@ public class TestRMNodeTransitions {
   }
   
   private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+    return getMockRMNodeStatusEvent(null);
+  }
+
+  private RMNodeStatusEvent getMockRMNodeStatusEvent(
+      List<ContainerStatus> containerStatus) {
     NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
 
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
@@ -157,6 +164,9 @@ public class TestRMNodeTransitions {
     doReturn(healthStatus).when(event).getNodeHealthStatus();
     doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    if (containerStatus != null) {
+      doReturn(containerStatus).when(event).getContainers();
+    }
     return event;
   }
   
@@ -631,4 +641,47 @@ public class TestRMNodeTransitions {
         null, null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
+
+  @Test
+  public void testDisappearingContainer() {
+    ContainerId cid1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+    ContainerId cid2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(2, 2), 2), 2);
+    ArrayList<ContainerStatus> containerStats =
+        new ArrayList<ContainerStatus>();
+    containerStats.add(ContainerStatus.newInstance(cid1,
+        ContainerState.RUNNING, "", -1));
+    containerStats.add(ContainerStatus.newInstance(cid2,
+        ContainerState.RUNNING, "", -1));
+    node = getRunningNode();
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("unexpected number of running containers",
+        2, node.getLaunchedContainers().size());
+    Assert.assertTrue("first container not running",
+        node.getLaunchedContainers().contains(cid1));
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+    assertEquals("already completed containers",
+        0, completedContainers.size());
+    containerStats.remove(0);
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("expected one container to be completed",
+        1, completedContainers.size());
+    ContainerStatus cs = completedContainers.get(0);
+    assertEquals("first container not the one that completed",
+        cid1, cs.getContainerId());
+    assertEquals("completed container not marked complete",
+        ContainerState.COMPLETE, cs.getState());
+    assertEquals("completed container not marked aborted",
+        ContainerExitStatus.ABORTED, cs.getExitStatus());
+    Assert.assertTrue("completed container not marked missing",
+        cs.getDiagnostics().contains("not reported"));
+    assertEquals("unexpected number of running containers",
+        1, node.getLaunchedContainers().size());
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+  }
 }