|
@@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
new HashSet<ContainerId>();
|
|
|
|
|
|
/* the list of applications that have finished and need to be purged */
|
|
|
- private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
|
|
|
+ private final List<ApplicationId> finishedApplications =
|
|
|
+ new ArrayList<ApplicationId>();
|
|
|
+
|
|
|
+ /* the list of applications that are running on this node */
|
|
|
+ private final List<ApplicationId> runningApplications =
|
|
|
+ new ArrayList<ApplicationId>();
|
|
|
|
|
|
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
|
|
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
|
|
-
|
|
|
+
|
|
|
private static final StateMachineFactory<RMNodeImpl,
|
|
|
NodeState,
|
|
|
RMNodeEventType,
|
|
@@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
NodeState,
|
|
|
RMNodeEventType,
|
|
|
RMNodeEvent>(NodeState.NEW)
|
|
|
-
|
|
|
+
|
|
|
//Transitions from NEW state
|
|
|
.addTransition(NodeState.NEW, NodeState.RUNNING,
|
|
|
RMNodeEventType.STARTED, new AddNodeTransition())
|
|
@@ -382,6 +387,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public List<ApplicationId> getRunningApps() {
|
|
|
+ this.readLock.lock();
|
|
|
+ try {
|
|
|
+ return new ArrayList<ApplicationId>(this.runningApplications);
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public List<ContainerId> getContainersToCleanUp() {
|
|
|
|
|
@@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
LOG.warn("Cannot get RMApp by appId=" + appId
|
|
|
+ ", just added it to finishedApplications list for cleanup");
|
|
|
rmNode.finishedApplications.add(appId);
|
|
|
+ rmNode.runningApplications.remove(appId);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ // Add running applications back due to Node add or Node reconnection.
|
|
|
+ rmNode.runningApplications.add(appId);
|
|
|
context.getDispatcher().getEventHandler()
|
|
|
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
|
|
|
}
|
|
@@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
|
|
|
@Override
|
|
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
|
|
- rmNode.finishedApplications.add(((
|
|
|
- RMNodeCleanAppEvent) event).getAppId());
|
|
|
+ ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId();
|
|
|
+ rmNode.finishedApplications.add(appId);
|
|
|
+ rmNode.runningApplications.remove(appId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
+ "cleanup, no further processing");
|
|
|
continue;
|
|
|
}
|
|
|
- if (finishedApplications.contains(containerId.getApplicationAttemptId()
|
|
|
- .getApplicationId())) {
|
|
|
+
|
|
|
+ ApplicationId containerAppId =
|
|
|
+ containerId.getApplicationAttemptId().getApplicationId();
|
|
|
+
|
|
|
+ if (finishedApplications.contains(containerAppId)) {
|
|
|
LOG.info("Container " + containerId
|
|
|
+ " belongs to an application that is already killed,"
|
|
|
+ " no further processing");
|
|
|
continue;
|
|
|
+ } else if (!runningApplications.contains(containerAppId)) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Container " + containerId
|
|
|
+ + " is the first container get launched for application "
|
|
|
+ + containerAppId);
|
|
|
+ }
|
|
|
+ runningApplications.add(containerAppId);
|
|
|
}
|
|
|
|
|
|
// Process running containers
|