|
@@ -1658,6 +1658,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Ack NM to remove finished AM container, not waiting for
|
|
|
+ // new appattempt to pull am container complete msg, new appattempt
|
|
|
+ // may launch fail and leaves too many completed container in NM
|
|
|
+ private void sendFinishedAMContainerToNM(NodeId nodeId,
|
|
|
+ ContainerId containerId) {
|
|
|
+ List<ContainerId> containerIdList = new ArrayList<ContainerId>();
|
|
|
+ containerIdList.add(containerId);
|
|
|
+ eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
|
|
|
+ nodeId, containerIdList));
|
|
|
+ }
|
|
|
|
|
|
// Ack NM to remove finished containers from context.
|
|
|
private void sendFinishedContainersToNM() {
|
|
@@ -1686,9 +1696,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
new ArrayList<ContainerStatus>());
|
|
|
appAttempt.finishedContainersSentToAM.get(nodeId).add(
|
|
|
containerFinishedEvent.getContainerStatus());
|
|
|
+
|
|
|
if (!appAttempt.getSubmissionContext()
|
|
|
.getKeepContainersAcrossApplicationAttempts()) {
|
|
|
appAttempt.sendFinishedContainersToNM();
|
|
|
+ } else {
|
|
|
+ appAttempt.sendFinishedAMContainerToNM(nodeId,
|
|
|
+ containerFinishedEvent.getContainerStatus().getContainerId());
|
|
|
}
|
|
|
}
|
|
|
|