|
@@ -0,0 +1,381 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
|
+ * this work for additional information regarding copyright ownership.
|
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
|
+ * the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.yarn.service.client;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.RemoteIterator;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.server.service.SystemServiceManager;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Service;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
|
|
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.lang.reflect.UndeclaredThrowableException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+
|
|
|
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
|
|
|
+
|
|
|
+/**
|
|
|
+ * SystemServiceManager implementation.
|
|
|
+ * Scan for configure system service path.
|
|
|
+ *
|
|
|
+ * The service path structure is as follows:
|
|
|
+ * SYSTEM_SERVICE_DIR_PATH
|
|
|
+ * |---- sync
|
|
|
+ * | |--- user1
|
|
|
+ * | | |---- service1.yarnfile
|
|
|
+ * | | |---- service2.yarnfile
|
|
|
+ * | |--- user2
|
|
|
+ * | | |---- service1.yarnfile
|
|
|
+ * | | ....
|
|
|
+ * | |
|
|
|
+ * |---- async
|
|
|
+ * | |--- user3
|
|
|
+ * | | |---- service1.yarnfile
|
|
|
+ * | | |---- service2.yarnfile
|
|
|
+ * | |--- user4
|
|
|
+ * | | |---- service1.yarnfile
|
|
|
+ * | | ....
|
|
|
+ * | |
|
|
|
+ *
|
|
|
+ * sync: These services are launched at the time of service start synchronously.
|
|
|
+ * It is a blocking service start.
|
|
|
+ * async: These services are launched in separate thread without any delay after
|
|
|
+ * service start. Non-blocking service start.
|
|
|
+ */
|
|
|
+public class SystemServiceManagerImpl extends AbstractService
|
|
|
+ implements SystemServiceManager {
|
|
|
+
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(SystemServiceManagerImpl.class);
|
|
|
+
|
|
|
+ private static final String YARN_FILE_SUFFIX = ".yarnfile";
|
|
|
+ private static final String SYNC = "sync";
|
|
|
+ private static final String ASYNC = "async";
|
|
|
+
|
|
|
+ private FileSystem fs;
|
|
|
+ private Path systemServiceDir;
|
|
|
+ private AtomicBoolean stopExecutors = new AtomicBoolean(false);
|
|
|
+ private Map<String, Set<Service>> syncUserServices = new HashMap<>();
|
|
|
+ private Map<String, Set<Service>> asyncUserServices = new HashMap<>();
|
|
|
+ private UserGroupInformation loginUGI;
|
|
|
+ private Thread serviceLaucher;
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ private int skipCounter;
|
|
|
+ @VisibleForTesting
|
|
|
+ private Map<String, Integer> ignoredUserServices =
|
|
|
+ new HashMap<>();
|
|
|
+
|
|
|
+ public SystemServiceManagerImpl() {
|
|
|
+ super(SystemServiceManagerImpl.class.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
+ String dirPath =
|
|
|
+ conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY);
|
|
|
+ if (dirPath != null) {
|
|
|
+ systemServiceDir = new Path(dirPath);
|
|
|
+ LOG.info("System Service Directory is configured to {}",
|
|
|
+ systemServiceDir);
|
|
|
+ fs = systemServiceDir.getFileSystem(conf);
|
|
|
+ this.loginUGI = UserGroupInformation.isSecurityEnabled() ?
|
|
|
+ UserGroupInformation.getLoginUser() :
|
|
|
+ UserGroupInformation.getCurrentUser();
|
|
|
+ LOG.info("UserGroupInformation initialized to {}", loginUGI);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
+ scanForUserServices();
|
|
|
+ launchUserService(syncUserServices);
|
|
|
+ // Create a thread and submit services in background otherwise it
|
|
|
+ // block RM switch time.
|
|
|
+ serviceLaucher = new Thread(createRunnable());
|
|
|
+ serviceLaucher.setName("System service launcher");
|
|
|
+ serviceLaucher.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceStop() throws Exception {
|
|
|
+ LOG.info("Stopping {}", getName());
|
|
|
+ stopExecutors.set(true);
|
|
|
+
|
|
|
+ if (serviceLaucher != null) {
|
|
|
+ serviceLaucher.interrupt();
|
|
|
+ try {
|
|
|
+ serviceLaucher.join();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted Exception while stopping", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Runnable createRunnable() {
|
|
|
+ return new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ launchUserService(asyncUserServices);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ void launchUserService(Map<String, Set<Service>> userServices) {
|
|
|
+ for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) {
|
|
|
+ String user = entry.getKey();
|
|
|
+ Set<Service> services = entry.getValue();
|
|
|
+ if (services.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ ServiceClient serviceClient = null;
|
|
|
+ try {
|
|
|
+ UserGroupInformation userUgi = getProxyUser(user);
|
|
|
+ serviceClient = createServiceClient(userUgi);
|
|
|
+ for (Service service : services) {
|
|
|
+ LOG.info("POST: createService = {} user = {}", service, userUgi);
|
|
|
+ try {
|
|
|
+ launchServices(userUgi, serviceClient, service);
|
|
|
+ } catch (IOException | UndeclaredThrowableException e) {
|
|
|
+ if (e.getCause() != null) {
|
|
|
+ LOG.warn(e.getCause().getMessage());
|
|
|
+ } else {
|
|
|
+ String message =
|
|
|
+ "Failed to create service " + service.getName() + " : ";
|
|
|
+ LOG.error(message, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("System service launcher thread interrupted", e);
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error while submitting services for user " + user, e);
|
|
|
+ } finally {
|
|
|
+ if (serviceClient != null) {
|
|
|
+ try {
|
|
|
+ serviceClient.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Error while closing serviceClient for user {}", user);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ServiceClient createServiceClient(UserGroupInformation userUgi)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ ServiceClient serviceClient =
|
|
|
+ userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() {
|
|
|
+ @Override public ServiceClient run()
|
|
|
+ throws IOException, YarnException {
|
|
|
+ ServiceClient sc = getServiceClient();
|
|
|
+ sc.init(getConfig());
|
|
|
+ sc.start();
|
|
|
+ return sc;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return serviceClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void launchServices(UserGroupInformation userUgi,
|
|
|
+ ServiceClient serviceClient, Service service)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ if (service.getState() == ServiceState.STOPPED) {
|
|
|
+ userUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
+ @Override public Void run() throws IOException, YarnException {
|
|
|
+ serviceClient.actionBuild(service);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ LOG.info("Service {} version {} saved.", service.getName(),
|
|
|
+ service.getVersion());
|
|
|
+ } else {
|
|
|
+ ApplicationId applicationId =
|
|
|
+ userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
|
|
|
+ @Override public ApplicationId run()
|
|
|
+ throws IOException, YarnException {
|
|
|
+ ApplicationId applicationId = serviceClient.actionCreate(service);
|
|
|
+ return applicationId;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ LOG.info("Service {} submitted with Application ID: {}",
|
|
|
+ service.getName(), applicationId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ServiceClient getServiceClient() {
|
|
|
+ return new ServiceClient();
|
|
|
+ }
|
|
|
+
|
|
|
+ private UserGroupInformation getProxyUser(String user) {
|
|
|
+ UserGroupInformation ugi;
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ ugi = UserGroupInformation.createProxyUser(user, loginUGI);
|
|
|
+ } else {
|
|
|
+ ugi = UserGroupInformation.createRemoteUser(user);
|
|
|
+ }
|
|
|
+ return ugi;
|
|
|
+ }
|
|
|
+
|
|
|
+ // scan for both launch service types i.e sync and async
|
|
|
+ void scanForUserServices() throws IOException {
|
|
|
+ if (systemServiceDir == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ LOG.info("Scan for launch type on {}", systemServiceDir);
|
|
|
+ RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir);
|
|
|
+ while (iterLaunchType.hasNext()) {
|
|
|
+ FileStatus launchType = iterLaunchType.next();
|
|
|
+ if (!launchType.isDirectory()) {
|
|
|
+ LOG.debug("Scanner skips for unknown file {}", launchType.getPath());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (launchType.getPath().getName().equals(SYNC)) {
|
|
|
+ scanForUserServiceDefinition(launchType.getPath(), syncUserServices);
|
|
|
+ } else if (launchType.getPath().getName().equals(ASYNC)) {
|
|
|
+ scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
|
|
|
+ } else {
|
|
|
+ LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ LOG.warn("System service directory {} doesn't not exist.",
|
|
|
+ systemServiceDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Files are under systemServiceDir/<users>. Scan for 2 levels
|
|
|
+ // 1st level for users
|
|
|
+ // 2nd level for service definitions under user
|
|
|
+ private void scanForUserServiceDefinition(Path userDirPath,
|
|
|
+ Map<String, Set<Service>> userServices) throws IOException {
|
|
|
+ LOG.info("Scan for users on {}", userDirPath);
|
|
|
+ RemoteIterator<FileStatus> iterUsers = list(userDirPath);
|
|
|
+ while (iterUsers.hasNext()) {
|
|
|
+ FileStatus userDir = iterUsers.next();
|
|
|
+ // if 1st level is not user directory then skip it.
|
|
|
+ if (!userDir.isDirectory()) {
|
|
|
+ LOG.info(
|
|
|
+ "Service definition {} doesn't belong to any user. Ignoring.. ",
|
|
|
+ userDir.getPath().getName());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String userName = userDir.getPath().getName();
|
|
|
+ LOG.info("Scanning service definitions for user {}.", userName);
|
|
|
+
|
|
|
+ //2nd level scan
|
|
|
+ RemoteIterator<FileStatus> iterServices = list(userDir.getPath());
|
|
|
+ while (iterServices.hasNext()) {
|
|
|
+ FileStatus serviceCache = iterServices.next();
|
|
|
+ String filename = serviceCache.getPath().getName();
|
|
|
+ if (!serviceCache.isFile()) {
|
|
|
+ LOG.info("Scanner skips for unknown dir {}", filename);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!filename.endsWith(YARN_FILE_SUFFIX)) {
|
|
|
+ LOG.info("Scanner skips for unknown file extension, filename = {}",
|
|
|
+ filename);
|
|
|
+ skipCounter++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ Service service = getServiceDefinition(serviceCache.getPath());
|
|
|
+ if (service != null) {
|
|
|
+ Set<Service> services = userServices.get(userName);
|
|
|
+ if (services == null) {
|
|
|
+ services = new HashSet<>();
|
|
|
+ userServices.put(userName, services);
|
|
|
+ }
|
|
|
+ if (!services.add(service)) {
|
|
|
+ int count = ignoredUserServices.containsKey(userName) ?
|
|
|
+ ignoredUserServices.get(userName) : 0;
|
|
|
+ ignoredUserServices.put(userName, count + 1);
|
|
|
+ LOG.warn(
|
|
|
+ "Ignoring service {} for the user {} as it is already present,"
|
|
|
+ + " filename = {}", service.getName(), userName, filename);
|
|
|
+ }
|
|
|
+ LOG.info("Added service {} for the user {}, filename = {}",
|
|
|
+ service.getName(), userName, filename);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Service getServiceDefinition(Path filePath) {
|
|
|
+ Service service = null;
|
|
|
+ try {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Loading service definition from FS: " + filePath);
|
|
|
+ }
|
|
|
+ service = jsonSerDeser.load(fs, filePath);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("Error while loading service definition from FS: {}", e);
|
|
|
+ }
|
|
|
+ return service;
|
|
|
+ }
|
|
|
+
|
|
|
+ private RemoteIterator<FileStatus> list(Path path) throws IOException {
|
|
|
+ return new StoppableRemoteIterator(fs.listStatusIterator(path));
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting Map<String, Integer> getIgnoredUserServices() {
|
|
|
+ return ignoredUserServices;
|
|
|
+ }
|
|
|
+
|
|
|
+ private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
|
|
|
+ private final RemoteIterator<FileStatus> remote;
|
|
|
+
|
|
|
+ StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
|
|
|
+ this.remote = remote;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override public boolean hasNext() throws IOException {
|
|
|
+ return !stopExecutors.get() && remote.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override public FileStatus next() throws IOException {
|
|
|
+ return remote.next();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ Map<String, Set<Service>> getSyncUserServices() {
|
|
|
+ return syncUserServices;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting int getSkipCounter() {
|
|
|
+ return skipCounter;
|
|
|
+ }
|
|
|
+}
|