|
@@ -18,15 +18,33 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
|
|
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
import java.net.URI;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.Timer;
|
|
|
+import java.util.TimerTask;
|
|
|
import java.util.regex.Pattern;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceFile;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -69,54 +87,91 @@ public class AuxServices extends AbstractService
|
|
|
public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
|
|
|
new FsPermission((short) 0700);
|
|
|
|
|
|
+ public static final String CLASS_NAME = "class.name";
|
|
|
+ public static final String SYSTEM_CLASSES = "system.classes";
|
|
|
+
|
|
|
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
|
|
|
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(AuxServices.class);
|
|
|
private static final String DEL_SUFFIX = "_DEL_";
|
|
|
|
|
|
- protected final Map<String,AuxiliaryService> serviceMap;
|
|
|
- protected final Map<String,ByteBuffer> serviceMetaData;
|
|
|
+ private final Map<String, AuxiliaryService> serviceMap;
|
|
|
+ private final Map<String, AuxServiceRecord> serviceRecordMap;
|
|
|
+ private final Map<String, ByteBuffer> serviceMetaData;
|
|
|
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
|
|
|
private final LocalDirsHandlerService dirsHandler;
|
|
|
private final DeletionService delService;
|
|
|
private final UserGroupInformation userUGI;
|
|
|
|
|
|
+ private final FsPermission storeDirPerms = new FsPermission((short)0700);
|
|
|
+ private Path stateStoreRoot = null;
|
|
|
+ private FileSystem stateStoreFs = null;
|
|
|
+
|
|
|
+ private Path manifest;
|
|
|
+ private FileSystem manifestFS;
|
|
|
+ private Timer manifestReloadTimer;
|
|
|
+ private TimerTask manifestReloadTask;
|
|
|
+ private long manifestReloadInterval;
|
|
|
+ private long manifestModifyTS = -1;
|
|
|
+
|
|
|
+ private final ObjectMapper mapper;
|
|
|
+
|
|
|
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
|
|
|
|
|
|
- public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
|
|
|
+ AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
|
|
|
Context nmContext, DeletionService deletionService) {
|
|
|
super(AuxServices.class.getName());
|
|
|
serviceMap =
|
|
|
- Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
|
|
|
+ Collections.synchronizedMap(new HashMap<String, AuxiliaryService>());
|
|
|
+ serviceRecordMap =
|
|
|
+ Collections.synchronizedMap(new HashMap<String, AuxServiceRecord>());
|
|
|
serviceMetaData =
|
|
|
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
|
|
|
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
|
|
|
this.dirsHandler = nmContext.getLocalDirsHandler();
|
|
|
this.delService = deletionService;
|
|
|
this.userUGI = getRemoteUgi();
|
|
|
+ this.mapper = new ObjectMapper();
|
|
|
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
|
|
// Obtain services from configuration in init()
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Adds a service to the service map.
|
|
|
+ *
|
|
|
+ * @param name aux service name
|
|
|
+ * @param service aux service
|
|
|
+ * @param serviceRecord aux service record
|
|
|
+ */
|
|
|
protected final synchronized void addService(String name,
|
|
|
- AuxiliaryService service) {
|
|
|
- LOG.info("Adding auxiliary service " +
|
|
|
- service.getName() + ", \"" + name + "\"");
|
|
|
+ AuxiliaryService service, AuxServiceRecord serviceRecord) {
|
|
|
+ LOG.info("Adding auxiliary service " + serviceRecord.getName() +
|
|
|
+ " version " + serviceRecord.getVersion());
|
|
|
serviceMap.put(name, service);
|
|
|
+ serviceRecordMap.put(name, serviceRecord);
|
|
|
}
|
|
|
|
|
|
Collection<AuxiliaryService> getServices() {
|
|
|
return Collections.unmodifiableCollection(serviceMap.values());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Gets current aux service records.
|
|
|
+ *
|
|
|
+ * @return a collection of service records
|
|
|
+ */
|
|
|
+ public Collection<AuxServiceRecord> getServiceRecords() {
|
|
|
+ return Collections.unmodifiableCollection(serviceRecordMap.values());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @return the meta data for all registered services, that have been started.
|
|
|
* If a service has not been started no metadata will be available. The key
|
|
|
* is the name of the service as defined in the configuration.
|
|
|
*/
|
|
|
public Map<String, ByteBuffer> getMetaData() {
|
|
|
- Map<String, ByteBuffer> metaClone = new HashMap<String, ByteBuffer>(
|
|
|
- serviceMetaData.size());
|
|
|
+ Map<String, ByteBuffer> metaClone = new HashMap<>(serviceMetaData.size());
|
|
|
synchronized (serviceMetaData) {
|
|
|
for (Entry<String, ByteBuffer> entry : serviceMetaData.entrySet()) {
|
|
|
metaClone.put(entry.getKey(), entry.getValue().duplicate());
|
|
@@ -125,11 +180,470 @@ public class AuxServices extends AbstractService
|
|
|
return metaClone;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Creates an auxiliary service from a specification using the Configuration
|
|
|
+ * classloader.
|
|
|
+ *
|
|
|
+ * @param service aux service record
|
|
|
+ * @return auxiliary service
|
|
|
+ */
|
|
|
+ private AuxiliaryService createAuxServiceFromConfiguration(AuxServiceRecord
|
|
|
+ service) {
|
|
|
+ Configuration c = new Configuration(false);
|
|
|
+ c.set(CLASS_NAME, getClassName(service));
|
|
|
+ Class<? extends AuxiliaryService> sClass = c.getClass(CLASS_NAME,
|
|
|
+ null, AuxiliaryService.class);
|
|
|
+
|
|
|
+ if (sClass == null) {
|
|
|
+ throw new YarnRuntimeException("No class defined for auxiliary " +
|
|
|
+ "service" + service.getName());
|
|
|
+ }
|
|
|
+ return ReflectionUtils.newInstance(sClass, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates an auxiliary service from a specification using a custom local
|
|
|
+ * classpath.
|
|
|
+ *
|
|
|
+ * @param service aux service record
|
|
|
+ * @param appLocalClassPath local class path
|
|
|
+ * @param conf configuration
|
|
|
+ * @return auxiliary service
|
|
|
+ * @throws IOException
|
|
|
+ * @throws ClassNotFoundException
|
|
|
+ */
|
|
|
+ private AuxiliaryService createAuxServiceFromLocalClasspath(
|
|
|
+ AuxServiceRecord service, String appLocalClassPath, Configuration conf)
|
|
|
+ throws IOException, ClassNotFoundException {
|
|
|
+ Preconditions.checkArgument(appLocalClassPath != null &&
|
|
|
+ !appLocalClassPath.isEmpty(),
|
|
|
+ "local classpath was null in createAuxServiceFromLocalClasspath");
|
|
|
+ final String sName = service.getName();
|
|
|
+ final String className = getClassName(service);
|
|
|
+
|
|
|
+ if (service.getConfiguration() != null && service.getConfiguration()
|
|
|
+ .getFiles().size() > 0) {
|
|
|
+ throw new YarnRuntimeException("The aux service:" + sName
|
|
|
+ + " has configured local classpath:" + appLocalClassPath
|
|
|
+ + " and config files:" + service.getConfiguration().getFiles()
|
|
|
+ + ". Only one of them should be configured.");
|
|
|
+ }
|
|
|
+
|
|
|
+ return AuxiliaryServiceWithCustomClassLoader.getInstance(conf, className,
|
|
|
+ appLocalClassPath, getSystemClasses(service, className));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates an auxiliary service from a specification.
|
|
|
+ *
|
|
|
+ * @param service aux service record
|
|
|
+ * @param conf configuration
|
|
|
+ * @param fromConfiguration true if from configuration, false if from manifest
|
|
|
+ * @return auxiliary service
|
|
|
+ * @throws IOException
|
|
|
+ * @throws ClassNotFoundException
|
|
|
+ */
|
|
|
+ private AuxiliaryService createAuxService(AuxServiceRecord service,
|
|
|
+ Configuration conf, boolean fromConfiguration) throws IOException,
|
|
|
+ ClassNotFoundException {
|
|
|
+ final String sName = service.getName();
|
|
|
+ final String className = getClassName(service);
|
|
|
+ if (className == null || className.isEmpty()) {
|
|
|
+ throw new YarnRuntimeException("Class name not provided for auxiliary " +
|
|
|
+ "service " + sName);
|
|
|
+ }
|
|
|
+ if (fromConfiguration) {
|
|
|
+ // aux services from the configuration have an additional configuration
|
|
|
+ // option specifying a local classpath that will not be localized
|
|
|
+ final String appLocalClassPath = conf.get(String.format(
|
|
|
+ YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
|
|
|
+ if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
|
|
|
+ return createAuxServiceFromLocalClasspath(service, appLocalClassPath,
|
|
|
+ conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ AuxServiceConfiguration serviceConf = service.getConfiguration();
|
|
|
+ List<Path> destFiles = new ArrayList<>();
|
|
|
+ if (serviceConf != null) {
|
|
|
+ List<AuxServiceFile> files = serviceConf.getFiles();
|
|
|
+ if (files != null) {
|
|
|
+ for (AuxServiceFile file : files) {
|
|
|
+ // localize file (if needed) and add it to the list of paths that
|
|
|
+ // will become the classpath
|
|
|
+ destFiles.add(maybeDownloadJars(sName, className, file.getSrcFile(),
|
|
|
+ file.getType(), conf));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (destFiles.size() > 0) {
|
|
|
+ // create aux service using a custom localized classpath
|
|
|
+ LOG.info("The aux service:" + sName
|
|
|
+ + " is using the custom classloader with classpath " + destFiles);
|
|
|
+ return AuxiliaryServiceWithCustomClassLoader.getInstance(conf,
|
|
|
+ className, StringUtils.join(File.pathSeparatorChar, destFiles),
|
|
|
+ getSystemClasses(service, className));
|
|
|
+ } else {
|
|
|
+ return createAuxServiceFromConfiguration(service);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Copies the specified remote file to local NM aux service directory. If the
|
|
|
+ * same file already exists (as determined by modification time), the file
|
|
|
+ * will not be copied again.
|
|
|
+ *
|
|
|
+ * @param sName service name
|
|
|
+ * @param className service class name
|
|
|
+ * @param remoteFile location of the file to download
|
|
|
+ * @param type type of file (STATIC for a jar or ARCHIVE for a tarball)
|
|
|
+ * @param conf configuration
|
|
|
+ * @return path of the downloaded file
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private Path maybeDownloadJars(String sName, String className, String
|
|
|
+ remoteFile, AuxServiceFile.TypeEnum type, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ // load AuxiliaryService from remote classpath
|
|
|
+ FileContext localLFS = getLocalFileContext(conf);
|
|
|
+ // create NM aux-service dir in NM localdir if it does not exist.
|
|
|
+ Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
|
|
|
+ + Path.SEPARATOR + NM_AUX_SERVICE_DIR);
|
|
|
+ if (!localLFS.util().exists(nmAuxDir)) {
|
|
|
+ try {
|
|
|
+ localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ throw new YarnRuntimeException("Fail to create dir:"
|
|
|
+ + nmAuxDir.toString(), ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Path src = new Path(remoteFile);
|
|
|
+ FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
|
|
|
+ FileStatus scFileStatus = remoteLFS.getFileStatus(src);
|
|
|
+ if (!scFileStatus.getOwner().equals(
|
|
|
+ this.userUGI.getShortUserName())) {
|
|
|
+ throw new YarnRuntimeException("The remote jarfile owner:"
|
|
|
+ + scFileStatus.getOwner() + " is not the same as the NM user:"
|
|
|
+ + this.userUGI.getShortUserName() + ".");
|
|
|
+ }
|
|
|
+ if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
|
|
|
+ throw new YarnRuntimeException("The remote jarfile should not "
|
|
|
+ + "be writable by group or others. "
|
|
|
+ + "The current Permission is "
|
|
|
+ + scFileStatus.getPermission().toShort());
|
|
|
+ }
|
|
|
+ Path downloadDest = new Path(nmAuxDir,
|
|
|
+ className + "_" + scFileStatus.getModificationTime());
|
|
|
+ // check whether we need to re-download the jar
|
|
|
+ // from remote directory
|
|
|
+ Path targetDirPath = new Path(downloadDest,
|
|
|
+ scFileStatus.getPath().getName());
|
|
|
+ FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
|
|
|
+ for (FileStatus sub : allSubDirs) {
|
|
|
+ if (sub.getPath().getName().equals(downloadDest.getName())) {
|
|
|
+ return new Path(targetDirPath + Path.SEPARATOR + "*");
|
|
|
+ } else {
|
|
|
+ if (sub.getPath().getName().contains(className) &&
|
|
|
+ !sub.getPath().getName().endsWith(DEL_SUFFIX)) {
|
|
|
+ Path delPath = new Path(sub.getPath().getParent(),
|
|
|
+ sub.getPath().getName() + DEL_SUFFIX);
|
|
|
+ localLFS.rename(sub.getPath(), delPath);
|
|
|
+ LOG.info("delete old aux service jar dir:"
|
|
|
+ + delPath.toString());
|
|
|
+ FileDeletionTask deletionTask = new FileDeletionTask(
|
|
|
+ this.delService, null, delPath, null);
|
|
|
+ this.delService.delete(deletionTask);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LocalResourceType srcType;
|
|
|
+ if (type == AuxServiceFile.TypeEnum.STATIC) {
|
|
|
+ srcType = LocalResourceType.FILE;
|
|
|
+ } else if (type == AuxServiceFile.TypeEnum.ARCHIVE) {
|
|
|
+ srcType = LocalResourceType.ARCHIVE;
|
|
|
+ } else {
|
|
|
+ throw new YarnRuntimeException(
|
|
|
+ "Cannot unpack file of type " + type + " from remote-file-path:" +
|
|
|
+ src + "for aux-service:" + ".\n");
|
|
|
+ }
|
|
|
+ LocalResource scRsrc = LocalResource.newInstance(
|
|
|
+ URL.fromURI(src.toUri()),
|
|
|
+ srcType, LocalResourceVisibility.PRIVATE,
|
|
|
+ scFileStatus.getLen(), scFileStatus.getModificationTime());
|
|
|
+ FSDownload download = new FSDownload(localLFS, null, conf,
|
|
|
+ downloadDest, scRsrc, null);
|
|
|
+ try {
|
|
|
+ // don't need to convert downloaded path into a dir
|
|
|
+ // since it's already a jar path.
|
|
|
+ return download.call();
|
|
|
+ } catch (Exception ex) {
|
|
|
+ throw new YarnRuntimeException(
|
|
|
+ "Exception happend while downloading files "
|
|
|
+ + "for aux-service:" + sName + " and remote-file-path:"
|
|
|
+ + src + ".\n" + ex.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If recovery is enabled, creates a recovery directory for the named
|
|
|
+ * service and sets it on the service.
|
|
|
+ *
|
|
|
+ * @param sName name of the service
|
|
|
+ * @param s auxiliary service
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void setStateStoreDir(String sName, AuxiliaryService s) throws
|
|
|
+ IOException {
|
|
|
+ if (stateStoreRoot != null) {
|
|
|
+ Path storePath = new Path(stateStoreRoot, sName);
|
|
|
+ stateStoreFs.mkdirs(storePath, storeDirPerms);
|
|
|
+ s.setRecoveryPath(storePath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Removes a service from the service map and stops it, if it exists.
|
|
|
+ *
|
|
|
+ * @param sName name of the service
|
|
|
+ */
|
|
|
+ private synchronized void maybeRemoveAuxService(String sName) {
|
|
|
+ AuxiliaryService s;
|
|
|
+ s = serviceMap.remove(sName);
|
|
|
+ serviceRecordMap.remove(sName);
|
|
|
+ serviceMetaData.remove(sName);
|
|
|
+ if (s != null) {
|
|
|
+ stopAuxService(s);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Constructs an AuxiliaryService then configures and initializes it based
|
|
|
+ * on a service specification.
|
|
|
+ *
|
|
|
+ * @param service aux service record
|
|
|
+ * @param conf configuration
|
|
|
+ * @param fromConfiguration true if from configuration, false if from manifest
|
|
|
+ * @return aux service
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private AuxiliaryService initAuxService(AuxServiceRecord service,
|
|
|
+ Configuration conf, boolean fromConfiguration) throws IOException {
|
|
|
+ final String sName = service.getName();
|
|
|
+ AuxiliaryService s;
|
|
|
+ try {
|
|
|
+ Preconditions
|
|
|
+ .checkArgument(
|
|
|
+ validateAuxServiceName(sName),
|
|
|
+ "The auxiliary service name: " + sName + " is invalid. " +
|
|
|
+ "The valid service name should only contain a-zA-Z0-9_ " +
|
|
|
+ "and cannot start with numbers.");
|
|
|
+ s = createAuxService(service, conf, fromConfiguration);
|
|
|
+ if (s == null) {
|
|
|
+ throw new YarnRuntimeException("No auxiliary service class loaded for" +
|
|
|
+ " " + sName);
|
|
|
+ }
|
|
|
+ // TODO better use s.getName()?
|
|
|
+ if (!sName.equals(s.getName())) {
|
|
|
+ LOG.warn("The Auxiliary Service named '" + sName + "' in the "
|
|
|
+ + "configuration is for " + s.getClass() + " which has "
|
|
|
+ + "a name of '" + s.getName() + "'. Because these are "
|
|
|
+ + "not the same tools trying to send ServiceData and read "
|
|
|
+ + "Service Meta Data may have issues unless the refer to "
|
|
|
+ + "the name in the config.");
|
|
|
+ }
|
|
|
+ s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
|
|
|
+ setStateStoreDir(sName, s);
|
|
|
+ Configuration customConf = new Configuration(conf);
|
|
|
+ if (service.getConfiguration() != null) {
|
|
|
+ for (Entry<String, String> entry : service.getConfiguration()
|
|
|
+ .getProperties().entrySet()) {
|
|
|
+ customConf.set(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ s.init(customConf);
|
|
|
+
|
|
|
+ LOG.info("Initialized auxiliary service " + sName);
|
|
|
+ } catch (RuntimeException e) {
|
|
|
+ LOG.error("Failed to initialize " + sName, e);
|
|
|
+ throw e;
|
|
|
+ } catch (ClassNotFoundException e) {
|
|
|
+ throw new YarnRuntimeException(e);
|
|
|
+ }
|
|
|
+ return s;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reloads auxiliary services manifest. Must be called after service init.
|
|
|
+ *
|
|
|
+ * @throws IOException if manifest can't be loaded
|
|
|
+ */
|
|
|
+ private void reloadManifest() throws IOException {
|
|
|
+ loadManifest(getConfig(), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reads the manifest file if it is configured, exists, and has not been
|
|
|
+ * modified since the last read.
|
|
|
+ *
|
|
|
+ * @return aux service records
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private synchronized AuxServiceRecords maybeReadManifestFile() throws
|
|
|
+ IOException {
|
|
|
+ if (manifest == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (!manifestFS.exists(manifest)) {
|
|
|
+ LOG.info("Manifest file " + manifest + " doesn't exist");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ FileStatus status;
|
|
|
+ try {
|
|
|
+ status = manifestFS.getFileStatus(manifest);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ LOG.info("Manifest file " + manifest + " doesn't exist");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (status.getModificationTime() == manifestModifyTS) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ manifestModifyTS = status.getModificationTime();
|
|
|
+ LOG.info("Reading auxiliary services manifest " + manifest);
|
|
|
+ try (FSDataInputStream in = manifestFS.open(manifest)) {
|
|
|
+ return mapper.readValue((InputStream) in, AuxServiceRecords.class);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Updates current aux services based on changes found in the manifest.
|
|
|
+ *
|
|
|
+ * @param conf configuration
|
|
|
+ * @param startServices if true starts services, otherwise only inits services
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private synchronized void loadManifest(Configuration conf, boolean
|
|
|
+ startServices) throws IOException {
|
|
|
+ AuxServiceRecords services = maybeReadManifestFile();
|
|
|
+ if (services == null) {
|
|
|
+ // read did not occur or no changes detected
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Set<String> loadedAuxServices = new HashSet<>();
|
|
|
+ boolean foundChanges = false;
|
|
|
+ if (services.getServices() != null) {
|
|
|
+ for (AuxServiceRecord service : services.getServices()) {
|
|
|
+ AuxServiceRecord existingService = serviceRecordMap.get(service
|
|
|
+ .getName());
|
|
|
+ loadedAuxServices.add(service.getName());
|
|
|
+ if (existingService != null && existingService.equals(service)) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Auxiliary service already loaded: " + service.getName());
|
|
|
+ }
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ foundChanges = true;
|
|
|
+ try {
|
|
|
+ // stop aux service
|
|
|
+ maybeRemoveAuxService(service.getName());
|
|
|
+ // init aux service
|
|
|
+ AuxiliaryService s = initAuxService(service, conf, false);
|
|
|
+ if (startServices) {
|
|
|
+ // start aux service
|
|
|
+ startAuxService(service.getName(), s, service);
|
|
|
+ }
|
|
|
+ // add aux service to serviceMap
|
|
|
+ addService(service.getName(), s, service);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Failed to load auxiliary service " + service.getName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // remove aux services that do not appear in the manifest
|
|
|
+ List<String> servicesToRemove = new ArrayList<>();
|
|
|
+ for (String sName : serviceMap.keySet()) {
|
|
|
+ if (!loadedAuxServices.contains(sName)) {
|
|
|
+ foundChanges = true;
|
|
|
+ servicesToRemove.add(sName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (String sName : servicesToRemove) {
|
|
|
+ LOG.info("Removing aux service " + sName);
|
|
|
+ maybeRemoveAuxService(sName);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!foundChanges) {
|
|
|
+ LOG.info("No auxiliary services changes detected in manifest");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getClassName(AuxServiceRecord service) {
|
|
|
+ AuxServiceConfiguration serviceConf = service.getConfiguration();
|
|
|
+ if (serviceConf == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return serviceConf.getProperty(CLASS_NAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String[] getSystemClasses(AuxServiceRecord service, String
|
|
|
+ className) {
|
|
|
+ AuxServiceConfiguration serviceConf =
|
|
|
+ service.getConfiguration();
|
|
|
+ if (serviceConf == null) {
|
|
|
+ return new String[]{className};
|
|
|
+ }
|
|
|
+ return StringUtils.split(serviceConf.getProperty(SYSTEM_CLASSES,
|
|
|
+ className));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Translates an aux service specified in the Configuration to an aux
|
|
|
+ * service record.
|
|
|
+ *
|
|
|
+ * @param sName aux service name
|
|
|
+ * @param conf configuration
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static AuxServiceRecord createServiceRecordFromConfiguration(String
|
|
|
+ sName, Configuration conf) {
|
|
|
+ String className = conf.get(String.format(
|
|
|
+ YarnConfiguration.NM_AUX_SERVICE_FMT, sName));
|
|
|
+ String remoteClassPath = conf.get(String.format(
|
|
|
+ YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
|
|
|
+ String[] systemClasses = conf.getTrimmedStrings(String.format(
|
|
|
+ YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, sName));
|
|
|
+
|
|
|
+ AuxServiceConfiguration serviceConf = new AuxServiceConfiguration();
|
|
|
+ if (className != null) {
|
|
|
+ serviceConf.setProperty(CLASS_NAME, className);
|
|
|
+ }
|
|
|
+ if (systemClasses != null) {
|
|
|
+ serviceConf.setProperty(SYSTEM_CLASSES, StringUtils.join(",",
|
|
|
+ systemClasses));
|
|
|
+ }
|
|
|
+ if (remoteClassPath != null) {
|
|
|
+ AuxServiceFile.TypeEnum type;
|
|
|
+ String lcClassPath = StringUtils.toLowerCase(remoteClassPath);
|
|
|
+ if (lcClassPath.endsWith(".jar")) {
|
|
|
+ type = AuxServiceFile.TypeEnum.STATIC;
|
|
|
+ } else if (lcClassPath.endsWith(".zip") ||
|
|
|
+ lcClassPath.endsWith(".tar.gz") || lcClassPath.endsWith(".tgz") ||
|
|
|
+ lcClassPath.endsWith(".tar")) {
|
|
|
+ type = AuxServiceFile.TypeEnum.ARCHIVE;
|
|
|
+ } else {
|
|
|
+ throw new YarnRuntimeException("Cannot unpack file from " +
|
|
|
+ "remote-file-path:" + remoteClassPath + "for aux-service:" +
|
|
|
+ sName + ".\n");
|
|
|
+ }
|
|
|
+ AuxServiceFile file = new AuxServiceFile().srcFile(remoteClassPath)
|
|
|
+ .type(type);
|
|
|
+ serviceConf.getFiles().add(file);
|
|
|
+ }
|
|
|
+ return new AuxServiceRecord().name(sName).configuration(serviceConf);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
- public void serviceInit(Configuration conf) throws Exception {
|
|
|
- final FsPermission storeDirPerms = new FsPermission((short)0700);
|
|
|
- Path stateStoreRoot = null;
|
|
|
- FileSystem stateStoreFs = null;
|
|
|
+ public synchronized void serviceInit(Configuration conf) throws Exception {
|
|
|
boolean recoveryEnabled = conf.getBoolean(
|
|
|
YarnConfiguration.NM_RECOVERY_ENABLED,
|
|
|
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
|
|
@@ -138,200 +652,77 @@ public class AuxServices extends AbstractService
|
|
|
STATE_STORE_ROOT_NAME);
|
|
|
stateStoreFs = FileSystem.getLocal(conf);
|
|
|
}
|
|
|
- Collection<String> auxNames = conf.getStringCollection(
|
|
|
- YarnConfiguration.NM_AUX_SERVICES);
|
|
|
- for (final String sName : auxNames) {
|
|
|
- try {
|
|
|
- Preconditions
|
|
|
- .checkArgument(
|
|
|
- validateAuxServiceName(sName),
|
|
|
- "The ServiceName: " + sName + " set in " +
|
|
|
- YarnConfiguration.NM_AUX_SERVICES +" is invalid." +
|
|
|
- "The valid service name should only contain a-zA-Z0-9_ " +
|
|
|
- "and can not start with numbers");
|
|
|
- String classKey = String.format(
|
|
|
- YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
|
|
|
- String className = conf.get(classKey);
|
|
|
- final String appLocalClassPath = conf.get(String.format(
|
|
|
- YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
|
|
|
- final String appRemoteClassPath = conf.get(String.format(
|
|
|
- YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
|
|
|
- AuxiliaryService s = null;
|
|
|
- boolean useCustomerClassLoader = ((appLocalClassPath != null
|
|
|
- && !appLocalClassPath.isEmpty()) ||
|
|
|
- (appRemoteClassPath != null && !appRemoteClassPath.isEmpty()))
|
|
|
- && className != null && !className.isEmpty();
|
|
|
- if (useCustomerClassLoader) {
|
|
|
- // load AuxiliaryService from local class path
|
|
|
- if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) {
|
|
|
- s = AuxiliaryServiceWithCustomClassLoader.getInstance(
|
|
|
- conf, className, appLocalClassPath);
|
|
|
- } else {
|
|
|
- // load AuxiliaryService from remote class path
|
|
|
- if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
|
|
|
- throw new YarnRuntimeException("The aux serivce:" + sName
|
|
|
- + " has configured local classpath:" + appLocalClassPath
|
|
|
- + " and remote classpath:" + appRemoteClassPath
|
|
|
- + ". Only one of them should be configured.");
|
|
|
- }
|
|
|
- FileContext localLFS = getLocalFileContext(conf);
|
|
|
- // create NM aux-service dir in NM localdir if it does not exist.
|
|
|
- Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
|
|
|
- + Path.SEPARATOR + NM_AUX_SERVICE_DIR);
|
|
|
- if (!localLFS.util().exists(nmAuxDir)) {
|
|
|
- try {
|
|
|
- localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
|
|
|
- } catch (IOException ex) {
|
|
|
- throw new YarnRuntimeException("Fail to create dir:"
|
|
|
- + nmAuxDir.toString(), ex);
|
|
|
- }
|
|
|
- }
|
|
|
- Path src = new Path(appRemoteClassPath);
|
|
|
- FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
|
|
|
- FileStatus scFileStatus = remoteLFS.getFileStatus(src);
|
|
|
- if (!scFileStatus.getOwner().equals(
|
|
|
- this.userUGI.getShortUserName())) {
|
|
|
- throw new YarnRuntimeException("The remote jarfile owner:"
|
|
|
- + scFileStatus.getOwner() + " is not the same as the NM user:"
|
|
|
- + this.userUGI.getShortUserName() + ".");
|
|
|
- }
|
|
|
- if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
|
|
|
- throw new YarnRuntimeException("The remote jarfile should not "
|
|
|
- + "be writable by group or others. "
|
|
|
- + "The current Permission is "
|
|
|
- + scFileStatus.getPermission().toShort());
|
|
|
- }
|
|
|
- Path dest = null;
|
|
|
- Path downloadDest = new Path(nmAuxDir,
|
|
|
- className + "_" + scFileStatus.getModificationTime());
|
|
|
- // check whether we need to re-download the jar
|
|
|
- // from remote directory
|
|
|
- Path targetDirPath = new Path(downloadDest,
|
|
|
- scFileStatus.getPath().getName());
|
|
|
- FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
|
|
|
- boolean reDownload = true;
|
|
|
- for (FileStatus sub : allSubDirs) {
|
|
|
- if (sub.getPath().getName().equals(downloadDest.getName())) {
|
|
|
- reDownload = false;
|
|
|
- dest = new Path(targetDirPath + Path.SEPARATOR + "*");
|
|
|
- break;
|
|
|
- } else {
|
|
|
- if (sub.getPath().getName().contains(className) &&
|
|
|
- !sub.getPath().getName().endsWith(DEL_SUFFIX)) {
|
|
|
- Path delPath = new Path(sub.getPath().getParent(),
|
|
|
- sub.getPath().getName() + DEL_SUFFIX);
|
|
|
- localLFS.rename(sub.getPath(), delPath);
|
|
|
- LOG.info("delete old aux service jar dir:"
|
|
|
- + delPath.toString());
|
|
|
- FileDeletionTask deletionTask = new FileDeletionTask(
|
|
|
- this.delService, null, delPath, null);
|
|
|
- this.delService.delete(deletionTask);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (reDownload) {
|
|
|
- LocalResourceType srcType = null;
|
|
|
- String lowerDst = StringUtils.toLowerCase(src.toString());
|
|
|
- if (lowerDst.endsWith(".jar")) {
|
|
|
- srcType = LocalResourceType.FILE;
|
|
|
- } else if (lowerDst.endsWith(".zip") ||
|
|
|
- lowerDst.endsWith(".tar.gz") || lowerDst.endsWith(".tgz")
|
|
|
- || lowerDst.endsWith(".tar")) {
|
|
|
- srcType = LocalResourceType.ARCHIVE;
|
|
|
- } else {
|
|
|
- throw new YarnRuntimeException(
|
|
|
- "Can not unpack file from remote-file-path:" + src
|
|
|
- + "for aux-service:" + ".\n");
|
|
|
- }
|
|
|
- LocalResource scRsrc = LocalResource.newInstance(
|
|
|
- URL.fromURI(src.toUri()),
|
|
|
- srcType, LocalResourceVisibility.PRIVATE,
|
|
|
- scFileStatus.getLen(), scFileStatus.getModificationTime());
|
|
|
- FSDownload download = new FSDownload(localLFS, null, conf,
|
|
|
- downloadDest, scRsrc, null);
|
|
|
- try {
|
|
|
- Path downloaded = download.call();
|
|
|
- // don't need to convert downloaded path into a dir
|
|
|
- // since its already a jar path.
|
|
|
- dest = downloaded;
|
|
|
- } catch (Exception ex) {
|
|
|
- throw new YarnRuntimeException(
|
|
|
- "Exception happend while downloading files "
|
|
|
- + "for aux-service:" + sName + " and remote-file-path:"
|
|
|
- + src + ".\n" + ex.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- s = AuxiliaryServiceWithCustomClassLoader.getInstance(
|
|
|
- new Configuration(conf), className, dest.toString());
|
|
|
- }
|
|
|
- LOG.info("The aux service:" + sName
|
|
|
- + " are using the custom classloader");
|
|
|
- } else {
|
|
|
- Class<? extends AuxiliaryService> sClass = conf.getClass(
|
|
|
- classKey, null, AuxiliaryService.class);
|
|
|
-
|
|
|
- if (sClass == null) {
|
|
|
- throw new RuntimeException("No class defined for " + sName);
|
|
|
- }
|
|
|
- s = ReflectionUtils.newInstance(sClass, new Configuration(conf));
|
|
|
- }
|
|
|
- if (s == null) {
|
|
|
- throw new RuntimeException("No object created for " + sName);
|
|
|
- }
|
|
|
- // TODO better use s.getName()?
|
|
|
- if(!sName.equals(s.getName())) {
|
|
|
- LOG.warn("The Auxiliary Service named '"+sName+"' in the "
|
|
|
- +"configuration is for "+s.getClass()+" which has "
|
|
|
- +"a name of '"+s.getName()+"'. Because these are "
|
|
|
- +"not the same tools trying to send ServiceData and read "
|
|
|
- +"Service Meta Data may have issues unless the refer to "
|
|
|
- +"the name in the config.");
|
|
|
- }
|
|
|
- s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
|
|
|
- addService(sName, s);
|
|
|
- if (recoveryEnabled) {
|
|
|
- Path storePath = new Path(stateStoreRoot, sName);
|
|
|
- stateStoreFs.mkdirs(storePath, storeDirPerms);
|
|
|
- s.setRecoveryPath(storePath);
|
|
|
- }
|
|
|
- s.init(new Configuration(conf));
|
|
|
- } catch (RuntimeException e) {
|
|
|
- LOG.error("Failed to initialize " + sName, e);
|
|
|
- throw e;
|
|
|
+ String manifestStr = conf.get(YarnConfiguration.NM_AUX_SERVICES_MANIFEST);
|
|
|
+ if (manifestStr == null) {
|
|
|
+ Collection<String> auxNames = conf.getStringCollection(
|
|
|
+ YarnConfiguration.NM_AUX_SERVICES);
|
|
|
+ for (final String sName : auxNames) {
|
|
|
+ AuxServiceRecord service = createServiceRecordFromConfiguration(sName,
|
|
|
+ conf);
|
|
|
+ maybeRemoveAuxService(sName);
|
|
|
+ AuxiliaryService s = initAuxService(service, conf, true);
|
|
|
+ addService(sName, s, service);
|
|
|
}
|
|
|
+ } else {
|
|
|
+ manifest = new Path(manifestStr);
|
|
|
+ manifestFS = FileSystem.get(new URI(manifestStr), conf);
|
|
|
+ loadManifest(conf, false);
|
|
|
}
|
|
|
+ manifestReloadInterval = conf.getLong(
|
|
|
+ YarnConfiguration.NM_AUX_SERVICES_MANIFEST_RELOAD_MS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_RELOAD_MS);
|
|
|
+ manifestReloadTask = new ManifestReloadTask();
|
|
|
+
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
+ private void startAuxService(String name, AuxiliaryService service,
|
|
|
+ AuxServiceRecord serviceRecord) {
|
|
|
+ service.start();
|
|
|
+ service.registerServiceListener(this);
|
|
|
+ ByteBuffer meta = service.getMetaData();
|
|
|
+ if (meta != null) {
|
|
|
+ serviceMetaData.put(name, meta);
|
|
|
+ }
|
|
|
+ serviceRecord.setLaunchTime(new Date());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void stopAuxService(Service service) {
|
|
|
+ if (service.getServiceState() == Service.STATE.STARTED) {
|
|
|
+ service.unregisterServiceListener(this);
|
|
|
+ service.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
- public void serviceStart() throws Exception {
|
|
|
+ public synchronized void serviceStart() throws Exception {
|
|
|
// TODO fork(?) services running as configured user
|
|
|
// monitor for health, shutdown/restart(?) if any should die
|
|
|
for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
|
|
|
AuxiliaryService service = entry.getValue();
|
|
|
String name = entry.getKey();
|
|
|
- service.start();
|
|
|
- service.registerServiceListener(this);
|
|
|
- ByteBuffer meta = service.getMetaData();
|
|
|
- if(meta != null) {
|
|
|
- serviceMetaData.put(name, meta);
|
|
|
- }
|
|
|
+ startAuxService(name, service, serviceRecordMap.get(name));
|
|
|
+ }
|
|
|
+ if (manifest != null && manifestReloadInterval > 0) {
|
|
|
+ manifestReloadTimer = new Timer("AuxServicesManifestRelaod-Timer",
|
|
|
+ true);
|
|
|
+ manifestReloadTimer.schedule(manifestReloadTask,
|
|
|
+ manifestReloadInterval, manifestReloadInterval);
|
|
|
}
|
|
|
super.serviceStart();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void serviceStop() throws Exception {
|
|
|
+ public synchronized void serviceStop() throws Exception {
|
|
|
try {
|
|
|
- synchronized (serviceMap) {
|
|
|
- for (Service service : serviceMap.values()) {
|
|
|
- if (service.getServiceState() == Service.STATE.STARTED) {
|
|
|
- service.unregisterServiceListener(this);
|
|
|
- service.stop();
|
|
|
- }
|
|
|
- }
|
|
|
- serviceMap.clear();
|
|
|
- serviceMetaData.clear();
|
|
|
+ for (Service service : serviceMap.values()) {
|
|
|
+ stopAuxService(service);
|
|
|
+ }
|
|
|
+ serviceMap.clear();
|
|
|
+ serviceRecordMap.clear();
|
|
|
+ serviceMetaData.clear();
|
|
|
+ if (manifestReloadTimer != null) {
|
|
|
+ manifestReloadTimer.cancel();
|
|
|
}
|
|
|
} finally {
|
|
|
super.serviceStop();
|
|
@@ -340,9 +731,9 @@ public class AuxServices extends AbstractService
|
|
|
|
|
|
@Override
|
|
|
public void stateChanged(Service service) {
|
|
|
- LOG.error("Service " + service.getName() + " changed state: " +
|
|
|
+ // services changing state is expected on reload
|
|
|
+ LOG.info("Service " + service.getName() + " changed state: " +
|
|
|
service.getServiceState());
|
|
|
- stop();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -448,4 +839,38 @@ public class AuxServices extends AbstractService
|
|
|
}
|
|
|
return remoteUgi;
|
|
|
}
|
|
|
+
|
|
|
+ protected static AuxServiceRecord newAuxService(String name, String
|
|
|
+ className) {
|
|
|
+ AuxServiceConfiguration serviceConf = new AuxServiceConfiguration();
|
|
|
+ serviceConf.setProperty(CLASS_NAME, className);
|
|
|
+ return new AuxServiceRecord().name(name).configuration(serviceConf);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected static void setClasspath(AuxServiceRecord service, String
|
|
|
+ classpath) {
|
|
|
+ service.getConfiguration().getFiles().add(new AuxServiceFile()
|
|
|
+ .srcFile(classpath).type(AuxServiceFile.TypeEnum.STATIC));
|
|
|
+ }
|
|
|
+
|
|
|
+ protected static void setSystemClasses(AuxServiceRecord service, String
|
|
|
+ systemClasses) {
|
|
|
+ service.getConfiguration().setProperty(SYSTEM_CLASSES, systemClasses);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Class which is used by the {@link Timer} class to periodically execute the
|
|
|
+ * manifest reload.
|
|
|
+ */
|
|
|
+ private final class ManifestReloadTask extends TimerTask {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ reloadManifest();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ // Prevent uncaught exceptions from killing this thread
|
|
|
+ LOG.warn("Error while reloading manifest: ", t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|