|
@@ -645,7 +645,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@Private
|
|
@Private
|
|
public void removeOrTrackCompletedContainersFromContext(
|
|
public void removeOrTrackCompletedContainersFromContext(
|
|
- List<ContainerId> containerIds) throws IOException {
|
|
|
|
|
|
+ List<ContainerId> containerIds) {
|
|
Set<ContainerId> removedContainers = new HashSet<ContainerId>();
|
|
Set<ContainerId> removedContainers = new HashSet<ContainerId>();
|
|
|
|
|
|
pendingContainersToRemove.addAll(containerIds);
|
|
pendingContainersToRemove.addAll(containerIds);
|
|
@@ -662,13 +662,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
removedContainers.add(containerId);
|
|
removedContainers.add(containerId);
|
|
iter.remove();
|
|
iter.remove();
|
|
}
|
|
}
|
|
|
|
+ pendingCompletedContainers.remove(containerId);
|
|
}
|
|
}
|
|
|
|
|
|
if (!removedContainers.isEmpty()) {
|
|
if (!removedContainers.isEmpty()) {
|
|
LOG.info("Removed completed containers from NM context: "
|
|
LOG.info("Removed completed containers from NM context: "
|
|
+ removedContainers);
|
|
+ removedContainers);
|
|
}
|
|
}
|
|
- pendingCompletedContainers.clear();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
|
|
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
|
|
@@ -1037,6 +1037,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public void run() {
|
|
public void run() {
|
|
int lastHeartbeatID = 0;
|
|
int lastHeartbeatID = 0;
|
|
|
|
+ boolean missedHearbeat = false;
|
|
while (!isStopped) {
|
|
while (!isStopped) {
|
|
// Send heartbeat
|
|
// Send heartbeat
|
|
try {
|
|
try {
|
|
@@ -1083,6 +1084,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
removeOrTrackCompletedContainersFromContext(response
|
|
removeOrTrackCompletedContainersFromContext(response
|
|
.getContainersToBeRemovedFromNM());
|
|
.getContainersToBeRemovedFromNM());
|
|
|
|
|
|
|
|
+ // If the last heartbeat was missed, it is possible that the
|
|
|
|
+ // RM saw this one as a duplicate and did not process it.
|
|
|
|
+ // If so, we can fail to notify the RM of these completed containers
|
|
|
|
+ // on the next heartbeat if we clear pendingCompletedContainers.
|
|
|
|
+ // If it wasn't a duplicate, the only impact is we might notify
|
|
|
|
+ // the RM twice, which it can handle.
|
|
|
|
+ if (!missedHearbeat) {
|
|
|
|
+ pendingCompletedContainers.clear();
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("skipped clearing pending completed containers due to " +
|
|
|
|
+ "missed heartbeat");
|
|
|
|
+ missedHearbeat = false;
|
|
|
|
+ }
|
|
|
|
+
|
|
logAggregationReportForAppsTempList.clear();
|
|
logAggregationReportForAppsTempList.clear();
|
|
lastHeartbeatID = response.getResponseId();
|
|
lastHeartbeatID = response.getResponseId();
|
|
List<ContainerId> containersToCleanup = response
|
|
List<ContainerId> containersToCleanup = response
|
|
@@ -1158,6 +1173,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
// TODO Better error handling. Thread can die with the rest of the
|
|
// TODO Better error handling. Thread can die with the rest of the
|
|
// NM still running.
|
|
// NM still running.
|
|
LOG.error("Caught exception in status-updater", e);
|
|
LOG.error("Caught exception in status-updater", e);
|
|
|
|
+ missedHearbeat = true;
|
|
} finally {
|
|
} finally {
|
|
synchronized (heartbeatMonitor) {
|
|
synchronized (heartbeatMonitor) {
|
|
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
|
|
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
|