|
@@ -0,0 +1,430 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
|
+ * this work for additional information regarding copyright ownership.
|
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
|
+ * the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.yarn.service.client;
|
|
|
+
|
|
|
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.text.MessageFormat;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import javax.ws.rs.core.MediaType;
|
|
|
+
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
|
|
|
+import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Component;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Service;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
|
|
|
+import org.apache.hadoop.yarn.util.RMHAUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import com.sun.jersey.api.client.Client;
|
|
|
+import com.sun.jersey.api.client.ClientResponse;
|
|
|
+import com.sun.jersey.api.client.WebResource;
|
|
|
+import com.sun.jersey.api.client.WebResource.Builder;
|
|
|
+import com.sun.jersey.api.client.config.ClientConfig;
|
|
|
+import com.sun.jersey.api.client.config.DefaultClientConfig;
|
|
|
+
|
|
|
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * The rest API client for users to manage services on YARN.
|
|
|
+ */
|
|
|
+public class ApiServiceClient extends AppAdminClient {
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(ApiServiceClient.class);
|
|
|
+ protected YarnClient yarnClient;
|
|
|
+
|
|
|
+ @Override protected void serviceInit(Configuration configuration)
|
|
|
+ throws Exception {
|
|
|
+ yarnClient = YarnClient.createYarnClient();
|
|
|
+ addService(yarnClient);
|
|
|
+ super.serviceInit(configuration);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calculate Resource Manager address base on working REST API.
|
|
|
+ */
|
|
|
+ private String getRMWebAddress() {
|
|
|
+ Configuration conf = getConfig();
|
|
|
+ String scheme = "http://";
|
|
|
+ String path = "/app/v1/services/version";
|
|
|
+ String rmAddress = conf
|
|
|
+ .get("yarn.resourcemanager.webapp.address");
|
|
|
+ if(conf.getBoolean("hadoop.ssl.enabled", false)) {
|
|
|
+ scheme = "https://";
|
|
|
+ rmAddress = conf
|
|
|
+ .get("yarn.resourcemanager.webapp.https.address");
|
|
|
+ }
|
|
|
+
|
|
|
+ List<String> rmServers = RMHAUtils
|
|
|
+ .getRMHAWebappAddresses(new YarnConfiguration(conf));
|
|
|
+ for (String host : rmServers) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append(scheme);
|
|
|
+ sb.append(host);
|
|
|
+ sb.append(path);
|
|
|
+ Client client = Client.create();
|
|
|
+ WebResource webResource = client
|
|
|
+ .resource(sb.toString());
|
|
|
+ String test = webResource.get(String.class);
|
|
|
+ if (test.contains("hadoop_version")) {
|
|
|
+ rmAddress = host;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return scheme+rmAddress;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute active resource manager API service location.
|
|
|
+ *
|
|
|
+ * @param appName - YARN service name
|
|
|
+ * @return URI to API Service
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private String getApiUrl(String appName) throws IOException {
|
|
|
+ String url = getRMWebAddress();
|
|
|
+ StringBuilder api = new StringBuilder();
|
|
|
+ api.append(url);
|
|
|
+ api.append("/app/v1/services");
|
|
|
+ if (appName != null) {
|
|
|
+ api.append("/");
|
|
|
+ api.append(appName);
|
|
|
+ }
|
|
|
+ return api.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Builder getApiClient() throws IOException {
|
|
|
+ return getApiClient(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Setup API service web request.
|
|
|
+ *
|
|
|
+ * @param appName
|
|
|
+ * @return
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private Builder getApiClient(String appName) throws IOException {
|
|
|
+ Client client = Client.create(getClientConfig());
|
|
|
+ Configuration conf = getConfig();
|
|
|
+ client.setChunkedEncodingSize(null);
|
|
|
+ Builder builder = client
|
|
|
+ .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON);
|
|
|
+ if (conf.get("hadoop.security.authentication").equals("kerberos")) {
|
|
|
+ AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
|
|
+ builder.header("WWW-Authenticate", token);
|
|
|
+ }
|
|
|
+ return builder
|
|
|
+ .accept("application/json;charset=utf-8");
|
|
|
+ }
|
|
|
+
|
|
|
+ private ClientConfig getClientConfig() {
|
|
|
+ ClientConfig config = new DefaultClientConfig();
|
|
|
+ config.getProperties().put(
|
|
|
+ ClientConfig.PROPERTY_CHUNKED_ENCODING_SIZE, 0);
|
|
|
+ config.getProperties().put(
|
|
|
+ ClientConfig.PROPERTY_BUFFER_RESPONSE_ENTITY_ON_EXCEPTION, true);
|
|
|
+ return config;
|
|
|
+ }
|
|
|
+
|
|
|
+ private int processResponse(ClientResponse response) {
|
|
|
+ response.bufferEntity();
|
|
|
+ if (response.getStatus() >= 299) {
|
|
|
+ String error = "";
|
|
|
+ try {
|
|
|
+ ServiceStatus ss = response.getEntity(ServiceStatus.class);
|
|
|
+ error = ss.getDiagnostics();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ error = response.getEntity(String.class);
|
|
|
+ }
|
|
|
+ LOG.error(error);
|
|
|
+ return EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ LOG.info(response.toString());
|
|
|
+ return EXIT_SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Utility method to load Service json from disk or from
|
|
|
+ * YARN examples.
|
|
|
+ *
|
|
|
+ * @param fileName - path to yarnfile
|
|
|
+ * @param serviceName - YARN Service Name
|
|
|
+ * @param lifetime - application lifetime
|
|
|
+ * @param queue - Queue to submit application
|
|
|
+ * @return
|
|
|
+ * @throws IOException
|
|
|
+ * @throws YarnException
|
|
|
+ */
|
|
|
+ public Service loadAppJsonFromLocalFS(String fileName, String serviceName,
|
|
|
+ Long lifetime, String queue) throws IOException, YarnException {
|
|
|
+ File file = new File(fileName);
|
|
|
+ if (!file.exists() && fileName.equals(file.getName())) {
|
|
|
+ String examplesDirStr = System.getenv("YARN_SERVICE_EXAMPLES_DIR");
|
|
|
+ String[] examplesDirs;
|
|
|
+ if (examplesDirStr == null) {
|
|
|
+ String yarnHome = System
|
|
|
+ .getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
|
|
|
+ examplesDirs = new String[]{
|
|
|
+ yarnHome + "/share/hadoop/yarn/yarn-service-examples",
|
|
|
+ yarnHome + "/yarn-service-examples"
|
|
|
+ };
|
|
|
+ } else {
|
|
|
+ examplesDirs = StringUtils.split(examplesDirStr, ":");
|
|
|
+ }
|
|
|
+ for (String dir : examplesDirs) {
|
|
|
+ file = new File(MessageFormat.format("{0}/{1}/{2}.json",
|
|
|
+ dir, fileName, fileName));
|
|
|
+ if (file.exists()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ // Then look for secondary location.
|
|
|
+ file = new File(MessageFormat.format("{0}/{1}.json",
|
|
|
+ dir, fileName));
|
|
|
+ if (file.exists()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!file.exists()) {
|
|
|
+ throw new YarnException("File or example could not be found: " +
|
|
|
+ fileName);
|
|
|
+ }
|
|
|
+ Path filePath = new Path(file.getAbsolutePath());
|
|
|
+ LOG.info("Loading service definition from local FS: " + filePath);
|
|
|
+ Service service = jsonSerDeser
|
|
|
+ .load(FileSystem.getLocal(getConfig()), filePath);
|
|
|
+ if (!StringUtils.isEmpty(serviceName)) {
|
|
|
+ service.setName(serviceName);
|
|
|
+ }
|
|
|
+ if (lifetime != null && lifetime > 0) {
|
|
|
+ service.setLifetime(lifetime);
|
|
|
+ }
|
|
|
+ if (!StringUtils.isEmpty(queue)) {
|
|
|
+ service.setQueue(queue);
|
|
|
+ }
|
|
|
+ return service;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Launch YARN service application.
|
|
|
+ *
|
|
|
+ * @param fileName - path to yarnfile
|
|
|
+ * @param appName - YARN Service Name
|
|
|
+ * @param lifetime - application lifetime
|
|
|
+ * @param queue - Queue to submit application
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int actionLaunch(String fileName, String appName, Long lifetime,
|
|
|
+ String queue) throws IOException, YarnException {
|
|
|
+ int result = EXIT_SUCCESS;
|
|
|
+ try {
|
|
|
+ Service service =
|
|
|
+ loadAppJsonFromLocalFS(fileName, appName, lifetime, queue);
|
|
|
+ String buffer = jsonSerDeser.toJson(service);
|
|
|
+ ClientResponse response = getApiClient()
|
|
|
+ .post(ClientResponse.class, buffer);
|
|
|
+ result = processResponse(response);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to launch application: ", e);
|
|
|
+ result = EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop YARN service application.
|
|
|
+ *
|
|
|
+ * @param appName - YARN Service Name
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int actionStop(String appName) throws IOException, YarnException {
|
|
|
+ int result = EXIT_SUCCESS;
|
|
|
+ try {
|
|
|
+ Service service = new Service();
|
|
|
+ service.setName(appName);
|
|
|
+ service.setState(ServiceState.STOPPED);
|
|
|
+ String buffer = jsonSerDeser.toJson(service);
|
|
|
+ ClientResponse response = getApiClient(appName)
|
|
|
+ .put(ClientResponse.class, buffer);
|
|
|
+ result = processResponse(response);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to stop application: ", e);
|
|
|
+ result = EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start YARN service application.
|
|
|
+ *
|
|
|
+ * @param appName - YARN Service Name
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int actionStart(String appName) throws IOException, YarnException {
|
|
|
+ int result = EXIT_SUCCESS;
|
|
|
+ try {
|
|
|
+ Service service = new Service();
|
|
|
+ service.setName(appName);
|
|
|
+ service.setState(ServiceState.STARTED);
|
|
|
+ String buffer = jsonSerDeser.toJson(service);
|
|
|
+ ClientResponse response = getApiClient(appName)
|
|
|
+ .put(ClientResponse.class, buffer);
|
|
|
+ result = processResponse(response);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to start application: ", e);
|
|
|
+ result = EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Save Service configuration.
|
|
|
+ *
|
|
|
+ * @param fileName - path to Yarnfile
|
|
|
+ * @param appName - YARN Service Name
|
|
|
+ * @param lifetime - container life time
|
|
|
+ * @param queue - Queue to submit the application
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int actionSave(String fileName, String appName, Long lifetime,
|
|
|
+ String queue) throws IOException, YarnException {
|
|
|
+ int result = EXIT_SUCCESS;
|
|
|
+ try {
|
|
|
+ Service service =
|
|
|
+ loadAppJsonFromLocalFS(fileName, appName, lifetime, queue);
|
|
|
+ service.setState(ServiceState.STOPPED);
|
|
|
+ String buffer = jsonSerDeser.toJson(service);
|
|
|
+ ClientResponse response = getApiClient()
|
|
|
+ .post(ClientResponse.class, buffer);
|
|
|
+ result = processResponse(response);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to save application: ", e);
|
|
|
+ result = EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Decommission a YARN service.
|
|
|
+ *
|
|
|
+ * @param appName - YARN Service Name
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int actionDestroy(String appName) throws IOException, YarnException {
|
|
|
+ int result = EXIT_SUCCESS;
|
|
|
+ try {
|
|
|
+ ClientResponse response = getApiClient(appName)
|
|
|
+ .delete(ClientResponse.class);
|
|
|
+ result = processResponse(response);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to destroy application: ", e);
|
|
|
+ result = EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Change number of containers associated with a service.
|
|
|
+ *
|
|
|
+ * @param appName - YARN Service Name
|
|
|
+ * @param cmponentCounts - list of components and desired container count
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int actionFlex(String appName, Map<String, String> componentCounts)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ int result = EXIT_SUCCESS;
|
|
|
+ try {
|
|
|
+ Service service = new Service();
|
|
|
+ service.setName(appName);
|
|
|
+ service.setState(ServiceState.FLEX);
|
|
|
+ for (Map.Entry<String, String> entry : componentCounts.entrySet()) {
|
|
|
+ Component component = new Component();
|
|
|
+ component.setName(entry.getKey());
|
|
|
+ Long numberOfContainers = Long.parseLong(entry.getValue());
|
|
|
+ component.setNumberOfContainers(numberOfContainers);
|
|
|
+ service.addComponent(component);
|
|
|
+ }
|
|
|
+ String buffer = jsonSerDeser.toJson(service);
|
|
|
+ ClientResponse response = getApiClient(appName)
|
|
|
+ .put(ClientResponse.class, buffer);
|
|
|
+ result = processResponse(response);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to flex application: ", e);
|
|
|
+ result = EXIT_EXCEPTION_THROWN;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int enableFastLaunch() throws IOException, YarnException {
|
|
|
+ ServiceClient sc = new ServiceClient();
|
|
|
+ sc.init(getConfig());
|
|
|
+ sc.start();
|
|
|
+ int result = sc.enableFastLaunch();
|
|
|
+ sc.close();
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Retrieve Service Status through REST API.
|
|
|
+ *
|
|
|
+ * @param applicationId - YARN application ID
|
|
|
+ * @return Status output
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public String getStatusString(String applicationId) throws IOException,
|
|
|
+ YarnException {
|
|
|
+ String output = "";
|
|
|
+ try {
|
|
|
+ ApplicationReport appReport = yarnClient
|
|
|
+ .getApplicationReport(ApplicationId.fromString(applicationId));
|
|
|
+
|
|
|
+ String appName = appReport.getName();
|
|
|
+ ClientResponse response = getApiClient(appName)
|
|
|
+ .get(ClientResponse.class);
|
|
|
+ if (response.getStatus() != 200) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append(appName);
|
|
|
+ sb.append(" Failed : HTTP error code : ");
|
|
|
+ sb.append(response.getStatus());
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+ output = response.getEntity(String.class);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Fail to check application status: ", e);
|
|
|
+ }
|
|
|
+ return output;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|