Browse Source

YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. Contributed by Chengbing Liu
(cherry picked from commit cc2a745f7e82c9fa6de03242952347c54c52dccc)

(cherry picked from commit e7e6173049adca2a2ae0e1231adcaca8168bec27)
(cherry picked from commit 3c4ed2497b14140f09b3cae4959be6474c4cdc99)

Jian He 10 years ago
parent
commit
3f8da2a9eb

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -63,6 +63,9 @@ Release 2.6.1 - UNRELEASED
     YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
     YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
     (Rohith Sharmaks via ozawa)
     (Rohith Sharmaks via ozawa)
 
 
+    YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed
+    container statuses on heartbeat. (Chengbing Liu via jianhe)
+
 Release 2.6.0 - 2014-11-18
 Release 2.6.0 - 2014-11-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 27 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -106,6 +106,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   // the AM finishes it informs the RM to stop the may-be-already-completed
   // the AM finishes it informs the RM to stop the may-be-already-completed
   // containers.
   // containers.
   private final Map<ContainerId, Long> recentlyStoppedContainers;
   private final Map<ContainerId, Long> recentlyStoppedContainers;
+  // Save the reported completed containers in case of lost heartbeat responses.
+  // These completed containers will be sent again till a successful response.
+  private final Map<ContainerId, ContainerStatus> pendingCompletedContainers;
   // Duration for which to track recently stopped container.
   // Duration for which to track recently stopped container.
   private long durationToTrackStoppedContainers;
   private long durationToTrackStoppedContainers;
 
 
@@ -126,6 +129,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.metrics = metrics;
     this.metrics = metrics;
     this.recentlyStoppedContainers =
     this.recentlyStoppedContainers =
         new LinkedHashMap<ContainerId, Long>();
         new LinkedHashMap<ContainerId, Long>();
+    this.pendingCompletedContainers =
+        new HashMap<ContainerId, ContainerStatus>();
   }
   }
 
 
   @Override
   @Override
@@ -358,11 +363,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
     for (Container container : this.context.getContainers().values()) {
       ContainerId containerId = container.getContainerId();
       ContainerId containerId = container.getContainerId();
-      ApplicationId applicationId = container.getContainerId()
-          .getApplicationAttemptId().getApplicationId();
+      ApplicationId applicationId = containerId.getApplicationAttemptId()
+          .getApplicationId();
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
           container.cloneAndGetContainerStatus();
           container.cloneAndGetContainerStatus();
-      containerStatuses.add(containerStatus);
       if (containerStatus.getState() == ContainerState.COMPLETE) {
       if (containerStatus.getState() == ContainerState.COMPLETE) {
         if (isApplicationStopped(applicationId)) {
         if (isApplicationStopped(applicationId)) {
           if (LOG.isDebugEnabled()) {
           if (LOG.isDebugEnabled()) {
@@ -370,14 +374,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                 + containerId + " from NM context.");
                 + containerId + " from NM context.");
           }
           }
           context.getContainers().remove(containerId);
           context.getContainers().remove(containerId);
+          pendingCompletedContainers.put(containerId, containerStatus);
         } else {
         } else {
-          // Adding to finished containers cache. Cache will keep it around at
-          // least for #durationToTrackStoppedContainers duration. In the
-          // subsequent call to stop container it will get removed from cache.
-          addCompletedContainer(container.getContainerId());
+          if (!isContainerRecentlyStopped(containerId)) {
+            pendingCompletedContainers.put(containerId, containerStatus);
+            // Adding to finished containers cache. Cache will keep it around at
+            // least for #durationToTrackStoppedContainers duration. In the
+            // subsequent call to stop container it will get removed from cache.
+            addCompletedContainer(containerId);
+          }
         }
         }
+      } else {
+        containerStatuses.add(containerStatus);
       }
       }
     }
     }
+    containerStatuses.addAll(pendingCompletedContainers.values());
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending out " + containerStatuses.size()
       LOG.debug("Sending out " + containerStatuses.size()
           + " container statuses: " + containerStatuses);
           + " container statuses: " + containerStatuses);
