|
@@ -594,16 +594,50 @@ public class YARNRunner implements ClientProtocol {
|
|
|
.getTaskReports(jobID, taskType);
|
|
|
}
|
|
|
|
|
|
+ private void killUnFinishedApplication(ApplicationId appId)
|
|
|
+ throws IOException {
|
|
|
+ ApplicationReport application = null;
|
|
|
+ try {
|
|
|
+ application = resMgrDelegate.getApplicationReport(appId);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ if (application.getYarnApplicationState() == YarnApplicationState.FINISHED
|
|
|
+ || application.getYarnApplicationState() == YarnApplicationState.FAILED
|
|
|
+ || application.getYarnApplicationState() == YarnApplicationState.KILLED) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ killApplication(appId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void killApplication(ApplicationId appId) throws IOException {
|
|
|
+ try {
|
|
|
+ resMgrDelegate.killApplication(appId);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isJobInTerminalState(JobStatus status) {
|
|
|
+ return status.getState() == JobStatus.State.KILLED
|
|
|
+ || status.getState() == JobStatus.State.FAILED
|
|
|
+ || status.getState() == JobStatus.State.SUCCEEDED;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void killJob(JobID arg0) throws IOException, InterruptedException {
|
|
|
/* check if the status is not running, if not send kill to RM */
|
|
|
JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
|
|
|
+ ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
|
|
|
+
|
|
|
+ // get status from RM and return
|
|
|
+ if (status == null) {
|
|
|
+ killUnFinishedApplication(appId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if (status.getState() != JobStatus.State.RUNNING) {
|
|
|
- try {
|
|
|
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
|
|
|
- } catch (YarnException e) {
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
+ killApplication(appId);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -612,26 +646,26 @@ public class YARNRunner implements ClientProtocol {
|
|
|
clientCache.getClient(arg0).killJob(arg0);
|
|
|
long currentTimeMillis = System.currentTimeMillis();
|
|
|
long timeKillIssued = currentTimeMillis;
|
|
|
- while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
|
|
|
- != JobStatus.State.KILLED)) {
|
|
|
- try {
|
|
|
- Thread.sleep(1000L);
|
|
|
- } catch(InterruptedException ie) {
|
|
|
- /** interrupted, just break */
|
|
|
- break;
|
|
|
- }
|
|
|
- currentTimeMillis = System.currentTimeMillis();
|
|
|
- status = clientCache.getClient(arg0).getJobStatus(arg0);
|
|
|
+ while ((currentTimeMillis < timeKillIssued + 10000L)
|
|
|
+ && !isJobInTerminalState(status)) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000L);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ /** interrupted, just break */
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ currentTimeMillis = System.currentTimeMillis();
|
|
|
+ status = clientCache.getClient(arg0).getJobStatus(arg0);
|
|
|
+ if (status == null) {
|
|
|
+ killUnFinishedApplication(appId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
} catch(IOException io) {
|
|
|
LOG.debug("Error when checking for application status", io);
|
|
|
}
|
|
|
- if (status.getState() != JobStatus.State.KILLED) {
|
|
|
- try {
|
|
|
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
|
|
|
- } catch (YarnException e) {
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
+ if (status != null && !isJobInTerminalState(status)) {
|
|
|
+ killApplication(appId);
|
|
|
}
|
|
|
}
|
|
|
|