|
@@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.curator.framework.CuratorFramework;
|
|
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
|
|
import org.apache.curator.retry.RetryNTimes;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsReque
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
@@ -58,34 +61,32 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
|
|
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
|
|
|
import org.apache.hadoop.yarn.service.ClientAMProtocol;
|
|
|
import org.apache.hadoop.yarn.service.ServiceMaster;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Application;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Component;
|
|
|
+import org.apache.hadoop.yarn.service.client.params.AbstractClusterBuildingActionArgs;
|
|
|
import org.apache.hadoop.yarn.service.client.params.ActionDependencyArgs;
|
|
|
import org.apache.hadoop.yarn.service.client.params.ActionFlexArgs;
|
|
|
import org.apache.hadoop.yarn.service.client.params.Arguments;
|
|
|
import org.apache.hadoop.yarn.service.client.params.ClientArgs;
|
|
|
import org.apache.hadoop.yarn.service.client.params.CommonArgs;
|
|
|
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
|
|
|
-import org.apache.hadoop.yarn.service.conf.SliderKeys;
|
|
|
-import org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys;
|
|
|
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
|
|
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
|
|
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
|
|
|
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
|
|
|
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
|
|
+import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
|
|
|
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
|
|
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.apache.hadoop.yarn.util.Times;
|
|
|
-import org.apache.slider.api.resource.Application;
|
|
|
-import org.apache.slider.api.resource.Component;
|
|
|
-import org.apache.slider.common.params.AbstractClusterBuildingActionArgs;
|
|
|
-import org.apache.slider.common.tools.SliderFileSystem;
|
|
|
-import org.apache.slider.common.tools.SliderUtils;
|
|
|
-import org.apache.slider.core.exceptions.BadClusterStateException;
|
|
|
-import org.apache.slider.core.exceptions.BadConfigException;
|
|
|
-import org.apache.slider.core.exceptions.SliderException;
|
|
|
-import org.apache.slider.core.exceptions.UsageException;
|
|
|
-import org.apache.slider.core.launch.ClasspathConstructor;
|
|
|
-import org.apache.slider.core.launch.JavaCommandLineBuilder;
|
|
|
-import org.apache.slider.core.registry.SliderRegistryUtils;
|
|
|
-import org.apache.slider.core.zk.ZKIntegration;
|
|
|
-import org.apache.slider.core.zk.ZookeeperUtils;
|
|
|
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
|
|
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
|
|
|
+import org.apache.hadoop.yarn.service.exceptions.BadConfigException;
|
|
|
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
|
|
+import org.apache.hadoop.yarn.service.exceptions.UsageException;
|
|
|
+import org.apache.hadoop.yarn.service.containerlaunch.ClasspathConstructor;
|
|
|
+import org.apache.hadoop.yarn.service.containerlaunch.JavaCommandLineBuilder;
|
|
|
+import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -105,17 +106,21 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*;
|
|
|
import static org.apache.hadoop.yarn.service.client.params.SliderActions.ACTION_CREATE;
|
|
|
import static org.apache.hadoop.yarn.service.client.params.SliderActions.ACTION_FLEX;
|
|
|
-import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
|
|
|
-import static org.apache.slider.common.tools.SliderUtils.*;
|
|
|
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_QUEUE;
|
|
|
+import static org.apache.hadoop.yarn.service.utils.SliderUtils.*;
|
|
|
|
|
|
+@InterfaceAudience.Public
|
|
|
+@InterfaceStability.Unstable
|
|
|
public class ServiceClient extends CompositeService
|
|
|
- implements SliderExitCodes, SliderKeys {
|
|
|
+ implements SliderExitCodes, YarnServiceConstants {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(ServiceClient.class);
|
|
|
private SliderFileSystem fs;
|
|
|
private YarnClient yarnClient;
|
|
|
// Avoid looking up applicationId from fs all the time.
|
|
|
private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
|
|
|
+ private Map<String, ClientAMProtocol> cachedAMProxies = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
private RegistryOperations registryClient;
|
|
|
private CuratorFramework curatorClient;
|
|
|
private YarnRPC rpc;
|
|
@@ -293,7 +298,12 @@ public class ServiceClient extends CompositeService
|
|
|
ServiceApiUtil.jsonSerDeser
|
|
|
.save(fs.getFileSystem(), ServiceApiUtil.getAppJsonPath(fs, appName),
|
|
|
persistedApp, true);
|
|
|
- ClientAMProtocol proxy = connectToAM(appName);
|
|
|
+ ClientAMProtocol proxy = getAMProxy(appName);
|
|
|
+ if (proxy == null) {
|
|
|
+ String message = appName + " is not running";
|
|
|
+ LOG.error(message);
|
|
|
+ throw new YarnException(message);
|
|
|
+ }
|
|
|
proxy.flexComponents(requestBuilder.build());
|
|
|
for (Map.Entry<String, Long> entry : original.entrySet()) {
|
|
|
LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
|
|
@@ -303,9 +313,10 @@ public class ServiceClient extends CompositeService
|
|
|
return original;
|
|
|
}
|
|
|
|
|
|
- public int actionStop(String appName) throws YarnException, IOException {
|
|
|
+ public int actionStop(String appName, boolean waitForAppStopped)
|
|
|
+ throws YarnException, IOException {
|
|
|
validateClusterName(appName);
|
|
|
- getAppIdFromPersistedApp(appName);
|
|
|
+ getAppId(appName);
|
|
|
ApplicationId currentAppId = cachedAppIds.get(appName);
|
|
|
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
|
|
|
if (terminatedStates.contains(report.getYarnApplicationState())) {
|
|
@@ -315,17 +326,29 @@ public class ServiceClient extends CompositeService
|
|
|
}
|
|
|
LOG.info("Stopping application {}, with appId = {}", appName, currentAppId);
|
|
|
try {
|
|
|
- // try to stop the app gracefully.
|
|
|
- ClientAMProtocol proxy = connectToAM(appName);
|
|
|
- StopRequestProto request = StopRequestProto.newBuilder().build();
|
|
|
- proxy.stop(request);
|
|
|
- LOG.info("Application " + appName + " is being gracefully stopped...");
|
|
|
+ ClientAMProtocol proxy = getAMProxy(appName, report);
|
|
|
+ cachedAppIds.remove(appName);
|
|
|
+ cachedAMProxies.remove(appName);
|
|
|
+ if (proxy != null) {
|
|
|
+ // try to stop the app gracefully.
|
|
|
+ StopRequestProto request = StopRequestProto.newBuilder().build();
|
|
|
+ proxy.stop(request);
|
|
|
+ LOG.info("Application " + appName + " is being gracefully stopped...");
|
|
|
+ } else {
|
|
|
+ yarnClient.killApplication(currentAppId,
|
|
|
+ appName + " is forcefully killed by user!");
|
|
|
+ LOG.info("Forcefully kill the application: " + appName);
|
|
|
+ return EXIT_SUCCESS;
|
|
|
+ }
|
|
|
|
|
|
+ if (!waitForAppStopped) {
|
|
|
+ return EXIT_SUCCESS;
|
|
|
+ }
|
|
|
// Wait until the app is killed.
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
int pollCount = 0;
|
|
|
while (true) {
|
|
|
- Thread.sleep(1000);
|
|
|
+ Thread.sleep(2000);
|
|
|
report = yarnClient.getApplicationReport(currentAppId);
|
|
|
if (terminatedStates.contains(report.getYarnApplicationState())) {
|
|
|
LOG.info("Application " + appName + " is stopped.");
|
|
@@ -358,6 +381,7 @@ public class ServiceClient extends CompositeService
|
|
|
FileSystem fileSystem = fs.getFileSystem();
|
|
|
// remove from the appId cache
|
|
|
cachedAppIds.remove(appName);
|
|
|
+ cachedAMProxies.remove(appName);
|
|
|
if (fileSystem.exists(appDir)) {
|
|
|
if (fileSystem.delete(appDir, true)) {
|
|
|
LOG.info("Successfully deleted application dir for " + appName + ": "
|
|
@@ -370,7 +394,7 @@ public class ServiceClient extends CompositeService
|
|
|
}
|
|
|
}
|
|
|
deleteZKNode(appName);
|
|
|
- String registryPath = SliderRegistryUtils.registryPathForInstance(appName);
|
|
|
+ String registryPath = ServiceRegistryUtils.registryPathForInstance(appName);
|
|
|
try {
|
|
|
getRegistryClient().delete(registryPath, true);
|
|
|
} catch (IOException e) {
|
|
@@ -395,7 +419,7 @@ public class ServiceClient extends CompositeService
|
|
|
private void deleteZKNode(String clusterName) throws Exception {
|
|
|
CuratorFramework curatorFramework = getCuratorClient();
|
|
|
String user = RegistryUtils.currentUser();
|
|
|
- String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
|
|
|
+ String zkPath = ServiceRegistryUtils.mkClusterPath(user, clusterName);
|
|
|
if (curatorFramework.checkExists().forPath(zkPath) != null) {
|
|
|
curatorFramework.delete().deletingChildrenIfNeeded().forPath(zkPath);
|
|
|
LOG.info("Deleted zookeeper path: " + zkPath);
|
|
@@ -418,7 +442,7 @@ public class ServiceClient extends CompositeService
|
|
|
if (curatorClient == null) {
|
|
|
curatorClient =
|
|
|
CuratorFrameworkFactory.builder().connectString(registryQuorum)
|
|
|
- .sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(10, 2000))
|
|
|
+ .sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(5, 2000))
|
|
|
.build();
|
|
|
curatorClient.start();
|
|
|
}
|
|
@@ -433,7 +457,7 @@ public class ServiceClient extends CompositeService
|
|
|
private void verifyNoLiveAppInRM(String appname, String action)
|
|
|
throws IOException, YarnException {
|
|
|
Set<String> types = new HashSet<>(1);
|
|
|
- types.add(SliderKeys.APP_TYPE);
|
|
|
+ types.add(YarnServiceConstants.APP_TYPE);
|
|
|
Set<String> tags = null;
|
|
|
if (appname != null) {
|
|
|
tags = Collections.singleton(SliderUtils.createNameTag(appname));
|
|
@@ -469,12 +493,13 @@ public class ServiceClient extends CompositeService
|
|
|
appTimeout.put(ApplicationTimeoutType.LIFETIME, app.getLifetime());
|
|
|
submissionContext.setApplicationTimeouts(appTimeout);
|
|
|
}
|
|
|
- submissionContext.setMaxAppAttempts(conf.getInt(KEY_AM_RESTART_LIMIT, 2));
|
|
|
+ submissionContext.setMaxAppAttempts(conf.getInt(
|
|
|
+ YarnServiceConf.AM_RESTART_MAX, 2));
|
|
|
|
|
|
Map<String, LocalResource> localResources = new HashMap<>();
|
|
|
|
|
|
// copy local slideram-log4j.properties to hdfs and add to localResources
|
|
|
- boolean hasSliderAMLog4j =
|
|
|
+ boolean hasAMLog4j =
|
|
|
addAMLog4jResource(appName, conf, localResources);
|
|
|
// copy jars to hdfs and add to localResources
|
|
|
addJarResource(appName, localResources);
|
|
@@ -487,17 +512,17 @@ public class ServiceClient extends CompositeService
|
|
|
|
|
|
// create AM CLI
|
|
|
String cmdStr =
|
|
|
- buildCommandLine(appName, conf, appRootDir, hasSliderAMLog4j);
|
|
|
+ buildCommandLine(appName, conf, appRootDir, hasAMLog4j);
|
|
|
submissionContext.setResource(Resource.newInstance(YarnServiceConf
|
|
|
- .getLong(KEY_AM_RESOURCE_MEM, DEFAULT_KEY_AM_RESOURCE_MEM,
|
|
|
+ .getLong(YarnServiceConf.AM_RESOURCE_MEM, YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM,
|
|
|
app.getConfiguration(), conf), 1));
|
|
|
String queue = app.getQueue();
|
|
|
if (StringUtils.isEmpty(queue)) {
|
|
|
- queue = conf.get(KEY_YARN_QUEUE, "default");
|
|
|
+ queue = conf.get(YARN_QUEUE, "default");
|
|
|
}
|
|
|
submissionContext.setQueue(queue);
|
|
|
submissionContext.setApplicationName(appName);
|
|
|
- submissionContext.setApplicationType(SliderKeys.APP_TYPE);
|
|
|
+ submissionContext.setApplicationType(YarnServiceConstants.APP_TYPE);
|
|
|
Set<String> appTags =
|
|
|
AbstractClientProvider.createApplicationTags(appName, null, null);
|
|
|
if (!appTags.isEmpty()) {
|
|
@@ -531,7 +556,7 @@ public class ServiceClient extends CompositeService
|
|
|
//TODO CLI.setJVMHeap
|
|
|
//TODO CLI.addJVMOPTS
|
|
|
if (hasSliderAMLog4j) {
|
|
|
- CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, LOG4J_SERVER_PROP_FILENAME);
|
|
|
+ CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, YARN_SERVICE_LOG4J_FILENAME);
|
|
|
CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
|
|
}
|
|
|
CLI.add(ServiceMaster.class.getCanonicalName());
|
|
@@ -553,15 +578,15 @@ public class ServiceClient extends CompositeService
|
|
|
private Map<String, String> addAMEnv(Configuration conf) throws IOException {
|
|
|
Map<String, String> env = new HashMap<>();
|
|
|
ClasspathConstructor classpath =
|
|
|
- buildClasspath(SliderKeys.SUBMITTED_CONF_DIR, "lib", fs, getConfig()
|
|
|
+ buildClasspath(YarnServiceConstants.SUBMITTED_CONF_DIR, "lib", fs, getConfig()
|
|
|
.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false));
|
|
|
env.put("CLASSPATH", classpath.buildClasspath());
|
|
|
env.put("LANG", "en_US.UTF-8");
|
|
|
env.put("LC_ALL", "en_US.UTF-8");
|
|
|
env.put("LANGUAGE", "en_US.UTF-8");
|
|
|
- String jaas = System.getenv(HADOOP_JAAS_DEBUG);
|
|
|
+ String jaas = System.getenv("HADOOP_JAAS_DEBUG");
|
|
|
if (jaas != null) {
|
|
|
- env.put(HADOOP_JAAS_DEBUG, jaas);
|
|
|
+ env.put("HADOOP_JAAS_DEBUG", jaas);
|
|
|
}
|
|
|
if (!UserGroupInformation.isSecurityEnabled()) {
|
|
|
String userName = UserGroupInformation.getCurrentUser().getUserName();
|
|
@@ -579,11 +604,11 @@ public class ServiceClient extends CompositeService
|
|
|
throws IOException, SliderException {
|
|
|
Path libPath = fs.buildClusterDirPath(appName);
|
|
|
ProviderUtils
|
|
|
- .addProviderJar(localResources, ServiceMaster.class, SLIDER_JAR, fs,
|
|
|
+ .addProviderJar(localResources, ServiceMaster.class, SERVICE_CORE_JAR, fs,
|
|
|
libPath, "lib", false);
|
|
|
Path dependencyLibTarGzip = fs.getDependencyTarGzip();
|
|
|
if (fs.isFile(dependencyLibTarGzip)) {
|
|
|
- LOG.info("Loading lib tar from " + fs.getFileSystem().getScheme() + ": "
|
|
|
+ LOG.info("Loading lib tar from " + fs.getFileSystem().getScheme() + ":/"
|
|
|
+ dependencyLibTarGzip);
|
|
|
SliderUtils.putAmTarGzipAndUpdate(localResources, fs);
|
|
|
} else {
|
|
@@ -599,27 +624,29 @@ public class ServiceClient extends CompositeService
|
|
|
private boolean addAMLog4jResource(String appName, Configuration conf,
|
|
|
Map<String, LocalResource> localResources)
|
|
|
throws IOException, BadClusterStateException {
|
|
|
- boolean hasSliderAMLog4j = false;
|
|
|
+ boolean hasAMLog4j = false;
|
|
|
String hadoopConfDir =
|
|
|
System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
|
|
|
if (hadoopConfDir != null) {
|
|
|
File localFile =
|
|
|
- new File(hadoopConfDir, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
|
|
|
+ new File(hadoopConfDir, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
|
|
|
if (localFile.exists()) {
|
|
|
Path localFilePath = createLocalPath(localFile);
|
|
|
Path appDirPath = fs.buildClusterDirPath(appName);
|
|
|
Path remoteConfPath =
|
|
|
- new Path(appDirPath, SliderKeys.SUBMITTED_CONF_DIR);
|
|
|
+ new Path(appDirPath, YarnServiceConstants.SUBMITTED_CONF_DIR);
|
|
|
Path remoteFilePath =
|
|
|
- new Path(remoteConfPath, SliderKeys.LOG4J_SERVER_PROP_FILENAME);
|
|
|
+ new Path(remoteConfPath, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
|
|
|
copy(conf, localFilePath, remoteFilePath);
|
|
|
LocalResource localResource =
|
|
|
fs.createAmResource(remoteConfPath, LocalResourceType.FILE);
|
|
|
localResources.put(localFilePath.getName(), localResource);
|
|
|
- hasSliderAMLog4j = true;
|
|
|
+ hasAMLog4j = true;
|
|
|
+ } else {
|
|
|
+ LOG.warn("AM log4j property file doesn't exist: " + localFile);
|
|
|
}
|
|
|
}
|
|
|
- return hasSliderAMLog4j;
|
|
|
+ return hasAMLog4j;
|
|
|
}
|
|
|
|
|
|
public int actionStart(String appName) throws YarnException, IOException {
|
|
@@ -674,22 +701,22 @@ public class ServiceClient extends CompositeService
|
|
|
return;
|
|
|
}
|
|
|
String keytabPreInstalledOnHost =
|
|
|
- conf.get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
|
|
|
+ conf.get(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
|
|
|
if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
|
|
|
String amKeytabName =
|
|
|
- conf.get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
|
|
|
- String keytabDir = conf.get(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
|
|
|
+ conf.get(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
|
|
|
+ String keytabDir = conf.get(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
|
|
|
Path keytabPath =
|
|
|
fileSystem.buildKeytabPath(keytabDir, amKeytabName, appName);
|
|
|
if (fileSystem.getFileSystem().exists(keytabPath)) {
|
|
|
LocalResource keytabRes =
|
|
|
fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
|
|
|
localResource
|
|
|
- .put(SliderKeys.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
|
|
|
+ .put(YarnServiceConstants.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
|
|
|
LOG.info("Adding AM keytab on hdfs: " + keytabPath);
|
|
|
} else {
|
|
|
LOG.warn("No keytab file was found at {}.", keytabPath);
|
|
|
- if (conf.getBoolean(KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
|
|
|
+ if (conf.getBoolean(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
|
|
|
throw new BadConfigException("No keytab file was found at %s.",
|
|
|
keytabPath);
|
|
|
} else {
|
|
@@ -704,7 +731,7 @@ public class ServiceClient extends CompositeService
|
|
|
|
|
|
public String updateLifetime(String appName, long lifetime)
|
|
|
throws YarnException, IOException {
|
|
|
- getAppIdFromPersistedApp(appName);
|
|
|
+ getAppId(appName);
|
|
|
ApplicationId currentAppId = cachedAppIds.get(appName);
|
|
|
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
|
|
|
if (report == null) {
|
|
@@ -729,11 +756,25 @@ public class ServiceClient extends CompositeService
|
|
|
|
|
|
public Application getStatus(String appName)
|
|
|
throws IOException, YarnException {
|
|
|
- ClientAMProtocol proxy = connectToAM(appName);
|
|
|
- GetStatusResponseProto response =
|
|
|
- proxy.getStatus(GetStatusRequestProto.newBuilder().build());
|
|
|
- return ServiceApiUtil.jsonSerDeser.fromJson(response.getStatus());
|
|
|
-
|
|
|
+ validateClusterName(appName);
|
|
|
+ ApplicationId currentAppId = getAppId(appName);
|
|
|
+ ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
|
|
|
+ ClientAMProtocol amProxy = getAMProxy(appName, appReport);
|
|
|
+ Application appSpec;
|
|
|
+ if (amProxy != null) {
|
|
|
+ GetStatusResponseProto response =
|
|
|
+ amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
|
|
|
+ appSpec = ServiceApiUtil.jsonSerDeser.fromJson(response.getStatus());
|
|
|
+ } else {
|
|
|
+ appSpec = new Application();
|
|
|
+ appSpec.setName(appName);
|
|
|
+ }
|
|
|
+ ApplicationTimeout lifetime =
|
|
|
+ appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
|
|
|
+ if (lifetime != null) {
|
|
|
+ appSpec.setLifetime(lifetime.getRemainingTime());
|
|
|
+ }
|
|
|
+ return appSpec;
|
|
|
}
|
|
|
|
|
|
public YarnClient getYarnClient() {
|
|
@@ -760,71 +801,61 @@ public class ServiceClient extends CompositeService
|
|
|
String[] libDirs = SliderUtils.getLibDirs();
|
|
|
if (libDirs.length > 0) {
|
|
|
File tempLibTarGzipFile = File.createTempFile(
|
|
|
- SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_NAME + "_",
|
|
|
- SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_EXT);
|
|
|
+ YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME + "_",
|
|
|
+ YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
|
|
|
// copy all jars
|
|
|
tarGzipFolder(libDirs, tempLibTarGzipFile, createJarFilter());
|
|
|
|
|
|
- LOG.info("Uploading dependency for AM (version {}) from {} to {}",
|
|
|
- VersionInfo.getBuildVersion(), tempLibTarGzipFile.toURI(),
|
|
|
- dependencyLibTarGzip.toUri());
|
|
|
+ LOG.info("Version Info: " + VersionInfo.getBuildVersion());
|
|
|
fs.copyLocalFileToHdfs(tempLibTarGzipFile, dependencyLibTarGzip,
|
|
|
- new FsPermission(SliderKeys.SLIDER_DEPENDENCY_DIR_PERMISSIONS));
|
|
|
+ new FsPermission(YarnServiceConstants.DEPENDENCY_DIR_PERMISSIONS));
|
|
|
return EXIT_SUCCESS;
|
|
|
} else {
|
|
|
return EXIT_FALSE;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected ClientAMProtocol connectToAM(String appName)
|
|
|
+ // Get AMProxy with the appReport provided
|
|
|
+ protected ClientAMProtocol getAMProxy(String appName, ApplicationReport report)
|
|
|
+ throws IOException {
|
|
|
+ if (!cachedAMProxies.containsKey(appName) && !StringUtils
|
|
|
+ .isEmpty(report.getHost())) {
|
|
|
+ insertAMProxy(appName, report.getHost(), report.getRpcPort());
|
|
|
+ }
|
|
|
+ return cachedAMProxies.get(appName);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get AMProxy without appReport provided - it'll getAppReport from RM
|
|
|
+ protected ClientAMProtocol getAMProxy(String appName)
|
|
|
throws IOException, YarnException {
|
|
|
- ApplicationId currentAppId = getAppIdFromPersistedApp(appName);
|
|
|
- // Wait until app becomes running.
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
- int pollCount = 0;
|
|
|
- ApplicationReport appReport = null;
|
|
|
- while (true) {
|
|
|
- appReport = yarnClient.getApplicationReport(currentAppId);
|
|
|
- YarnApplicationState state = appReport.getYarnApplicationState();
|
|
|
- if (state == RUNNING) {
|
|
|
- break;
|
|
|
- }
|
|
|
- if (terminatedStates.contains(state)) {
|
|
|
- throw new YarnException(
|
|
|
- "Failed to getStatus " + currentAppId + ": " + appReport
|
|
|
- .getDiagnostics());
|
|
|
- }
|
|
|
- long elapsedMillis = System.currentTimeMillis() - startTime;
|
|
|
- // if over 5 min, quit
|
|
|
- if (elapsedMillis >= 300000) {
|
|
|
- throw new YarnException(
|
|
|
- "Timed out while waiting for application " + currentAppId
|
|
|
- + " to be running");
|
|
|
- }
|
|
|
+ ApplicationId currentAppId = getAppId(appName);
|
|
|
|
|
|
- if (++pollCount % 10 == 0) {
|
|
|
- LOG.info(
|
|
|
- "Waiting for application {} to be running, current state is {}",
|
|
|
- currentAppId, state);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(3000);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- String msg =
|
|
|
- "Interrupted while waiting for application " + currentAppId
|
|
|
- + " to be running.";
|
|
|
- throw new YarnException(msg, ie);
|
|
|
+ if (cachedAMProxies.containsKey(appName)) {
|
|
|
+ return cachedAMProxies.get(appName);
|
|
|
+ } else {
|
|
|
+ ApplicationReport appReport =
|
|
|
+ yarnClient.getApplicationReport(currentAppId);
|
|
|
+ String host = appReport.getHost();
|
|
|
+ int port = appReport.getRpcPort();
|
|
|
+ if (!StringUtils.isEmpty(host)) {
|
|
|
+ return insertAMProxy(appName, host, port);
|
|
|
}
|
|
|
+ return null;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // Make the connection
|
|
|
- InetSocketAddress address = NetUtils
|
|
|
- .createSocketAddrForHost(appReport.getHost(), appReport.getRpcPort());
|
|
|
- return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
|
|
|
+ private ClientAMProtocol insertAMProxy(String appName, String host, int port)
|
|
|
+ throws IOException {
|
|
|
+ InetSocketAddress address =
|
|
|
+ NetUtils.createSocketAddrForHost(host, port);
|
|
|
+ ClientAMProtocol amProxy =
|
|
|
+ ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
|
|
|
UserGroupInformation.getCurrentUser(), rpc, address);
|
|
|
+ cachedAMProxies.put(appName, amProxy);
|
|
|
+ return amProxy;
|
|
|
}
|
|
|
|
|
|
- private synchronized ApplicationId getAppIdFromPersistedApp(String appName)
|
|
|
+ private synchronized ApplicationId getAppId(String appName)
|
|
|
throws IOException, YarnException {
|
|
|
if (cachedAppIds.containsKey(appName)) {
|
|
|
return cachedAppIds.get(appName);
|