|
@@ -1,3 +1,19 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
/**
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
* you may not use this file except in compliance with the License.
|
|
@@ -12,7 +28,7 @@
|
|
|
* limitations under the License. See accompanying LICENSE file.
|
|
|
*/
|
|
|
|
|
|
-package org.apache.hadoop.yarn.submarine.client.cli;
|
|
|
+package org.apache.hadoop.yarn.submarine.client.cli.runjob;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.cli.CommandLine;
|
|
@@ -23,9 +39,13 @@ import org.apache.commons.cli.ParseException;
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.AbstractCli;
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.CliUtils;
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.Command;
|
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.ParametersHolder;
|
|
|
-import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
|
|
|
-import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters.UnderscoreConverterPropertyUtils;
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.param.runjob.RunJobParameters;
|
|
|
+import org.apache.hadoop.yarn.submarine.client.cli.param.runjob.RunJobParameters.UnderscoreConverterPropertyUtils;
|
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.yaml.YamlConfigFile;
|
|
|
import org.apache.hadoop.yarn.submarine.client.cli.param.yaml.YamlParseException;
|
|
|
import org.apache.hadoop.yarn.submarine.common.ClientContext;
|
|
@@ -44,17 +64,25 @@ import java.io.IOException;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
+/**
|
|
|
+ * This purpose of this class is to handle / parse CLI arguments related to
|
|
|
+ * the run job Submarine command.
|
|
|
+ */
|
|
|
public class RunJobCli extends AbstractCli {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(RunJobCli.class);
|
|
|
- private static final String YAML_PARSE_FAILED = "Failed to parse " +
|
|
|
+ private static final String CAN_BE_USED_WITH_TF_PYTORCH =
|
|
|
+ "Can be used with TensorFlow or PyTorch frameworks.";
|
|
|
+ private static final String CAN_BE_USED_WITH_TF_ONLY =
|
|
|
+ "Can only be used with TensorFlow framework.";
|
|
|
+ public static final String YAML_PARSE_FAILED = "Failed to parse " +
|
|
|
"YAML config";
|
|
|
|
|
|
- private Options options;
|
|
|
- private RunJobParameters parameters = new RunJobParameters();
|
|
|
|
|
|
+ private Options options;
|
|
|
private JobSubmitter jobSubmitter;
|
|
|
private JobMonitor jobMonitor;
|
|
|
+ private ParametersHolder parametersHolder;
|
|
|
|
|
|
public RunJobCli(ClientContext cliContext) {
|
|
|
this(cliContext, cliContext.getRuntimeFactory().getJobSubmitterInstance(),
|
|
@@ -62,7 +90,7 @@ public class RunJobCli extends AbstractCli {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- RunJobCli(ClientContext cliContext, JobSubmitter jobSubmitter,
|
|
|
+ public RunJobCli(ClientContext cliContext, JobSubmitter jobSubmitter,
|
|
|
JobMonitor jobMonitor) {
|
|
|
super(cliContext);
|
|
|
this.options = generateOptions();
|
|
@@ -78,6 +106,10 @@ public class RunJobCli extends AbstractCli {
|
|
|
Options options = new Options();
|
|
|
options.addOption(CliConstants.YAML_CONFIG, true,
|
|
|
"Config file (in YAML format)");
|
|
|
+ options.addOption(CliConstants.FRAMEWORK, true,
|
|
|
+ String.format("Framework to use. Valid values are: %s! " +
|
|
|
+ "The default framework is Tensorflow.",
|
|
|
+ Framework.getValues()));
|
|
|
options.addOption(CliConstants.NAME, true, "Name of the job");
|
|
|
options.addOption(CliConstants.INPUT_PATH, true,
|
|
|
"Input of the job, could be local or other FS directory");
|
|
@@ -88,48 +120,22 @@ public class RunJobCli extends AbstractCli {
|
|
|
options.addOption(CliConstants.SAVED_MODEL_PATH, true,
|
|
|
"Model exported path (savedmodel) of the job, which is needed when "
|
|
|
+ "exported model is not placed under ${checkpoint_path}"
|
|
|
- + "could be local or other FS directory. This will be used to serve.");
|
|
|
- options.addOption(CliConstants.N_WORKERS, true,
|
|
|
- "Number of worker tasks of the job, by default it's 1");
|
|
|
- options.addOption(CliConstants.N_PS, true,
|
|
|
- "Number of PS tasks of the job, by default it's 0");
|
|
|
- options.addOption(CliConstants.WORKER_RES, true,
|
|
|
- "Resource of each worker, for example "
|
|
|
- + "memory-mb=2048,vcores=2,yarn.io/gpu=2");
|
|
|
- options.addOption(CliConstants.PS_RES, true,
|
|
|
- "Resource of each PS, for example "
|
|
|
- + "memory-mb=2048,vcores=2,yarn.io/gpu=2");
|
|
|
+ + "could be local or other FS directory. " +
|
|
|
+ "This will be used to serve.");
|
|
|
options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
|
|
|
options.addOption(CliConstants.QUEUE, true,
|
|
|
"Name of queue to run the job, by default it uses default queue");
|
|
|
- options.addOption(CliConstants.TENSORBOARD, false,
|
|
|
- "Should we run TensorBoard"
|
|
|
- + " for this job? By default it's disabled");
|
|
|
- options.addOption(CliConstants.TENSORBOARD_RESOURCES, true,
|
|
|
- "Specify resources of Tensorboard, by default it is "
|
|
|
- + CliConstants.TENSORBOARD_DEFAULT_RESOURCES);
|
|
|
- options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true,
|
|
|
- "Specify Tensorboard docker image. when this is not "
|
|
|
- + "specified, Tensorboard " + "uses --" + CliConstants.DOCKER_IMAGE
|
|
|
- + " as default.");
|
|
|
- options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
|
|
|
- "Commandline of worker, arguments will be "
|
|
|
- + "directly used to launch the worker");
|
|
|
- options.addOption(CliConstants.PS_LAUNCH_CMD, true,
|
|
|
- "Commandline of worker, arguments will be "
|
|
|
- + "directly used to launch the PS");
|
|
|
+
|
|
|
+ addWorkerOptions(options);
|
|
|
+ addPSOptions(options);
|
|
|
+ addTensorboardOptions(options);
|
|
|
+
|
|
|
options.addOption(CliConstants.ENV, true,
|
|
|
"Common environment variable of worker/ps");
|
|
|
options.addOption(CliConstants.VERBOSE, false,
|
|
|
"Print verbose log for troubleshooting");
|
|
|
options.addOption(CliConstants.WAIT_JOB_FINISH, false,
|
|
|
"Specified when user want to wait the job finish");
|
|
|
- options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
|
|
|
- "Specify docker image for PS, when this is not specified, PS uses --"
|
|
|
- + CliConstants.DOCKER_IMAGE + " as default.");
|
|
|
- options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
|
|
|
- "Specify docker image for WORKER, when this is not specified, WORKER "
|
|
|
- + "uses --" + CliConstants.DOCKER_IMAGE + " as default.");
|
|
|
options.addOption(CliConstants.QUICKLINK, true, "Specify quicklink so YARN"
|
|
|
+ "web UI shows link to given role instance and port. When "
|
|
|
+ "--tensorboard is specified, quicklink to tensorboard instance will "
|
|
@@ -172,63 +178,97 @@ public class RunJobCli extends AbstractCli {
|
|
|
return options;
|
|
|
}
|
|
|
|
|
|
- private void replacePatternsInParameters() throws IOException {
|
|
|
- if (parameters.getPSLaunchCmd() != null && !parameters.getPSLaunchCmd()
|
|
|
- .isEmpty()) {
|
|
|
- String afterReplace = CliUtils.replacePatternsInLaunchCommand(
|
|
|
- parameters.getPSLaunchCmd(), parameters,
|
|
|
- clientContext.getRemoteDirectoryManager());
|
|
|
- parameters.setPSLaunchCmd(afterReplace);
|
|
|
- }
|
|
|
+ private void addWorkerOptions(Options options) {
|
|
|
+ options.addOption(CliConstants.N_WORKERS, true,
|
|
|
+ "Number of worker tasks of the job, by default it's 1." +
|
|
|
+ CAN_BE_USED_WITH_TF_PYTORCH);
|
|
|
+ options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
|
|
|
+ "Specify docker image for WORKER, when this is not specified, WORKER "
|
|
|
+ + "uses --" + CliConstants.DOCKER_IMAGE + " as default." +
|
|
|
+ CAN_BE_USED_WITH_TF_PYTORCH);
|
|
|
+ options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
|
|
|
+ "Commandline of worker, arguments will be "
|
|
|
+ + "directly used to launch the worker" +
|
|
|
+ CAN_BE_USED_WITH_TF_PYTORCH);
|
|
|
+ options.addOption(CliConstants.WORKER_RES, true,
|
|
|
+ "Resource of each worker, for example "
|
|
|
+ + "memory-mb=2048,vcores=2,yarn.io/gpu=2" +
|
|
|
+ CAN_BE_USED_WITH_TF_PYTORCH);
|
|
|
+ }
|
|
|
|
|
|
- if (parameters.getWorkerLaunchCmd() != null && !parameters
|
|
|
- .getWorkerLaunchCmd().isEmpty()) {
|
|
|
- String afterReplace = CliUtils.replacePatternsInLaunchCommand(
|
|
|
- parameters.getWorkerLaunchCmd(), parameters,
|
|
|
- clientContext.getRemoteDirectoryManager());
|
|
|
- parameters.setWorkerLaunchCmd(afterReplace);
|
|
|
- }
|
|
|
+ private void addPSOptions(Options options) {
|
|
|
+ options.addOption(CliConstants.N_PS, true,
|
|
|
+ "Number of PS tasks of the job, by default it's 0. " +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
+ options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
|
|
|
+ "Specify docker image for PS, when this is not specified, PS uses --"
|
|
|
+ + CliConstants.DOCKER_IMAGE + " as default." +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
+ options.addOption(CliConstants.PS_LAUNCH_CMD, true,
|
|
|
+ "Commandline of worker, arguments will be "
|
|
|
+ + "directly used to launch the PS" +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
+ options.addOption(CliConstants.PS_RES, true,
|
|
|
+ "Resource of each PS, for example "
|
|
|
+ + "memory-mb=2048,vcores=2,yarn.io/gpu=2" +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addTensorboardOptions(Options options) {
|
|
|
+ options.addOption(CliConstants.TENSORBOARD, false,
|
|
|
+ "Should we run TensorBoard"
|
|
|
+ + " for this job? By default it's disabled." +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
+ options.addOption(CliConstants.TENSORBOARD_RESOURCES, true,
|
|
|
+ "Specify resources of Tensorboard, by default it is "
|
|
|
+ + CliConstants.TENSORBOARD_DEFAULT_RESOURCES + "." +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
+ options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true,
|
|
|
+ "Specify Tensorboard docker image. when this is not "
|
|
|
+ + "specified, Tensorboard " + "uses --" + CliConstants.DOCKER_IMAGE
|
|
|
+ + " as default." +
|
|
|
+ CAN_BE_USED_WITH_TF_ONLY);
|
|
|
}
|
|
|
|
|
|
private void parseCommandLineAndGetRunJobParameters(String[] args)
|
|
|
throws ParseException, IOException, YarnException {
|
|
|
try {
|
|
|
- // Do parsing
|
|
|
GnuParser parser = new GnuParser();
|
|
|
CommandLine cli = parser.parse(options, args);
|
|
|
- ParametersHolder parametersHolder = createParametersHolder(cli);
|
|
|
- parameters.updateParameters(parametersHolder, clientContext);
|
|
|
+ parametersHolder = createParametersHolder(cli);
|
|
|
+ parametersHolder.updateParameters(clientContext);
|
|
|
} catch (ParseException e) {
|
|
|
LOG.error("Exception in parse: {}", e.getMessage());
|
|
|
printUsages();
|
|
|
throw e;
|
|
|
}
|
|
|
-
|
|
|
- // Set default job dir / saved model dir, etc.
|
|
|
- setDefaultDirs();
|
|
|
-
|
|
|
- // replace patterns
|
|
|
- replacePatternsInParameters();
|
|
|
}
|
|
|
|
|
|
- private ParametersHolder createParametersHolder(CommandLine cli) {
|
|
|
+ private ParametersHolder createParametersHolder(CommandLine cli)
|
|
|
+ throws ParseException, YarnException {
|
|
|
String yamlConfigFile =
|
|
|
cli.getOptionValue(CliConstants.YAML_CONFIG);
|
|
|
if (yamlConfigFile != null) {
|
|
|
YamlConfigFile yamlConfig = readYamlConfigFile(yamlConfigFile);
|
|
|
- if (yamlConfig == null) {
|
|
|
- throw new YamlParseException(String.format(
|
|
|
- YAML_PARSE_FAILED + ", file is empty: %s", yamlConfigFile));
|
|
|
- } else if (yamlConfig.getConfigs() == null) {
|
|
|
- throw new YamlParseException(String.format(YAML_PARSE_FAILED +
|
|
|
- ", config section should be defined, but it cannot be found in " +
|
|
|
- "YAML file '%s'!", yamlConfigFile));
|
|
|
- }
|
|
|
+ checkYamlConfig(yamlConfigFile, yamlConfig);
|
|
|
LOG.info("Using YAML configuration!");
|
|
|
- return ParametersHolder.createWithCmdLineAndYaml(cli, yamlConfig);
|
|
|
+ return ParametersHolder.createWithCmdLineAndYaml(cli, yamlConfig,
|
|
|
+ Command.RUN_JOB);
|
|
|
} else {
|
|
|
LOG.info("Using CLI configuration!");
|
|
|
- return ParametersHolder.createWithCmdLine(cli);
|
|
|
+ return ParametersHolder.createWithCmdLine(cli, Command.RUN_JOB);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkYamlConfig(String yamlConfigFile,
|
|
|
+ YamlConfigFile yamlConfig) {
|
|
|
+ if (yamlConfig == null) {
|
|
|
+ throw new YamlParseException(String.format(
|
|
|
+ YAML_PARSE_FAILED + ", file is empty: %s", yamlConfigFile));
|
|
|
+ } else if (yamlConfig.getConfigs() == null) {
|
|
|
+ throw new YamlParseException(String.format(YAML_PARSE_FAILED +
|
|
|
+ ", config section should be defined, but it cannot be found in " +
|
|
|
+ "YAML file '%s'!", yamlConfigFile));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -256,34 +296,9 @@ public class RunJobCli extends AbstractCli {
|
|
|
e);
|
|
|
}
|
|
|
|
|
|
- private void setDefaultDirs() throws IOException {
|
|
|
- // Create directories if needed
|
|
|
- String jobDir = parameters.getCheckpointPath();
|
|
|
- if (null == jobDir) {
|
|
|
- if (parameters.getNumWorkers() > 0) {
|
|
|
- jobDir = clientContext.getRemoteDirectoryManager().getJobCheckpointDir(
|
|
|
- parameters.getName(), true).toString();
|
|
|
- } else {
|
|
|
- // when #workers == 0, it means we only launch TB. In that case,
|
|
|
- // point job dir to root dir so all job's metrics will be shown.
|
|
|
- jobDir = clientContext.getRemoteDirectoryManager().getUserRootFolder()
|
|
|
- .toString();
|
|
|
- }
|
|
|
- parameters.setCheckpointPath(jobDir);
|
|
|
- }
|
|
|
-
|
|
|
- if (parameters.getNumWorkers() > 0) {
|
|
|
- // Only do this when #worker > 0
|
|
|
- String savedModelDir = parameters.getSavedModelPath();
|
|
|
- if (null == savedModelDir) {
|
|
|
- savedModelDir = jobDir;
|
|
|
- parameters.setSavedModelPath(savedModelDir);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void storeJobInformation(String jobName, ApplicationId applicationId,
|
|
|
- String[] args) throws IOException {
|
|
|
+ private void storeJobInformation(RunJobParameters parameters,
|
|
|
+ ApplicationId applicationId, String[] args) throws IOException {
|
|
|
+ String jobName = parameters.getName();
|
|
|
Map<String, String> jobInfo = new HashMap<>();
|
|
|
jobInfo.put(StorageKeyConstants.JOB_NAME, jobName);
|
|
|
jobInfo.put(StorageKeyConstants.APPLICATION_ID, applicationId.toString());
|
|
@@ -316,8 +331,10 @@ public class RunJobCli extends AbstractCli {
|
|
|
}
|
|
|
|
|
|
parseCommandLineAndGetRunJobParameters(args);
|
|
|
- ApplicationId applicationId = this.jobSubmitter.submitJob(parameters);
|
|
|
- storeJobInformation(parameters.getName(), applicationId, args);
|
|
|
+ ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder);
|
|
|
+ RunJobParameters parameters =
|
|
|
+ (RunJobParameters) parametersHolder.getParameters();
|
|
|
+ storeJobInformation(parameters, applicationId, args);
|
|
|
if (parameters.isWaitJobFinish()) {
|
|
|
this.jobMonitor.waitTrainingFinal(parameters.getName());
|
|
|
}
|
|
@@ -332,6 +349,6 @@ public class RunJobCli extends AbstractCli {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
public RunJobParameters getRunJobParameters() {
|
|
|
- return parameters;
|
|
|
+ return (RunJobParameters) parametersHolder.getParameters();
|
|
|
}
|
|
|
}
|