|
@@ -265,10 +265,10 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
|
|
|
allocateResponse.setAMResponse(reboot);
|
|
|
return allocateResponse;
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
// Allow only one thread in AM to do heartbeat at a time.
|
|
|
- synchronized (lastResponse) { // BUG TODO: Locking order is screwed.
|
|
|
+ synchronized (lastResponse) {
|
|
|
|
|
|
// Send the status update to the appAttempt.
|
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
@@ -282,7 +282,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
Allocation allocation =
|
|
|
this.rScheduler.allocate(appAttemptId, ask, release);
|
|
|
|
|
|
- RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
|
|
|
+ RMApp app = this.rmContext.getRMApps().get(
|
|
|
+ appAttemptId.getApplicationId());
|
|
|
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
|
|
|
|
|
|
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
|
|
@@ -316,7 +317,18 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
.pullJustFinishedContainers());
|
|
|
response.setResponseId(lastResponse.getResponseId() + 1);
|
|
|
response.setAvailableResources(allocation.getResourceLimit());
|
|
|
- responseMap.put(appAttemptId, response);
|
|
|
+
|
|
|
+ AMResponse oldResponse = responseMap.put(appAttemptId, response);
|
|
|
+ if (oldResponse == null) {
|
|
|
+ // appAttempt got unregistered, remove it back out
|
|
|
+ responseMap.remove(appAttemptId);
|
|
|
+ String message = "App Attempt removed from the cache during allocate"
|
|
|
+ + appAttemptId;
|
|
|
+ LOG.error(message);
|
|
|
+ allocateResponse.setAMResponse(reboot);
|
|
|
+ return allocateResponse;
|
|
|
+ }
|
|
|
+
|
|
|
allocateResponse.setAMResponse(response);
|
|
|
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
|
|
|
return allocateResponse;
|
|
@@ -331,12 +343,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
}
|
|
|
|
|
|
public void unregisterAttempt(ApplicationAttemptId attemptId) {
|
|
|
- AMResponse lastResponse = responseMap.get(attemptId);
|
|
|
- if (lastResponse != null) {
|
|
|
- synchronized (lastResponse) {
|
|
|
- responseMap.remove(attemptId);
|
|
|
- }
|
|
|
- }
|
|
|
+ responseMap.remove(attemptId);
|
|
|
}
|
|
|
|
|
|
public void refreshServiceAcls(Configuration configuration,
|