|
@@ -117,10 +117,13 @@ public class ApiServer {
|
|
|
@Override
|
|
|
public Void run() throws YarnException, IOException {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- sc.actionBuild(service);
|
|
|
- sc.close();
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ sc.actionBuild(service);
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -132,11 +135,14 @@ public class ApiServer {
|
|
|
@Override
|
|
|
public ApplicationId run() throws IOException, YarnException {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- ApplicationId applicationId = sc.actionCreate(service);
|
|
|
- sc.close();
|
|
|
- return applicationId;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ ApplicationId applicationId = sc.actionCreate(service);
|
|
|
+ return applicationId;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
serviceStatus.setDiagnostics("Application ID: " + applicationId);
|
|
@@ -244,29 +250,32 @@ public class ApiServer {
|
|
|
public Integer run() throws Exception {
|
|
|
int result = 0;
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- Exception stopException = null;
|
|
|
try {
|
|
|
- result = sc.actionStop(appName, destroy);
|
|
|
- if (result == EXIT_SUCCESS) {
|
|
|
- LOG.info("Successfully stopped service {}", appName);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info("Got exception stopping service", e);
|
|
|
- stopException = e;
|
|
|
- }
|
|
|
- if (destroy) {
|
|
|
- result = sc.actionDestroy(appName);
|
|
|
- if (result == EXIT_SUCCESS) {
|
|
|
- LOG.info("Successfully deleted service {}", appName);
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ Exception stopException = null;
|
|
|
+ try {
|
|
|
+ result = sc.actionStop(appName, destroy);
|
|
|
+ if (result == EXIT_SUCCESS) {
|
|
|
+ LOG.info("Successfully stopped service {}", appName);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Got exception stopping service", e);
|
|
|
+ stopException = e;
|
|
|
}
|
|
|
- } else {
|
|
|
- if (stopException != null) {
|
|
|
- throw stopException;
|
|
|
+ if (destroy) {
|
|
|
+ result = sc.actionDestroy(appName);
|
|
|
+ if (result == EXIT_SUCCESS) {
|
|
|
+ LOG.info("Successfully deleted service {}", appName);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (stopException != null) {
|
|
|
+ throw stopException;
|
|
|
+ }
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
}
|
|
|
- sc.close();
|
|
|
return result;
|
|
|
}
|
|
|
});
|
|
@@ -377,13 +386,16 @@ public class ApiServer {
|
|
|
@Override
|
|
|
public Map<String, Long> run() throws YarnException, IOException {
|
|
|
ServiceClient sc = new ServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- Map<String, Long> original = sc.flexByRestService(appName,
|
|
|
- Collections.singletonMap(componentName,
|
|
|
- component.getNumberOfContainers()));
|
|
|
- sc.close();
|
|
|
- return original;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ Map<String, Long> original = sc.flexByRestService(appName,
|
|
|
+ Collections.singletonMap(componentName,
|
|
|
+ component.getNumberOfContainers()));
|
|
|
+ return original;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
ServiceStatus status = new ServiceStatus();
|
|
@@ -625,12 +637,15 @@ public class ApiServer {
|
|
|
public Integer run() throws YarnException, IOException {
|
|
|
int result = 0;
|
|
|
ServiceClient sc = new ServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- result = sc
|
|
|
- .actionFlex(appName, componentCountStrings);
|
|
|
- sc.close();
|
|
|
- return Integer.valueOf(result);
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ result = sc
|
|
|
+ .actionFlex(appName, componentCountStrings);
|
|
|
+ return Integer.valueOf(result);
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
if (result == EXIT_SUCCESS) {
|
|
@@ -651,12 +666,15 @@ public class ApiServer {
|
|
|
@Override
|
|
|
public String run() throws YarnException, IOException {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- String newLifeTime = sc.updateLifetime(appName,
|
|
|
- updateAppData.getLifetime());
|
|
|
- sc.close();
|
|
|
- return newLifeTime;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ String newLifeTime = sc.updateLifetime(appName,
|
|
|
+ updateAppData.getLifetime());
|
|
|
+ return newLifeTime;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
ServiceStatus status = new ServiceStatus();
|
|
@@ -674,11 +692,14 @@ public class ApiServer {
|
|
|
@Override public ApplicationId run()
|
|
|
throws YarnException, IOException {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- ApplicationId appId = sc.actionStartAndGetId(appName);
|
|
|
- sc.close();
|
|
|
- return appId;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ ApplicationId appId = sc.actionStartAndGetId(appName);
|
|
|
+ return appId;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
LOG.info("Successfully started service " + appName);
|
|
@@ -695,14 +716,17 @@ public class ApiServer {
|
|
|
ServiceStatus status = new ServiceStatus();
|
|
|
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
|
|
|
- sc.actionUpgradeExpress(service);
|
|
|
- } else {
|
|
|
- sc.initiateUpgrade(service);
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
|
|
|
+ sc.actionUpgradeExpress(service);
|
|
|
+ } else {
|
|
|
+ sc.initiateUpgrade(service);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
}
|
|
|
- sc.close();
|
|
|
return null;
|
|
|
});
|
|
|
LOG.info("Service {} version {} upgrade initialized", service.getName(),
|
|
@@ -717,11 +741,14 @@ public class ApiServer {
|
|
|
final UserGroupInformation ugi) throws IOException, InterruptedException {
|
|
|
int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- int exitCode = sc.actionCancelUpgrade(serviceName);
|
|
|
- sc.close();
|
|
|
- return exitCode;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ int exitCode = sc.actionCancelUpgrade(serviceName);
|
|
|
+ return exitCode;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
});
|
|
|
if (result == EXIT_SUCCESS) {
|
|
|
ServiceStatus status = new ServiceStatus();
|
|
@@ -786,10 +813,13 @@ public class ApiServer {
|
|
|
return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
|
|
int result1;
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- result1 = sc.actionUpgrade(service, containers);
|
|
|
- sc.close();
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ result1 = sc.actionUpgrade(service, containers);
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
return result1;
|
|
|
});
|
|
|
}
|
|
@@ -799,11 +829,14 @@ public class ApiServer {
|
|
|
|
|
|
return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- Service app1 = sc.getStatus(serviceName);
|
|
|
- sc.close();
|
|
|
- return app1;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ Service app1 = sc.getStatus(serviceName);
|
|
|
+ return app1;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -814,12 +847,15 @@ public class ApiServer {
|
|
|
return ugi.doAs((PrivilegedExceptionAction<Container[]>) () -> {
|
|
|
Container[] result;
|
|
|
ServiceClient sc = getServiceClient();
|
|
|
- sc.init(YARN_CONFIG);
|
|
|
- sc.start();
|
|
|
- result = sc.getContainers(serviceName, componentNames, version,
|
|
|
- containerStates);
|
|
|
- sc.close();
|
|
|
- return result;
|
|
|
+ try {
|
|
|
+ sc.init(YARN_CONFIG);
|
|
|
+ sc.start();
|
|
|
+ result = sc.getContainers(serviceName, componentNames, version,
|
|
|
+ containerStates);
|
|
|
+ return result;
|
|
|
+ } finally {
|
|
|
+ sc.close();
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|