|
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
@@ -61,8 +62,9 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
|
|
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
|
|
|
import org.apache.hadoop.yarn.service.ClientAMProtocol;
|
|
|
import org.apache.hadoop.yarn.service.ServiceMaster;
|
|
|
-import org.apache.hadoop.yarn.service.api.records.Application;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Service;
|
|
|
import org.apache.hadoop.yarn.service.api.records.Component;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
|
|
import org.apache.hadoop.yarn.service.client.params.AbstractClusterBuildingActionArgs;
|
|
|
import org.apache.hadoop.yarn.service.client.params.ActionDependencyArgs;
|
|
|
import org.apache.hadoop.yarn.service.client.params.ActionFlexArgs;
|
|
@@ -107,6 +109,7 @@ import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*;
|
|
|
import static org.apache.hadoop.yarn.service.client.params.SliderActions.ACTION_CREATE;
|
|
|
import static org.apache.hadoop.yarn.service.client.params.SliderActions.ACTION_FLEX;
|
|
|
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_QUEUE;
|
|
|
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
|
|
|
import static org.apache.hadoop.yarn.service.utils.SliderUtils.*;
|
|
|
|
|
|
@InterfaceAudience.Public
|
|
@@ -116,10 +119,10 @@ public class ServiceClient extends CompositeService
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(ServiceClient.class);
|
|
|
private SliderFileSystem fs;
|
|
|
- private YarnClient yarnClient;
|
|
|
+ //TODO disable retry so that client / rest API doesn't block?
|
|
|
+ protected YarnClient yarnClient;
|
|
|
// Avoid looking up applicationId from fs all the time.
|
|
|
private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
|
|
|
- private Map<String, ClientAMProtocol> cachedAMProxies = new ConcurrentHashMap<>();
|
|
|
|
|
|
private RegistryOperations registryClient;
|
|
|
private CuratorFramework curatorClient;
|
|
@@ -128,7 +131,9 @@ public class ServiceClient extends CompositeService
|
|
|
private static EnumSet<YarnApplicationState> terminatedStates =
|
|
|
EnumSet.of(FINISHED, FAILED, KILLED);
|
|
|
private static EnumSet<YarnApplicationState> liveStates =
|
|
|
- EnumSet.of(NEW, NEW_SAVING, SUBMITTED, RUNNING);
|
|
|
+ EnumSet.of(NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING);
|
|
|
+ private static EnumSet<YarnApplicationState> preRunningStates =
|
|
|
+ EnumSet.of(NEW, NEW_SAVING, SUBMITTED, ACCEPTED);
|
|
|
|
|
|
public ServiceClient() {
|
|
|
super(ServiceClient.class.getName());
|
|
@@ -151,18 +156,18 @@ public class ServiceClient extends CompositeService
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
|
- private Application loadAppJsonFromLocalFS(
|
|
|
+ private Service loadAppJsonFromLocalFS(
|
|
|
AbstractClusterBuildingActionArgs args) throws IOException {
|
|
|
File file = args.getAppDef();
|
|
|
Path filePath = new Path(file.getAbsolutePath());
|
|
|
LOG.info("Loading app json from: " + filePath);
|
|
|
- Application application = ServiceApiUtil.jsonSerDeser
|
|
|
+ Service service = jsonSerDeser
|
|
|
.load(FileSystem.getLocal(getConfig()), filePath);
|
|
|
if (args.lifetime > 0) {
|
|
|
- application.setLifetime(args.lifetime);
|
|
|
+ service.setLifetime(args.lifetime);
|
|
|
}
|
|
|
- application.setName(args.getClusterName());
|
|
|
- return application;
|
|
|
+ service.setName(args.getClusterName());
|
|
|
+ return service;
|
|
|
}
|
|
|
|
|
|
public int actionBuild(AbstractClusterBuildingActionArgs args)
|
|
@@ -170,11 +175,11 @@ public class ServiceClient extends CompositeService
|
|
|
return actionBuild(loadAppJsonFromLocalFS(args));
|
|
|
}
|
|
|
|
|
|
- public int actionBuild(Application application)
|
|
|
+ public int actionBuild(Service service)
|
|
|
throws YarnException, IOException {
|
|
|
- Path appDir = checkAppNotExistOnHdfs(application);
|
|
|
- ServiceApiUtil.validateAndResolveApplication(application, fs, getConfig());
|
|
|
- createDirAndPersistApp(appDir, application);
|
|
|
+ Path appDir = checkAppNotExistOnHdfs(service);
|
|
|
+ ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
|
|
|
+ createDirAndPersistApp(appDir, service);
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
|
|
@@ -184,21 +189,21 @@ public class ServiceClient extends CompositeService
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
|
|
|
- public ApplicationId actionCreate(Application application)
|
|
|
+ public ApplicationId actionCreate(Service service)
|
|
|
throws IOException, YarnException {
|
|
|
- String appName = application.getName();
|
|
|
- validateClusterName(appName);
|
|
|
- ServiceApiUtil.validateAndResolveApplication(application, fs, getConfig());
|
|
|
- verifyNoLiveAppInRM(appName, "create");
|
|
|
- Path appDir = checkAppNotExistOnHdfs(application);
|
|
|
+ String serviceName = service.getName();
|
|
|
+ ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
|
|
+ ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
|
|
|
+ verifyNoLiveAppInRM(serviceName, "create");
|
|
|
+ Path appDir = checkAppNotExistOnHdfs(service);
|
|
|
|
|
|
// Write the definition first and then submit - AM will read the definition
|
|
|
- createDirAndPersistApp(appDir, application);
|
|
|
- ApplicationId appId = submitApp(application);
|
|
|
- cachedAppIds.put(appName, appId);
|
|
|
- application.setId(appId.toString());
|
|
|
+ createDirAndPersistApp(appDir, service);
|
|
|
+ ApplicationId appId = submitApp(service);
|
|
|
+ cachedAppIds.put(serviceName, appId);
|
|
|
+ service.setId(appId.toString());
|
|
|
// update app definition with appId
|
|
|
- persistAppDef(appDir, application);
|
|
|
+ persistAppDef(appDir, service);
|
|
|
return appId;
|
|
|
}
|
|
|
|
|
@@ -208,17 +213,21 @@ public class ServiceClient extends CompositeService
|
|
|
ActionFlexArgs flexArgs = args.getActionFlexArgs();
|
|
|
Map<String, Long> componentCounts =
|
|
|
new HashMap<>(flexArgs.getComponentMap().size());
|
|
|
- Application persistedApp =
|
|
|
- ServiceApiUtil.loadApplication(fs, flexArgs.getClusterName());
|
|
|
- if (!StringUtils.isEmpty(persistedApp.getId())) {
|
|
|
- cachedAppIds.put(persistedApp.getName(),
|
|
|
- ApplicationId.fromString(persistedApp.getId()));
|
|
|
+ Service persistedService =
|
|
|
+ ServiceApiUtil.loadService(fs, flexArgs.getClusterName());
|
|
|
+ if (!StringUtils.isEmpty(persistedService.getId())) {
|
|
|
+ cachedAppIds.put(persistedService.getName(),
|
|
|
+ ApplicationId.fromString(persistedService.getId()));
|
|
|
+ } else {
|
|
|
+ throw new YarnException(persistedService.getName()
|
|
|
+ + " appId is null, may be not submitted to YARN yet");
|
|
|
}
|
|
|
+
|
|
|
for (Map.Entry<String, String> entry : flexArgs.getComponentMap()
|
|
|
.entrySet()) {
|
|
|
String compName = entry.getKey();
|
|
|
- ServiceApiUtil.validateCompName(compName);
|
|
|
- Component component = persistedApp.getComponent(compName);
|
|
|
+ ServiceApiUtil.validateNameFormat(compName, getConfig());
|
|
|
+ Component component = persistedService.getComponent(compName);
|
|
|
if (component == null) {
|
|
|
throw new IllegalArgumentException(entry.getKey() + " does not exist !");
|
|
|
}
|
|
@@ -230,7 +239,7 @@ public class ServiceClient extends CompositeService
|
|
|
if (componentCounts.size() == 0) {
|
|
|
actionHelp(ACTION_FLEX, args);
|
|
|
}
|
|
|
- flexComponents(args.getClusterName(), componentCounts, persistedApp);
|
|
|
+ flexComponents(args.getClusterName(), componentCounts, persistedService);
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
|
|
@@ -258,19 +267,23 @@ public class ServiceClient extends CompositeService
|
|
|
}
|
|
|
|
|
|
// Called by Rest Service
|
|
|
- public Map<String, Long> flexByRestService(String appName,
|
|
|
+ public Map<String, Long> flexByRestService(String serviceName,
|
|
|
Map<String, Long> componentCounts) throws YarnException, IOException {
|
|
|
// load app definition
|
|
|
- Application persistedApp = ServiceApiUtil.loadApplication(fs, appName);
|
|
|
- cachedAppIds.put(persistedApp.getName(),
|
|
|
- ApplicationId.fromString(persistedApp.getId()));
|
|
|
- return flexComponents(appName, componentCounts, persistedApp);
|
|
|
+ Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
|
|
|
+ if (StringUtils.isEmpty(persistedService.getId())) {
|
|
|
+ throw new YarnException(
|
|
|
+ serviceName + " appId is null, may be not submitted to YARN yet");
|
|
|
+ }
|
|
|
+ cachedAppIds.put(persistedService.getName(),
|
|
|
+ ApplicationId.fromString(persistedService.getId()));
|
|
|
+ return flexComponents(serviceName, componentCounts, persistedService);
|
|
|
}
|
|
|
|
|
|
- private Map<String, Long> flexComponents(String appName,
|
|
|
- Map<String, Long> componentCounts, Application persistedApp)
|
|
|
+ private Map<String, Long> flexComponents(String serviceName,
|
|
|
+ Map<String, Long> componentCounts, Service persistedService)
|
|
|
throws YarnException, IOException {
|
|
|
- validateClusterName(appName);
|
|
|
+ ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
|
|
|
|
|
Map<String, Long> original = new HashMap<>(componentCounts.size());
|
|
|
|
|
@@ -278,7 +291,7 @@ public class ServiceClient extends CompositeService
|
|
|
FlexComponentsRequestProto.Builder requestBuilder =
|
|
|
FlexComponentsRequestProto.newBuilder();
|
|
|
|
|
|
- for (Component persistedComp : persistedApp.getComponents()) {
|
|
|
+ for (Component persistedComp : persistedService.getComponents()) {
|
|
|
String name = persistedComp.getName();
|
|
|
if (componentCounts.containsKey(persistedComp.getName())) {
|
|
|
original.put(name, persistedComp.getNumberOfContainers());
|
|
@@ -295,15 +308,24 @@ public class ServiceClient extends CompositeService
|
|
|
throw new YarnException("Components " + componentCounts.keySet()
|
|
|
+ " do not exist in app definition.");
|
|
|
}
|
|
|
- ServiceApiUtil.jsonSerDeser
|
|
|
- .save(fs.getFileSystem(), ServiceApiUtil.getAppJsonPath(fs, appName),
|
|
|
- persistedApp, true);
|
|
|
- ClientAMProtocol proxy = getAMProxy(appName);
|
|
|
- if (proxy == null) {
|
|
|
- String message = appName + " is not running";
|
|
|
+ jsonSerDeser
|
|
|
+ .save(fs.getFileSystem(), ServiceApiUtil.getServiceJsonPath(fs, serviceName),
|
|
|
+ persistedService, true);
|
|
|
+
|
|
|
+ ApplicationReport appReport =
|
|
|
+ yarnClient.getApplicationReport(getAppId(serviceName));
|
|
|
+ if (appReport.getYarnApplicationState() != RUNNING) {
|
|
|
+ String message =
|
|
|
+ serviceName + " is at " + appReport.getYarnApplicationState()
|
|
|
+ + " state, flex can only be invoked when service is running";
|
|
|
LOG.error(message);
|
|
|
throw new YarnException(message);
|
|
|
}
|
|
|
+ if (StringUtils.isEmpty(appReport.getHost())) {
|
|
|
+ throw new YarnException(serviceName + " AM hostname is empty");
|
|
|
+ }
|
|
|
+ ClientAMProtocol proxy =
|
|
|
+ createAMProxy(appReport.getHost(), appReport.getRpcPort());
|
|
|
proxy.flexComponents(requestBuilder.build());
|
|
|
for (Map.Entry<String, Long> entry : original.entrySet()) {
|
|
|
LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
|
|
@@ -313,31 +335,40 @@ public class ServiceClient extends CompositeService
|
|
|
return original;
|
|
|
}
|
|
|
|
|
|
- public int actionStop(String appName, boolean waitForAppStopped)
|
|
|
+ public int actionStop(String serviceName, boolean waitForAppStopped)
|
|
|
throws YarnException, IOException {
|
|
|
- validateClusterName(appName);
|
|
|
- getAppId(appName);
|
|
|
- ApplicationId currentAppId = cachedAppIds.get(appName);
|
|
|
+ ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
|
|
+ ApplicationId currentAppId = getAppId(serviceName);
|
|
|
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
|
|
|
if (terminatedStates.contains(report.getYarnApplicationState())) {
|
|
|
- LOG.info("Application {} is already in a terminated state {}", appName,
|
|
|
+ LOG.info("Service {} is already in a terminated state {}", serviceName,
|
|
|
report.getYarnApplicationState());
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
- LOG.info("Stopping application {}, with appId = {}", appName, currentAppId);
|
|
|
+ if (preRunningStates.contains(report.getYarnApplicationState())) {
|
|
|
+ String msg = serviceName + " is at " + report.getYarnApplicationState()
|
|
|
+ + ", forcefully killed by user!";
|
|
|
+ yarnClient.killApplication(currentAppId, msg);
|
|
|
+ LOG.info(msg);
|
|
|
+ return EXIT_SUCCESS;
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(report.getHost())) {
|
|
|
+ throw new YarnException(serviceName + " AM hostname is empty");
|
|
|
+ }
|
|
|
+ LOG.info("Stopping service {}, with appId = {}", serviceName, currentAppId);
|
|
|
try {
|
|
|
- ClientAMProtocol proxy = getAMProxy(appName, report);
|
|
|
- cachedAppIds.remove(appName);
|
|
|
- cachedAMProxies.remove(appName);
|
|
|
+ ClientAMProtocol proxy =
|
|
|
+ createAMProxy(report.getHost(), report.getRpcPort());
|
|
|
+ cachedAppIds.remove(serviceName);
|
|
|
if (proxy != null) {
|
|
|
// try to stop the app gracefully.
|
|
|
StopRequestProto request = StopRequestProto.newBuilder().build();
|
|
|
proxy.stop(request);
|
|
|
- LOG.info("Application " + appName + " is being gracefully stopped...");
|
|
|
+ LOG.info("Service " + serviceName + " is being gracefully stopped...");
|
|
|
} else {
|
|
|
yarnClient.killApplication(currentAppId,
|
|
|
- appName + " is forcefully killed by user!");
|
|
|
- LOG.info("Forcefully kill the application: " + appName);
|
|
|
+ serviceName + " is forcefully killed by user!");
|
|
|
+ LOG.info("Forcefully kill the service: " + serviceName);
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
|
|
@@ -351,56 +382,55 @@ public class ServiceClient extends CompositeService
|
|
|
Thread.sleep(2000);
|
|
|
report = yarnClient.getApplicationReport(currentAppId);
|
|
|
if (terminatedStates.contains(report.getYarnApplicationState())) {
|
|
|
- LOG.info("Application " + appName + " is stopped.");
|
|
|
+ LOG.info("Service " + serviceName + " is stopped.");
|
|
|
break;
|
|
|
}
|
|
|
// Forcefully kill after 10 seconds.
|
|
|
if ((System.currentTimeMillis() - startTime) > 10000) {
|
|
|
LOG.info("Stop operation timeout stopping, forcefully kill the app "
|
|
|
- + appName);
|
|
|
+ + serviceName);
|
|
|
yarnClient.killApplication(currentAppId,
|
|
|
"Forcefully kill the app by user");
|
|
|
break;
|
|
|
}
|
|
|
if (++pollCount % 10 == 0) {
|
|
|
- LOG.info("Waiting for application " + appName + " to be stopped.");
|
|
|
+ LOG.info("Waiting for service " + serviceName + " to be stopped.");
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException | YarnException | InterruptedException e) {
|
|
|
- LOG.info("Failed to stop " + appName
|
|
|
+ LOG.info("Failed to stop " + serviceName
|
|
|
+ " gracefully, forcefully kill the app.");
|
|
|
yarnClient.killApplication(currentAppId, "Forcefully kill the app");
|
|
|
}
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
|
|
|
- public int actionDestroy(String appName) throws Exception {
|
|
|
- validateClusterName(appName);
|
|
|
- verifyNoLiveAppInRM(appName, "Destroy");
|
|
|
- Path appDir = fs.buildClusterDirPath(appName);
|
|
|
+ public int actionDestroy(String serviceName) throws Exception {
|
|
|
+ ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
|
|
+ verifyNoLiveAppInRM(serviceName, "Destroy");
|
|
|
+ Path appDir = fs.buildClusterDirPath(serviceName);
|
|
|
FileSystem fileSystem = fs.getFileSystem();
|
|
|
// remove from the appId cache
|
|
|
- cachedAppIds.remove(appName);
|
|
|
- cachedAMProxies.remove(appName);
|
|
|
+ cachedAppIds.remove(serviceName);
|
|
|
if (fileSystem.exists(appDir)) {
|
|
|
if (fileSystem.delete(appDir, true)) {
|
|
|
- LOG.info("Successfully deleted application dir for " + appName + ": "
|
|
|
+ LOG.info("Successfully deleted service dir for " + serviceName + ": "
|
|
|
+ appDir);
|
|
|
} else {
|
|
|
String message =
|
|
|
- "Failed to delete application + " + appName + " at: " + appDir;
|
|
|
+ "Failed to delete service + " + serviceName + " at: " + appDir;
|
|
|
LOG.info(message);
|
|
|
throw new YarnException(message);
|
|
|
}
|
|
|
}
|
|
|
- deleteZKNode(appName);
|
|
|
- String registryPath = ServiceRegistryUtils.registryPathForInstance(appName);
|
|
|
+ deleteZKNode(serviceName);
|
|
|
+ String registryPath = ServiceRegistryUtils.registryPathForInstance(serviceName);
|
|
|
try {
|
|
|
getRegistryClient().delete(registryPath, true);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Error deleting registry entry {}", registryPath, e);
|
|
|
}
|
|
|
- LOG.info("Destroyed cluster {}", appName);
|
|
|
+ LOG.info("Destroyed cluster {}", serviceName);
|
|
|
return EXIT_SUCCESS;
|
|
|
}
|
|
|
|
|
@@ -454,13 +484,13 @@ public class ServiceClient extends CompositeService
|
|
|
throw new UsageException(CommonArgs.usage(args, actionName));
|
|
|
}
|
|
|
|
|
|
- private void verifyNoLiveAppInRM(String appname, String action)
|
|
|
+ private void verifyNoLiveAppInRM(String serviceName, String action)
|
|
|
throws IOException, YarnException {
|
|
|
Set<String> types = new HashSet<>(1);
|
|
|
types.add(YarnServiceConstants.APP_TYPE);
|
|
|
Set<String> tags = null;
|
|
|
- if (appname != null) {
|
|
|
- tags = Collections.singleton(SliderUtils.createNameTag(appname));
|
|
|
+ if (serviceName != null) {
|
|
|
+ tags = Collections.singleton(SliderUtils.createNameTag(serviceName));
|
|
|
}
|
|
|
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
|
|
|
request.setApplicationTypes(types);
|
|
@@ -469,14 +499,14 @@ public class ServiceClient extends CompositeService
|
|
|
List<ApplicationReport> reports = yarnClient.getApplications(request);
|
|
|
if (!reports.isEmpty()) {
|
|
|
throw new YarnException(
|
|
|
- "Failed to " + action + " application, as " + appname
|
|
|
+ "Failed to " + action + " service, as " + serviceName
|
|
|
+ " already exists.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private ApplicationId submitApp(Application app)
|
|
|
+ private ApplicationId submitApp(Service app)
|
|
|
throws IOException, YarnException {
|
|
|
- String appName = app.getName();
|
|
|
+ String serviceName = app.getName();
|
|
|
Configuration conf = getConfig();
|
|
|
Path appRootDir = fs.buildClusterDirPath(app.getName());
|
|
|
|
|
@@ -500,11 +530,11 @@ public class ServiceClient extends CompositeService
|
|
|
|
|
|
// copy local slideram-log4j.properties to hdfs and add to localResources
|
|
|
boolean hasAMLog4j =
|
|
|
- addAMLog4jResource(appName, conf, localResources);
|
|
|
+ addAMLog4jResource(serviceName, conf, localResources);
|
|
|
// copy jars to hdfs and add to localResources
|
|
|
- addJarResource(appName, localResources);
|
|
|
+ addJarResource(serviceName, localResources);
|
|
|
// add keytab if in secure env
|
|
|
- addKeytabResourceIfSecure(fs, localResources, conf, appName);
|
|
|
+ addKeytabResourceIfSecure(fs, localResources, conf, serviceName);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
printLocalResources(localResources);
|
|
|
}
|
|
@@ -512,7 +542,7 @@ public class ServiceClient extends CompositeService
|
|
|
|
|
|
// create AM CLI
|
|
|
String cmdStr =
|
|
|
- buildCommandLine(appName, conf, appRootDir, hasAMLog4j);
|
|
|
+ buildCommandLine(serviceName, conf, appRootDir, hasAMLog4j);
|
|
|
submissionContext.setResource(Resource.newInstance(YarnServiceConf
|
|
|
.getLong(YarnServiceConf.AM_RESOURCE_MEM, YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM,
|
|
|
app.getConfiguration(), conf), 1));
|
|
@@ -521,10 +551,10 @@ public class ServiceClient extends CompositeService
|
|
|
queue = conf.get(YARN_QUEUE, "default");
|
|
|
}
|
|
|
submissionContext.setQueue(queue);
|
|
|
- submissionContext.setApplicationName(appName);
|
|
|
+ submissionContext.setApplicationName(serviceName);
|
|
|
submissionContext.setApplicationType(YarnServiceConstants.APP_TYPE);
|
|
|
Set<String> appTags =
|
|
|
- AbstractClientProvider.createApplicationTags(appName, null, null);
|
|
|
+ AbstractClientProvider.createApplicationTags(serviceName, null, null);
|
|
|
if (!appTags.isEmpty()) {
|
|
|
submissionContext.setApplicationTags(appTags);
|
|
|
}
|
|
@@ -549,7 +579,7 @@ public class ServiceClient extends CompositeService
|
|
|
LOG.debug(builder.toString());
|
|
|
}
|
|
|
|
|
|
- private String buildCommandLine(String appName, Configuration conf,
|
|
|
+ private String buildCommandLine(String serviceName, Configuration conf,
|
|
|
Path appRootDir, boolean hasSliderAMLog4j) throws BadConfigException {
|
|
|
JavaCommandLineBuilder CLI = new JavaCommandLineBuilder();
|
|
|
CLI.forceIPv4().headless();
|
|
@@ -560,9 +590,9 @@ public class ServiceClient extends CompositeService
|
|
|
CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
|
|
}
|
|
|
CLI.add(ServiceMaster.class.getCanonicalName());
|
|
|
- CLI.add(ACTION_CREATE, appName);
|
|
|
+ CLI.add(ACTION_CREATE, serviceName);
|
|
|
//TODO debugAM CLI.add(Arguments.ARG_DEBUG)
|
|
|
- CLI.add(Arguments.ARG_CLUSTER_URI, new Path(appRootDir, appName + ".json"));
|
|
|
+ CLI.add(Arguments.ARG_CLUSTER_URI, new Path(appRootDir, serviceName + ".json"));
|
|
|
// pass the registry binding
|
|
|
CLI.addConfOptionToCLI(conf, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
|
|
|
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
|
|
@@ -599,10 +629,10 @@ public class ServiceClient extends CompositeService
|
|
|
return env;
|
|
|
}
|
|
|
|
|
|
- protected Path addJarResource(String appName,
|
|
|
+ protected Path addJarResource(String serviceName,
|
|
|
Map<String, LocalResource> localResources)
|
|
|
throws IOException, SliderException {
|
|
|
- Path libPath = fs.buildClusterDirPath(appName);
|
|
|
+ Path libPath = fs.buildClusterDirPath(serviceName);
|
|
|
ProviderUtils
|
|
|
.addProviderJar(localResources, ServiceMaster.class, SERVICE_CORE_JAR, fs,
|
|
|
libPath, "lib", false);
|
|
@@ -621,7 +651,7 @@ public class ServiceClient extends CompositeService
|
|
|
return libPath;
|
|
|
}
|
|
|
|
|
|
- private boolean addAMLog4jResource(String appName, Configuration conf,
|
|
|
+ private boolean addAMLog4jResource(String serviceName, Configuration conf,
|
|
|
Map<String, LocalResource> localResources)
|
|
|
throws IOException, BadClusterStateException {
|
|
|
boolean hasAMLog4j = false;
|
|
@@ -632,7 +662,7 @@ public class ServiceClient extends CompositeService
|
|
|
new File(hadoopConfDir, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
|
|
|
if (localFile.exists()) {
|
|
|
Path localFilePath = createLocalPath(localFile);
|
|
|
- Path appDirPath = fs.buildClusterDirPath(appName);
|
|
|
+ Path appDirPath = fs.buildClusterDirPath(serviceName);
|
|
|
Path remoteConfPath =
|
|
|
new Path(appDirPath, YarnServiceConstants.SUBMITTED_CONF_DIR);
|
|
|
Path remoteFilePath =
|
|
@@ -649,54 +679,54 @@ public class ServiceClient extends CompositeService
|
|
|
return hasAMLog4j;
|
|
|
}
|
|
|
|
|
|
- public int actionStart(String appName) throws YarnException, IOException {
|
|
|
- validateClusterName(appName);
|
|
|
- Path appDir = checkAppExistOnHdfs(appName);
|
|
|
- Application application = ServiceApiUtil.loadApplication(fs, appName);
|
|
|
- ServiceApiUtil.validateAndResolveApplication(application, fs, getConfig());
|
|
|
+ public int actionStart(String serviceName) throws YarnException, IOException {
|
|
|
+ ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
|
|
+ Path appDir = checkAppExistOnHdfs(serviceName);
|
|
|
+ Service service = ServiceApiUtil.loadService(fs, serviceName);
|
|
|
+ ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
|
|
|
// see if it is actually running and bail out;
|
|
|
- verifyNoLiveAppInRM(appName, "thaw");
|
|
|
- ApplicationId appId = submitApp(application);
|
|
|
- application.setId(appId.toString());
|
|
|
+ verifyNoLiveAppInRM(serviceName, "thaw");
|
|
|
+ ApplicationId appId = submitApp(service);
|
|
|
+ service.setId(appId.toString());
|
|
|
// write app definition on to hdfs
|
|
|
- createDirAndPersistApp(appDir, application);
|
|
|
+ createDirAndPersistApp(appDir, service);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- private Path checkAppNotExistOnHdfs(Application application)
|
|
|
+ private Path checkAppNotExistOnHdfs(Service service)
|
|
|
throws IOException, SliderException {
|
|
|
- Path appDir = fs.buildClusterDirPath(application.getName());
|
|
|
+ Path appDir = fs.buildClusterDirPath(service.getName());
|
|
|
fs.verifyDirectoryNonexistent(
|
|
|
- new Path(appDir, application.getName() + ".json"));
|
|
|
+ new Path(appDir, service.getName() + ".json"));
|
|
|
return appDir;
|
|
|
}
|
|
|
|
|
|
- private Path checkAppExistOnHdfs(String appName)
|
|
|
+ private Path checkAppExistOnHdfs(String serviceName)
|
|
|
throws IOException, SliderException {
|
|
|
- Path appDir = fs.buildClusterDirPath(appName);
|
|
|
- fs.verifyPathExists(new Path(appDir, appName + ".json"));
|
|
|
+ Path appDir = fs.buildClusterDirPath(serviceName);
|
|
|
+ fs.verifyPathExists(new Path(appDir, serviceName + ".json"));
|
|
|
return appDir;
|
|
|
}
|
|
|
|
|
|
- private void createDirAndPersistApp(Path appDir, Application application)
|
|
|
+ private void createDirAndPersistApp(Path appDir, Service service)
|
|
|
throws IOException, SliderException {
|
|
|
FsPermission appDirPermission = new FsPermission("750");
|
|
|
fs.createWithPermissions(appDir, appDirPermission);
|
|
|
- persistAppDef(appDir, application);
|
|
|
+ persistAppDef(appDir, service);
|
|
|
}
|
|
|
|
|
|
- private void persistAppDef(Path appDir, Application application)
|
|
|
+ private void persistAppDef(Path appDir, Service service)
|
|
|
throws IOException {
|
|
|
- Path appJson = new Path(appDir, application.getName() + ".json");
|
|
|
- ServiceApiUtil.jsonSerDeser
|
|
|
- .save(fs.getFileSystem(), appJson, application, true);
|
|
|
+ Path appJson = new Path(appDir, service.getName() + ".json");
|
|
|
+ jsonSerDeser
|
|
|
+ .save(fs.getFileSystem(), appJson, service, true);
|
|
|
LOG.info(
|
|
|
- "Persisted application " + application.getName() + " at " + appJson);
|
|
|
+ "Persisted service " + service.getName() + " at " + appJson);
|
|
|
}
|
|
|
|
|
|
private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
|
|
|
Map<String, LocalResource> localResource, Configuration conf,
|
|
|
- String appName) throws IOException, BadConfigException {
|
|
|
+ String serviceName) throws IOException, BadConfigException {
|
|
|
if (!UserGroupInformation.isSecurityEnabled()) {
|
|
|
return;
|
|
|
}
|
|
@@ -707,7 +737,7 @@ public class ServiceClient extends CompositeService
|
|
|
conf.get(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
|
|
|
String keytabDir = conf.get(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
|
|
|
Path keytabPath =
|
|
|
- fileSystem.buildKeytabPath(keytabDir, amKeytabName, appName);
|
|
|
+ fileSystem.buildKeytabPath(keytabDir, amKeytabName, serviceName);
|
|
|
if (fileSystem.getFileSystem().exists(keytabPath)) {
|
|
|
LocalResource keytabRes =
|
|
|
fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
|
|
@@ -722,23 +752,22 @@ public class ServiceClient extends CompositeService
|
|
|
} else {
|
|
|
LOG.warn("The AM will be "
|
|
|
+ "started without a kerberos authenticated identity. "
|
|
|
- + "The application is therefore not guaranteed to remain "
|
|
|
+ + "The service is therefore not guaranteed to remain "
|
|
|
+ "operational beyond 24 hours.");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public String updateLifetime(String appName, long lifetime)
|
|
|
+ public String updateLifetime(String serviceName, long lifetime)
|
|
|
throws YarnException, IOException {
|
|
|
- getAppId(appName);
|
|
|
- ApplicationId currentAppId = cachedAppIds.get(appName);
|
|
|
+ ApplicationId currentAppId = getAppId(serviceName);
|
|
|
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
|
|
|
if (report == null) {
|
|
|
- throw new YarnException("Application not found for " + appName);
|
|
|
+ throw new YarnException("Service not found for " + serviceName);
|
|
|
}
|
|
|
ApplicationId appId = report.getApplicationId();
|
|
|
- LOG.info("Updating lifetime of an application: appName = " + appName
|
|
|
+ LOG.info("Updating lifetime of an service: serviceName = " + serviceName
|
|
|
+ ", appId = " + appId + ", lifetime = " + lifetime);
|
|
|
Map<ApplicationTimeoutType, String> map = new HashMap<>();
|
|
|
String newTimeout =
|
|
@@ -748,32 +777,55 @@ public class ServiceClient extends CompositeService
|
|
|
UpdateApplicationTimeoutsRequest.newInstance(appId, map);
|
|
|
yarnClient.updateApplicationTimeouts(request);
|
|
|
LOG.info(
|
|
|
- "Successfully updated lifetime for an application: appName = " + appName
|
|
|
+ "Successfully updated lifetime for an service: serviceName = " + serviceName
|
|
|
+ ", appId = " + appId + ". New expiry time in ISO8601 format is "
|
|
|
+ newTimeout);
|
|
|
return newTimeout;
|
|
|
}
|
|
|
|
|
|
- public Application getStatus(String appName)
|
|
|
+ public ServiceState convertState(FinalApplicationStatus status) {
|
|
|
+ switch (status) {
|
|
|
+ case UNDEFINED:
|
|
|
+ return ServiceState.ACCEPTED;
|
|
|
+ case FAILED:
|
|
|
+ case KILLED:
|
|
|
+ return ServiceState.FAILED;
|
|
|
+ case ENDED:
|
|
|
+ case SUCCEEDED:
|
|
|
+ return ServiceState.STOPPED;
|
|
|
+ }
|
|
|
+ return ServiceState.ACCEPTED;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Service getStatus(String serviceName)
|
|
|
throws IOException, YarnException {
|
|
|
- validateClusterName(appName);
|
|
|
- ApplicationId currentAppId = getAppId(appName);
|
|
|
+ ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
|
|
+ ApplicationId currentAppId = getAppId(serviceName);
|
|
|
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
|
|
|
- ClientAMProtocol amProxy = getAMProxy(appName, appReport);
|
|
|
- Application appSpec;
|
|
|
- if (amProxy != null) {
|
|
|
- GetStatusResponseProto response =
|
|
|
- amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
|
|
|
- appSpec = ServiceApiUtil.jsonSerDeser.fromJson(response.getStatus());
|
|
|
- } else {
|
|
|
- appSpec = new Application();
|
|
|
- appSpec.setName(appName);
|
|
|
- }
|
|
|
+ Service appSpec = new Service();
|
|
|
+ appSpec.setName(serviceName);
|
|
|
+ appSpec.setState(convertState(appReport.getFinalApplicationStatus()));
|
|
|
ApplicationTimeout lifetime =
|
|
|
appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
|
|
|
if (lifetime != null) {
|
|
|
appSpec.setLifetime(lifetime.getRemainingTime());
|
|
|
}
|
|
|
+
|
|
|
+ if (appReport.getYarnApplicationState() != RUNNING) {
|
|
|
+ LOG.info("Service {} is at {} state", serviceName,
|
|
|
+ appReport.getYarnApplicationState());
|
|
|
+ return appSpec;
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(appReport.getHost())) {
|
|
|
+ LOG.warn(serviceName + " AM hostname is empty");
|
|
|
+ return appSpec;
|
|
|
+ }
|
|
|
+ ClientAMProtocol amProxy =
|
|
|
+ createAMProxy(appReport.getHost(), appReport.getRpcPort());
|
|
|
+ GetStatusResponseProto response =
|
|
|
+ amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
|
|
|
+ appSpec = jsonSerDeser.fromJson(response.getStatus());
|
|
|
+
|
|
|
return appSpec;
|
|
|
}
|
|
|
|
|
@@ -815,58 +867,26 @@ public class ServiceClient extends CompositeService
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Get AMProxy with the appReport provided
|
|
|
- protected ClientAMProtocol getAMProxy(String appName, ApplicationReport report)
|
|
|
- throws IOException {
|
|
|
- if (!cachedAMProxies.containsKey(appName) && !StringUtils
|
|
|
- .isEmpty(report.getHost())) {
|
|
|
- insertAMProxy(appName, report.getHost(), report.getRpcPort());
|
|
|
- }
|
|
|
- return cachedAMProxies.get(appName);
|
|
|
- }
|
|
|
-
|
|
|
- // Get AMProxy without appReport provided - it'll getAppReport from RM
|
|
|
- protected ClientAMProtocol getAMProxy(String appName)
|
|
|
- throws IOException, YarnException {
|
|
|
- ApplicationId currentAppId = getAppId(appName);
|
|
|
-
|
|
|
- if (cachedAMProxies.containsKey(appName)) {
|
|
|
- return cachedAMProxies.get(appName);
|
|
|
- } else {
|
|
|
- ApplicationReport appReport =
|
|
|
- yarnClient.getApplicationReport(currentAppId);
|
|
|
- String host = appReport.getHost();
|
|
|
- int port = appReport.getRpcPort();
|
|
|
- if (!StringUtils.isEmpty(host)) {
|
|
|
- return insertAMProxy(appName, host, port);
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private ClientAMProtocol insertAMProxy(String appName, String host, int port)
|
|
|
+ protected ClientAMProtocol createAMProxy(String host, int port)
|
|
|
throws IOException {
|
|
|
InetSocketAddress address =
|
|
|
NetUtils.createSocketAddrForHost(host, port);
|
|
|
- ClientAMProtocol amProxy =
|
|
|
- ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
|
|
|
+ return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
|
|
|
UserGroupInformation.getCurrentUser(), rpc, address);
|
|
|
- cachedAMProxies.put(appName, amProxy);
|
|
|
- return amProxy;
|
|
|
}
|
|
|
|
|
|
- private synchronized ApplicationId getAppId(String appName)
|
|
|
+ private synchronized ApplicationId getAppId(String serviceName)
|
|
|
throws IOException, YarnException {
|
|
|
- if (cachedAppIds.containsKey(appName)) {
|
|
|
- return cachedAppIds.get(appName);
|
|
|
+ if (cachedAppIds.containsKey(serviceName)) {
|
|
|
+ return cachedAppIds.get(serviceName);
|
|
|
}
|
|
|
- Application persistedApp = ServiceApiUtil.loadApplication(fs, appName);
|
|
|
- if (persistedApp == null) {
|
|
|
- throw new YarnException("Application " + appName
|
|
|
+ Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
|
|
|
+ if (persistedService == null) {
|
|
|
+ throw new YarnException("Service " + serviceName
|
|
|
+ " doesn't exist on hdfs. Please check if the app exists in RM");
|
|
|
}
|
|
|
- ApplicationId currentAppId = ApplicationId.fromString(persistedApp.getId());
|
|
|
- cachedAppIds.put(appName, currentAppId);
|
|
|
+ ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
|
|
|
+ cachedAppIds.put(serviceName, currentAppId);
|
|
|
return currentAppId;
|
|
|
}
|
|
|
}
|