|
@@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
@@ -90,7 +92,6 @@ public class UnmanagedApplicationManager {
|
|
|
private AMRequestHandlerThread handlerThread;
|
|
|
private ApplicationMasterProtocol rmProxy;
|
|
|
private ApplicationId applicationId;
|
|
|
- private ApplicationAttemptId attemptId;
|
|
|
private String submitter;
|
|
|
private String appNameSuffix;
|
|
|
private Configuration conf;
|
|
@@ -101,9 +102,31 @@ public class UnmanagedApplicationManager {
|
|
|
private ApplicationClientProtocol rmClient;
|
|
|
private long asyncApiPollIntervalMillis;
|
|
|
private RecordFactory recordFactory;
|
|
|
+ private boolean keepContainersAcrossApplicationAttempts;
|
|
|
|
|
|
+ /*
|
|
|
+ * This flag is used as an indication that this method launchUAM/reAttachUAM
|
|
|
+ * is called (and perhaps blocked in initializeUnmanagedAM below due to RM
|
|
|
+ * connection/failover issue and not finished yet). Set the flag before
|
|
|
+ * calling the blocking call to RM.
|
|
|
+ */
|
|
|
+ private boolean connectionInitiated;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Constructor.
|
|
|
+ *
|
|
|
+ * @param conf configuration
|
|
|
+ * @param appId application Id to use for this UAM
|
|
|
+ * @param queueName the queue of the UAM
|
|
|
+ * @param submitter user name of the app
|
|
|
+ * @param appNameSuffix the app name suffix to use
|
|
|
+ * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
|
|
|
+ * recovery. See {@link ApplicationSubmissionContext
|
|
|
+ * #setKeepContainersAcrossApplicationAttempts(boolean)}
|
|
|
+ */
|
|
|
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
|
|
|
- String queueName, String submitter, String appNameSuffix) {
|
|
|
+ String queueName, String submitter, String appNameSuffix,
|
|
|
+ boolean keepContainersAcrossApplicationAttempts) {
|
|
|
Preconditions.checkNotNull(conf, "Configuration cannot be null");
|
|
|
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
|
|
|
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
|
|
@@ -116,6 +139,7 @@ public class UnmanagedApplicationManager {
|
|
|
this.handlerThread = new AMRequestHandlerThread();
|
|
|
this.requestQueue = new LinkedBlockingQueue<>();
|
|
|
this.rmProxy = null;
|
|
|
+ this.connectionInitiated = false;
|
|
|
this.registerRequest = null;
|
|
|
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
|
|
this.asyncApiPollIntervalMillis = conf.getLong(
|
|
@@ -123,45 +147,84 @@ public class UnmanagedApplicationManager {
|
|
|
YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
|
|
|
YarnConfiguration.
|
|
|
DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
|
|
|
+ this.keepContainersAcrossApplicationAttempts =
|
|
|
+ keepContainersAcrossApplicationAttempts;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Launch a new UAM in the resource manager.
|
|
|
+ *
|
|
|
+ * @return identifier uam identifier
|
|
|
+ * @throws YarnException if fails
|
|
|
+ * @throws IOException if fails
|
|
|
+ */
|
|
|
+ public Token<AMRMTokenIdentifier> launchUAM()
|
|
|
+ throws YarnException, IOException {
|
|
|
+ this.connectionInitiated = true;
|
|
|
+
|
|
|
+ // Blocking call to RM
|
|
|
+ Token<AMRMTokenIdentifier> amrmToken =
|
|
|
+ initializeUnmanagedAM(this.applicationId);
|
|
|
+
|
|
|
+ // Creates the UAM connection
|
|
|
+ createUAMProxy(amrmToken);
|
|
|
+ return amrmToken;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Re-attach to an existing UAM in the resource manager.
|
|
|
+ *
|
|
|
+ * @param amrmToken the UAM token
|
|
|
+ * @throws IOException if re-attach fails
|
|
|
+ * @throws YarnException if re-attach fails
|
|
|
+ */
|
|
|
+ public void reAttachUAM(Token<AMRMTokenIdentifier> amrmToken)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ this.connectionInitiated = true;
|
|
|
+
|
|
|
+ // Creates the UAM connection
|
|
|
+ createUAMProxy(amrmToken);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken)
|
|
|
+ throws IOException {
|
|
|
+ this.userUgi = UserGroupInformation.createProxyUser(
|
|
|
+ this.applicationId.toString(), UserGroupInformation.getCurrentUser());
|
|
|
+ this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
|
|
|
+ this.userUgi, amrmToken);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Registers this {@link UnmanagedApplicationManager} with the resource
|
|
|
* manager.
|
|
|
*
|
|
|
- * @param request the register request
|
|
|
- * @return the register response
|
|
|
+ * @param request RegisterApplicationMasterRequest
|
|
|
+ * @return register response
|
|
|
* @throws YarnException if register fails
|
|
|
* @throws IOException if register fails
|
|
|
*/
|
|
|
- public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
|
|
|
+ public RegisterApplicationMasterResponse registerApplicationMaster(
|
|
|
RegisterApplicationMasterRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- // This need to be done first in this method, because it is used as an
|
|
|
- // indication that this method is called (and perhaps blocked due to RM
|
|
|
- // connection and not finished yet)
|
|
|
+ // Save the register request for re-register later
|
|
|
this.registerRequest = request;
|
|
|
|
|
|
- // attemptId will be available after this call
|
|
|
- UnmanagedAMIdentifier identifier =
|
|
|
- initializeUnmanagedAM(this.applicationId);
|
|
|
-
|
|
|
- try {
|
|
|
- this.userUgi = UserGroupInformation.createProxyUser(
|
|
|
- identifier.getAttemptId().toString(),
|
|
|
- UserGroupInformation.getCurrentUser());
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Exception while trying to get current user", e);
|
|
|
- throw new YarnRuntimeException(e);
|
|
|
- }
|
|
|
-
|
|
|
- this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
|
|
|
- this.userUgi, identifier.getToken());
|
|
|
-
|
|
|
- LOG.info("Registering the Unmanaged application master {}", this.attemptId);
|
|
|
+ // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
|
|
|
+ // We do not expect application already registered exception here
|
|
|
+ LOG.info("Registering the Unmanaged application master {}",
|
|
|
+ this.applicationId);
|
|
|
RegisterApplicationMasterResponse response =
|
|
|
this.rmProxy.registerApplicationMaster(this.registerRequest);
|
|
|
|
|
|
+ for (Container container : response.getContainersFromPreviousAttempts()) {
|
|
|
+ LOG.info("RegisterUAM returned existing running container "
|
|
|
+ + container.getId());
|
|
|
+ }
|
|
|
+ for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
|
|
|
+ LOG.info("RegisterUAM returned existing NM token for node "
|
|
|
+ + nmToken.getNodeId());
|
|
|
+ }
|
|
|
+
|
|
|
// Only when register succeed that we start the heartbeat thread
|
|
|
this.handlerThread.setUncaughtExceptionHandler(
|
|
|
new HeartBeatThreadUncaughtExceptionHandler());
|
|
@@ -187,11 +250,11 @@ public class UnmanagedApplicationManager {
|
|
|
this.handlerThread.shutdown();
|
|
|
|
|
|
if (this.rmProxy == null) {
|
|
|
- if (this.registerRequest != null) {
|
|
|
- // This is possible if the async registerApplicationMaster is still
|
|
|
+ if (this.connectionInitiated) {
|
|
|
+ // This is possible if the async launchUAM is still
|
|
|
// blocked and retrying. Return a dummy response in this case.
|
|
|
LOG.warn("Unmanaged AM still not successfully launched/registered yet."
|
|
|
- + " Stopping the UAM client thread anyways.");
|
|
|
+ + " Stopping the UAM heartbeat thread anyways.");
|
|
|
return FinishApplicationMasterResponse.newInstance(false);
|
|
|
} else {
|
|
|
throw new YarnException("finishApplicationMaster should not "
|
|
@@ -199,7 +262,7 @@ public class UnmanagedApplicationManager {
|
|
|
}
|
|
|
}
|
|
|
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
|
|
|
- this.registerRequest, this.attemptId);
|
|
|
+ this.registerRequest, this.applicationId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -212,7 +275,7 @@ public class UnmanagedApplicationManager {
|
|
|
public KillApplicationResponse forceKillApplication()
|
|
|
throws IOException, YarnException {
|
|
|
KillApplicationRequest request =
|
|
|
- KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
|
|
|
+ KillApplicationRequest.newInstance(this.applicationId);
|
|
|
|
|
|
this.handlerThread.shutdown();
|
|
|
|
|
@@ -240,29 +303,29 @@ public class UnmanagedApplicationManager {
|
|
|
LOG.debug("Interrupted while waiting to put on response queue", ex);
|
|
|
}
|
|
|
// Two possible cases why the UAM is not successfully registered yet:
|
|
|
- // 1. registerApplicationMaster is not called at all. Should throw here.
|
|
|
- // 2. registerApplicationMaster is called but hasn't successfully returned.
|
|
|
+ // 1. launchUAM is not called at all. Should throw here.
|
|
|
+ // 2. launchUAM is called but hasn't successfully returned.
|
|
|
//
|
|
|
// In case 2, we have already save the allocate request above, so if the
|
|
|
// registration succeed later, no request is lost.
|
|
|
if (this.rmProxy == null) {
|
|
|
- if (this.registerRequest != null) {
|
|
|
+ if (this.connectionInitiated) {
|
|
|
LOG.info("Unmanaged AM still not successfully launched/registered yet."
|
|
|
+ " Saving the allocate request and send later.");
|
|
|
} else {
|
|
|
throw new YarnException(
|
|
|
- "AllocateAsync should not be called before createAndRegister");
|
|
|
+ "AllocateAsync should not be called before launchUAM");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the application attempt id of the UAM.
|
|
|
+ * Returns the application id of the UAM.
|
|
|
*
|
|
|
- * @return attempt id of the UAM
|
|
|
+ * @return application id of the UAM
|
|
|
*/
|
|
|
- public ApplicationAttemptId getAttemptId() {
|
|
|
- return this.attemptId;
|
|
|
+ public ApplicationId getAppId() {
|
|
|
+ return this.applicationId;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -287,15 +350,15 @@ public class UnmanagedApplicationManager {
|
|
|
* Launch and initialize an unmanaged AM. First, it creates a new application
|
|
|
* on the RM and negotiates a new attempt id. Then it waits for the RM
|
|
|
* application attempt state to reach YarnApplicationAttemptState.LAUNCHED
|
|
|
- * after which it returns the AM-RM token and the attemptId.
|
|
|
+ * after which it returns the AM-RM token.
|
|
|
*
|
|
|
* @param appId application id
|
|
|
- * @return the UAM identifier
|
|
|
+ * @return the UAM token
|
|
|
* @throws IOException if initialize fails
|
|
|
* @throws YarnException if initialize fails
|
|
|
*/
|
|
|
- protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
|
|
|
- throws IOException, YarnException {
|
|
|
+ protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
|
|
|
+ ApplicationId appId) throws IOException, YarnException {
|
|
|
try {
|
|
|
UserGroupInformation appSubmitter =
|
|
|
UserGroupInformation.createRemoteUser(this.submitter);
|
|
@@ -306,13 +369,12 @@ public class UnmanagedApplicationManager {
|
|
|
submitUnmanagedApp(appId);
|
|
|
|
|
|
// Monitor the application attempt to wait for launch state
|
|
|
- ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
|
|
|
+ monitorCurrentAppAttempt(appId,
|
|
|
EnumSet.of(YarnApplicationState.ACCEPTED,
|
|
|
YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
|
|
|
YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
|
|
|
YarnApplicationAttemptState.LAUNCHED);
|
|
|
- this.attemptId = attemptReport.getApplicationAttemptId();
|
|
|
- return getUAMIdentifier();
|
|
|
+ return getUAMToken();
|
|
|
} finally {
|
|
|
this.rmClient = null;
|
|
|
}
|
|
@@ -343,6 +405,8 @@ public class UnmanagedApplicationManager {
|
|
|
submitRequest.setApplicationSubmissionContext(context);
|
|
|
|
|
|
context.setUnmanagedAM(true);
|
|
|
+ context.setKeepContainersAcrossApplicationAttempts(
|
|
|
+ this.keepContainersAcrossApplicationAttempts);
|
|
|
|
|
|
LOG.info("Submitting unmanaged application {}", appId);
|
|
|
this.rmClient.submitApplication(submitRequest);
|
|
@@ -374,8 +438,10 @@ public class UnmanagedApplicationManager {
|
|
|
if (appStates.contains(state)) {
|
|
|
if (state != YarnApplicationState.ACCEPTED) {
|
|
|
throw new YarnRuntimeException(
|
|
|
- "Received non-accepted application state: " + state
|
|
|
- + ". Application " + appId + " not the first attempt?");
|
|
|
+ "Received non-accepted application state: " + state + " for "
|
|
|
+ + appId + ". This is likely because this is not the first "
|
|
|
+ + "app attempt in home sub-cluster, and AMRMProxy HA "
|
|
|
+ + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled.");
|
|
|
}
|
|
|
appAttemptId =
|
|
|
getApplicationReport(appId).getCurrentApplicationAttemptId();
|
|
@@ -415,25 +481,25 @@ public class UnmanagedApplicationManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the identifier of the unmanaged AM.
|
|
|
+ * Gets the amrmToken of the unmanaged AM.
|
|
|
*
|
|
|
- * @return the identifier of the unmanaged AM.
|
|
|
+ * @return the amrmToken of the unmanaged AM.
|
|
|
* @throws IOException if getApplicationReport fails
|
|
|
* @throws YarnException if getApplicationReport fails
|
|
|
*/
|
|
|
- protected UnmanagedAMIdentifier getUAMIdentifier()
|
|
|
+ protected Token<AMRMTokenIdentifier> getUAMToken()
|
|
|
throws IOException, YarnException {
|
|
|
Token<AMRMTokenIdentifier> token = null;
|
|
|
org.apache.hadoop.yarn.api.records.Token amrmToken =
|
|
|
- getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
|
|
|
+ getApplicationReport(this.applicationId).getAMRMToken();
|
|
|
if (amrmToken != null) {
|
|
|
token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
|
|
|
} else {
|
|
|
LOG.warn(
|
|
|
"AMRMToken not found in the application report for application: {}",
|
|
|
- this.attemptId.getApplicationId());
|
|
|
+ this.applicationId);
|
|
|
}
|
|
|
- return new UnmanagedAMIdentifier(this.attemptId, token);
|
|
|
+ return token;
|
|
|
}
|
|
|
|
|
|
private ApplicationReport getApplicationReport(ApplicationId appId)
|
|
@@ -444,29 +510,6 @@ public class UnmanagedApplicationManager {
|
|
|
return this.rmClient.getApplicationReport(request).getApplicationReport();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Data structure that encapsulates the application attempt identifier and the
|
|
|
- * AMRMTokenIdentifier. Make it public because clients with HA need it.
|
|
|
- */
|
|
|
- public static class UnmanagedAMIdentifier {
|
|
|
- private ApplicationAttemptId attemptId;
|
|
|
- private Token<AMRMTokenIdentifier> token;
|
|
|
-
|
|
|
- public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
|
|
|
- Token<AMRMTokenIdentifier> token) {
|
|
|
- this.attemptId = attemptId;
|
|
|
- this.token = token;
|
|
|
- }
|
|
|
-
|
|
|
- public ApplicationAttemptId getAttemptId() {
|
|
|
- return this.attemptId;
|
|
|
- }
|
|
|
-
|
|
|
- public Token<AMRMTokenIdentifier> getToken() {
|
|
|
- return this.token;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Data structure that encapsulates AllocateRequest and AsyncCallback
|
|
|
* instance.
|
|
@@ -549,8 +592,10 @@ public class UnmanagedApplicationManager {
|
|
|
}
|
|
|
|
|
|
request.setResponseId(lastResponseId);
|
|
|
+
|
|
|
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
|
|
|
- request, rmProxy, registerRequest, attemptId);
|
|
|
+ request, rmProxy, registerRequest, applicationId);
|
|
|
+
|
|
|
if (response == null) {
|
|
|
throw new YarnException("Null allocateResponse from allocate");
|
|
|
}
|
|
@@ -578,18 +623,17 @@ public class UnmanagedApplicationManager {
|
|
|
LOG.debug("Interrupted while waiting for queue", ex);
|
|
|
}
|
|
|
} catch (IOException ex) {
|
|
|
- LOG.warn(
|
|
|
- "IO Error occurred while processing heart beat for " + attemptId,
|
|
|
- ex);
|
|
|
+ LOG.warn("IO Error occurred while processing heart beat for "
|
|
|
+ + applicationId, ex);
|
|
|
} catch (Throwable ex) {
|
|
|
LOG.warn(
|
|
|
- "Error occurred while processing heart beat for " + attemptId,
|
|
|
+ "Error occurred while processing heart beat for " + applicationId,
|
|
|
ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
LOG.info("UnmanagedApplicationManager has been stopped for {}. "
|
|
|
- + "AMRequestHandlerThread thread is exiting", attemptId);
|
|
|
+ + "AMRequestHandlerThread thread is exiting", applicationId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -600,8 +644,8 @@ public class UnmanagedApplicationManager {
|
|
|
implements UncaughtExceptionHandler {
|
|
|
@Override
|
|
|
public void uncaughtException(Thread t, Throwable e) {
|
|
|
- LOG.error("Heartbeat thread {} for application attempt {} crashed!",
|
|
|
- t.getName(), attemptId, e);
|
|
|
+ LOG.error("Heartbeat thread {} for application {} crashed!",
|
|
|
+ t.getName(), applicationId, e);
|
|
|
}
|
|
|
}
|
|
|
}
|