Browse Source

YARN-3194. RM should handle NMContainerStatuses sent by NM while registering if NM is Reconnected node. Contributed by Rohith
(cherry picked from commit a64dd3d24bfcb9af21eb63869924f6482b147fd3)

Jason Lowe 10 năm trước cách đây
mục cha
commit
3e8dfd1299

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -590,6 +590,9 @@ Release 2.7.0 - UNRELEASED
     YARN-933. Fixed InvalidStateTransitonException at FINAL_SAVING state in
     RMApp. (Rohith Sharmaks via jianhe)
 
+    YARN-3194. RM should handle NMContainerStatuses sent by NM while
+    registering if NM is Reconnected node (Rohith via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -312,9 +312,12 @@ public class ResourceTrackerService extends AbstractService implements
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeReconnectEvent(nodeId, rmNode,
-              request.getRunningApplications()));
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(
+              new RMNodeReconnectEvent(nodeId, rmNode, request
+                  .getRunningApplications(), request.getNMContainerStatuses()));
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.

+ 69 - 42
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -601,6 +601,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         rmNode.httpAddress = newNode.getHttpAddress();
         rmNode.totalCapability = newNode.getTotalCapability();
       
+        handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
+
         // Reset heartbeat ID since node just restarted.
         rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
       }
@@ -622,6 +624,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       }
       
     }
+
+    private void handleNMContainerStatus(
+        List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
+      List<ContainerStatus> containerStatuses =
+          new ArrayList<ContainerStatus>();
+      for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
+        containerStatuses.add(createContainerStatus(nmContainerStatus));
+      }
+      rmnode.handleContainerStatus(containerStatuses);
+    }
+
+    private ContainerStatus createContainerStatus(
+        NMContainerStatus remoteContainer) {
+      ContainerStatus cStatus =
+          ContainerStatus.newInstance(remoteContainer.getContainerId(),
+              remoteContainer.getContainerState(),
+              remoteContainer.getDiagnostics(),
+              remoteContainer.getContainerExitStatus());
+      return cStatus;
+    }
   }
   
   public static class UpdateNodeResourceWhenRunningTransition
@@ -747,49 +769,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         return NodeState.UNHEALTHY;
       }
 
-      // Filter the map to only obtain just launched containers and finished
-      // containers.
-      List<ContainerStatus> newlyLaunchedContainers = 
-          new ArrayList<ContainerStatus>();
-      List<ContainerStatus> completedContainers = 
-          new ArrayList<ContainerStatus>();
-      for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
-        ContainerId containerId = remoteContainer.getContainerId();
-
-        // Don't bother with containers already scheduled for cleanup, or for
-        // applications already killed. The scheduler doens't need to know any
-        // more about this container
-        if (rmNode.containersToClean.contains(containerId)) {
-          LOG.info("Container " + containerId + " already scheduled for " +
-          		"cleanup, no further processing");
-          continue;
-        }
-        if (rmNode.finishedApplications.contains(containerId
-            .getApplicationAttemptId().getApplicationId())) {
-          LOG.info("Container " + containerId
-              + " belongs to an application that is already killed,"
-              + " no further processing");
-          continue;
-        }
+      rmNode.handleContainerStatus(statusEvent.getContainers());
 
-        // Process running containers
-        if (remoteContainer.getState() == ContainerState.RUNNING) {
-          if (!rmNode.launchedContainers.contains(containerId)) {
-            // Just launched container. RM knows about it the first time.
-            rmNode.launchedContainers.add(containerId);
-            newlyLaunchedContainers.add(remoteContainer);
-          }
-        } else {
-          // A finished container
-          rmNode.launchedContainers.remove(containerId);
-          completedContainers.add(remoteContainer);
-        }
-      }
-      if(newlyLaunchedContainers.size() != 0 
-          || completedContainers.size() != 0) {
-        rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
-            (newlyLaunchedContainers, completedContainers));
-      }
       if(rmNode.nextHeartBeat) {
         rmNode.nextHeartBeat = false;
         rmNode.context.getDispatcher().getEventHandler().handle(
@@ -874,4 +855,50 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
     return nlm.getLabelsOnNode(nodeId);
   }
+
+  private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
+    // Filter the map to only obtain just launched containers and finished
+    // containers.
+    List<ContainerStatus> newlyLaunchedContainers =
+        new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers =
+        new ArrayList<ContainerStatus>();
+    for (ContainerStatus remoteContainer : containerStatuses) {
+      ContainerId containerId = remoteContainer.getContainerId();
+
+      // Don't bother with containers already scheduled for cleanup, or for
+      // applications already killed. The scheduler doens't need to know any
+      // more about this container
+      if (containersToClean.contains(containerId)) {
+        LOG.info("Container " + containerId + " already scheduled for "
+            + "cleanup, no further processing");
+        continue;
+      }
+      if (finishedApplications.contains(containerId.getApplicationAttemptId()
+          .getApplicationId())) {
+        LOG.info("Container " + containerId
+            + " belongs to an application that is already killed,"
+            + " no further processing");
+        continue;
+      }
+
+      // Process running containers
+      if (remoteContainer.getState() == ContainerState.RUNNING) {
+        if (!launchedContainers.contains(containerId)) {
+          // Just launched container. RM knows about it the first time.
+          launchedContainers.add(containerId);
+          newlyLaunchedContainers.add(remoteContainer);
+        }
+      } else {
+        // A finished container
+        launchedContainers.remove(containerId);
+        completedContainers.add(remoteContainer);
+      }
+    }
+    if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
+      nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
+          completedContainers));
+    }
+  }
+
  }

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java