@@ -397,8 +408,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         new ArrayList<NMContainerStatus>();
         new ArrayList<NMContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
     for (Container container : this.context.getContainers().values()) {
       ContainerId containerId = container.getContainerId();
       ContainerId containerId = container.getContainerId();
-      ApplicationId applicationId = container.getContainerId()
-          .getApplicationAttemptId().getApplicationId();
+      ApplicationId applicationId = containerId.getApplicationAttemptId()
+          .getApplicationId();
       if (!this.context.getApplications().containsKey(applicationId)) {
       if (!this.context.getApplications().containsKey(applicationId)) {
         context.getContainers().remove(containerId);
         context.getContainers().remove(containerId);
         continue;
         continue;
@@ -410,7 +421,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         // Adding to finished containers cache. Cache will keep it around at
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
         // subsequent call to stop container it will get removed from cache.
-        addCompletedContainer(container.getContainerId());
+        addCompletedContainer(containerId);
       }
       }
     }
     }
     LOG.info("Sending out " + containerStatuses.size()
     LOG.info("Sending out " + containerStatuses.size()
@@ -457,7 +468,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       ContainerId containerId = iter.next();
       ContainerId containerId = iter.next();
       // remove the container only if the container is at DONE state
       // remove the container only if the container is at DONE state
       Container nmContainer = context.getContainers().get(containerId);
       Container nmContainer = context.getContainers().get(containerId);
-      if (nmContainer != null && nmContainer.getContainerState().equals(
+      if (nmContainer == null) {
+        iter.remove();
+      } else if (nmContainer.getContainerState().equals(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
         context.getContainers().remove(containerId);
         context.getContainers().remove(containerId);
         removedContainers.add(containerId);
         removedContainers.add(containerId);
@@ -469,6 +482,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       LOG.info("Removed completed containers from NM context: "
       LOG.info("Removed completed containers from NM context: "
           + removedContainers);
           + removedContainers);
     }
     }
+    pendingCompletedContainers.clear();
   }
   }
 
 
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -507,7 +521,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       recentlyStoppedContainers.clear();
       recentlyStoppedContainers.clear();
     }
     }
   }
   }
