|
|
@@ -0,0 +1,274 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing,
|
|
|
+ * software distributed under the License is distributed on an
|
|
|
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
+ * KIND, either express or implied. See the License for the
|
|
|
+ * specific language governing permissions and limitations
|
|
|
+ * under the License.
|
|
|
+ */
|
|
|
+package org.apache.ambari.infra.manager;
|
|
|
+
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import org.apache.ambari.infra.model.ExecutionContextResponse;
|
|
|
+import org.apache.ambari.infra.model.JobDetailsResponse;
|
|
|
+import org.apache.ambari.infra.model.JobExecutionDetailsResponse;
|
|
|
+import org.apache.ambari.infra.model.JobExecutionInfoResponse;
|
|
|
+import org.apache.ambari.infra.model.JobInstanceDetailsResponse;
|
|
|
+import org.apache.ambari.infra.model.JobOperationParams;
|
|
|
+import org.apache.ambari.infra.model.StepExecutionContextResponse;
|
|
|
+import org.apache.ambari.infra.model.StepExecutionInfoResponse;
|
|
|
+import org.apache.ambari.infra.model.StepExecutionProgressResponse;
|
|
|
+import org.springframework.batch.admin.history.StepExecutionHistory;
|
|
|
+import org.springframework.batch.admin.service.JobService;
|
|
|
+import org.springframework.batch.admin.service.NoSuchStepExecutionException;
|
|
|
+import org.springframework.batch.admin.web.JobInfo;
|
|
|
+import org.springframework.batch.admin.web.StepExecutionProgress;
|
|
|
+import org.springframework.batch.core.JobExecution;
|
|
|
+import org.springframework.batch.core.JobInstance;
|
|
|
+import org.springframework.batch.core.JobParametersBuilder;
|
|
|
+import org.springframework.batch.core.JobParametersInvalidException;
|
|
|
+import org.springframework.batch.core.StepExecution;
|
|
|
+import org.springframework.batch.core.launch.JobExecutionNotRunningException;
|
|
|
+import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
|
|
|
+import org.springframework.batch.core.launch.JobOperator;
|
|
|
+import org.springframework.batch.core.launch.NoSuchJobException;
|
|
|
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
|
|
|
+import org.springframework.batch.core.launch.NoSuchJobInstanceException;
|
|
|
+import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
|
|
|
+import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
|
|
|
+import org.springframework.batch.core.repository.JobRestartException;
|
|
|
+
|
|
|
+import javax.inject.Inject;
|
|
|
+import javax.inject.Named;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.TimeZone;
|
|
|
+
|
|
|
+@Named
|
|
|
+public class JobManager {
|
|
|
+
|
|
|
+ @Inject
|
|
|
+ private JobService jobService;
|
|
|
+
|
|
|
+ @Inject
|
|
|
+ private JobOperator jobOperator;
|
|
|
+
|
|
|
+ private TimeZone timeZone = TimeZone.getDefault();
|
|
|
+
|
|
|
+ public Set<String> getAllJobNames() {
|
|
|
+ return jobOperator.getJobNames();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Launch a new job instance (based on job name) and applies customized parameters to it.
|
|
|
+ * Also add a new date parameter to make sure the job instance will be unique
|
|
|
+ */
|
|
|
+ public JobExecutionInfoResponse launchJob(String jobName, String params)
|
|
|
+ throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException,
|
|
|
+ JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
|
|
|
+ // TODO: handle params
|
|
|
+ JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
|
|
|
+ jobParametersBuilder.addDate("date", new Date());
|
|
|
+ return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all executions ids that mapped to specific job name,
|
|
|
+ */
|
|
|
+ public Set<Long> getExecutionIdsByJobName(String jobName) throws NoSuchJobException {
|
|
|
+ return jobOperator.getRunningExecutions(jobName);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop all running job executions and returns with the number of stopped jobs.
|
|
|
+ */
|
|
|
+ public Integer stopAllJobs() {
|
|
|
+ return jobService.stopAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gather job execution details by job execution id.
|
|
|
+ */
|
|
|
+ public JobExecutionDetailsResponse getExectionInfo(Long jobExecutionId) throws NoSuchJobExecutionException {
|
|
|
+ JobExecution jobExecution = jobService.getJobExecution(jobExecutionId);
|
|
|
+ List<StepExecutionInfoResponse> stepExecutionInfos = new ArrayList<StepExecutionInfoResponse>();
|
|
|
+ for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
|
|
|
+ stepExecutionInfos.add(new StepExecutionInfoResponse(stepExecution, timeZone));
|
|
|
+ }
|
|
|
+ Collections.sort(stepExecutionInfos, new Comparator<StepExecutionInfoResponse>() {
|
|
|
+ @Override
|
|
|
+ public int compare(StepExecutionInfoResponse o1, StepExecutionInfoResponse o2) {
|
|
|
+ return o1.getId().compareTo(o2.getId());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfos);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop or abandon a running job execution by job execution id
|
|
|
+ */
|
|
|
+ public JobExecutionInfoResponse stopOrAbandonJobByExecutionId(Long jobExecutionId, JobOperationParams.JobStopOrAbandonOperationParam operation)
|
|
|
+ throws NoSuchJobExecutionException, JobExecutionNotRunningException, JobExecutionAlreadyRunningException {
|
|
|
+ JobExecution jobExecution;
|
|
|
+ if (JobOperationParams.JobStopOrAbandonOperationParam.STOP.equals(operation)) {
|
|
|
+ jobExecution = jobService.stop(jobExecutionId);
|
|
|
+ } else if (JobOperationParams.JobStopOrAbandonOperationParam.ABANDON.equals(operation)) {
|
|
|
+ jobExecution = jobService.abandon(jobExecutionId);
|
|
|
+ } else {
|
|
|
+ throw new UnsupportedOperationException("Unsupported operaration");
|
|
|
+ }
|
|
|
+ return new JobExecutionInfoResponse(jobExecution, timeZone);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get execution context for a job execution instance. (context can be shipped between job executions)
|
|
|
+ */
|
|
|
+ public ExecutionContextResponse getExecutionContextByJobExecutionId(Long executionId) throws NoSuchJobExecutionException {
|
|
|
+ JobExecution jobExecution = jobService.getJobExecution(executionId);
|
|
|
+ Map<String, Object> executionMap = new HashMap<>();
|
|
|
+ for (Map.Entry<String, Object> entry : jobExecution.getExecutionContext().entrySet()) {
|
|
|
+ executionMap.put(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ return new ExecutionContextResponse(executionId, executionMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Restart a specific job instance with the same parameters. (only restart operation is supported here)
|
|
|
+ */
|
|
|
+ public JobExecutionInfoResponse restart(Long jobInstanceId, String jobName,
|
|
|
+ JobOperationParams.JobRestartOperationParam operation) throws NoSuchJobException, JobParametersInvalidException,
|
|
|
+ JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobExecutionException {
|
|
|
+ if (JobOperationParams.JobRestartOperationParam.RESTART.equals(operation)) {
|
|
|
+ Collection<JobExecution> jobExecutions = jobService.getJobExecutionsForJobInstance(jobName, jobInstanceId);
|
|
|
+ JobExecution jobExecution = jobExecutions.iterator().next();
|
|
|
+ Long jobExecutionId = jobExecution.getId();
|
|
|
+ return new JobExecutionInfoResponse(jobService.restart(jobExecutionId), timeZone);
|
|
|
+ } else {
|
|
|
+ throw new UnsupportedOperationException("Unsupported operation (try: RESTART)");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all job details. (paged)
|
|
|
+ */
|
|
|
+ public List<JobInfo> getAllJobs(int start, int pageSize) {
|
|
|
+ List<JobInfo> jobs = new ArrayList<>();
|
|
|
+ Collection<String> names = jobService.listJobs(start, pageSize);
|
|
|
+ for (String name : names) {
|
|
|
+ int count = 0;
|
|
|
+ try {
|
|
|
+ count = jobService.countJobExecutionsForJob(name);
|
|
|
+ }
|
|
|
+ catch (NoSuchJobException e) {
|
|
|
+ // shouldn't happen
|
|
|
+ }
|
|
|
+ boolean launchable = jobService.isLaunchable(name);
|
|
|
+ boolean incrementable = jobService.isIncrementable(name);
|
|
|
+ jobs.add(new JobInfo(name, count, null, launchable, incrementable));
|
|
|
+ }
|
|
|
+ return jobs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all executions for unique job instance.
|
|
|
+ */
|
|
|
+ public List<JobExecutionInfoResponse> getExecutionsForJobInstance(String jobName, Long jobInstanceId) throws NoSuchJobInstanceException, NoSuchJobException {
|
|
|
+ List<JobExecutionInfoResponse> result = Lists.newArrayList();
|
|
|
+ JobInstance jobInstance = jobService.getJobInstance(jobInstanceId);
|
|
|
+ Collection<JobExecution> jobExecutions = jobService.getJobExecutionsForJobInstance(jobName, jobInstance.getInstanceId());
|
|
|
+ for (JobExecution jobExecution : jobExecutions) {
|
|
|
+ result.add(new JobExecutionInfoResponse(jobExecution, timeZone));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get job details for a specific job. (paged)
|
|
|
+ */
|
|
|
+ public JobDetailsResponse getJobDetails(String jobName, int page, int size) throws NoSuchJobException {
|
|
|
+ List<JobInstanceDetailsResponse> jobInstanceResponses = Lists.newArrayList();
|
|
|
+ Collection<JobInstance> jobInstances = jobService.listJobInstances(jobName, page, size);
|
|
|
+
|
|
|
+ int count = jobService.countJobExecutionsForJob(jobName);
|
|
|
+ boolean launchable = jobService.isLaunchable(jobName);
|
|
|
+ boolean isIncrementable = jobService.isIncrementable(jobName);
|
|
|
+
|
|
|
+ for (JobInstance jobInstance: jobInstances) {
|
|
|
+ List<JobExecutionInfoResponse> executionInfos = Lists.newArrayList();
|
|
|
+ Collection<JobExecution> jobExecutions = jobService.getJobExecutionsForJobInstance(jobName, jobInstance.getId());
|
|
|
+ if (jobExecutions != null) {
|
|
|
+ for (JobExecution jobExecution : jobExecutions) {
|
|
|
+ executionInfos.add(new JobExecutionInfoResponse(jobExecution, timeZone));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ jobInstanceResponses.add(new JobInstanceDetailsResponse(jobInstance, executionInfos));
|
|
|
+ }
|
|
|
+ return new JobDetailsResponse(new JobInfo(jobName, count, launchable, isIncrementable), jobInstanceResponses);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get step execution details based for job execution id and step execution id.
|
|
|
+ */
|
|
|
+ public StepExecutionInfoResponse getStepExecution(Long jobExecutionId, Long stepExecutionId) throws NoSuchStepExecutionException, NoSuchJobExecutionException {
|
|
|
+ StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
|
|
|
+ return new StepExecutionInfoResponse(stepExecution, timeZone);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get step execution context details. (execution context can be shipped between steps)
|
|
|
+ */
|
|
|
+ public StepExecutionContextResponse getStepExecutionContext(Long jobExecutionId, Long stepExecutionId) throws NoSuchStepExecutionException, NoSuchJobExecutionException {
|
|
|
+ StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
|
|
|
+ Map<String, Object> executionMap = new HashMap<>();
|
|
|
+ for (Map.Entry<String, Object> entry : stepExecution.getExecutionContext().entrySet()) {
|
|
|
+ executionMap.put(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ return new StepExecutionContextResponse(executionMap, jobExecutionId, stepExecutionId, stepExecution.getStepName());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get step execution progress status detauls.
|
|
|
+ */
|
|
|
+ public StepExecutionProgressResponse getStepExecutionProgress(Long jobExecutionId, Long stepExecutionId) throws NoSuchStepExecutionException, NoSuchJobExecutionException {
|
|
|
+ StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
|
|
|
+ StepExecutionInfoResponse stepExecutionInfoResponse = new StepExecutionInfoResponse(stepExecution, timeZone);
|
|
|
+ String stepName = stepExecution.getStepName();
|
|
|
+ if (stepName.contains(":partition")) {
|
|
|
+ stepName = stepName.replaceAll("(:partition).*", "$1*");
|
|
|
+ }
|
|
|
+ String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
|
|
|
+ StepExecutionHistory stepExecutionHistory = computeHistory(jobName, stepName);
|
|
|
+ StepExecutionProgress stepExecutionProgress = new StepExecutionProgress(stepExecution, stepExecutionHistory);
|
|
|
+
|
|
|
+ return new StepExecutionProgressResponse(stepExecutionProgress, stepExecutionHistory, stepExecutionInfoResponse);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private StepExecutionHistory computeHistory(String jobName, String stepName) {
|
|
|
+ int total = jobService.countStepExecutionsForStep(jobName, stepName);
|
|
|
+ StepExecutionHistory stepExecutionHistory = new StepExecutionHistory(stepName);
|
|
|
+ for (int i = 0; i < total; i += 1000) {
|
|
|
+ for (StepExecution stepExecution : jobService.listStepExecutionsForStep(jobName, stepName, i, 1000)) {
|
|
|
+ stepExecutionHistory.append(stepExecution);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return stepExecutionHistory;
|
|
|
+ }
|
|
|
+}
|