|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.service.provider;
|
|
|
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
@@ -198,6 +199,10 @@ public class ProviderUtils implements YarnServiceConstants {
|
|
|
|
|
|
for (ConfigFile originalFile : compLaunchContext.getConfiguration()
|
|
|
.getFiles()) {
|
|
|
+
|
|
|
+ if (isStaticFile(originalFile)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
ConfigFile configFile = originalFile.copy();
|
|
|
String fileName = new Path(configFile.getDestFile()).getName();
|
|
|
|
|
@@ -207,7 +212,14 @@ public class ProviderUtils implements YarnServiceConstants {
|
|
|
.replaceAll(Pattern.quote(token.getKey()), token.getValue()));
|
|
|
}
|
|
|
|
|
|
+ /* When source file is not specified, write new configs
|
|
|
+ * to compInstanceDir/fileName
|
|
|
+ * When source file is specified, it reads and performs variable
|
|
|
+ * substitution and merges in new configs, and writes a new file to
|
|
|
+ * compInstanceDir/fileName.
|
|
|
+ */
|
|
|
Path remoteFile = new Path(compInstanceDir, fileName);
|
|
|
+
|
|
|
if (!fs.getFileSystem().exists(remoteFile)) {
|
|
|
log.info("Saving config file on hdfs for component " + instance
|
|
|
.getCompInstanceName() + ": " + configFile);
|
|
@@ -239,22 +251,79 @@ public class ProviderUtils implements YarnServiceConstants {
|
|
|
// Add resource for localization
|
|
|
LocalResource configResource =
|
|
|
fs.createAmResource(remoteFile, LocalResourceType.FILE);
|
|
|
- File destFile = new File(configFile.getDestFile());
|
|
|
+ Path destFile = new Path(configFile.getDestFile());
|
|
|
String symlink = APP_CONF_DIR + "/" + fileName;
|
|
|
- if (destFile.isAbsolute()) {
|
|
|
- launcher.addLocalResource(symlink, configResource,
|
|
|
- configFile.getDestFile());
|
|
|
- log.info("Add config file for localization: " + symlink + " -> "
|
|
|
- + configResource.getResource().getFile() + ", dest mount path: "
|
|
|
- + configFile.getDestFile());
|
|
|
- } else {
|
|
|
- launcher.addLocalResource(symlink, configResource);
|
|
|
- log.info("Add config file for localization: " + symlink + " -> "
|
|
|
- + configResource.getResource().getFile());
|
|
|
+ addLocalResource(launcher, symlink, configResource, destFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static synchronized void handleStaticFilesForLocalization(
|
|
|
+ AbstractLauncher launcher, SliderFileSystem fs, ContainerLaunchService
|
|
|
+ .ComponentLaunchContext componentLaunchCtx)
|
|
|
+ throws IOException {
|
|
|
+ for (ConfigFile staticFile :
|
|
|
+ componentLaunchCtx.getConfiguration().getFiles()) {
|
|
|
+ // Only handle static file here.
|
|
|
+ if (!isStaticFile(staticFile)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (staticFile.getSrcFile() == null) {
|
|
|
+ // This should not happen, AbstractClientProvider should have checked
|
|
|
+ // this.
|
|
|
+ throw new IOException("srcFile is null, please double check.");
|
|
|
+ }
|
|
|
+ Path sourceFile = new Path(staticFile.getSrcFile());
|
|
|
+
|
|
|
+ // Output properties to sourceFile if not existed
|
|
|
+ if (!fs.getFileSystem().exists(sourceFile)) {
|
|
|
+ throw new IOException(
|
|
|
+ "srcFile=" + sourceFile + " doesn't exist, please double check.");
|
|
|
}
|
|
|
+
|
|
|
+ FileStatus fileStatus = fs.getFileSystem().getFileStatus(sourceFile);
|
|
|
+ if (fileStatus.isDirectory()) {
|
|
|
+ throw new IOException("srcFile=" + sourceFile +
|
|
|
+ " is a directory, which is not supported.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add resource for localization
|
|
|
+ LocalResource localResource = fs.createAmResource(sourceFile,
|
|
|
+ (staticFile.getType() == ConfigFile.TypeEnum.ARCHIVE ?
|
|
|
+ LocalResourceType.ARCHIVE :
|
|
|
+ LocalResourceType.FILE));
|
|
|
+ Path destFile = new Path(sourceFile.getName());
|
|
|
+ if (staticFile.getDestFile() != null && !staticFile.getDestFile()
|
|
|
+ .isEmpty()) {
|
|
|
+ destFile = new Path(staticFile.getDestFile());
|
|
|
+ }
|
|
|
+
|
|
|
+ String symlink = APP_RESOURCES_DIR + "/" + destFile.getName();
|
|
|
+ addLocalResource(launcher, symlink, localResource, destFile);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static void addLocalResource(AbstractLauncher launcher,
|
|
|
+ String symlink, LocalResource localResource, Path destFile) {
|
|
|
+ if (destFile.isAbsolute()) {
|
|
|
+ launcher.addLocalResource(symlink, localResource, destFile.toString());
|
|
|
+ log.info("Added file for localization: "+ symlink +" -> " +
|
|
|
+ localResource.getResource().getFile() + ", dest mount path: " +
|
|
|
+ destFile);
|
|
|
+ } else{
|
|
|
+ launcher.addLocalResource(symlink, localResource);
|
|
|
+ log.info("Added file for localization: " + symlink+ " -> " +
|
|
|
+ localResource.getResource().getFile());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Static file is files uploaded by users before launch the service. Which
|
|
|
+ // should be localized to container local disk without any changes.
|
|
|
+ private static boolean isStaticFile(ConfigFile file) {
|
|
|
+ return file.getType().equals(ConfigFile.TypeEnum.ARCHIVE) || file.getType()
|
|
|
+ .equals(ConfigFile.TypeEnum.STATIC);
|
|
|
+ }
|
|
|
+
|
|
|
private static void resolvePropsInConfigFileAndSaveOnHdfs(SliderFileSystem fs,
|
|
|
Map<String, String> tokensForSubstitution, ComponentInstance instance,
|
|
|
ConfigFile configFile, String fileName, Path remoteFile)
|