@@ -22,16 +22,19 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 
 public class RMNodeReconnectEvent extends RMNodeEvent {
   private RMNode reconnectedNode;
   private List<ApplicationId> runningApplications;
+  private List<NMContainerStatus> containerStatuses;
 
   public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
-      List<ApplicationId> runningApps) {
+      List<ApplicationId> runningApps, List<NMContainerStatus> containerReports) {
     super(nodeId, RMNodeEventType.RECONNECTED);
     reconnectedNode = newNode;
     runningApplications = runningApps;
+    containerStatuses = containerReports;
   }
 
   public RMNode getReconnectedNode() {
@@ -41,4 +44,8 @@ public class RMNodeReconnectEvent extends RMNodeEvent {
   public List<ApplicationId> getRunningApplications() {
     return runningApplications;
   }
+
+  public List<NMContainerStatus> getNMContainerStatuses() {
+    return containerStatuses;
+  }
 }

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java

@@ -28,7 +28,9 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.log4j.Level;
@@ -478,6 +481,124 @@ public class TestApplicationCleanup {
     rm1.stop();
   }
 
+  // The test verifies processing of NMContainerStatuses which are sent during
+  // NM registration.
+  // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
+  // 2. AM sends ResourceRequest for 1 container with memory 2048MB.
+  // 3. Verify for number of container allocated by RM
+  // 4. Verify Memory Usage by cluster, it should be 3072. AM memory + requested
+  // memory. 1024 + 2048=3072
+  // 5. Re-register NM by sending completed container status
+  // 6. Verify for Memory Used, it should be 1024
+  // 7. Send AM heatbeat to RM. Allocated response should contain completed
+  // container.
+  @Test(timeout = 60000)
+  public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    int nmMemory = 8192;
+    int amMemory = 1024;
+    int containerMemory = 2048;
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    RMApp app0 = rm1.submitApp(amMemory);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    // 2. AM sends ResourceRequest for 1 container with memory 2048MB.
+    int noOfContainers = 1;
+    List<Container> allocateContainers =
+        am0.allocateAndWaitForContainers(noOfContainers, containerMemory, nm1);
+
+    // 3. Verify for number of container allocated by RM
+    Assert.assertEquals(noOfContainers, allocateContainers.size());
+    Container container = allocateContainers.get(0);
+
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), container.getId()
+        .getContainerId(), ContainerState.RUNNING);
+
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // 4. Verify Memory Usage by cluster, it should be 3072. AM memory +
+    // requested memory. 1024 + 2048=3072
+    ResourceScheduler rs = rm1.getRMContext().getScheduler();
+    int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
+    Assert.assertEquals(amMemory + containerMemory, allocatedMB);
+
+    // 5. Re-register NM by sending completed container status
+    List<NMContainerStatus> nMContainerStatusForApp =
+        createNMContainerStatusForApp(am0);
+    nm1.registerNode(nMContainerStatusForApp,
+        Arrays.asList(app0.getApplicationId()));
+
+    waitForClusterMemory(nm1, rs, amMemory);
+
+    // 6. Verify for Memory Used, it should be 1024
+    Assert.assertEquals(amMemory, rs.getRootQueueMetrics().getAllocatedMB());
+
+    // 7. Send AM heatbeat to RM. Allocated response should contain completed
+    // container
+    AllocateRequest req =
+        AllocateRequest.newInstance(0, 0F, new ArrayList<ResourceRequest>(),
+            new ArrayList<ContainerId>(), null);
+    AllocateResponse allocate = am0.allocate(req);
+    List<ContainerStatus> completedContainersStatuses =
+        allocate.getCompletedContainersStatuses();
+    Assert.assertEquals(noOfContainers, completedContainersStatuses.size());
+
+    // Application clean up should happen Cluster memory used is 0
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    waitForClusterMemory(nm1, rs, 0);
+
+    rm1.stop();
+  }
+
+  private void waitForClusterMemory(MockNM nm1, ResourceScheduler rs,
+      int clusterMemory) throws Exception, InterruptedException {
+    int counter = 0;
+    while (rs.getRootQueueMetrics().getAllocatedMB() != clusterMemory) {
+      nm1.nodeHeartbeat(true);
+
+      Thread.sleep(100);
+      if (counter++ == 50) {
+        Assert.fail("Wait for cluster memory is timed out.Expected="
+            + clusterMemory + " Actual="
+            + rs.getRootQueueMetrics().getAllocatedMB());
+      }
+    }
+  }
+
+  public static List<NMContainerStatus> createNMContainerStatusForApp(MockAM am) {
+    List<NMContainerStatus> list = new ArrayList<NMContainerStatus>();
+    NMContainerStatus amContainer =
+        createNMContainerStatus(am.getApplicationAttemptId(), 1,
+            ContainerState.RUNNING, 1024);
+    NMContainerStatus completedContainer =
+        createNMContainerStatus(am.getApplicationAttemptId(), 2,
+            ContainerState.COMPLETE, 2048);
+    list.add(amContainer);
+    list.add(completedContainer);
+    return list;
+  }
+
+  public static NMContainerStatus createNMContainerStatus(
+      ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
+      int memory) {
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+            Resource.newInstance(memory, 1), "recover container", 0,
+            Priority.newInstance(0), 0);
+    return containerReport;
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -540,7 +540,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
     Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",
@@ -614,7 +614,7 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
     RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
     node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
-        null));
+        null, null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
 }