-  
+
   @Private
   @Private
   @VisibleForTesting
   @VisibleForTesting
   public void removeVeryOldStoppedContainersFromCache() {
   public void removeVeryOldStoppedContainersFromCache() {
@@ -605,6 +619,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   ResourceManagerConstants.RM_INVALID_IDENTIFIER;
                   ResourceManagerConstants.RM_INVALID_IDENTIFIER;
               dispatcher.getEventHandler().handle(
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
+              pendingCompletedContainers.clear();
               break;
               break;
             }
             }
 
 

+ 34 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -610,14 +610,14 @@ public class TestNodeStatusUpdater {
           <ContainerId>();
           <ContainerId>();
       try {
       try {
         if (heartBeatID == 0) {
         if (heartBeatID == 0) {
-          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
-            .size(), 0);
-          Assert.assertEquals(context.getContainers().size(), 0);
+          Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses()
+            .size());
+          Assert.assertEquals(0, context.getContainers().size());
         } else if (heartBeatID == 1) {
         } else if (heartBeatID == 1) {
           List<ContainerStatus> statuses =
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
               request.getNodeStatus().getContainersStatuses();
-          Assert.assertEquals(statuses.size(), 2);
-          Assert.assertEquals(context.getContainers().size(), 2);
+          Assert.assertEquals(2, statuses.size());
+          Assert.assertEquals(2, context.getContainers().size());
 
 
           boolean container2Exist = false, container3Exist = false;
           boolean container2Exist = false, container3Exist = false;
           for (ContainerStatus status : statuses) {
           for (ContainerStatus status : statuses) {
@@ -643,8 +643,16 @@ public class TestNodeStatusUpdater {
         } else if (heartBeatID == 2 || heartBeatID == 3) {
         } else if (heartBeatID == 2 || heartBeatID == 3) {
           List<ContainerStatus> statuses =
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
               request.getNodeStatus().getContainersStatuses();
-          Assert.assertEquals(statuses.size(), 4);
-          Assert.assertEquals(context.getContainers().size(), 4);
+          if (heartBeatID == 2) {
+            // NM should send completed containers again, since the last
+            // heartbeat is lost.
+            Assert.assertEquals(4, statuses.size());
+          } else {
+            // NM should not send completed containers again, since the last
+            // heartbeat is successful.
+            Assert.assertEquals(2, statuses.size());
+          }
+          Assert.assertEquals(4, context.getContainers().size());
 
 
           boolean container2Exist = false, container3Exist = false,
           boolean container2Exist = false, container3Exist = false,
               container4Exist = false, container5Exist = false;
               container4Exist = false, container5Exist = false;
@@ -674,8 +682,14 @@ public class TestNodeStatusUpdater {
               container5Exist = true;
               container5Exist = true;
             }
             }
           }
           }
-          Assert.assertTrue(container2Exist && container3Exist
-              && container4Exist && container5Exist);
+          if (heartBeatID == 2) {
+            Assert.assertTrue(container2Exist && container3Exist
+                && container4Exist && container5Exist);
+          } else {
+            // NM do not send completed containers again
+            Assert.assertTrue(container2Exist && !container3Exist
+                && container4Exist && !container5Exist);
+          }
 
 
           if (heartBeatID == 3) {
           if (heartBeatID == 3) {
             finishedContainersPulledByAM.add(containerStatus3.getContainerId());
             finishedContainersPulledByAM.add(containerStatus3.getContainerId());
@@ -683,8 +697,9 @@ public class TestNodeStatusUpdater {
         } else if (heartBeatID == 4) {
         } else if (heartBeatID == 4) {
           List<ContainerStatus> statuses =
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
               request.getNodeStatus().getContainersStatuses();
-          Assert.assertEquals(statuses.size(), 3);
-          Assert.assertEquals(context.getContainers().size(), 3);
+          Assert.assertEquals(2, statuses.size());
+          // Container 3 is acked by AM, hence removed from context
+          Assert.assertEquals(3, context.getContainers().size());
 
 
           boolean container3Exist = false;
           boolean container3Exist = false;
           for (ContainerStatus status : statuses) {
           for (ContainerStatus status : statuses) {
@@ -917,13 +932,14 @@ public class TestNodeStatusUpdater {
     nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
     nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
 
 
     Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
     Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
-    for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) {
+    List<ContainerStatus> containerStatuses = nodeStatusUpdater.getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
       containerIdSet.add(status.getContainerId());
       containerIdSet.add(status.getContainerId());
     }
     }
 
 
-    Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1);
+    Assert.assertEquals(1, containerStatuses.size());
     // completed container is removed;
     // completed container is removed;
-    Assert.assertFalse(containerIdSet.contains(anyCompletedContainer));
+    Assert.assertFalse(containerIdSet.contains(cId));
     // running container is not removed;
     // running container is not removed;
     Assert.assertTrue(containerIdSet.contains(runningContainerId));
     Assert.assertTrue(containerIdSet.contains(runningContainerId));
   }
   }
@@ -967,15 +983,15 @@ public class TestNodeStatusUpdater {
 
 
     when(application.getApplicationState()).thenReturn(
     when(application.getApplicationState()).thenReturn(
         ApplicationState.FINISHING_CONTAINERS_WAIT);
         ApplicationState.FINISHING_CONTAINERS_WAIT);
-    // The completed container will be sent one time. Then we will delete it.
+    // The completed container will be saved in case of lost heartbeat.
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
-    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
 
 
     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
     nm.getNMContext().getApplications().remove(appId);
     nm.getNMContext().getApplications().remove(appId);
-    // The completed container will be sent one time. Then we will delete it.
+    // The completed container will be saved in case of lost heartbeat.
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
-    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
   }
   }
 
 
   @Test
   @Test