Browse Source

YARN-1372. Ensure all completed containers are reported to the AMs across RM restart. Contributed by Anubhav Dhoot
(cherry picked from commit 0a641496c706fc175e7bf66d69ebf71c7d078e84)

Jian He 10 years ago
parent
commit
3ce97a9efd
21 changed files with 701 additions and 180 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  3. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  5. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  6. 48 32
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  7. 21 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
  8. 173 61
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  9. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  10. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  11. 20 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
  12. 114 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  13. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
  14. 7 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  15. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
  16. 41 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
  17. 39 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  18. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  19. 17 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
  20. 119 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
  21. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -208,6 +208,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2001. Added a time threshold for RM to wait before starting container
     allocations after restart/failover. (Jian He via vinodkv)
 
+    YARN-1372. Ensure all completed containers are reported to the AMs across
+    RM restart. (Anubhav Dhoot via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -30,6 +30,7 @@ public interface NodeHeartbeatResponse {
   NodeAction getNodeAction();
 
   List<ContainerId> getContainersToCleanup();
+  List<ContainerId> getFinishedContainersPulledByAM();
 
   List<ApplicationId> getApplicationsToCleanup();
 
@@ -43,6 +44,10 @@ public interface NodeHeartbeatResponse {
   void setNMTokenMasterKey(MasterKey secretKey);
 
   void addAllContainersToCleanup(List<ContainerId> containers);
+
+  // This tells NM to remove finished containers only after the AM
+  // has actually received it in a previous allocate response
+  void addFinishedContainersPulledByAM(List<ContainerId> containers);
   
   void addAllApplicationsToCleanup(List<ApplicationId> applications);
 

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -46,6 +46,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
   boolean viaProto = false;
   
   private List<ContainerId> containersToCleanup = null;
+  private List<ContainerId> finishedContainersPulledByAM = null;
   private List<ApplicationId> applicationsToCleanup = null;
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -73,6 +74,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     if (this.applicationsToCleanup != null) {
       addApplicationsToCleanupToProto();
     }
+    if (this.finishedContainersPulledByAM != null) {
+      addFinishedContainersPulledByAMToProto();
+    }
     if (this.containerTokenMasterKey != null) {
       builder.setContainerTokenMasterKey(
           convertToProtoFormat(this.containerTokenMasterKey));
@@ -199,6 +203,12 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     return this.containersToCleanup;
   }
 
+  @Override
+  public List<ContainerId> getFinishedContainersPulledByAM() {
+    initFinishedContainersPulledByAM();
+    return this.finishedContainersPulledByAM;
+  }
+
   private void initContainersToCleanup() {
     if (this.containersToCleanup != null) {
       return;
@@ -212,6 +222,19 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     }
   }
 
+  private void initFinishedContainersPulledByAM() {
+    if (this.finishedContainersPulledByAM != null) {
+      return;
+    }
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
+    this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
+
+    for (ContainerIdProto c : list) {
+      this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
+    }
+  }
+
   @Override
   public void addAllContainersToCleanup(
       final List<ContainerId> containersToCleanup) {
@@ -221,6 +244,15 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     this.containersToCleanup.addAll(containersToCleanup);
   }
 
+  @Override
+  public void addFinishedContainersPulledByAM(
+      final List<ContainerId> finishedContainersPulledByAM) {
+    if (finishedContainersPulledByAM == null)
+      return;
+    initFinishedContainersPulledByAM();
+    this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
+  }
+
   private void addContainersToCleanupToProto() {
     maybeInitBuilder();
     builder.clearContainersToCleanup();
@@ -256,6 +288,41 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     builder.addAllContainersToCleanup(iterable);
   }
 
+  private void addFinishedContainersPulledByAMToProto() {
+    maybeInitBuilder();
+    builder.clearFinishedContainersPulledByAm();
+    if (finishedContainersPulledByAM == null)
+      return;
+    Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
+
+      @Override
+      public Iterator<ContainerIdProto> iterator() {
+        return new Iterator<ContainerIdProto>() {
+
+          Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public ContainerIdProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+
+          }
+        };
+
+      }
+    };
+    builder.addAllFinishedContainersPulledByAm(iterable);
+  }
+
   @Override
   public List<ApplicationId> getApplicationsToCleanup() {
     initApplicationsToCleanup();

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto {
   repeated ApplicationIdProto applications_to_cleanup = 6;
   optional int64 nextHeartBeatInterval = 7;
   optional string diagnostics_message = 8;
+  repeated ContainerIdProto finished_containers_pulled_by_am = 9;
 }
 
 message NMContainerStatusProto {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -311,7 +311,7 @@ public class NodeManager extends CompositeService
   public static class NMContext implements Context {
 
     private NodeId nodeId = null;
-    private final ConcurrentMap<ApplicationId, Application> applications =
+    protected final ConcurrentMap<ApplicationId, Application> applications =
         new ConcurrentHashMap<ApplicationId, Application>();
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();

+ 48 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -104,11 +104,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   // Duration for which to track recently stopped container.
   private long durationToTrackStoppedContainers;
 
-  // This is used to track the current completed containers when nodeheartBeat
-  // is called. These completed containers will be removed from NM context after
-  // nodeHeartBeat succeeds and the response from the nodeHeartBeat is
-  // processed.
-  private final Set<ContainerId> previousCompletedContainers;
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
@@ -125,7 +120,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.metrics = metrics;
     this.recentlyStoppedContainers =
         new LinkedHashMap<ContainerId, Long>();
-    this.previousCompletedContainers = new HashSet<ContainerId>();
   }
 
   @Override
@@ -331,7 +325,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return appList;
   }
 
-  private NodeStatus getNodeStatus(int responseId) {
+  private NodeStatus getNodeStatus(int responseId) throws IOException {
 
     NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
     nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
@@ -352,11 +346,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   // Iterate through the NMContext and clone and get all the containers'
   // statuses. If it's a completed container, add into the
-  // recentlyStoppedContainers and previousCompletedContainers collections.
+  // recentlyStoppedContainers collections.
   @VisibleForTesting
-  protected List<ContainerStatus> getContainerStatuses() {
+  protected List<ContainerStatus> getContainerStatuses() throws IOException {
     List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
+      ContainerId containerId = container.getContainerId();
+      ApplicationId applicationId = container.getContainerId()
+          .getApplicationAttemptId().getApplicationId();
+      if (!this.context.getApplications().containsKey(applicationId)) {
+        context.getContainers().remove(containerId);
+        continue;
+      }
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
           container.cloneAndGetContainerStatus();
       containerStatuses.add(containerStatus);
@@ -381,10 +382,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   }
 
   // These NMContainerStatus are sent on NM registration and used by YARN only.
-  private List<NMContainerStatus> getNMContainerStatuses() {
+  private List<NMContainerStatus> getNMContainerStatuses() throws IOException {
     List<NMContainerStatus> containerStatuses =
         new ArrayList<NMContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
+      ContainerId containerId = container.getContainerId();
+      ApplicationId applicationId = container.getContainerId()
+          .getApplicationAttemptId().getApplicationId();
+      if (!this.context.getApplications().containsKey(applicationId)) {
+        context.getContainers().remove(containerId);
+        continue;
+      }
       NMContainerStatus status =
           container.getNMContainerStatus();
       containerStatuses.add(status);
@@ -402,26 +410,30 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   @Override
   public void addCompletedContainer(ContainerId containerId) {
-    synchronized (previousCompletedContainers) {
-      previousCompletedContainers.add(containerId);
-    }
     synchronized (recentlyStoppedContainers) {
       removeVeryOldStoppedContainersFromCache();
-      recentlyStoppedContainers.put(containerId,
-        System.currentTimeMillis() + durationToTrackStoppedContainers);
+      if (!recentlyStoppedContainers.containsKey(containerId)) {
+        recentlyStoppedContainers.put(containerId,
+            System.currentTimeMillis() + durationToTrackStoppedContainers);
+      }
     }
   }
 
-  private void removeCompletedContainersFromContext() {
-    synchronized (previousCompletedContainers) {
-      if (!previousCompletedContainers.isEmpty()) {
-        for (ContainerId containerId : previousCompletedContainers) {
-          this.context.getContainers().remove(containerId);
-        }
-        LOG.info("Removed completed containers from NM context: "
-            + previousCompletedContainers);
-        previousCompletedContainers.clear();
-      }
+  @VisibleForTesting
+  @Private
+  public void removeCompletedContainersFromContext(
+      List<ContainerId>containerIds) throws IOException {
+    Set<ContainerId> removedContainers = new HashSet<ContainerId>();
+
+    // If the AM has pulled the completedContainer it can be removed
+    for (ContainerId containerId : containerIds) {
+      context.getContainers().remove(containerId);
+      removedContainers.add(containerId);
+    }
+
+    if (!removedContainers.isEmpty()) {
+      LOG.info("Removed completed containers from NM context: " +
+          removedContainers);
     }
   }
 
@@ -454,7 +466,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       return recentlyStoppedContainers.containsKey(containerId);
     }
   }
-  
+
   @Override
   public void clearFinishedContainersFromCache() {
     synchronized (recentlyStoppedContainers) {
@@ -472,11 +484,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       while (i.hasNext()) {
         ContainerId cid = i.next();
         if (recentlyStoppedContainers.get(cid) < currentTime) {
-          i.remove();
-          try {
-            context.getNMStateStore().removeContainer(cid);
-          } catch (IOException e) {
-            LOG.error("Unable to remove container " + cid + " in store", e);
+          if (!context.getContainers().containsKey(cid)) {
+            i.remove();
+            try {
+              context.getNMStateStore().removeContainer(cid);
+            } catch (IOException e) {
+              LOG.error("Unable to remove container " + cid + " in store", e);
+            }
           }
         } else {
           break;
@@ -542,7 +556,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             // don't want to remove the completed containers before resync
             // because these completed containers will be reported back to RM
             // when NM re-registers with RM.
-            removeCompletedContainersFromContext();
+            // Only remove the cleanedup containers that are acked
+            removeCompletedContainersFromContext(response
+                  .getFinishedContainersPulledByAM());
 
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response

+ 21 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -247,6 +248,10 @@ public class TestNodeManagerResync {
                   // put the completed container into the context
                   getNMContext().getContainers().put(
                     testCompleteContainer.getContainerId(), container);
+                  getNMContext().getApplications().put(
+                      testCompleteContainer.getContainerId()
+                          .getApplicationAttemptId().getApplicationId(),
+                      mock(Application.class));
                 } else {
                   // second register contains the completed container info.
                   List<NMContainerStatus> statuses =
@@ -382,9 +387,17 @@ public class TestNodeManagerResync {
             if (containersShouldBePreserved) {
               Assert.assertFalse(containers.isEmpty());
               Assert.assertTrue(containers.containsKey(existingCid));
+              Assert.assertEquals(ContainerState.RUNNING,
+                  containers.get(existingCid)
+                  .cloneAndGetContainerStatus().getState());
             } else {
-              // ensure that containers are empty before restart nodeStatusUpdater
-              Assert.assertTrue(containers.isEmpty());
+              // ensure that containers are empty or are completed before
+              // restart nodeStatusUpdater
+              if (!containers.isEmpty()) {
+                Assert.assertEquals(ContainerState.COMPLETE,
+                    containers.get(existingCid)
+                        .cloneAndGetContainerStatus().getState());
+              }
             }
             super.rebootNodeStatusUpdaterAndRegisterWithRM();
           }
@@ -465,7 +478,12 @@ public class TestNodeManagerResync {
 
         try {
           // ensure that containers are empty before restart nodeStatusUpdater
-          Assert.assertTrue(containers.isEmpty());
+          if (!containers.isEmpty()) {
+            for (Container container: containers.values()) {
+              Assert.assertEquals(ContainerState.COMPLETE,
+                  container.cloneAndGetContainerStatus().getState());
+            }
+          }
           super.rebootNodeStatusUpdaterAndRegisterWithRM();
           // After this point new containers are free to be launched, except
           // containers from previous RM

+ 173 - 61
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -180,7 +181,7 @@ public class TestNodeStatusUpdater {
       Map<ApplicationId, List<ContainerStatus>> map =
           new HashMap<ApplicationId, List<ContainerStatus>>();
       for (ContainerStatus cs : containers) {
-        ApplicationId applicationId = 
+        ApplicationId applicationId =
             cs.getContainerId().getApplicationAttemptId().getApplicationId();
         List<ContainerStatus> appContainers = map.get(applicationId);
         if (appContainers == null) {
@@ -205,10 +206,10 @@ public class TestNodeStatusUpdater {
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
-      
+
       ApplicationId appId1 = ApplicationId.newInstance(0, 1);
       ApplicationId appId2 = ApplicationId.newInstance(0, 2);
-      
+
       if (heartBeatID == 1) {
         Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
 
@@ -419,7 +420,7 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeManager extends NodeManager {
-    
+
     private MyNodeStatusUpdater3 nodeStatusUpdater;
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@@ -433,7 +434,7 @@ public class TestNodeStatusUpdater {
       return this.nodeStatusUpdater;
     }
   }
-  
+
   private class MyNodeManager2 extends NodeManager {
     public boolean isStopped = false;
     private NodeStatusUpdater nodeStatusUpdater;
@@ -467,7 +468,7 @@ public class TestNodeStatusUpdater {
       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
     }
   }
-  // 
+  //
   private class MyResourceTracker2 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -478,7 +479,7 @@ public class TestNodeStatusUpdater {
     public RegisterNodeManagerResponse registerNodeManager(
         RegisterNodeManagerRequest request) throws YarnException,
         IOException {
-      
+
       RegisterNodeManagerResponse response = recordFactory
           .newRecordInstance(RegisterNodeManagerResponse.class);
       response.setNodeAction(registerNodeAction );
@@ -493,7 +494,7 @@ public class TestNodeStatusUpdater {
         throws YarnException, IOException {
       NodeStatus nodeStatus = request.getNodeStatus();
       nodeStatus.setResponseId(heartBeatID++);
-      
+
       NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
           newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
               null, null, null, 1000L);
@@ -501,7 +502,7 @@ public class TestNodeStatusUpdater {
       return nhResponse;
     }
   }
-  
+
   private class MyResourceTracker3 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -513,7 +514,7 @@ public class TestNodeStatusUpdater {
     MyResourceTracker3(Context context) {
       this.context = context;
     }
-    
+
     @Override
     public RegisterNodeManagerResponse registerNodeManager(
         RegisterNodeManagerRequest request) throws YarnException,
@@ -564,6 +565,14 @@ public class TestNodeStatusUpdater {
     public NodeAction registerNodeAction = NodeAction.NORMAL;
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
     private Context context;
+    private final ContainerStatus containerStatus2 =
+        createContainerStatus(2, ContainerState.RUNNING);
+    private final ContainerStatus containerStatus3 =
+        createContainerStatus(3, ContainerState.COMPLETE);
+    private final ContainerStatus containerStatus4 =
+        createContainerStatus(4, ContainerState.RUNNING);
+    private final ContainerStatus containerStatus5 =
+        createContainerStatus(5, ContainerState.COMPLETE);
 
     public MyResourceTracker4(Context context) {
       this.context = context;
@@ -583,6 +592,8 @@ public class TestNodeStatusUpdater {
     @Override
     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
         throws YarnException, IOException {
+      List<ContainerId> finishedContainersPulledByAM = new ArrayList
+          <ContainerId>();
       try {
         if (heartBeatID == 0) {
           Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
@@ -594,10 +605,6 @@ public class TestNodeStatusUpdater {
           Assert.assertEquals(statuses.size(), 2);
           Assert.assertEquals(context.getContainers().size(), 2);
 
-          ContainerStatus containerStatus2 =
-              createContainerStatus(2, ContainerState.RUNNING);
-          ContainerStatus containerStatus3 =
-              createContainerStatus(3, ContainerState.COMPLETE);
           boolean container2Exist = false, container3Exist = false;
           for (ContainerStatus status : statuses) {
             if (status.getContainerId().equals(
@@ -619,23 +626,14 @@ public class TestNodeStatusUpdater {
           // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
           // test passes.
           throw new YarnRuntimeException("Lost the heartbeat response");
-        } else if (heartBeatID == 2) {
+        } else if (heartBeatID == 2 || heartBeatID == 3) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
           Assert.assertEquals(statuses.size(), 4);
           Assert.assertEquals(context.getContainers().size(), 4);
 
-          ContainerStatus containerStatus2 =
-              createContainerStatus(2, ContainerState.RUNNING);
-          ContainerStatus containerStatus3 =
-              createContainerStatus(3, ContainerState.COMPLETE);
-          ContainerStatus containerStatus4 =
-              createContainerStatus(4, ContainerState.RUNNING);
-          ContainerStatus containerStatus5 =
-              createContainerStatus(5, ContainerState.COMPLETE);
-
-          boolean container2Exist = false, container3Exist = false, container4Exist =
-              false, container5Exist = false;
+          boolean container2Exist = false, container3Exist = false,
+              container4Exist = false, container5Exist = false;
           for (ContainerStatus status : statuses) {
             if (status.getContainerId().equals(
               containerStatus2.getContainerId())) {
@@ -664,6 +662,24 @@ public class TestNodeStatusUpdater {
           }
           Assert.assertTrue(container2Exist && container3Exist
               && container4Exist && container5Exist);
+
+          if (heartBeatID == 3) {
+            finishedContainersPulledByAM.add(containerStatus3.getContainerId());
+          }
+        } else if (heartBeatID == 4) {
+          List<ContainerStatus> statuses =
+              request.getNodeStatus().getContainersStatuses();
+          Assert.assertEquals(statuses.size(), 3);
+          Assert.assertEquals(context.getContainers().size(), 3);
+
+          boolean container3Exist = false;
+          for (ContainerStatus status : statuses) {
+            if (status.getContainerId().equals(
+                containerStatus3.getContainerId())) {
+              container3Exist = true;
+            }
+          }
+          Assert.assertFalse(container3Exist);
         }
       } catch (AssertionError error) {
         error.printStackTrace();
@@ -676,6 +692,7 @@ public class TestNodeStatusUpdater {
       NodeHeartbeatResponse nhResponse =
           YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
             heartBeatNodeAction, null, null, null, null, 1000L);
+      nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
       return nhResponse;
     }
   }
@@ -686,7 +703,7 @@ public class TestNodeStatusUpdater {
     public RegisterNodeManagerResponse registerNodeManager(
         RegisterNodeManagerRequest request) throws YarnException,
         IOException {
-      
+
       RegisterNodeManagerResponse response = recordFactory
           .newRecordInstance(RegisterNodeManagerResponse.class);
       response.setNodeAction(registerNodeAction );
@@ -694,7 +711,7 @@ public class TestNodeStatusUpdater {
       response.setNMTokenMasterKey(createMasterKey());
       return response;
     }
-    
+
     @Override
     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
         throws YarnException, IOException {
@@ -767,11 +784,11 @@ public class TestNodeStatusUpdater {
     lfs.delete(new Path(basedir.getPath()), true);
   }
 
-  @Test(timeout = 90000)                                                      
-  public void testRecentlyFinishedContainers() throws Exception {             
-    NodeManager nm = new NodeManager();                                       
-    YarnConfiguration conf = new YarnConfiguration();                         
-    conf.set(                                                                 
+  @Test(timeout = 90000)
+  public void testRecentlyFinishedContainers() throws Exception {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(
         NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
         "10000");                                                             
     nm.init(conf);                                                            
@@ -780,27 +797,112 @@ public class TestNodeStatusUpdater {
     ApplicationId appId = ApplicationId.newInstance(0, 0);                    
     ApplicationAttemptId appAttemptId =                                       
         ApplicationAttemptId.newInstance(appId, 0);                           
-    ContainerId cId = ContainerId.newInstance(appAttemptId, 0);               
-                                                                              
-                                                                              
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+    nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class));
+
     nodeStatusUpdater.addCompletedContainer(cId);
     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));     
-                                                                              
+
+    nm.getNMContext().getContainers().remove(cId);
     long time1 = System.currentTimeMillis();                                  
     int waitInterval = 15;                                                    
     while (waitInterval-- > 0                                                 
         && nodeStatusUpdater.isContainerRecentlyStopped(cId)) {               
-      nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();          
+      nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
       Thread.sleep(1000);                                                     
     }                                                                         
-    long time2 = System.currentTimeMillis();                                  
+    long time2 = System.currentTimeMillis();
     // By this time the container will be removed from cache. need to verify.
-    Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));    
-    Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000);  
-  }                                                                           
-                                                                              
+    Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
+    Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
+  }
 
+  @Test(timeout = 90000)
+  public void testRemovePreviousCompletedContainersFromContext() throws Exception {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(
+        NodeStatusUpdaterImpl
+            .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+        "10000");
+    nm.init(conf);
+    NodeStatusUpdaterImpl nodeStatusUpdater =
+        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
   
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
+    Token containerToken =
+        BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
+            BuilderUtils.newResource(1024, 1), 0, 123,
+            "password".getBytes(), 0);
+    Container anyCompletedContainer = new ContainerImpl(conf, null,
+        null, null, null, null,
+        BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+
+      @Override
+      public ContainerState getCurrentState() {
+        return ContainerState.COMPLETE;
+      }
+    };
+
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+    nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+
+    List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
+    ackedContainers.add(cId);
+
+    nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
+    Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
+  }
+
+  @Test
+  public void testCleanedupApplicationContainerCleanup() throws IOException {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(NodeStatusUpdaterImpl
+            .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+        "1000000");
+    nm.init(conf);
+
+    NodeStatusUpdaterImpl nodeStatusUpdater =
+        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
+    Token containerToken =
+        BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
+            BuilderUtils.newResource(1024, 1), 0, 123,
+            "password".getBytes(), 0);
+    Container anyCompletedContainer = new ContainerImpl(conf, null,
+        null, null, null, null,
+        BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+
+      @Override
+      public ContainerState getCurrentState() {
+        return ContainerState.COMPLETE;
+      }
+    };
+
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+    nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
+
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+
+    nm.getNMContext().getApplications().remove(appId);
+    nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList
+        <ContainerId>());
+    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
+  }
+
   @Test
   public void testNMRegistration() throws InterruptedException {
     nm = new NodeManager() {
@@ -860,7 +962,7 @@ public class TestNodeStatusUpdater {
 
     nm.stop();
   }
-  
+
   @Test
   public void testStopReentrant() throws Exception {
     final AtomicInteger numCleanups = new AtomicInteger(0);
@@ -875,7 +977,7 @@ public class TestNodeStatusUpdater {
         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
         return myNodeStatusUpdater;
       }
-      
+
       @Override
       protected ContainerManagerImpl createContainerManager(Context context,
           ContainerExecutor exec, DeletionService del,
@@ -897,7 +999,7 @@ public class TestNodeStatusUpdater {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);
     nm.start();
-    
+
     int waitCount = 0;
     while (heartBeatID < 1 && waitCount++ != 200) {
       Thread.sleep(500);
@@ -906,7 +1008,7 @@ public class TestNodeStatusUpdater {
 
     // Meanwhile call stop directly as the shutdown hook would
     nm.stop();
-    
+
     // NM takes a while to reach the STOPPED state.
     waitCount = 0;
     while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
@@ -1172,9 +1274,13 @@ public class TestNodeStatusUpdater {
     nm.start();
 
     int waitCount = 0;
-    while (heartBeatID <= 3 && waitCount++ != 20) {
+    while (heartBeatID <= 4 && waitCount++ != 20) {
       Thread.sleep(500);
     }
+    if (heartBeatID <= 4) {
+      Assert.fail("Failed to get all heartbeats in time, " +
+          "heartbeatID:" + heartBeatID);
+    }
     if(assertionFailedInThread.get()) {
       Assert.fail("ContainerStatus Backup failed");
     }
@@ -1182,7 +1288,7 @@ public class TestNodeStatusUpdater {
   }
 
   @Test(timeout = 200000)
-  public void testNodeStatusUpdaterRetryAndNMShutdown() 
+  public void testNodeStatusUpdaterRetryAndNMShutdown()
       throws Exception {
     final long connectionWaitSecs = 1000;
     final long connectionRetryIntervalMs = 1000;
@@ -1190,7 +1296,7 @@ public class TestNodeStatusUpdater {
     conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
         connectionWaitSecs);
     conf.setLong(YarnConfiguration
-        .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+            .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
         connectionRetryIntervalMs);
     conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
@@ -1281,30 +1387,36 @@ public class TestNodeStatusUpdater {
       } else if (heartBeatID == 1) {
         ContainerStatus containerStatus2 =
             createContainerStatus(2, ContainerState.RUNNING);
-        Container container2 = getMockContainer(containerStatus2);
-        containers.put(containerStatus2.getContainerId(), container2);
+        putMockContainer(containerStatus2);
 
         ContainerStatus containerStatus3 =
             createContainerStatus(3, ContainerState.COMPLETE);
-        Container container3 = getMockContainer(containerStatus3);
-        containers.put(containerStatus3.getContainerId(), container3);
+        putMockContainer(containerStatus3);
         return containers;
       } else if (heartBeatID == 2) {
         ContainerStatus containerStatus4 =
             createContainerStatus(4, ContainerState.RUNNING);
-        Container container4 = getMockContainer(containerStatus4);
-        containers.put(containerStatus4.getContainerId(), container4);
+        putMockContainer(containerStatus4);
 
         ContainerStatus containerStatus5 =
             createContainerStatus(5, ContainerState.COMPLETE);
-        Container container5 = getMockContainer(containerStatus5);
-        containers.put(containerStatus5.getContainerId(), container5);
+        putMockContainer(containerStatus5);
+        return containers;
+      } else if (heartBeatID == 3 || heartBeatID == 4) {
         return containers;
       } else {
         containers.clear();
         return containers;
       }
     }
+
+    private void putMockContainer(ContainerStatus containerStatus) {
+      Container container = getMockContainer(containerStatus);
+      containers.put(containerStatus.getContainerId(), container);
+      applications.putIfAbsent(containerStatus.getContainerId()
+          .getApplicationAttemptId().getApplicationId(),
+          mock(Application.class));
+    }
   }
 
   public static ContainerStatus createContainerStatus(int id,
@@ -1345,7 +1457,7 @@ public class TestNodeStatusUpdater {
         throw e;
       }
     }
-    
+
     // the service should be stopped
     Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
         .getServiceState());
@@ -1364,7 +1476,7 @@ public class TestNodeStatusUpdater {
     }
     conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
     conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345");
-    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");  
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");
     conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
       remoteLogsDir.getAbsolutePath());
@@ -1372,7 +1484,7 @@ public class TestNodeStatusUpdater {
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
     return conf;
   }
-  
+
   private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
     return new NodeManager() {
       @Override

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -198,7 +198,7 @@ public class ResourceTrackerService extends AbstractService implements
    */
   @SuppressWarnings("unchecked")
   @VisibleForTesting
-  void handleNMContainerStatus(NMContainerStatus containerStatus) {
+  void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
     ApplicationAttemptId appAttemptId =
         containerStatus.getContainerId().getApplicationAttemptId();
     RMApp rmApp =
@@ -229,7 +229,8 @@ public class ResourceTrackerService extends AbstractService implements
             containerStatus.getContainerExitStatus());
       // sending master container finished event.
       RMAppAttemptContainerFinishedEvent evt =
-          new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
+          new RMAppAttemptContainerFinishedEvent(appAttemptId, status,
+              nodeId);
       rmContext.getDispatcher().getEventHandler().handle(evt);
     }
   }
@@ -324,7 +325,7 @@ public class ResourceTrackerService extends AbstractService implements
         LOG.info("received container statuses on node manager register :"
             + request.getNMContainerStatuses());
         for (NMContainerStatus status : request.getNMContainerStatuses()) {
-          handleNMContainerStatus(status);
+          handleNMContainerStatus(status, nodeId);
         }
       }
     }

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1181,7 +1181,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       int numberOfFailure = app.getNumFailedAppAttempts();
       if (!app.submissionContext.getUnmanagedAM()
           && numberOfFailure < app.maxAppAttempts) {
-        boolean transferStateFromPreviousAttempt = false;
+        boolean transferStateFromPreviousAttempt;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
             failedEvent.getTransferStateFromPreviousAttempt();
@@ -1191,11 +1191,11 @@ public class RMAppImpl implements RMApp, Recoverable {
         // Transfer the state from the previous attempt to the current attempt.
         // Note that the previous failed attempt may still be collecting the
         // container events from the scheduler and update its data structures
-        // before the new attempt is created.
-        if (transferStateFromPreviousAttempt) {
-          ((RMAppAttemptImpl) app.currentAttempt)
-            .transferStateFromPreviousAttempt(oldAttempt);
-        }
+        // before the new attempt is created. We always transferState for
+        // finished containers so that they can be acked to NM,
+        // but when pulling finished container we will check this flag again.
+        ((RMAppAttemptImpl) app.currentAttempt)
+          .transferStateFromPreviousAttempt(oldAttempt);
         return initialState;
       } else {
         if (numberOfFailure >= app.maxAppAttempts) {

+ 20 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.crypto.SecretKey;
 
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -120,13 +122,28 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
   List<ContainerStatus> pullJustFinishedContainers();
 
   /**
-   * Return the list of last set of finished containers. This does not reset the
-   * finished containers.
-   * @return the list of just finished contianers, this does not reset the
+   * Returns a reference to the map of last set of finished containers to the
+   * corresponding node. This does not reset the finished containers.
+   * @return the list of just finished containers, this does not reset the
    * finished containers.
    */
+  ConcurrentMap<NodeId, List<ContainerStatus>>
+      getJustFinishedContainersReference();
+
+  /**
+   * Return the list of last set of finished containers. This does not reset
+   * the finished containers.
+   * @return the list of just finished containers
+   */
   List<ContainerStatus> getJustFinishedContainers();
 
+  /**
+   * The map of conatiners per Node that are already sent to the AM.
+   * @return map of per node list of finished container status sent to AM
+   */
+  ConcurrentMap<NodeId, List<ContainerStatus>>
+      getFinishedContainersSentToAMReference();
+
   /**
    * The container on which the Application Master is running.
    * @return the {@link Container} on which the application master is running.

+ 114 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -24,9 +24,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@@ -83,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -129,9 +134,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   private final ApplicationSubmissionContext submissionContext;
   private Token<AMRMTokenIdentifier> amrmToken = null;
   private SecretKey clientTokenMasterKey = null;
-  
-  private List<ContainerStatus> justFinishedContainers =
-    new ArrayList<ContainerStatus>();
+
+  private ConcurrentMap<NodeId, List<ContainerStatus>>
+      justFinishedContainers =
+      new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
+  // Tracks the previous finished containers that are waiting to be
+  // verified as received by the AM. If the AM sends the next allocate
+  // request it implicitly acks this list.
+  private ConcurrentMap<NodeId, List<ContainerStatus>>
+      finishedContainersSentToAM =
+      new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
   private Container masterContainer;
 
   private float progress = 0;
@@ -627,9 +639,27 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
+  @VisibleForTesting
   @Override
   public List<ContainerStatus> getJustFinishedContainers() {
     this.readLock.lock();
+    try {
+      List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
+      for (Collection<ContainerStatus> containerStatusList :
+          justFinishedContainers.values()) {
+        returnList.addAll(containerStatusList);
+      }
+      return returnList;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public ConcurrentMap<NodeId, List<ContainerStatus>>
+  getJustFinishedContainersReference
+      () {
+    this.readLock.lock();
     try {
       return this.justFinishedContainers;
     } finally {
@@ -637,15 +667,68 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
+  @Override
+  public ConcurrentMap<NodeId, List<ContainerStatus>>
+  getFinishedContainersSentToAMReference() {
+    this.readLock.lock();
+    try {
+      return this.finishedContainersSentToAM;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Override
   public List<ContainerStatus> pullJustFinishedContainers() {
     this.writeLock.lock();
 
     try {
-      List<ContainerStatus> returnList = new ArrayList<ContainerStatus>(
-          this.justFinishedContainers.size());
-      returnList.addAll(this.justFinishedContainers);
-      this.justFinishedContainers.clear();
+      List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
+
+      // A new allocate means the AM received the previously sent
+      // finishedContainers. We can ack this to NM now
+      for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
+
+        // Clear and get current values
+        List<ContainerStatus> currentSentContainers =
+            finishedContainersSentToAM
+            .put(nodeId, new ArrayList<ContainerStatus>());
+        List<ContainerId> containerIdList = new ArrayList<ContainerId>
+            (currentSentContainers.size());
+        for (ContainerStatus containerStatus:currentSentContainers) {
+          containerIdList.add(containerStatus.getContainerId());
+        }
+        eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
+            nodeId, containerIdList));
+      }
+
+      // Mark every containerStatus as being sent to AM though we may return
+      // only the ones that belong to the current attempt
+      boolean keepContainersAcressAttempts = this.submissionContext
+          .getKeepContainersAcrossApplicationAttempts();
+      for (NodeId nodeId:justFinishedContainers.keySet()) {
+
+        // Clear and get current values
+        List<ContainerStatus> finishedContainers = justFinishedContainers.put
+            (nodeId, new ArrayList<ContainerStatus>());
+
+        if (keepContainersAcressAttempts) {
+          returnList.addAll(finishedContainers);
+        } else {
+          // Filter out containers from previous attempt
+          for (ContainerStatus containerStatus: finishedContainers) {
+            if (containerStatus.getContainerId().getApplicationAttemptId()
+                .equals(this.getAppAttemptId())) {
+              returnList.add(containerStatus);
+            }
+          }
+        }
+
+        finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList
+              <ContainerStatus>());
+        finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
+      }
+
       return returnList;
     } finally {
       this.writeLock.unlock();
@@ -732,7 +815,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
     setMasterContainer(attemptState.getMasterContainer());
     recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
-      attemptState.getState());
+        attemptState.getState());
     this.recoveredFinalState = attemptState.getState();
     this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -744,7 +827,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   }
 
   public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
-    this.justFinishedContainers = attempt.getJustFinishedContainers();
+    this.justFinishedContainers = attempt.getJustFinishedContainersReference();
+    this.finishedContainersSentToAM =
+        attempt.getFinishedContainersSentToAMReference();
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
@@ -1507,6 +1592,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
+      // Add all finished containers so that they can be acked to NM
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
       // Is this container the AmContainer? If the finished container is same as
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
@@ -1519,12 +1607,18 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         return RMAppAttemptState.FINAL_SAVING;
       }
 
-      // Normal container.Put it in completed containers list
-      appAttempt.justFinishedContainers.add(containerStatus);
       return this.currentState;
     }
   }
 
+  private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
+    appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
+        .getNodeId(), new ArrayList<ContainerStatus>());
+    appAttempt.justFinishedContainers.get(containerFinishedEvent
+            .getNodeId()).add(containerFinishedEvent.getContainerStatus());
+  }
+
   private static final class ContainerFinishedAtFinalStateTransition
       extends BaseTransition {
     @Override
@@ -1533,10 +1627,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       RMAppAttemptContainerFinishedEvent containerFinishedEvent =
           (RMAppAttemptContainerFinishedEvent) event;
       
-      ContainerStatus containerStatus =
-          containerFinishedEvent.getContainerStatus();
       // Normal container. Add it in completed containers list
-      appAttempt.justFinishedContainers.add(containerStatus);
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
     }
   }
 
@@ -1569,6 +1661,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
+      // Add all finished containers so that they can be acked to NM.
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
       // Is this container the ApplicationMaster container?
       if (appAttempt.masterContainer.getId().equals(
           containerStatus.getContainerId())) {
@@ -1576,8 +1671,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
             appAttempt, containerFinishedEvent);
         return RMAppAttemptState.FINISHED;
       }
-      // Normal container.
-      appAttempt.justFinishedContainers.add(containerStatus);
+
       return RMAppAttemptState.FINISHING;
     }
   }
@@ -1592,6 +1686,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
+      // Add all finished containers so that they can be acked to NM.
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
       // If this is the AM container, it means the AM container is finished,
       // but we are not yet acknowledged that the final state has been saved.
       // Thus, we still return FINAL_SAVING state here.
@@ -1611,8 +1708,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
             appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
         return;
       }
-      // Normal container.
-      appAttempt.justFinishedContainers.add(containerStatus);
     }
   }
 
@@ -1629,7 +1724,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
       appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
       new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
-        event);
+          event);
     }
   }
 

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java

@@ -20,21 +20,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 
 public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent {
 
   private final ContainerStatus containerStatus;
+  private final NodeId nodeId;
 
   public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, 
-      ContainerStatus containerStatus) {
+      ContainerStatus containerStatus, NodeId nodeId) {
     super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED);
     this.containerStatus = containerStatus;
+    this.nodeId = nodeId;
   }
 
   public ContainerStatus getContainerStatus() {
     return this.containerStatus;
   }
 
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
 }

+ 7 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -78,13 +78,13 @@ public class RMContainerImpl implements RMContainer {
         RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
 
     // Transitions from RESERVED state
-    .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
-    .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
         RMContainerEventType.START, new ContainerStartedTransition())
-    .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED,
         RMContainerEventType.KILL) // nothing to do
-    .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, 
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED) // nothing to do
        
 
@@ -100,7 +100,7 @@ public class RMContainerImpl implements RMContainer {
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
         RMContainerEventType.LAUNCHED, new LaunchedTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
-        RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())        
+        RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@@ -495,7 +495,8 @@ public class RMContainerImpl implements RMContainer {
       updateAttemptMetrics(container);
 
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
-        container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+        container.appAttemptId, finishedEvent.getRemoteContainerStatus(),
+          container.getAllocatedNode()));
 
       container.rmContext.getRMApplicationHistoryWriter().containerFinished(
         container);

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java

@@ -40,6 +40,9 @@ public enum RMNodeEventType {
   CONTAINER_ALLOCATED,
   CLEANUP_CONTAINER,
 
+  // Source: RMAppAttempt
+  FINISHED_CONTAINERS_PULLED_BY_AM,
+
   // Source: NMLivelinessMonitor
   EXPIRE
 }

+ 41 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.List;
+
+// Happens after an implicit ack from AM that the container completion has
+// been notified successfully to the AM
+public class RMNodeFinishedContainersPulledByAMEvent extends RMNodeEvent {
+
+  private List<ContainerId> containers;
+
+  public RMNodeFinishedContainersPulledByAMEvent(NodeId nodeId,
+      List<ContainerId> containers) {
+    super(nodeId, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM);
+    this.containers = containers;
+  }
+
+  public List<ContainerId> getContainers() {
+    return this.containers;
+  }
+}

+ 39 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -112,6 +112,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
       new ContainerIdComparator());
 
+  /* set of containers that were notified to AM about their completion */
+  private final Set<ContainerId> finishedContainersPulledByAM =
+      new HashSet<ContainerId>();
+
   /* the list of applications that have finished and need to be purged */
   private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
 
@@ -135,7 +139,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          new UpdateNodeResourceWhenUnusableTransition())
 
      //Transitions from RUNNING state
-     .addTransition(NodeState.RUNNING, 
+     .addTransition(NodeState.RUNNING,
          EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
      .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
@@ -151,6 +155,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
@@ -158,23 +165,30 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
      //Transitions from REBOOTED state
      .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
-         RMNodeEventType.RESOURCE_UPDATE, 
+         RMNodeEventType.RESOURCE_UPDATE,
          new UpdateNodeResourceWhenUnusableTransition())
          
      //Transitions from DECOMMISSIONED state
      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
-         RMNodeEventType.RESOURCE_UPDATE, 
+         RMNodeEventType.RESOURCE_UPDATE,
          new UpdateNodeResourceWhenUnusableTransition())
-         
+     .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
+
      //Transitions from LOST state
      .addTransition(NodeState.LOST, NodeState.LOST,
-         RMNodeEventType.RESOURCE_UPDATE, 
+         RMNodeEventType.RESOURCE_UPDATE,
          new UpdateNodeResourceWhenUnusableTransition())
+     .addTransition(NodeState.LOST, NodeState.LOST,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
 
      //Transitions from UNHEALTHY state
-     .addTransition(NodeState.UNHEALTHY, 
+     .addTransition(NodeState.UNHEALTHY,
          EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
-         RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
+         RMNodeEventType.STATUS_UPDATE,
+         new StatusUpdateWhenUnHealthyTransition())
      .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
          RMNodeEventType.DECOMMISSION,
          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
@@ -192,7 +206,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
          RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
-         
+     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new FinishedContainersPulledByAMTransition())
+
      // create the topology tables
      .installTopology(); 
 
@@ -365,8 +382,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       response.addAllContainersToCleanup(
           new ArrayList<ContainerId>(this.containersToClean));
       response.addAllApplicationsToCleanup(this.finishedApplications);
+      response.addFinishedContainersPulledByAM(
+          new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
       this.containersToClean.clear();
       this.finishedApplications.clear();
+      this.finishedContainersPulledByAM.clear();
     } finally {
       this.writeLock.unlock();
     }
@@ -652,6 +672,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  public static class FinishedContainersPulledByAMTransition implements
+      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      rmNode.finishedContainersPulledByAM.addAll(((
+          RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
+    }
+  }
+
   public static class DeactivateNodeTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -726,7 +756,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           new ArrayList<ContainerStatus>();
       for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
         ContainerId containerId = remoteContainer.getContainerId();
-        
+
         // Don't bother with containers already scheduled for cleanup, or for
         // applications already killed. The scheduler doens't need to know any
         // more about this container

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -491,7 +491,7 @@ public class TestResourceTrackerService {
             ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
-    rm.getResourceTrackerService().handleNMContainerStatus(report);
+    rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     verify(handler, never()).handle((Event) any());
 
     // Case 1.2: Master container is null
@@ -502,7 +502,7 @@ public class TestResourceTrackerService {
           ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
-    rm.getResourceTrackerService().handleNMContainerStatus(report);
+    rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     verify(handler, never()).handle((Event)any());
 
     // Case 2: Managed AM
@@ -515,7 +515,7 @@ public class TestResourceTrackerService {
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
-      rm.getResourceTrackerService().handleNMContainerStatus(report);
+      rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     } catch (Exception e) {
       // expected - ignore
     }
@@ -530,7 +530,7 @@ public class TestResourceTrackerService {
       ContainerState.COMPLETE, Resource.newInstance(1024, 1),
       "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
-      rm.getResourceTrackerService().handleNMContainerStatus(report);
+      rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     } catch (Exception e) {
       // expected - ignore
     }

+ 17 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

@@ -98,6 +98,9 @@ public class TestAMRestart {
       Thread.sleep(200);
     }
 
+    ContainerId amContainerId = ContainerId.newInstance(am1
+        .getApplicationAttemptId(), 1);
+
     // launch the 2nd container, for testing running container transferred.
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
     ContainerId containerId2 =
@@ -196,11 +199,15 @@ public class TestAMRestart {
     // completed containerId4 is also transferred to the new attempt.
     RMAppAttempt newAttempt =
         app1.getRMAppAttempt(am2.getApplicationAttemptId());
-    // 4 containers finished, acquired/allocated/reserved/completed.
-    waitForContainersToFinish(4, newAttempt);
+    // 4 containers finished, acquired/allocated/reserved/completed + AM
+    // container.
+    waitForContainersToFinish(5, newAttempt);
     boolean container3Exists = false, container4Exists = false, container5Exists =
-        false, container6Exists = false;
+        false, container6Exists = false, amContainerExists = false;
     for(ContainerStatus status :  newAttempt.getJustFinishedContainers()) {
+      if(status.getContainerId().equals(amContainerId)) {
+        amContainerExists = true;
+      }
       if(status.getContainerId().equals(containerId3)) {
         // containerId3 is the container ran by previous attempt but finished by the
         // new attempt.
@@ -220,8 +227,11 @@ public class TestAMRestart {
         container6Exists = true;
       }
     }
-    Assert.assertTrue(container3Exists && container4Exists && container5Exists
-        && container6Exists);
+    Assert.assertTrue(amContainerExists);
+    Assert.assertTrue(container3Exists);
+    Assert.assertTrue(container4Exists);
+    Assert.assertTrue(container5Exists);
+    Assert.assertTrue(container6Exists);
 
     // New SchedulerApplicationAttempt also has the containers info.
     rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
@@ -240,14 +250,14 @@ public class TestAMRestart {
     // all 4 normal containers finished.
     System.out.println("New attempt's just finished containers: "
         + newAttempt.getJustFinishedContainers());
-    waitForContainersToFinish(5, newAttempt);
+    waitForContainersToFinish(6, newAttempt);
     rm1.stop();
   }
 
   private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
       throws InterruptedException {
     int count = 0;
-    while (attempt.getJustFinishedContainers().size() != expectedNum
+    while (attempt.getJustFinishedContainers().size() < expectedNum
         && count < 500) {
       Thread.sleep(100);
       count++;

+ 119 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -28,6 +28,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.when;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -91,6 +93,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -151,6 +158,7 @@ public class TestRMAppAttemptTransitions {
   private NMTokenSecretManagerInRM nmTokenManager =
       spy(new NMTokenSecretManagerInRM(conf));
   private boolean transferStateFromPreviousAttempt = false;
+  private EventHandler<RMNodeEvent> rmnodeEventHandler;
 
   private final class TestApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
@@ -203,7 +211,7 @@ public class TestRMAppAttemptTransitions {
       applicationMasterLauncher.handle(event);
     }
   }
-  
+
   private static int appId = 1;
   
   private ApplicationSubmissionContext submissionContext = null;
@@ -268,6 +276,9 @@ public class TestRMAppAttemptTransitions {
     rmDispatcher.register(AMLauncherEventType.class, 
         new TestAMLauncherEventDispatcher());
 
+    rmnodeEventHandler = mock(RMNodeImpl.class);
+    rmDispatcher.register(RMNodeEventType.class, rmnodeEventHandler);
+
     rmDispatcher.init(conf);
     rmDispatcher.start();
     
@@ -575,6 +586,8 @@ public class TestRMAppAttemptTransitions {
     }
     assertEquals(finishedContainerCount, applicationAttempt
         .getJustFinishedContainers().size());
+    Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
@@ -704,7 +717,8 @@ public class TestRMAppAttemptTransitions {
     application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(),
         container.getNodeId()));
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-        applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
+        applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class),
+        container.getNodeId()));
     // complete AM
     String diagnostics = "Successful";
     FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
@@ -752,10 +766,11 @@ public class TestRMAppAttemptTransitions {
     when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L);
     when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L);
     sendAttemptUpdateSavedEvent(applicationAttempt);
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
         attemptId, 
         ContainerStatus.newInstance(
-            amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+            amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
 
     when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null);
 
@@ -857,8 +872,9 @@ public class TestRMAppAttemptTransitions {
             SchedulerUtils.LOST_CONTAINER);
     // send CONTAINER_FINISHED event at SCHEDULED state,
     // The state should be FINAL_SAVING with previous state SCHEDULED
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-        applicationAttempt.getAppAttemptId(), cs));
+        applicationAttempt.getAppAttemptId(), cs, anyNodeId));
     // createApplicationAttemptState will return previous state (SCHEDULED),
     // if the current state is FINAL_SAVING.
     assertEquals(YarnApplicationAttemptState.SCHEDULED,
@@ -904,8 +920,9 @@ public class TestRMAppAttemptTransitions {
     ContainerStatus cs =
         BuilderUtils.newContainerStatus(amContainer.getId(),
           ContainerState.COMPLETE, containerDiagMsg, exitCode);
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      applicationAttempt.getAppAttemptId(), cs));
+      applicationAttempt.getAppAttemptId(), cs, anyNodeId));
     assertEquals(YarnApplicationAttemptState.ALLOCATED,
         applicationAttempt.createApplicationAttemptState());
     sendAttemptUpdateSavedEvent(applicationAttempt);
@@ -928,16 +945,17 @@ public class TestRMAppAttemptTransitions {
     ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
         ContainerState.COMPLETE, containerDiagMsg, exitCode);
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-        appAttemptId, cs));
+        appAttemptId, cs, anyNodeId));
 
     // ignored ContainerFinished and Expire at FinalSaving if we were supposed
     // to Failed state.
     assertEquals(RMAppAttemptState.FINAL_SAVING,
-      applicationAttempt.getAppAttemptState()); 
+      applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
-        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+        amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
     applicationAttempt.handle(new RMAppAttemptEvent(
       applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
     assertEquals(RMAppAttemptState.FINAL_SAVING,
@@ -947,7 +965,7 @@ public class TestRMAppAttemptTransitions {
     sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
         applicationAttempt.getAppAttemptState());
-    assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -971,10 +989,11 @@ public class TestRMAppAttemptTransitions {
     // ignored ContainerFinished and Expire at FinalSaving if we were supposed
     // to Killed state.
     assertEquals(RMAppAttemptState.FINAL_SAVING,
-      applicationAttempt.getAppAttemptState()); 
+      applicationAttempt.getAppAttemptState());
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
-        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+        amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
     applicationAttempt.handle(new RMAppAttemptEvent(
       applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
     assertEquals(RMAppAttemptState.FINAL_SAVING,
@@ -984,7 +1003,7 @@ public class TestRMAppAttemptTransitions {
     sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.KILLED,
         applicationAttempt.getAppAttemptState());
-    assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1144,13 +1163,14 @@ public class TestRMAppAttemptTransitions {
     unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
         diagnostics);
     // container must be AM container to move from FINISHING to FINISHED
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(
         new RMAppAttemptContainerFinishedEvent(
             applicationAttempt.getAppAttemptId(),
             BuilderUtils.newContainerStatus(
                 BuilderUtils.newContainerId(
                     applicationAttempt.getAppAttemptId(), 42),
-                ContainerState.COMPLETE, "", 0)));
+                ContainerState.COMPLETE, "", 0), anyNodeId));
     testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
         diagnostics);
   }
@@ -1165,13 +1185,14 @@ public class TestRMAppAttemptTransitions {
     String diagnostics = "Successful";
     unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
         diagnostics);
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(
         new RMAppAttemptContainerFinishedEvent(
             applicationAttempt.getAppAttemptId(),
             BuilderUtils.newContainerStatus(amContainer.getId(),
-                ContainerState.COMPLETE, "", 0)));
+                ContainerState.COMPLETE, "", 0), anyNodeId));
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
-        diagnostics, 0, false);
+        diagnostics, 1, false);
   }
 
   // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@@ -1195,15 +1216,16 @@ public class TestRMAppAttemptTransitions {
     assertEquals(YarnApplicationAttemptState.RUNNING,
         applicationAttempt.createApplicationAttemptState());
     // Container_finished event comes before Attempt_Saved event.
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
-        amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+        amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
     assertEquals(RMAppAttemptState.FINAL_SAVING,
       applicationAttempt.getAppAttemptState());
     // send attempt_saved
     sendAttemptUpdateSavedEvent(applicationAttempt);
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
-      diagnostics, 0, false);
+      diagnostics, 1, false);
   }
 
   // While attempt is at FINAL_SAVING, Expire event may come before
@@ -1235,6 +1257,71 @@ public class TestRMAppAttemptTransitions {
       diagnostics, 0, false);
   }
 
+  @Test
+  public void testFinishedContainer() {
+    Container amContainer = allocateApplicationAttempt();
+    launchApplicationAttempt(amContainer);
+    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+    // Complete one container
+    ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+        .getAppAttemptId(), 2);
+    Container container1 = mock(Container.class);
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    when(container1.getId()).thenReturn(
+        containerId1);
+    when(containerStatus1.getContainerId()).thenReturn(containerId1);
+    when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+
+    application.handle(new RMAppRunningOnNodeEvent(application
+        .getApplicationId(),
+        container1.getNodeId()));
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), containerStatus1,
+        container1.getNodeId()));
+
+    ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
+        ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+
+    // Verify justFinishedContainers
+    Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
+        .size());
+    Assert.assertEquals(container1.getId(), applicationAttempt
+        .getJustFinishedContainers().get(0).getContainerId());
+    Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
+
+    // Verify finishedContainersSentToAM gets container after pull
+    List<ContainerStatus> containerStatuses = applicationAttempt
+        .pullJustFinishedContainers();
+    Assert.assertEquals(1, containerStatuses.size());
+    Mockito.verify(rmnodeEventHandler, never()).handle(Mockito
+        .any(RMNodeEvent.class));
+    Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
+    Assert.assertEquals(1, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
+
+    // Verify container is acked to NM via the RMNodeEvent after second pull
+    containerStatuses = applicationAttempt.pullJustFinishedContainers();
+    Assert.assertEquals(0, containerStatuses.size());
+    Mockito.verify(rmnodeEventHandler).handle(captor.capture());
+    Assert.assertEquals(container1.getId(), captor.getValue().getContainers()
+        .get(0));
+    Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
+    Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+        .size());
+  }
+
+  private static List<ContainerStatus> getFinishedContainersSentToAM(
+      RMAppAttempt applicationAttempt) {
+    List<ContainerStatus> containers = new ArrayList<ContainerStatus>();
+    for (List<ContainerStatus> containerStatuses: applicationAttempt
+        .getFinishedContainersSentToAMReference().values()) {
+      containers.addAll(containerStatuses);
+    }
+    return containers;
+  }
+
   // this is to test user can get client tokens only after the client token
   // master key is saved in the state store and also registered in
   // ClientTokenSecretManager
@@ -1281,8 +1368,9 @@ public class TestRMAppAttemptTransitions {
         ContainerStatus.newInstance(amContainer.getId(),
           ContainerState.COMPLETE, "some error", 123);
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs1));
+      appAttemptId, cs1, anyNodeId));
     assertEquals(YarnApplicationAttemptState.RUNNING,
         applicationAttempt.createApplicationAttemptState());
     sendAttemptUpdateSavedEvent(applicationAttempt);
@@ -1293,15 +1381,21 @@ public class TestRMAppAttemptTransitions {
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
 
     // failed attempt captured the container finished event.
-    assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
     ContainerStatus cs2 =
         ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
           ContainerState.COMPLETE, "", 0);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs2));
-    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
-    assertEquals(cs2.getContainerId(), applicationAttempt
-      .getJustFinishedContainers().get(0).getContainerId());
+      appAttemptId, cs2, anyNodeId));
+    assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+    boolean found = false;
+    for (ContainerStatus containerStatus:applicationAttempt
+        .getJustFinishedContainers()) {
+      if (cs2.getContainerId().equals(containerStatus.getContainerId())) {
+        found = true;
+      }
+    }
+    assertTrue(found);
   }
 
 
@@ -1322,8 +1416,9 @@ public class TestRMAppAttemptTransitions {
         ContainerStatus.newInstance(amContainer.getId(),
           ContainerState.COMPLETE, "some error", 123);
     ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+    NodeId anyNodeId = NodeId.newInstance("host", 1234);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs1));
+      appAttemptId, cs1, anyNodeId));
     assertEquals(YarnApplicationAttemptState.RUNNING,
         applicationAttempt.createApplicationAttemptState());
     sendAttemptUpdateSavedEvent(applicationAttempt);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

@@ -161,7 +161,7 @@ public class TestAMRMTokens {
           .getEventHandler()
           .handle(
               new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
-                  containerStatus));
+                  containerStatus, nm1.getNodeId()));
 
       // Make sure the RMAppAttempt is at Finished State.
       // Both AMRMToken and ClientToAMToken have been removed.