|
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
@@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
|
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
|
private static final AppFinishedTransition FINISHED_TRANSITION =
|
|
private static final AppFinishedTransition FINISHED_TRANSITION =
|
|
new AppFinishedTransition();
|
|
new AppFinishedTransition();
|
|
|
|
+ private boolean isAppRemovalRequestSent = false;
|
|
|
|
+ private RMAppState previousStateAtRemoving;
|
|
|
|
|
|
private static final StateMachineFactory<RMAppImpl,
|
|
private static final StateMachineFactory<RMAppImpl,
|
|
RMAppState,
|
|
RMAppState,
|
|
@@ -167,8 +171,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
// Transitions from RUNNING state
|
|
// Transitions from RUNNING state
|
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
- .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
|
|
|
|
- RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
|
|
|
|
|
|
+ .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
|
|
|
|
+ RMAppEventType.ATTEMPT_UNREGISTERED,
|
|
|
|
+ new RMAppRemovingTransition())
|
|
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
|
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
|
.addTransition(RMAppState.RUNNING,
|
|
.addTransition(RMAppState.RUNNING,
|
|
@@ -178,6 +183,17 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
|
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|
|
|
|
|
|
|
+ // Transitions from REMOVING state
|
|
|
|
+ .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
|
|
|
|
+ RMAppEventType.APP_REMOVED, new RMAppFinishingTransition())
|
|
|
|
+ .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
|
|
|
|
+ RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
|
|
|
+ .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
|
|
|
|
+ RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|
|
|
+ // ignorable transitions
|
|
|
|
+ .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
|
|
|
|
+ RMAppEventType.NODE_UPDATE)
|
|
|
|
+
|
|
// Transitions from FINISHING state
|
|
// Transitions from FINISHING state
|
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
|
@@ -185,36 +201,34 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|
// ignorable transitions
|
|
// ignorable transitions
|
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
|
- RMAppEventType.NODE_UPDATE)
|
|
|
|
|
|
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
|
|
|
|
|
|
// Transitions from FINISHED state
|
|
// Transitions from FINISHED state
|
|
- .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
|
|
|
- RMAppEventType.KILL)
|
|
|
|
// ignorable transitions
|
|
// ignorable transitions
|
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
|
EnumSet.of(
|
|
EnumSet.of(
|
|
RMAppEventType.NODE_UPDATE,
|
|
RMAppEventType.NODE_UPDATE,
|
|
- RMAppEventType.ATTEMPT_FINISHING,
|
|
|
|
- RMAppEventType.ATTEMPT_FINISHED))
|
|
|
|
|
|
+ RMAppEventType.ATTEMPT_UNREGISTERED,
|
|
|
|
+ RMAppEventType.ATTEMPT_FINISHED,
|
|
|
|
+ RMAppEventType.KILL,
|
|
|
|
+ RMAppEventType.APP_REMOVED))
|
|
|
|
|
|
// Transitions from FAILED state
|
|
// Transitions from FAILED state
|
|
- .addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
|
|
|
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
|
|
|
|
// ignorable transitions
|
|
// ignorable transitions
|
|
- .addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
|
|
|
- RMAppEventType.NODE_UPDATE)
|
|
|
|
|
|
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
|
|
|
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
|
|
|
|
+ RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
|
|
|
|
|
|
// Transitions from KILLED state
|
|
// Transitions from KILLED state
|
|
|
|
+ // ignorable transitions
|
|
.addTransition(
|
|
.addTransition(
|
|
RMAppState.KILLED,
|
|
RMAppState.KILLED,
|
|
RMAppState.KILLED,
|
|
RMAppState.KILLED,
|
|
EnumSet.of(RMAppEventType.APP_ACCEPTED,
|
|
EnumSet.of(RMAppEventType.APP_ACCEPTED,
|
|
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
|
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
|
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
|
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
|
- RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
|
|
|
|
- // ignorable transitions
|
|
|
|
- .addTransition(RMAppState.KILLED, RMAppState.KILLED,
|
|
|
|
- RMAppEventType.NODE_UPDATE)
|
|
|
|
|
|
+ RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
|
|
|
|
+ RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
|
|
|
|
|
|
.installTopology();
|
|
.installTopology();
|
|
|
|
|
|
@@ -384,6 +398,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
case SUBMITTED:
|
|
case SUBMITTED:
|
|
case ACCEPTED:
|
|
case ACCEPTED:
|
|
case RUNNING:
|
|
case RUNNING:
|
|
|
|
+ case REMOVING:
|
|
return FinalApplicationStatus.UNDEFINED;
|
|
return FinalApplicationStatus.UNDEFINED;
|
|
// finished without a proper final state is the same as failed
|
|
// finished without a proper final state is the same as failed
|
|
case FINISHING:
|
|
case FINISHING:
|
|
@@ -475,7 +490,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
return BuilderUtils.newApplicationReport(this.applicationId,
|
|
return BuilderUtils.newApplicationReport(this.applicationId,
|
|
currentApplicationAttemptId, this.user, this.queue,
|
|
currentApplicationAttemptId, this.user, this.queue,
|
|
this.name, host, rpcPort, clientToAMToken,
|
|
this.name, host, rpcPort, clientToAMToken,
|
|
- RMServerUtils.createApplicationState(this.stateMachine.getCurrentState()), diags,
|
|
|
|
|
|
+ createApplicationState(), diags,
|
|
trackingUrl, this.startTime, this.finishTime, finishState,
|
|
trackingUrl, this.startTime, this.finishTime, finishState,
|
|
appUsageReport, origTrackingUrl, progress, this.applicationType,
|
|
appUsageReport, origTrackingUrl, progress, this.applicationType,
|
|
amrmToken);
|
|
amrmToken);
|
|
@@ -657,6 +672,15 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static final class RMAppRemovingTransition extends RMAppTransition {
|
|
|
|
+ @Override
|
|
|
|
+ public void transition(RMAppImpl app, RMAppEvent event) {
|
|
|
|
+ LOG.info("Removing application with id " + app.applicationId);
|
|
|
|
+ app.removeApplicationState();
|
|
|
|
+ app.previousStateAtRemoving = app.getState();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class AppFinishedTransition extends FinalTransition {
|
|
private static class AppFinishedTransition extends FinalTransition {
|
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
RMAppFinishedAttemptEvent finishedEvent =
|
|
RMAppFinishedAttemptEvent finishedEvent =
|
|
@@ -712,6 +736,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
if (app.getState() != RMAppState.FINISHING) {
|
|
if (app.getState() != RMAppState.FINISHING) {
|
|
app.finishTime = System.currentTimeMillis();
|
|
app.finishTime = System.currentTimeMillis();
|
|
}
|
|
}
|
|
|
|
+ // application completely done and remove from state store.
|
|
|
|
+ app.removeApplicationState();
|
|
|
|
+
|
|
app.handler.handle(
|
|
app.handler.handle(
|
|
new RMAppManagerEvent(app.applicationId,
|
|
new RMAppManagerEvent(app.applicationId,
|
|
RMAppManagerEventType.APP_COMPLETED));
|
|
RMAppManagerEventType.APP_COMPLETED));
|
|
@@ -764,4 +791,52 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
public String getApplicationType() {
|
|
public String getApplicationType() {
|
|
return this.applicationType;
|
|
return this.applicationType;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean isAppSafeToUnregister() {
|
|
|
|
+ RMAppState state = getState();
|
|
|
|
+ return state.equals(RMAppState.FINISHING)
|
|
|
|
+ || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|
|
|
|
+ || state.equals(RMAppState.KILLED) ||
|
|
|
|
+ // If this is an unmanaged AM, we are safe to unregister since unmanaged
|
|
|
|
+ // AM will immediately go to FINISHED state on AM unregistration
|
|
|
|
+ getApplicationSubmissionContext().getUnmanagedAM();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public YarnApplicationState createApplicationState() {
|
|
|
|
+ RMAppState rmAppState = getState();
|
|
|
|
+ // If App is in REMOVING state, return its previous state.
|
|
|
|
+ if (rmAppState.equals(RMAppState.REMOVING)) {
|
|
|
|
+ rmAppState = previousStateAtRemoving;
|
|
|
|
+ }
|
|
|
|
+ switch (rmAppState) {
|
|
|
|
+ case NEW:
|
|
|
|
+ return YarnApplicationState.NEW;
|
|
|
|
+ case NEW_SAVING:
|
|
|
|
+ return YarnApplicationState.NEW_SAVING;
|
|
|
|
+ case SUBMITTED:
|
|
|
|
+ return YarnApplicationState.SUBMITTED;
|
|
|
|
+ case ACCEPTED:
|
|
|
|
+ return YarnApplicationState.ACCEPTED;
|
|
|
|
+ case RUNNING:
|
|
|
|
+ return YarnApplicationState.RUNNING;
|
|
|
|
+ case FINISHING:
|
|
|
|
+ case FINISHED:
|
|
|
|
+ return YarnApplicationState.FINISHED;
|
|
|
|
+ case KILLED:
|
|
|
|
+ return YarnApplicationState.KILLED;
|
|
|
|
+ case FAILED:
|
|
|
|
+ return YarnApplicationState.FAILED;
|
|
|
|
+ default:
|
|
|
|
+ throw new YarnRuntimeException("Unknown state passed!");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void removeApplicationState(){
|
|
|
|
+ if (!isAppRemovalRequestSent) {
|
|
|
|
+ rmContext.getStateStore().removeApplication(this);
|
|
|
|
+ isAppRemovalRequestSent = true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|