|
@@ -15,7 +15,6 @@
|
|
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
|
|
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
-import com.google.common.collect.ImmutableMap;
|
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -29,6 +28,7 @@ import org.apache.hadoop.yarn.service.api.records.Resource;
|
|
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
|
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
|
import org.apache.hadoop.yarn.service.client.ServiceClient;
|
|
import org.apache.hadoop.yarn.service.client.ServiceClient;
|
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
|
|
import org.apache.hadoop.yarn.submarine.common.ClientContext;
|
|
import org.apache.hadoop.yarn.submarine.common.ClientContext;
|
|
import org.apache.hadoop.yarn.submarine.common.Envs;
|
|
import org.apache.hadoop.yarn.submarine.common.Envs;
|
|
@@ -40,10 +40,14 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
-import java.io.FileWriter;
|
|
|
|
|
|
+import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.OutputStreamWriter;
|
|
|
|
+import java.io.PrintWriter;
|
|
|
|
+import java.io.Writer;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.StringTokenizer;
|
|
import java.util.StringTokenizer;
|
|
@@ -54,6 +58,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
|
|
* Submit a job to cluster
|
|
* Submit a job to cluster
|
|
*/
|
|
*/
|
|
public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
|
|
+ public static final String TENSORBOARD_QUICKLINK_LABEL = "Tensorboard";
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(YarnServiceJobSubmitter.class);
|
|
LoggerFactory.getLogger(YarnServiceJobSubmitter.class);
|
|
ClientContext clientContext;
|
|
ClientContext clientContext;
|
|
@@ -98,7 +103,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
}
|
|
}
|
|
|
|
|
|
private void addHdfsClassPathIfNeeded(RunJobParameters parameters,
|
|
private void addHdfsClassPathIfNeeded(RunJobParameters parameters,
|
|
- FileWriter fw, Component comp) throws IOException {
|
|
|
|
|
|
+ PrintWriter fw, Component comp) throws IOException {
|
|
// Find envs to use HDFS
|
|
// Find envs to use HDFS
|
|
String hdfsHome = null;
|
|
String hdfsHome = null;
|
|
String javaHome = null;
|
|
String javaHome = null;
|
|
@@ -191,7 +196,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
envs.put(Envs.TASK_TYPE_ENV, taskType.name());
|
|
envs.put(Envs.TASK_TYPE_ENV, taskType.name());
|
|
}
|
|
}
|
|
|
|
|
|
- private String getUserName() {
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ protected String getUserName() {
|
|
return System.getProperty("user.name");
|
|
return System.getProperty("user.name");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -205,18 +211,19 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
private String generateCommandLaunchScript(RunJobParameters parameters,
|
|
private String generateCommandLaunchScript(RunJobParameters parameters,
|
|
TaskType taskType, Component comp) throws IOException {
|
|
TaskType taskType, Component comp) throws IOException {
|
|
File file = File.createTempFile(taskType.name() + "-launch-script", ".sh");
|
|
File file = File.createTempFile(taskType.name() + "-launch-script", ".sh");
|
|
- FileWriter fw = new FileWriter(file);
|
|
|
|
|
|
+ Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
|
|
|
|
+ PrintWriter pw = new PrintWriter(w);
|
|
|
|
|
|
try {
|
|
try {
|
|
- fw.append("#!/bin/bash\n");
|
|
|
|
|
|
+ pw.append("#!/bin/bash\n");
|
|
|
|
|
|
- addHdfsClassPathIfNeeded(parameters, fw, comp);
|
|
|
|
|
|
+ addHdfsClassPathIfNeeded(parameters, pw, comp);
|
|
|
|
|
|
if (taskType.equals(TaskType.TENSORBOARD)) {
|
|
if (taskType.equals(TaskType.TENSORBOARD)) {
|
|
String tbCommand =
|
|
String tbCommand =
|
|
"export LC_ALL=C && tensorboard --logdir=" + parameters
|
|
"export LC_ALL=C && tensorboard --logdir=" + parameters
|
|
.getCheckpointPath();
|
|
.getCheckpointPath();
|
|
- fw.append(tbCommand + "\n");
|
|
|
|
|
|
+ pw.append(tbCommand + "\n");
|
|
LOG.info("Tensorboard command=" + tbCommand);
|
|
LOG.info("Tensorboard command=" + tbCommand);
|
|
} else{
|
|
} else{
|
|
// When distributed training is required
|
|
// When distributed training is required
|
|
@@ -226,20 +233,20 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
taskType.getComponentName(), parameters.getNumWorkers(),
|
|
taskType.getComponentName(), parameters.getNumWorkers(),
|
|
parameters.getNumPS(), parameters.getName(), getUserName(),
|
|
parameters.getNumPS(), parameters.getName(), getUserName(),
|
|
getDNSDomain());
|
|
getDNSDomain());
|
|
- fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n");
|
|
|
|
|
|
+ pw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n");
|
|
}
|
|
}
|
|
|
|
|
|
// Print launch command
|
|
// Print launch command
|
|
if (taskType.equals(TaskType.WORKER) || taskType.equals(
|
|
if (taskType.equals(TaskType.WORKER) || taskType.equals(
|
|
TaskType.PRIMARY_WORKER)) {
|
|
TaskType.PRIMARY_WORKER)) {
|
|
- fw.append(parameters.getWorkerLaunchCmd() + '\n');
|
|
|
|
|
|
+ pw.append(parameters.getWorkerLaunchCmd() + '\n');
|
|
|
|
|
|
if (SubmarineLogs.isVerbose()) {
|
|
if (SubmarineLogs.isVerbose()) {
|
|
LOG.info(
|
|
LOG.info(
|
|
"Worker command =[" + parameters.getWorkerLaunchCmd() + "]");
|
|
"Worker command =[" + parameters.getWorkerLaunchCmd() + "]");
|
|
}
|
|
}
|
|
} else if (taskType.equals(TaskType.PS)) {
|
|
} else if (taskType.equals(TaskType.PS)) {
|
|
- fw.append(parameters.getPSLaunchCmd() + '\n');
|
|
|
|
|
|
+ pw.append(parameters.getPSLaunchCmd() + '\n');
|
|
|
|
|
|
if (SubmarineLogs.isVerbose()) {
|
|
if (SubmarineLogs.isVerbose()) {
|
|
LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]");
|
|
LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]");
|
|
@@ -247,7 +254,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- fw.close();
|
|
|
|
|
|
+ pw.close();
|
|
}
|
|
}
|
|
return file.getAbsolutePath();
|
|
return file.getAbsolutePath();
|
|
}
|
|
}
|
|
@@ -421,18 +428,51 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
return new Artifact().type(Artifact.TypeEnum.DOCKER).id(dockerImageName);
|
|
return new Artifact().type(Artifact.TypeEnum.DOCKER).id(dockerImageName);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void handleQuicklinks(RunJobParameters runJobParameters)
|
|
|
|
+ throws IOException {
|
|
|
|
+ List<Quicklink> quicklinks = runJobParameters.getQuicklinks();
|
|
|
|
+ if (null != quicklinks && !quicklinks.isEmpty()) {
|
|
|
|
+ for (Quicklink ql : quicklinks) {
|
|
|
|
+ // Make sure it is a valid instance name
|
|
|
|
+ String instanceName = ql.getComponentInstanceName();
|
|
|
|
+ boolean found = false;
|
|
|
|
+
|
|
|
|
+ for (Component comp : serviceSpec.getComponents()) {
|
|
|
|
+ for (int i = 0; i < comp.getNumberOfContainers(); i++) {
|
|
|
|
+ String possibleInstanceName = comp.getName() + "-" + i;
|
|
|
|
+ if (possibleInstanceName.equals(instanceName)) {
|
|
|
|
+ found = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!found) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "Couldn't find a component instance = " + instanceName
|
|
|
|
+ + " while adding quicklink");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String link = ql.getProtocol() + YarnServiceUtils.getDNSName(
|
|
|
|
+ serviceSpec.getName(), instanceName, getUserName(), getDNSDomain(),
|
|
|
|
+ ql.getPort());
|
|
|
|
+ YarnServiceUtils.addQuicklink(serviceSpec, ql.getLabel(), link);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private Service createServiceByParameters(RunJobParameters parameters)
|
|
private Service createServiceByParameters(RunJobParameters parameters)
|
|
throws IOException {
|
|
throws IOException {
|
|
componentToLocalLaunchScriptPath.clear();
|
|
componentToLocalLaunchScriptPath.clear();
|
|
- Service service = new Service();
|
|
|
|
- service.setName(parameters.getName());
|
|
|
|
- service.setVersion(String.valueOf(System.currentTimeMillis()));
|
|
|
|
- service.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
|
|
|
|
|
|
+ serviceSpec = new Service();
|
|
|
|
+ serviceSpec.setName(parameters.getName());
|
|
|
|
+ serviceSpec.setVersion(String.valueOf(System.currentTimeMillis()));
|
|
|
|
+ serviceSpec.setArtifact(getDockerArtifact(parameters.getDockerImageName()));
|
|
|
|
|
|
- handleServiceEnvs(service, parameters);
|
|
|
|
|
|
+ handleServiceEnvs(serviceSpec, parameters);
|
|
|
|
|
|
if (parameters.getNumWorkers() > 0) {
|
|
if (parameters.getNumWorkers() > 0) {
|
|
- addWorkerComponents(service, parameters);
|
|
|
|
|
|
+ addWorkerComponents(serviceSpec, parameters);
|
|
}
|
|
}
|
|
|
|
|
|
if (parameters.getNumPS() > 0) {
|
|
if (parameters.getNumPS() > 0) {
|
|
@@ -450,7 +490,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
getDockerArtifact(parameters.getPsDockerImage()));
|
|
getDockerArtifact(parameters.getPsDockerImage()));
|
|
}
|
|
}
|
|
handleLaunchCommand(parameters, TaskType.PS, psComponent);
|
|
handleLaunchCommand(parameters, TaskType.PS, psComponent);
|
|
- service.addComponent(psComponent);
|
|
|
|
|
|
+ serviceSpec.addComponent(psComponent);
|
|
}
|
|
}
|
|
|
|
|
|
if (parameters.isTensorboardEnabled()) {
|
|
if (parameters.isTensorboardEnabled()) {
|
|
@@ -470,14 +510,20 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
|
|
|
|
// Add tensorboard to quicklink
|
|
// Add tensorboard to quicklink
|
|
String tensorboardLink = "http://" + YarnServiceUtils.getDNSName(
|
|
String tensorboardLink = "http://" + YarnServiceUtils.getDNSName(
|
|
- parameters.getName(), TaskType.TENSORBOARD.getComponentName(), 0,
|
|
|
|
- getUserName(), getDNSDomain(), 6006);
|
|
|
|
|
|
+ parameters.getName(),
|
|
|
|
+ TaskType.TENSORBOARD.getComponentName() + "-" + 0, getUserName(),
|
|
|
|
+ getDNSDomain(), 6006);
|
|
LOG.info("Link to tensorboard:" + tensorboardLink);
|
|
LOG.info("Link to tensorboard:" + tensorboardLink);
|
|
- service.addComponent(tbComponent);
|
|
|
|
- service.setQuicklinks(ImmutableMap.of("Tensorboard", tensorboardLink));
|
|
|
|
|
|
+ serviceSpec.addComponent(tbComponent);
|
|
|
|
+
|
|
|
|
+ YarnServiceUtils.addQuicklink(serviceSpec, TENSORBOARD_QUICKLINK_LABEL,
|
|
|
|
+ tensorboardLink);
|
|
}
|
|
}
|
|
|
|
|
|
- return service;
|
|
|
|
|
|
+ // After all components added, handle quicklinks
|
|
|
|
+ handleQuicklinks(parameters);
|
|
|
|
+
|
|
|
|
+ return serviceSpec;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -486,12 +532,11 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|
@Override
|
|
@Override
|
|
public ApplicationId submitJob(RunJobParameters parameters)
|
|
public ApplicationId submitJob(RunJobParameters parameters)
|
|
throws IOException, YarnException {
|
|
throws IOException, YarnException {
|
|
- Service service = createServiceByParameters(parameters);
|
|
|
|
|
|
+ createServiceByParameters(parameters);
|
|
ServiceClient serviceClient = YarnServiceUtils.createServiceClient(
|
|
ServiceClient serviceClient = YarnServiceUtils.createServiceClient(
|
|
clientContext.getYarnConfig());
|
|
clientContext.getYarnConfig());
|
|
- ApplicationId appid = serviceClient.actionCreate(service);
|
|
|
|
|
|
+ ApplicationId appid = serviceClient.actionCreate(serviceSpec);
|
|
serviceClient.stop();
|
|
serviceClient.stop();
|
|
- this.serviceSpec = service;
|
|
|
|
return appid;
|
|
return appid;
|
|
}
|
|
}
|
|
|
|
|