|
@@ -15,9 +15,11 @@
|
|
|
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.client.api.AppAdminClient;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
@@ -28,6 +30,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
|
|
import org.apache.hadoop.yarn.service.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
|
|
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
|
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
|
|
@@ -300,8 +303,26 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
|
private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir,
|
|
|
String fileToUpload, String destFilename, Component comp)
|
|
|
throws IOException {
|
|
|
+ Path uploadedFilePath = uploadToRemoteFile(stagingDir, fileToUpload);
|
|
|
+ locateRemoteFileToContainerWorkDir(destFilename, comp, uploadedFilePath);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void locateRemoteFileToContainerWorkDir(String destFilename,
|
|
|
+ Component comp, Path uploadedFilePath) throws IOException {
|
|
|
FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
|
|
|
|
|
|
+ FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
|
|
|
+ LOG.info("Uploaded file path = " + fileStatus.getPath());
|
|
|
+
|
|
|
+ // Set it to component's files list
|
|
|
+ comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
|
|
|
+ fileStatus.getPath().toUri().toString()).destFile(destFilename)
|
|
|
+ .type(ConfigFile.TypeEnum.STATIC));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws
|
|
|
+ IOException {
|
|
|
+ FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
|
|
|
// Upload to remote FS under staging area
|
|
|
File localFile = new File(fileToUpload);
|
|
|
if (!localFile.exists()) {
|
|
@@ -320,14 +341,13 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
|
fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath);
|
|
|
uploadedFiles.add(uploadedFilePath);
|
|
|
}
|
|
|
+ return uploadedFilePath;
|
|
|
+ }
|
|
|
|
|
|
- FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
|
|
|
- LOG.info("Uploaded file path = " + fileStatus.getPath());
|
|
|
-
|
|
|
- // Set it to component's files list
|
|
|
- comp.getConfiguration().getFiles().add(new ConfigFile().srcFile(
|
|
|
- fileStatus.getPath().toUri().toString()).destFile(destFilename)
|
|
|
- .type(ConfigFile.TypeEnum.STATIC));
|
|
|
+ private void setPermission(Path destPath, FsPermission permission) throws
|
|
|
+ IOException {
|
|
|
+ FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
|
|
|
+ fs.setPermission(destPath, new FsPermission(permission));
|
|
|
}
|
|
|
|
|
|
private void handleLaunchCommand(RunJobParameters parameters,
|
|
@@ -475,6 +495,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
|
serviceSpec.setName(parameters.getName());
|
|
|
serviceSpec.setVersion(String.valueOf(System.currentTimeMillis()));
|
|
|
serviceSpec.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
|
|
|
+ handleKerberosPrincipal(parameters);
|
|
|
|
|
|
handleServiceEnvs(serviceSpec, parameters);
|
|
|
|
|
@@ -547,6 +568,32 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
|
return serviceSpecFile.getAbsolutePath();
|
|
|
}
|
|
|
|
|
|
+ private void handleKerberosPrincipal(RunJobParameters parameters) throws
|
|
|
+ IOException {
|
|
|
+ if(StringUtils.isNotBlank(parameters.getKeytab()) && StringUtils
|
|
|
+ .isNotBlank(parameters.getPrincipal())) {
|
|
|
+ String keytab = parameters.getKeytab();
|
|
|
+ String principal = parameters.getPrincipal();
|
|
|
+ if(parameters.isDistributeKeytab()) {
|
|
|
+ Path stagingDir =
|
|
|
+ clientContext.getRemoteDirectoryManager().getJobStagingArea(
|
|
|
+ parameters.getName(), true);
|
|
|
+ Path remoteKeytabPath = uploadToRemoteFile(stagingDir, keytab);
|
|
|
+ //only the owner has read access
|
|
|
+ setPermission(remoteKeytabPath,
|
|
|
+ FsPermission.createImmutable((short)Integer.parseInt("400", 8)));
|
|
|
+ serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(
|
|
|
+ remoteKeytabPath.toString()).principalName(principal));
|
|
|
+ } else {
|
|
|
+ if(!keytab.startsWith("file")) {
|
|
|
+ keytab = "file://" + keytab;
|
|
|
+ }
|
|
|
+ serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(
|
|
|
+ keytab).principalName(principal));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* {@inheritDoc}
|
|
|
*/
|