Browse Source

YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)

(cherry picked from commit 08244264c0583472b9c4e16591cfde72c6db62a2)
Ming Ma 10 năm trước cách đây
mục cha
commit
b169889f01

+ 7 - 1
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -62,7 +62,8 @@ public class NodeInfo {
     private NodeState state;
     private List<ContainerId> toCleanUpContainers;
     private List<ApplicationId> toCleanUpApplications;
-    
+    private List<ApplicationId> runningApplications;
+
     public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
         int cmdPort, String hostName, NodeState state) {
@@ -77,6 +78,7 @@ public class NodeInfo {
       this.state = state;
       toCleanUpApplications = new ArrayList<ApplicationId>();
       toCleanUpContainers = new ArrayList<ContainerId>();
+      runningApplications = new ArrayList<ApplicationId>();
     }
 
     public NodeId getNodeID() {
@@ -135,6 +137,10 @@ public class NodeInfo {
       return toCleanUpApplications;
     }
 
+    public List<ApplicationId> getRunningApps() {
+      return runningApplications;
+    }
+
     public void updateNodeHeartbeatResponseForCleanup(
             NodeHeartbeatResponse response) {
     }

+ 5 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -118,6 +118,11 @@ public class RMNodeWrapper implements RMNode {
     return node.getAppsToCleanup();
   }
 
+  @Override
+  public List<ApplicationId> getRunningApps() {
+    return node.getRunningApps();
+  }
+
   @Override
   public void updateNodeHeartbeatResponseForCleanup(
           NodeHeartbeatResponse nodeHeartbeatResponse) {

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -1637,6 +1637,9 @@ Release 2.6.0 - 2014-11-18
     YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
     share (Siqi Li via Sandy Ryza)
 
+    YARN-3445. Cache runningApps in RMNode for getting running apps on given
+    NodeId. (Junping Du via mingma)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -119,6 +119,8 @@ public interface RMNode {
 
   public List<ApplicationId> getAppsToCleanup();
 
+  List<ApplicationId> getRunningApps();
+
   /**
    * Update a {@link NodeHeartbeatResponse} with the list of containers and
    * applications to clean up for this node.

+ 36 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       new HashSet<ContainerId>();
 
   /* the list of applications that have finished and need to be purged */
-  private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+  private final List<ApplicationId> finishedApplications =
+      new ArrayList<ApplicationId>();
+
+  /* the list of applications that are running on this node */
+  private final List<ApplicationId> runningApplications =
+      new ArrayList<ApplicationId>();
 
   private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
-  
+
   private static final StateMachineFactory<RMNodeImpl,
                                            NodeState,
                                            RMNodeEventType,
@@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            NodeState,
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
-  
+
      //Transitions from NEW state
      .addTransition(NodeState.NEW, NodeState.RUNNING, 
          RMNodeEventType.STARTED, new AddNodeTransition())
@@ -382,6 +387,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
   }
   
+  @Override
+  public List<ApplicationId> getRunningApps() {
+    this.readLock.lock();
+    try {
+      return new ArrayList<ApplicationId>(this.runningApplications);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Override
   public List<ContainerId> getContainersToCleanUp() {
 
@@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       LOG.warn("Cannot get RMApp by appId=" + appId
           + ", just added it to finishedApplications list for cleanup");
       rmNode.finishedApplications.add(appId);
+      rmNode.runningApplications.remove(appId);
       return;
     }
 
+    // Add running applications back due to Node add or Node reconnection.
+    rmNode.runningApplications.add(appId);
     context.getDispatcher().getEventHandler()
         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
   }
@@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      rmNode.finishedApplications.add(((
-          RMNodeCleanAppEvent) event).getAppId());
+      ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId();
+      rmNode.finishedApplications.add(appId);
+      rmNode.runningApplications.remove(appId);
     }
   }
 
@@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             + "cleanup, no further processing");
         continue;
       }
-      if (finishedApplications.contains(containerId.getApplicationAttemptId()
-          .getApplicationId())) {
+
+      ApplicationId containerAppId =
+          containerId.getApplicationAttemptId().getApplicationId();
+
+      if (finishedApplications.contains(containerAppId)) {
         LOG.info("Container " + containerId
             + " belongs to an application that is already killed,"
             + " no further processing");
         continue;
+      } else if (!runningApplications.contains(containerAppId)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Container " + containerId
+              + " is the first container get launched for application "
+              + containerAppId);
+        }
+        runningApplications.add(containerAppId);
       }
 
       // Process running containers

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -186,6 +186,11 @@ public class MockNodes {
       return null;
     }
 
+    @Override
+    public List<ApplicationId> getRunningApps() {
+      return null;
+    }
+
     @Override
     public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
     }

+ 33 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -33,6 +33,7 @@ import java.util.List;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -485,9 +486,9 @@ public class TestRMNodeTransitions {
     NodeId nodeId = node.getNodeID();
 
     // Expire a container
-		ContainerId completedContainerId = BuilderUtils.newContainerId(
-				BuilderUtils.newApplicationAttemptId(
-						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
     node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
 
@@ -512,6 +513,35 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
   }
 
+  @Test(timeout=20000)
+  public void testUpdateHeartbeatResponseForAppLifeCycle() {
+    RMNodeImpl node = getRunningNode();
+    NodeId nodeId = node.getNodeID();
+
+    ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
+    // Create a running container
+    ContainerId runningContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        runningAppId, 0), 0);
+
+    ContainerStatus status = ContainerStatus.newInstance(runningContainerId,
+        ContainerState.RUNNING, "", 0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status);
+    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+        "", System.currentTimeMillis());
+    node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
+        statusList, null, null));
+
+    Assert.assertEquals(1, node.getRunningApps().size());
+
+    // Finish an application
+    ApplicationId finishedAppId = runningAppId;
+    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+    Assert.assertEquals(0, node.getRunningApps().size());
+  }
+
   private RMNodeImpl getRunningNode() {
     return getRunningNode(null, 0);
   }