|
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.http.HttpConfig;
|
|
import org.apache.hadoop.http.HttpConfig;
|
|
|
|
+import org.apache.hadoop.util.ExitUtil;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
@@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
@@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
@@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
|
|
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
|
-public class RMAppAttemptImpl implements RMAppAttempt {
|
|
|
|
|
|
+public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
|
|
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
|
|
|
|
|
|
@@ -153,12 +160,15 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
|
|
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
|
|
RMAppAttemptEventType.REGISTERED,
|
|
RMAppAttemptEventType.REGISTERED,
|
|
new UnexpectedAMRegisteredTransition())
|
|
new UnexpectedAMRegisteredTransition())
|
|
|
|
+ .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED,
|
|
|
|
+ RMAppAttemptEventType.RECOVER)
|
|
|
|
|
|
// Transitions from SUBMITTED state
|
|
// Transitions from SUBMITTED state
|
|
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
|
|
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
|
|
RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
|
|
RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
|
|
.addTransition(RMAppAttemptState.SUBMITTED,
|
|
.addTransition(RMAppAttemptState.SUBMITTED,
|
|
- EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
|
|
|
|
|
|
+ EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
|
+ RMAppAttemptState.SCHEDULED),
|
|
RMAppAttemptEventType.APP_ACCEPTED,
|
|
RMAppAttemptEventType.APP_ACCEPTED,
|
|
new ScheduleTransition())
|
|
new ScheduleTransition())
|
|
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
|
|
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
|
|
@@ -170,12 +180,42 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
|
|
|
|
// Transitions from SCHEDULED State
|
|
// Transitions from SCHEDULED State
|
|
.addTransition(RMAppAttemptState.SCHEDULED,
|
|
.addTransition(RMAppAttemptState.SCHEDULED,
|
|
- RMAppAttemptState.ALLOCATED,
|
|
|
|
|
|
+ RMAppAttemptState.ALLOCATED_SAVING,
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
new AMContainerAllocatedTransition())
|
|
new AMContainerAllocatedTransition())
|
|
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
|
|
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
|
|
RMAppAttemptEventType.KILL,
|
|
RMAppAttemptEventType.KILL,
|
|
new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
|
+
|
|
|
|
+ // Transitions from ALLOCATED_SAVING State
|
|
|
|
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
|
+ RMAppAttemptState.ALLOCATED,
|
|
|
|
+ RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
|
|
|
|
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
|
+ RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
|
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
|
|
+ new ContainerAcquiredTransition())
|
|
|
|
+ // App could be killed by the client. So need to handle this.
|
|
|
|
+ .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
|
|
+ RMAppAttemptState.KILLED,
|
|
|
|
+ RMAppAttemptEventType.KILL,
|
|
|
|
+ new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
|
+
|
|
|
|
+ // Transitions from LAUNCHED_UNMANAGED_SAVING State
|
|
|
|
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
|
+ RMAppAttemptState.LAUNCHED,
|
|
|
|
+ RMAppAttemptEventType.ATTEMPT_SAVED,
|
|
|
|
+ new UnmanagedAMAttemptSavedTransition())
|
|
|
|
+ // attempt should not try to register in this state
|
|
|
|
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
|
+ RMAppAttemptState.FAILED,
|
|
|
|
+ RMAppAttemptEventType.REGISTERED,
|
|
|
|
+ new UnexpectedAMRegisteredTransition())
|
|
|
|
+ // App could be killed by the client. So need to handle this.
|
|
|
|
+ .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
|
|
|
+ RMAppAttemptState.KILLED,
|
|
|
|
+ RMAppAttemptEventType.KILL,
|
|
|
|
+ new BaseFinalTransition(RMAppAttemptState.KILLED))
|
|
|
|
|
|
// Transitions from ALLOCATED State
|
|
// Transitions from ALLOCATED State
|
|
.addTransition(RMAppAttemptState.ALLOCATED,
|
|
.addTransition(RMAppAttemptState.ALLOCATED,
|
|
@@ -279,11 +319,30 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
RMAppAttemptEventType.EXPIRE,
|
|
RMAppAttemptEventType.EXPIRE,
|
|
RMAppAttemptEventType.REGISTERED,
|
|
RMAppAttemptEventType.REGISTERED,
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
|
|
+ RMAppAttemptEventType.ATTEMPT_SAVED,
|
|
|
|
+ RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
|
|
+ RMAppAttemptEventType.UNREGISTERED,
|
|
|
|
+ RMAppAttemptEventType.KILL,
|
|
|
|
+ RMAppAttemptEventType.STATUS_UPDATE))
|
|
|
|
+
|
|
|
|
+ // Transitions from RECOVERED State
|
|
|
|
+ .addTransition(
|
|
|
|
+ RMAppAttemptState.RECOVERED,
|
|
|
|
+ RMAppAttemptState.RECOVERED,
|
|
|
|
+ EnumSet.of(RMAppAttemptEventType.START,
|
|
|
|
+ RMAppAttemptEventType.APP_ACCEPTED,
|
|
|
|
+ RMAppAttemptEventType.APP_REJECTED,
|
|
|
|
+ RMAppAttemptEventType.EXPIRE,
|
|
|
|
+ RMAppAttemptEventType.LAUNCHED,
|
|
|
|
+ RMAppAttemptEventType.LAUNCH_FAILED,
|
|
|
|
+ RMAppAttemptEventType.REGISTERED,
|
|
|
|
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
|
|
|
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
|
|
+ RMAppAttemptEventType.ATTEMPT_SAVED,
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
|
RMAppAttemptEventType.UNREGISTERED,
|
|
RMAppAttemptEventType.UNREGISTERED,
|
|
RMAppAttemptEventType.KILL,
|
|
RMAppAttemptEventType.KILL,
|
|
RMAppAttemptEventType.STATUS_UPDATE))
|
|
RMAppAttemptEventType.STATUS_UPDATE))
|
|
-
|
|
|
|
.installTopology();
|
|
.installTopology();
|
|
|
|
|
|
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
|
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
|
@@ -318,7 +377,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
@Override
|
|
@Override
|
|
public ApplicationSubmissionContext getSubmissionContext() {
|
|
public ApplicationSubmissionContext getSubmissionContext() {
|
|
return this.submissionContext;
|
|
return this.submissionContext;
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public FinalApplicationStatus getFinalApplicationStatus() {
|
|
public FinalApplicationStatus getFinalApplicationStatus() {
|
|
@@ -494,6 +553,10 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void setMasterContainer(Container container) {
|
|
|
|
+ masterContainer = container;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void handle(RMAppAttemptEvent event) {
|
|
public void handle(RMAppAttemptEvent event) {
|
|
|
|
|
|
@@ -561,6 +624,21 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void recover(RMState state) {
|
|
|
|
+ ApplicationState appState =
|
|
|
|
+ state.getApplicationState().get(getAppAttemptId().getApplicationId());
|
|
|
|
+ ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
|
|
|
|
+ assert attemptState != null;
|
|
|
|
+ setMasterContainer(attemptState.getMasterContainer());
|
|
|
|
+ LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
|
|
|
|
+ + " AttemptId: " + getAppAttemptId()
|
|
|
|
+ + " MasterContainer: " + masterContainer);
|
|
|
|
+ setDiagnostics("Attempt recovered after RM restart");
|
|
|
|
+ handle(new RMAppAttemptEvent(getAppAttemptId(),
|
|
|
|
+ RMAppAttemptEventType.RECOVER));
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class BaseTransition implements
|
|
private static class BaseTransition implements
|
|
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
|
|
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
|
|
|
|
|
|
@@ -625,13 +703,12 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
@Override
|
|
@Override
|
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|
RMAppAttemptEvent event) {
|
|
RMAppAttemptEvent event) {
|
|
-
|
|
|
|
- // Send the acceptance to the app
|
|
|
|
- appAttempt.eventHandler.handle(new RMAppEvent(event
|
|
|
|
- .getApplicationAttemptId().getApplicationId(),
|
|
|
|
- RMAppEventType.APP_ACCEPTED));
|
|
|
|
-
|
|
|
|
if (!appAttempt.submissionContext.getUnmanagedAM()) {
|
|
if (!appAttempt.submissionContext.getUnmanagedAM()) {
|
|
|
|
+ // Send the acceptance to the app
|
|
|
|
+ appAttempt.eventHandler.handle(new RMAppEvent(event
|
|
|
|
+ .getApplicationAttemptId().getApplicationId(),
|
|
|
|
+ RMAppEventType.APP_ACCEPTED));
|
|
|
|
+
|
|
// Request a container for the AM.
|
|
// Request a container for the AM.
|
|
ResourceRequest request = BuilderUtils.newResourceRequest(
|
|
ResourceRequest request = BuilderUtils.newResourceRequest(
|
|
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
|
|
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
|
|
@@ -647,35 +724,42 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
return RMAppAttemptState.SCHEDULED;
|
|
return RMAppAttemptState.SCHEDULED;
|
|
} else {
|
|
} else {
|
|
// RM not allocating container. AM is self launched.
|
|
// RM not allocating container. AM is self launched.
|
|
- // Directly go to LAUNCHED state
|
|
|
|
- // Register with AMLivelinessMonitor
|
|
|
|
- appAttempt.rmContext.getAMLivelinessMonitor().register(
|
|
|
|
- appAttempt.applicationAttemptId);
|
|
|
|
- return RMAppAttemptState.LAUNCHED;
|
|
|
|
|
|
+ RMStateStore store = appAttempt.rmContext.getStateStore();
|
|
|
|
+ // save state and then go to LAUNCHED state
|
|
|
|
+ appAttempt.storeAttempt(store);
|
|
|
|
+ return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static final class AMContainerAllocatedTransition extends BaseTransition {
|
|
|
|
|
|
+ private static final class AMContainerAllocatedTransition
|
|
|
|
+ extends BaseTransition {
|
|
@Override
|
|
@Override
|
|
public void transition(RMAppAttemptImpl appAttempt,
|
|
public void transition(RMAppAttemptImpl appAttempt,
|
|
- RMAppAttemptEvent event) {
|
|
|
|
-
|
|
|
|
|
|
+ RMAppAttemptEvent event) {
|
|
// Acquire the AM container from the scheduler.
|
|
// Acquire the AM container from the scheduler.
|
|
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
|
|
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
|
|
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
|
|
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
|
|
EMPTY_CONTAINER_RELEASE_LIST);
|
|
EMPTY_CONTAINER_RELEASE_LIST);
|
|
|
|
|
|
// Set the masterContainer
|
|
// Set the masterContainer
|
|
- appAttempt.masterContainer = amContainerAllocation.getContainers().get(
|
|
|
|
- 0);
|
|
|
|
|
|
+ appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
|
|
|
|
+ 0));
|
|
|
|
|
|
- // Send event to launch the AM Container
|
|
|
|
- appAttempt.eventHandler.handle(new AMLauncherEvent(
|
|
|
|
- AMLauncherEventType.LAUNCH, appAttempt));
|
|
|
|
|
|
+ RMStateStore store = appAttempt.rmContext.getStateStore();
|
|
|
|
+ appAttempt.storeAttempt(store);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private static final class AttemptStoredTransition extends BaseTransition {
|
|
|
|
+ @Override
|
|
|
|
+ public void transition(RMAppAttemptImpl appAttempt,
|
|
|
|
+ RMAppAttemptEvent event) {
|
|
|
|
+ appAttempt.checkAttemptStoreError(event);
|
|
|
|
+ appAttempt.launchAttempt();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class BaseFinalTransition extends BaseTransition {
|
|
private static class BaseFinalTransition extends BaseTransition {
|
|
|
|
|
|
private final RMAppAttemptState finalAttemptState;
|
|
private final RMAppAttemptState finalAttemptState;
|
|
@@ -736,17 +820,34 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static final class AMLaunchedTransition extends BaseTransition {
|
|
|
|
|
|
+ private static class AMLaunchedTransition extends BaseTransition {
|
|
@Override
|
|
@Override
|
|
public void transition(RMAppAttemptImpl appAttempt,
|
|
public void transition(RMAppAttemptImpl appAttempt,
|
|
- RMAppAttemptEvent event) {
|
|
|
|
-
|
|
|
|
|
|
+ RMAppAttemptEvent event) {
|
|
// Register with AMLivelinessMonitor
|
|
// Register with AMLivelinessMonitor
|
|
- appAttempt.rmContext.getAMLivelinessMonitor().register(
|
|
|
|
- appAttempt.applicationAttemptId);
|
|
|
|
-
|
|
|
|
|
|
+ appAttempt.attemptLaunched();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static final class UnmanagedAMAttemptSavedTransition
|
|
|
|
+ extends AMLaunchedTransition {
|
|
|
|
+ @Override
|
|
|
|
+ public void transition(RMAppAttemptImpl appAttempt,
|
|
|
|
+ RMAppAttemptEvent event) {
|
|
|
|
+ appAttempt.checkAttemptStoreError(event);
|
|
|
|
+ // Send the acceptance to the app
|
|
|
|
+ // Ideally this should have been done when the scheduler accepted the app.
|
|
|
|
+ // But its here because until the attempt is saved the client should not
|
|
|
|
+ // launch the unmanaged AM. Client waits for the app status to be accepted
|
|
|
|
+ // before doing so. So we have to delay the accepted state until we have
|
|
|
|
+ // completed storing the attempt
|
|
|
|
+ appAttempt.eventHandler.handle(new RMAppEvent(event
|
|
|
|
+ .getApplicationAttemptId().getApplicationId(),
|
|
|
|
+ RMAppEventType.APP_ACCEPTED));
|
|
|
|
+
|
|
|
|
+ super.transition(appAttempt, event);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
private static final class LaunchFailedTransition extends BaseFinalTransition {
|
|
private static final class LaunchFailedTransition extends BaseFinalTransition {
|
|
|
|
|
|
@@ -1040,4 +1141,37 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|
this.readLock.unlock();
|
|
this.readLock.unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void launchAttempt(){
|
|
|
|
+ // Send event to launch the AM Container
|
|
|
|
+ eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void attemptLaunched() {
|
|
|
|
+ // Register with AMLivelinessMonitor
|
|
|
|
+ rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void checkAttemptStoreError(RMAppAttemptEvent event) {
|
|
|
|
+ RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
|
|
|
|
+ if(storeEvent.getStoredException() != null)
|
|
|
|
+ {
|
|
|
|
+ // This needs to be handled for HA and give up master status if we got
|
|
|
|
+ // fenced
|
|
|
|
+ LOG.error("Failed to store attempt: " + getAppAttemptId(),
|
|
|
|
+ storeEvent.getStoredException());
|
|
|
|
+ ExitUtil.terminate(1, storeEvent.getStoredException());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void storeAttempt(RMStateStore store) {
|
|
|
|
+ // store attempt data in a non-blocking manner to prevent dispatcher
|
|
|
|
+ // thread starvation and wait for state to be saved
|
|
|
|
+ LOG.info("Storing attempt: AppId: " +
|
|
|
|
+ getAppAttemptId().getApplicationId()
|
|
|
|
+ + " AttemptId: " +
|
|
|
|
+ getAppAttemptId()
|
|
|
|
+ + " MasterContainer: " + masterContainer);
|
|
|
|
+ store.storeApplicationAttempt(this);
|
|
|
|
+ }
|
|
}
|
|
}
|