|
@@ -40,7 +40,6 @@ import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.http.HttpConfig;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
@@ -82,8 +81,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
@@ -142,7 +142,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
private float progress = 0;
|
|
|
private String host = "N/A";
|
|
|
private int rpcPort;
|
|
|
- private String origTrackingUrl = "N/A";
|
|
|
+ private String originalTrackingUrl = "N/A";
|
|
|
private String proxiedTrackingUrl = "N/A";
|
|
|
private long startTime = 0;
|
|
|
|
|
@@ -157,6 +157,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
private static final ExpiredTransition EXPIRED_TRANSITION =
|
|
|
new ExpiredTransition();
|
|
|
|
|
|
+ private RMAppAttemptEvent eventCausingFinalSaving;
|
|
|
+ private RMAppAttemptState targetedFinalState;
|
|
|
+ private RMAppAttemptState recoveredFinalState;
|
|
|
+ private Object transitionTodo;
|
|
|
+
|
|
|
private static final StateMachineFactory<RMAppAttemptImpl,
|
|
|
RMAppAttemptState,
|
|
|
RMAppAttemptEventType,
|
|
@@ -169,68 +174,80 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
// Transitions from NEW State
|
|
|
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
|
|
|
RMAppAttemptEventType.START, new AttemptStartedTransition())
|
|
|
- .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
|
|
|
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
- .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
|
|
|
+ new FinalSavingTransition(new BaseFinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.REGISTERED,
|
|
|
- new UnexpectedAMRegisteredTransition())
|
|
|
- .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED,
|
|
|
- RMAppAttemptEventType.RECOVER)
|
|
|
+ new FinalSavingTransition(
|
|
|
+ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
|
|
|
+ .addTransition( RMAppAttemptState.NEW,
|
|
|
+ EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
|
|
|
+ RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
|
|
|
+ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
|
|
|
|
|
|
// Transitions from SUBMITTED state
|
|
|
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
|
|
|
- RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
|
|
|
+ .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
|
|
|
+ RMAppAttemptEventType.APP_REJECTED,
|
|
|
+ new FinalSavingTransition(new AppRejectedTransition(),
|
|
|
+ RMAppAttemptState.FAILED))
|
|
|
.addTransition(RMAppAttemptState.SUBMITTED,
|
|
|
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
RMAppAttemptState.SCHEDULED),
|
|
|
RMAppAttemptEventType.APP_ACCEPTED,
|
|
|
new ScheduleTransition())
|
|
|
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
|
|
|
+ .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
- .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
|
|
|
+ new FinalSavingTransition(new BaseFinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
+ .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.REGISTERED,
|
|
|
- new UnexpectedAMRegisteredTransition())
|
|
|
+ new FinalSavingTransition(
|
|
|
+ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
|
|
|
|
|
|
// Transitions from SCHEDULED State
|
|
|
.addTransition(RMAppAttemptState.SCHEDULED,
|
|
|
RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
|
new AMContainerAllocatedTransition())
|
|
|
- .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
|
|
|
+ .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
+ new FinalSavingTransition(new BaseFinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
|
|
|
// Transitions from ALLOCATED_SAVING State
|
|
|
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
RMAppAttemptState.ALLOCATED,
|
|
|
- RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
|
|
|
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
|
|
|
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
|
new ContainerAcquiredTransition())
|
|
|
// App could be killed by the client. So need to handle this.
|
|
|
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
- RMAppAttemptState.KILLED,
|
|
|
+ RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
-
|
|
|
+ new FinalSavingTransition(new BaseFinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
+
|
|
|
// Transitions from LAUNCHED_UNMANAGED_SAVING State
|
|
|
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
RMAppAttemptState.LAUNCHED,
|
|
|
- RMAppAttemptEventType.ATTEMPT_SAVED,
|
|
|
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
|
|
|
new UnmanagedAMAttemptSavedTransition())
|
|
|
// attempt should not try to register in this state
|
|
|
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
- RMAppAttemptState.FAILED,
|
|
|
+ RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.REGISTERED,
|
|
|
- new UnexpectedAMRegisteredTransition())
|
|
|
+ new FinalSavingTransition(
|
|
|
+ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
|
|
|
// App could be killed by the client. So need to handle this.
|
|
|
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
- RMAppAttemptState.KILLED,
|
|
|
+ RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
+ new FinalSavingTransition(new BaseFinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
|
|
|
// Transitions from ALLOCATED State
|
|
|
.addTransition(RMAppAttemptState.ALLOCATED,
|
|
@@ -239,32 +256,40 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
new ContainerAcquiredTransition())
|
|
|
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
|
|
|
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
|
|
|
- .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
|
|
|
- RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition())
|
|
|
- .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED,
|
|
|
- RMAppAttemptEventType.KILL, new KillAllocatedAMTransition())
|
|
|
+ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
|
|
|
+ RMAppAttemptEventType.LAUNCH_FAILED,
|
|
|
+ new FinalSavingTransition(new LaunchFailedTransition(),
|
|
|
+ RMAppAttemptState.FAILED))
|
|
|
+ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
|
|
|
+ RMAppAttemptEventType.KILL,
|
|
|
+ new FinalSavingTransition(
|
|
|
+ new KillAllocatedAMTransition(), RMAppAttemptState.KILLED))
|
|
|
|
|
|
- .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
|
|
|
+ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
- new AMContainerCrashedTransition())
|
|
|
+ new FinalSavingTransition(
|
|
|
+ new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
|
|
|
|
|
|
// Transitions from LAUNCHED State
|
|
|
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
|
|
|
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
|
|
|
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
|
|
|
+ .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
- new AMContainerCrashedTransition())
|
|
|
+ new FinalSavingTransition(
|
|
|
+ new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
|
|
|
.addTransition(
|
|
|
- RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
|
|
|
+ RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.EXPIRE,
|
|
|
- EXPIRED_TRANSITION)
|
|
|
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.KILLED,
|
|
|
+ new FinalSavingTransition(EXPIRED_TRANSITION,
|
|
|
+ RMAppAttemptState.FAILED))
|
|
|
+ .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new FinalTransition(RMAppAttemptState.KILLED))
|
|
|
+ new FinalSavingTransition(new FinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
|
|
|
// Transitions from RUNNING State
|
|
|
.addTransition(RMAppAttemptState.RUNNING,
|
|
|
- EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
|
|
|
+ EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED),
|
|
|
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
|
|
|
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
|
|
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
|
@@ -276,17 +301,41 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
new ContainerAcquiredTransition())
|
|
|
.addTransition(
|
|
|
RMAppAttemptState.RUNNING,
|
|
|
- EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
|
|
|
+ EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
|
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
new ContainerFinishedTransition())
|
|
|
.addTransition(
|
|
|
- RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED,
|
|
|
+ RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.EXPIRE,
|
|
|
- EXPIRED_TRANSITION)
|
|
|
+ new FinalSavingTransition(EXPIRED_TRANSITION,
|
|
|
+ RMAppAttemptState.FAILED))
|
|
|
.addTransition(
|
|
|
- RMAppAttemptState.RUNNING, RMAppAttemptState.KILLED,
|
|
|
+ RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
|
|
|
RMAppAttemptEventType.KILL,
|
|
|
- new FinalTransition(RMAppAttemptState.KILLED))
|
|
|
+ new FinalSavingTransition(new FinalTransition(
|
|
|
+ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
|
|
+
|
|
|
+ // Transitions from FINAL_SAVING State
|
|
|
+ .addTransition(RMAppAttemptState.FINAL_SAVING,
|
|
|
+ EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FAILED,
|
|
|
+ RMAppAttemptState.KILLED, RMAppAttemptState.FINISHED),
|
|
|
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED,
|
|
|
+ new FinalStateSavedTransition())
|
|
|
+ .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
|
|
|
+ RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
+ new ContainerFinishedAtFinalSavingTransition())
|
|
|
+ .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
|
|
|
+ RMAppAttemptEventType.EXPIRE,
|
|
|
+ new AMExpiredAtFinalSavingTransition())
|
|
|
+ .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
|
|
|
+ EnumSet.of(
|
|
|
+ RMAppAttemptEventType.UNREGISTERED,
|
|
|
+ RMAppAttemptEventType.STATUS_UPDATE,
|
|
|
+ // should be fixed to reject container allocate request at Final
|
|
|
+ // Saving in scheduler
|
|
|
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
|
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
|
+ RMAppAttemptEventType.KILL))
|
|
|
|
|
|
// Transitions from FAILED State
|
|
|
.addTransition(
|
|
@@ -338,7 +387,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
RMAppAttemptEventType.EXPIRE,
|
|
|
RMAppAttemptEventType.REGISTERED,
|
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
|
- RMAppAttemptEventType.ATTEMPT_SAVED,
|
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
RMAppAttemptEventType.UNREGISTERED,
|
|
|
RMAppAttemptEventType.KILL,
|
|
@@ -357,7 +405,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
RMAppAttemptEventType.REGISTERED,
|
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
|
RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
|
- RMAppAttemptEventType.ATTEMPT_SAVED,
|
|
|
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
|
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
RMAppAttemptEventType.UNREGISTERED,
|
|
|
RMAppAttemptEventType.KILL,
|
|
@@ -411,7 +459,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
public RMAppAttemptState getAppAttemptState() {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
- return this.stateMachine.getCurrentState();
|
|
|
+ return this.stateMachine.getCurrentState();
|
|
|
} finally {
|
|
|
this.readLock.unlock();
|
|
|
}
|
|
@@ -444,7 +492,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
return (getSubmissionContext().getUnmanagedAM()) ?
|
|
|
- this.origTrackingUrl : this.proxiedTrackingUrl;
|
|
|
+ this.originalTrackingUrl : this.proxiedTrackingUrl;
|
|
|
} finally {
|
|
|
this.readLock.unlock();
|
|
|
}
|
|
@@ -454,7 +502,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
public String getOriginalTrackingUrl() {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
- return this.origTrackingUrl;
|
|
|
+ return this.originalTrackingUrl;
|
|
|
} finally {
|
|
|
this.readLock.unlock();
|
|
|
}
|
|
@@ -490,10 +538,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
|
|
|
private void setTrackingUrlToRMAppPage() {
|
|
|
- origTrackingUrl = pjoin(
|
|
|
+ originalTrackingUrl = pjoin(
|
|
|
WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
|
|
|
"cluster", "app", getAppAttemptId().getApplicationId());
|
|
|
- proxiedTrackingUrl = origTrackingUrl;
|
|
|
+ proxiedTrackingUrl = originalTrackingUrl;
|
|
|
}
|
|
|
|
|
|
// This is only used for RMStateStore. Normal operation must invoke the secret
|
|
@@ -539,16 +587,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void setDiagnostics(String message) {
|
|
|
- this.writeLock.lock();
|
|
|
-
|
|
|
- try {
|
|
|
- this.diagnostics.append(message);
|
|
|
- } finally {
|
|
|
- this.writeLock.unlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public float getProgress() {
|
|
|
this.readLock.lock();
|
|
@@ -673,19 +711,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void recover(RMState state) throws Exception{
|
|
|
- ApplicationState appState =
|
|
|
+ public void recover(RMState state) throws Exception {
|
|
|
+ ApplicationState appState =
|
|
|
state.getApplicationState().get(getAppAttemptId().getApplicationId());
|
|
|
- ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
|
|
|
+ ApplicationAttemptState attemptState =
|
|
|
+ appState.getAttempt(getAppAttemptId());
|
|
|
assert attemptState != null;
|
|
|
+ LOG.info("Recovered attempt: AppId: "
|
|
|
+ + getAppAttemptId().getApplicationId() + " AttemptId: "
|
|
|
+ + getAppAttemptId() + " MasterContainer: " + masterContainer);
|
|
|
+ diagnostics.append("Attempt recovered after RM restart");
|
|
|
+ diagnostics.append(attemptState.getDiagnostics());
|
|
|
setMasterContainer(attemptState.getMasterContainer());
|
|
|
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
|
|
|
- LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
|
|
|
- + " AttemptId: " + getAppAttemptId()
|
|
|
- + " MasterContainer: " + masterContainer);
|
|
|
- setDiagnostics("Attempt recovered after RM restart");
|
|
|
- handle(new RMAppAttemptEvent(getAppAttemptId(),
|
|
|
- RMAppAttemptEventType.RECOVER));
|
|
|
+ this.recoveredFinalState = attemptState.getState();
|
|
|
+ this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
|
|
|
+ this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
|
|
|
+ this.finalStatus = attemptState.getFinalApplicationStatus();
|
|
|
+ this.startTime = attemptState.getStartTime();
|
|
|
+ handle(new RMAppAttemptEvent(getAppAttemptId(),
|
|
|
+ RMAppAttemptEventType.RECOVER));
|
|
|
}
|
|
|
|
|
|
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
|
|
@@ -763,7 +808,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
|
|
|
// Save the diagnostic message
|
|
|
String message = rejectedEvent.getMessage();
|
|
|
- appAttempt.setDiagnostics(message);
|
|
|
+ appAttempt.diagnostics.append(message);
|
|
|
|
|
|
// Send the rejection event to app
|
|
|
appAttempt.eventHandler.handle(
|
|
@@ -810,10 +855,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
return RMAppAttemptState.SCHEDULED;
|
|
|
} else {
|
|
|
- // RM not allocating container. AM is self launched.
|
|
|
- RMStateStore store = appAttempt.rmContext.getStateStore();
|
|
|
// save state and then go to LAUNCHED state
|
|
|
- appAttempt.storeAttempt(store);
|
|
|
+ appAttempt.storeAttempt();
|
|
|
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
|
|
|
}
|
|
|
}
|
|
@@ -838,8 +881,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
0));
|
|
|
appAttempt.getSubmissionContext().setResource(
|
|
|
appAttempt.getMasterContainer().getResource());
|
|
|
- RMStateStore store = appAttempt.rmContext.getStateStore();
|
|
|
- appAttempt.storeAttempt(store);
|
|
|
+ appAttempt.storeAttempt();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -851,6 +893,134 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
appAttempt.launchAttempt();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static class AttemptRecoveredTransition
|
|
|
+ implements
|
|
|
+ MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
|
|
+ @Override
|
|
|
+ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|
|
+ RMAppAttemptEvent event) {
|
|
|
+ if (appAttempt.recoveredFinalState != null) {
|
|
|
+ appAttempt.progress = 1.0f;
|
|
|
+ return appAttempt.recoveredFinalState;
|
|
|
+ } else {
|
|
|
+ return RMAppAttemptState.RECOVERED;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void rememberTargetTransitions(RMAppAttemptEvent event,
|
|
|
+ Object transitionToDo, RMAppAttemptState targetFinalState) {
|
|
|
+ transitionTodo = transitionToDo;
|
|
|
+ targetedFinalState = targetFinalState;
|
|
|
+ eventCausingFinalSaving = event;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
|
|
|
+ Object transitionToDo, RMAppAttemptState targetFinalState,
|
|
|
+ RMAppAttemptState stateToBeStored) {
|
|
|
+
|
|
|
+ rememberTargetTransitions(event, transitionToDo, targetFinalState);
|
|
|
+
|
|
|
+ // As of today, finalState, diagnostics, final-tracking-url and
|
|
|
+ // finalAppStatus are the only things that we store into the StateStore
|
|
|
+ // AFTER the initial saving on app-attempt-start
|
|
|
+ // These fields can be visible from outside only after they are saved in
|
|
|
+ // StateStore
|
|
|
+ String diags = null;
|
|
|
+ String finalTrackingUrl = null;
|
|
|
+ FinalApplicationStatus finalStatus = null;
|
|
|
+
|
|
|
+ switch (event.getType()) {
|
|
|
+ case APP_REJECTED:
|
|
|
+ RMAppAttemptRejectedEvent rejectedEvent =
|
|
|
+ (RMAppAttemptRejectedEvent) event;
|
|
|
+ diags = rejectedEvent.getMessage();
|
|
|
+ break;
|
|
|
+ case LAUNCH_FAILED:
|
|
|
+ RMAppAttemptLaunchFailedEvent launchFaileEvent =
|
|
|
+ (RMAppAttemptLaunchFailedEvent) event;
|
|
|
+ diags = launchFaileEvent.getMessage();
|
|
|
+ break;
|
|
|
+ case REGISTERED:
|
|
|
+ diags = getUnexpectedAMRegisteredDiagnostics();
|
|
|
+ break;
|
|
|
+ case UNREGISTERED:
|
|
|
+ RMAppAttemptUnregistrationEvent unregisterEvent =
|
|
|
+ (RMAppAttemptUnregistrationEvent) event;
|
|
|
+ diags = unregisterEvent.getDiagnostics();
|
|
|
+ finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
|
|
|
+ finalStatus = unregisterEvent.getFinalApplicationStatus();
|
|
|
+ break;
|
|
|
+ case CONTAINER_FINISHED:
|
|
|
+ RMAppAttemptContainerFinishedEvent finishEvent =
|
|
|
+ (RMAppAttemptContainerFinishedEvent) event;
|
|
|
+ diags = getAMContainerCrashedDiagnostics(finishEvent);
|
|
|
+ break;
|
|
|
+ case KILL:
|
|
|
+ break;
|
|
|
+ case EXPIRE:
|
|
|
+ diags = getAMExpiredDiagnostics(event);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ RMStateStore rmStore = rmContext.getStateStore();
|
|
|
+ ApplicationAttemptState attemptState =
|
|
|
+ new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
|
|
|
+ rmStore.getCredentialsFromAppAttempt(this), startTime,
|
|
|
+ stateToBeStored, finalTrackingUrl, diags, finalStatus);
|
|
|
+ LOG.info("Updating application attempt " + applicationAttemptId
|
|
|
+ + " with final state: " + targetedFinalState);
|
|
|
+ rmStore.updateApplicationAttemptState(attemptState);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FinalSavingTransition extends BaseTransition {
|
|
|
+
|
|
|
+ Object transitionToDo;
|
|
|
+ RMAppAttemptState targetedFinalState;
|
|
|
+
|
|
|
+ public FinalSavingTransition(Object transitionToDo,
|
|
|
+ RMAppAttemptState targetedFinalState) {
|
|
|
+ this.transitionToDo = transitionToDo;
|
|
|
+ this.targetedFinalState = targetedFinalState;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
+ // For cases Killed/Failed, targetedFinalState is the same as the state to
|
|
|
+ // be stored
|
|
|
+ appAttempt.rememberTargetTransitionsAndStoreState(event, transitionToDo,
|
|
|
+ targetedFinalState, targetedFinalState);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FinalStateSavedTransition implements
|
|
|
+ MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
|
|
+ @Override
|
|
|
+ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|
|
+ RMAppAttemptEvent event) {
|
|
|
+ RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
|
|
|
+ if (storeEvent.getUpdatedException() != null) {
|
|
|
+ LOG.error("Failed to update the final state of application attempt: "
|
|
|
+ + storeEvent.getApplicationAttemptId(),
|
|
|
+ storeEvent.getUpdatedException());
|
|
|
+ ExitUtil.terminate(1, storeEvent.getUpdatedException());
|
|
|
+ }
|
|
|
+
|
|
|
+ RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
|
|
|
+
|
|
|
+ if (appAttempt.transitionTodo instanceof SingleArcTransition) {
|
|
|
+ ((SingleArcTransition) appAttempt.transitionTodo).transition(
|
|
|
+ appAttempt, causeEvent);
|
|
|
+ } else if (appAttempt.transitionTodo instanceof MultipleArcTransition) {
|
|
|
+ ((MultipleArcTransition) appAttempt.transitionTodo).transition(
|
|
|
+ appAttempt, causeEvent);
|
|
|
+ }
|
|
|
+ return appAttempt.targetedFinalState;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
private static class BaseFinalTransition extends BaseTransition {
|
|
|
|
|
@@ -998,15 +1168,20 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
= (RMAppAttemptRegistrationEvent) event;
|
|
|
appAttempt.host = registrationEvent.getHost();
|
|
|
appAttempt.rpcPort = registrationEvent.getRpcport();
|
|
|
- appAttempt.origTrackingUrl =
|
|
|
+ appAttempt.originalTrackingUrl =
|
|
|
sanitizeTrackingUrl(registrationEvent.getTrackingurl());
|
|
|
appAttempt.proxiedTrackingUrl =
|
|
|
- appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
|
|
|
+ appAttempt.generateProxyUriWithScheme(appAttempt.originalTrackingUrl);
|
|
|
|
|
|
// Let the app know
|
|
|
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
|
|
|
.getAppAttemptId().getApplicationId(),
|
|
|
RMAppEventType.ATTEMPT_REGISTERED));
|
|
|
+
|
|
|
+ // TODO:FIXME: Note for future. Unfortunately we only do a state-store
|
|
|
+ // write at AM launch time, so we don't save the AM's tracking URL anywhere
|
|
|
+ // as that would mean an extra state-store write. For now, we hope that in
|
|
|
+ // work-preserving restart, AMs are forced to reregister.
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1029,17 +1204,24 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
appAttempt.getAppAttemptId());
|
|
|
|
|
|
// Setup diagnostic message
|
|
|
- ContainerStatus status = finishEvent.getContainerStatus();
|
|
|
- appAttempt.diagnostics.append("AM Container for " +
|
|
|
- appAttempt.getAppAttemptId() + " exited with " +
|
|
|
- " exitCode: " + status.getExitStatus() +
|
|
|
- " due to: " + status.getDiagnostics() + "." +
|
|
|
- "Failing this attempt.");
|
|
|
+ appAttempt.diagnostics
|
|
|
+ .append(getAMContainerCrashedDiagnostics(finishEvent));
|
|
|
// Tell the app, scheduler
|
|
|
super.transition(appAttempt, finishEvent);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static String getAMContainerCrashedDiagnostics(
|
|
|
+ RMAppAttemptContainerFinishedEvent finishEvent) {
|
|
|
+ ContainerStatus status = finishEvent.getContainerStatus();
|
|
|
+ String diagnostics =
|
|
|
+ "AM Container for " + finishEvent.getApplicationAttemptId()
|
|
|
+ + " exited with " + " exitCode: " + status.getExitStatus()
|
|
|
+ + " due to: " + status.getDiagnostics() + "."
|
|
|
+ + "Failing this attempt.";
|
|
|
+ return diagnostics;
|
|
|
+ }
|
|
|
+
|
|
|
private static class FinalTransition extends BaseFinalTransition {
|
|
|
|
|
|
public FinalTransition(RMAppAttemptState finalAttemptState) {
|
|
@@ -1055,7 +1237,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
// Tell the app and the scheduler
|
|
|
super.transition(appAttempt, event);
|
|
|
|
|
|
- // UnRegister from AMLivelinessMonitor
|
|
|
+ // UnRegister from AMLivelinessMonitor. Perhaps for
|
|
|
+ // FAILING/KILLED/UnManaged AMs
|
|
|
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
|
|
appAttempt.getAppAttemptId());
|
|
|
appAttempt.rmContext.getAMFinishingMonitor().unregister(
|
|
@@ -1078,12 +1261,18 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
@Override
|
|
|
public void transition(RMAppAttemptImpl appAttempt,
|
|
|
RMAppAttemptEvent event) {
|
|
|
- appAttempt.diagnostics.append("ApplicationMaster for attempt " +
|
|
|
- appAttempt.getAppAttemptId() + " timed out");
|
|
|
+ appAttempt.diagnostics.append(getAMExpiredDiagnostics(event));
|
|
|
super.transition(appAttempt, event);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static String getAMExpiredDiagnostics(RMAppAttemptEvent event) {
|
|
|
+ String diag =
|
|
|
+ "ApplicationMaster for attempt " + event.getApplicationAttemptId()
|
|
|
+ + " timed out";
|
|
|
+ return diag;
|
|
|
+ }
|
|
|
+
|
|
|
private static class UnexpectedAMRegisteredTransition extends
|
|
|
BaseFinalTransition {
|
|
|
|
|
@@ -1094,13 +1283,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
@Override
|
|
|
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
assert appAttempt.submissionContext.getUnmanagedAM();
|
|
|
- appAttempt
|
|
|
- .setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
|
|
|
+ appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics());
|
|
|
super.transition(appAttempt, event);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
+ private static String getUnexpectedAMRegisteredDiagnostics() {
|
|
|
+ return "Unmanaged AM must register after AM attempt reaches LAUNCHED state.";
|
|
|
+ }
|
|
|
+
|
|
|
private static final class StatusUpdateTransition extends
|
|
|
BaseTransition {
|
|
|
@Override
|
|
@@ -1125,38 +1317,62 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
@Override
|
|
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|
|
RMAppAttemptEvent event) {
|
|
|
- ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
|
|
|
-
|
|
|
- appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
|
|
|
-
|
|
|
- appAttempt.progress = 1.0f;
|
|
|
-
|
|
|
- RMAppAttemptUnregistrationEvent unregisterEvent
|
|
|
- = (RMAppAttemptUnregistrationEvent) event;
|
|
|
- appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
|
|
|
- appAttempt.origTrackingUrl =
|
|
|
- sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
|
|
|
- appAttempt.proxiedTrackingUrl =
|
|
|
- appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
|
|
|
- appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
|
|
|
-
|
|
|
// Tell the app
|
|
|
if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
|
|
|
// Unmanaged AMs have no container to wait for, so they skip
|
|
|
// the FINISHING state and go straight to FINISHED.
|
|
|
+ appAttempt.updateInfoOnAMUnregister(event);
|
|
|
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
|
|
appAttempt, event);
|
|
|
return RMAppAttemptState.FINISHED;
|
|
|
}
|
|
|
- appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
|
|
|
+ // Saving the attempt final state
|
|
|
+ appAttempt.rememberTargetTransitionsAndStoreState(event,
|
|
|
+ new FinalStateSavedAfterAMUnregisterTransition(),
|
|
|
+ RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
|
|
|
ApplicationId applicationId =
|
|
|
appAttempt.getAppAttemptId().getApplicationId();
|
|
|
- appAttempt.eventHandler.handle(
|
|
|
- new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
|
|
|
- return RMAppAttemptState.FINISHING;
|
|
|
+
|
|
|
+ // Tell the app immediately that AM is unregistering so that app itself
|
|
|
+ // can save its state as soon as possible. Whether we do it like this, or
|
|
|
+ // we wait till AppAttempt is saved, it doesn't make any difference on the
|
|
|
+ // app side w.r.t failure conditions. The only event going out of
|
|
|
+ // AppAttempt to App after this point of time is AM/AppAttempt Finished.
|
|
|
+ appAttempt.eventHandler.handle(new RMAppEvent(applicationId,
|
|
|
+ RMAppEventType.ATTEMPT_UNREGISTERED));
|
|
|
+ return RMAppAttemptState.FINAL_SAVING;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class FinalStateSavedAfterAMUnregisterTransition extends
|
|
|
+ BaseTransition {
|
|
|
+ @Override
|
|
|
+ public void
|
|
|
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
+ // Unregister from the AMlivenessMonitor and register with AMFinishingMonitor
|
|
|
+ appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
|
|
+ appAttempt.applicationAttemptId);
|
|
|
+ appAttempt.rmContext.getAMFinishingMonitor().register(
|
|
|
+ appAttempt.applicationAttemptId);
|
|
|
+
|
|
|
+ // Do not make any more changes to this transition code. Make all changes
|
|
|
+ // to the following method. Unless you are absolutely sure that you have
|
|
|
+ // stuff to do that shouldn't be used by the callers of the following
|
|
|
+ // method.
|
|
|
+ appAttempt.updateInfoOnAMUnregister(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) {
|
|
|
+ progress = 1.0f;
|
|
|
+ RMAppAttemptUnregistrationEvent unregisterEvent =
|
|
|
+ (RMAppAttemptUnregistrationEvent) event;
|
|
|
+ diagnostics.append(unregisterEvent.getDiagnostics());
|
|
|
+ originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
|
|
|
+ proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
|
|
|
+ finalStatus = unregisterEvent.getFinalApplicationStatus();
|
|
|
+ }
|
|
|
+
|
|
|
private static final class ContainerAcquiredTransition extends
|
|
|
BaseTransition {
|
|
|
@Override
|
|
@@ -1185,29 +1401,37 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
// the AMContainer, AppAttempt fails
|
|
|
if (appAttempt.masterContainer != null
|
|
|
&& appAttempt.masterContainer.getId().equals(
|
|
|
- containerStatus.getContainerId())) {
|
|
|
- // container associated with AM. must not be unmanaged
|
|
|
- assert appAttempt.submissionContext.getUnmanagedAM() == false;
|
|
|
- // Setup diagnostic message
|
|
|
- appAttempt.diagnostics.append("AM Container for " +
|
|
|
- appAttempt.getAppAttemptId() + " exited with " +
|
|
|
- " exitCode: " + containerStatus.getExitStatus() +
|
|
|
- " due to: " + containerStatus.getDiagnostics() + "." +
|
|
|
- "Failing this attempt.");
|
|
|
-
|
|
|
- new FinalTransition(RMAppAttemptState.FAILED).transition(
|
|
|
- appAttempt, containerFinishedEvent);
|
|
|
- return RMAppAttemptState.FAILED;
|
|
|
+ containerStatus.getContainerId())) {
|
|
|
+ // Remember the follow up transition and save the final attempt state.
|
|
|
+ appAttempt.rememberTargetTransitionsAndStoreState(event,
|
|
|
+ new ContainerFinishedFinalStateSavedTransition(),
|
|
|
+ RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
|
|
|
+ return RMAppAttemptState.FINAL_SAVING;
|
|
|
}
|
|
|
|
|
|
- // Normal container.
|
|
|
-
|
|
|
- // Put it in completedcontainers list
|
|
|
+ // Normal container.Put it in completedcontainers list
|
|
|
appAttempt.justFinishedContainers.add(containerStatus);
|
|
|
return RMAppAttemptState.RUNNING;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class ContainerFinishedFinalStateSavedTransition extends
|
|
|
+ BaseTransition {
|
|
|
+ @Override
|
|
|
+ public void
|
|
|
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent =
|
|
|
+ (RMAppAttemptContainerFinishedEvent) event;
|
|
|
+ // container associated with AM. must not be unmanaged
|
|
|
+ assert appAttempt.submissionContext.getUnmanagedAM() == false;
|
|
|
+ // Setup diagnostic message
|
|
|
+ appAttempt.diagnostics
|
|
|
+ .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
|
|
|
+ new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
|
|
|
+ event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static final class AMFinishingContainerFinishedTransition
|
|
|
implements
|
|
|
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
|
@@ -1228,13 +1452,83 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
appAttempt, containerFinishedEvent);
|
|
|
return RMAppAttemptState.FINISHED;
|
|
|
}
|
|
|
-
|
|
|
// Normal container.
|
|
|
appAttempt.justFinishedContainers.add(containerStatus);
|
|
|
return RMAppAttemptState.FINISHING;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class ContainerFinishedAtFinalSavingTransition extends
|
|
|
+ BaseTransition {
|
|
|
+ @Override
|
|
|
+ public void
|
|
|
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent =
|
|
|
+ (RMAppAttemptContainerFinishedEvent) event;
|
|
|
+ ContainerStatus containerStatus =
|
|
|
+ containerFinishedEvent.getContainerStatus();
|
|
|
+
|
|
|
+ // If this is the AM container, it means the AM container is finished,
|
|
|
+ // but we are not yet acknowledged that the final state has been saved.
|
|
|
+ // Thus, we still return FINAL_SAVING state here.
|
|
|
+ if (appAttempt.masterContainer.getId().equals(
|
|
|
+ containerStatus.getContainerId())) {
|
|
|
+ if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|
|
|
+ || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
|
|
|
+ // ignore Container_Finished Event if we were supposed to reach
|
|
|
+ // FAILED/KILLED state.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // pass in the earlier AMUnregistered Event also, as this is needed for
|
|
|
+ // AMFinishedAfterFinalSavingTransition later on
|
|
|
+ appAttempt.rememberTargetTransitions(event,
|
|
|
+ new AMFinishedAfterFinalSavingTransition(
|
|
|
+ appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // Normal container.
|
|
|
+ appAttempt.justFinishedContainers.add(containerStatus);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class AMFinishedAfterFinalSavingTransition extends
|
|
|
+ BaseTransition {
|
|
|
+ RMAppAttemptEvent amUnregisteredEvent;
|
|
|
+ public AMFinishedAfterFinalSavingTransition(
|
|
|
+ RMAppAttemptEvent amUnregisteredEvent) {
|
|
|
+ this.amUnregisteredEvent = amUnregisteredEvent;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void
|
|
|
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
+ appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
|
|
|
+ new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
|
|
|
+ event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class AMExpiredAtFinalSavingTransition extends
|
|
|
+ BaseTransition {
|
|
|
+ @Override
|
|
|
+ public void
|
|
|
+ transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
|
|
|
+ if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|
|
|
+ || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
|
|
|
+ // ignore Container_Finished Event if we were supposed to reach
|
|
|
+ // FAILED/KILLED state.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // pass in the earlier AMUnregistered Event also, as this is needed for
|
|
|
+ // AMFinishedAfterFinalSavingTransition later on
|
|
|
+ appAttempt.rememberTargetTransitions(event,
|
|
|
+ new AMFinishedAfterFinalSavingTransition(
|
|
|
+ appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public long getStartTime() {
|
|
|
this.readLock.lock();
|
|
@@ -1256,7 +1550,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
|
|
|
private void checkAttemptStoreError(RMAppAttemptEvent event) {
|
|
|
- RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
|
|
|
+ RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
|
|
|
if(storeEvent.getStoredException() != null)
|
|
|
{
|
|
|
// This needs to be handled for HA and give up master status if we got
|
|
@@ -1267,7 +1561,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void storeAttempt(RMStateStore store) {
|
|
|
+ private void storeAttempt() {
|
|
|
// store attempt data in a non-blocking manner to prevent dispatcher
|
|
|
// thread starvation and wait for state to be saved
|
|
|
LOG.info("Storing attempt: AppId: " +
|
|
@@ -1275,7 +1569,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
+ " AttemptId: " +
|
|
|
getAppAttemptId()
|
|
|
+ " MasterContainer: " + masterContainer);
|
|
|
- store.storeApplicationAttempt(this);
|
|
|
+ rmContext.getStateStore().storeNewApplicationAttempt(this);
|
|
|
}
|
|
|
|
|
|
private void removeCredentials(RMAppAttemptImpl appAttempt) {
|