|
@@ -80,6 +80,7 @@ public class YarnClientImpl extends YarnClient {
|
|
|
protected ApplicationClientProtocol rmClient;
|
|
|
protected long submitPollIntervalMillis;
|
|
|
private long asyncApiPollIntervalMillis;
|
|
|
+ private long asyncApiPollTimeoutMillis;
|
|
|
|
|
|
private static final String ROOT = "root";
|
|
|
|
|
@@ -93,6 +94,9 @@ public class YarnClientImpl extends YarnClient {
|
|
|
asyncApiPollIntervalMillis =
|
|
|
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
|
|
|
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
|
|
|
+ asyncApiPollTimeoutMillis =
|
|
|
+ conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
|
|
|
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS);
|
|
|
submitPollIntervalMillis = asyncApiPollIntervalMillis;
|
|
|
if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
|
|
|
!= null) {
|
|
@@ -152,13 +156,24 @@ public class YarnClientImpl extends YarnClient {
|
|
|
rmClient.submitApplication(request);
|
|
|
|
|
|
int pollCount = 0;
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
while (true) {
|
|
|
YarnApplicationState state =
|
|
|
getApplicationReport(applicationId).getYarnApplicationState();
|
|
|
if (!state.equals(YarnApplicationState.NEW) &&
|
|
|
!state.equals(YarnApplicationState.NEW_SAVING)) {
|
|
|
+ LOG.info("Submitted application " + applicationId);
|
|
|
break;
|
|
|
}
|
|
|
+
|
|
|
+ long elapsedMillis = System.currentTimeMillis() - startTime;
|
|
|
+ if (enforceAsyncAPITimeout() &&
|
|
|
+ elapsedMillis >= asyncApiPollTimeoutMillis) {
|
|
|
+ throw new YarnException("Timed out while waiting for application " +
|
|
|
+ applicationId + " to be submitted successfully");
|
|
|
+ }
|
|
|
+
|
|
|
// Notify the client through the log every 10 poll, in case the client
|
|
|
// is blocked here too long.
|
|
|
if (++pollCount % 10 == 0) {
|
|
@@ -169,10 +184,11 @@ public class YarnClientImpl extends YarnClient {
|
|
|
try {
|
|
|
Thread.sleep(submitPollIntervalMillis);
|
|
|
} catch (InterruptedException ie) {
|
|
|
+ LOG.error("Interrupted while waiting for application " + applicationId
|
|
|
+ + " to be successfully submitted.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- LOG.info("Submitted application " + applicationId);
|
|
|
return applicationId;
|
|
|
}
|
|
|
|
|
@@ -185,15 +201,25 @@ public class YarnClientImpl extends YarnClient {
|
|
|
|
|
|
try {
|
|
|
int pollCount = 0;
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
while (true) {
|
|
|
KillApplicationResponse response =
|
|
|
rmClient.forceKillApplication(request);
|
|
|
if (response.getIsKillCompleted()) {
|
|
|
+ LOG.info("Killed application " + applicationId);
|
|
|
break;
|
|
|
}
|
|
|
+
|
|
|
+ long elapsedMillis = System.currentTimeMillis() - startTime;
|
|
|
+ if (enforceAsyncAPITimeout() &&
|
|
|
+ elapsedMillis >= this.asyncApiPollTimeoutMillis) {
|
|
|
+ throw new YarnException("Timed out while waiting for application " +
|
|
|
+ applicationId + " to be killed.");
|
|
|
+ }
|
|
|
+
|
|
|
if (++pollCount % 10 == 0) {
|
|
|
- LOG.info("Watiting for application " + applicationId
|
|
|
- + " to be killed.");
|
|
|
+ LOG.info("Waiting for application " + applicationId + " to be killed.");
|
|
|
}
|
|
|
Thread.sleep(asyncApiPollIntervalMillis);
|
|
|
}
|
|
@@ -201,7 +227,11 @@ public class YarnClientImpl extends YarnClient {
|
|
|
LOG.error("Interrupted while waiting for application " + applicationId
|
|
|
+ " to be killed.");
|
|
|
}
|
|
|
- LOG.info("Killed application " + applicationId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean enforceAsyncAPITimeout() {
|
|
|
+ return asyncApiPollTimeoutMillis >= 0;
|
|
|
}
|
|
|
|
|
|
@Override
|