|
@@ -26,6 +26,7 @@ import org.apache.ambari.view.pig.resources.jobs.models.PigJob;
|
|
|
import org.apache.ambari.view.pig.resources.jobs.utils.JobPolling;
|
|
|
import org.apache.ambari.view.pig.services.BaseService;
|
|
|
import org.apache.ambari.view.pig.templeton.client.TempletonApi;
|
|
|
+import org.apache.ambari.view.pig.utils.HdfsApi;
|
|
|
import org.apache.ambari.view.pig.utils.MisconfigurationFormattedException;
|
|
|
import org.apache.ambari.view.pig.utils.ServiceFormattedException;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -89,26 +90,26 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
});
|
|
|
|
|
|
for(PigJob job : notCompleted) {
|
|
|
- JobPolling.pollJob(context, job);
|
|
|
+ JobPolling.pollJob(this, job);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public PigJob create(PigJob object) {
|
|
|
- object.setStatus(PigJob.Status.SUBMITTING);
|
|
|
+ object.setStatus(PigJob.PIG_JOB_STATE_SUBMITTING);
|
|
|
PigJob job = super.create(object);
|
|
|
LOG.debug("Submitting job...");
|
|
|
|
|
|
try {
|
|
|
submitJob(object);
|
|
|
} catch (RuntimeException e) {
|
|
|
- object.setStatus(PigJob.Status.SUBMIT_FAILED);
|
|
|
+ object.setStatus(PigJob.PIG_JOB_STATE_SUBMIT_FAILED);
|
|
|
save(object);
|
|
|
LOG.debug("Job submit FAILED");
|
|
|
throw e;
|
|
|
}
|
|
|
LOG.debug("Job submit OK");
|
|
|
- object.setStatus(PigJob.Status.SUBMITTED);
|
|
|
+ object.setStatus(PigJob.PIG_JOB_STATE_SUBMITTED);
|
|
|
save(object);
|
|
|
return job;
|
|
|
}
|
|
@@ -121,13 +122,17 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
public void killJob(PigJob object) throws IOException {
|
|
|
LOG.debug("Killing job...");
|
|
|
|
|
|
- try {
|
|
|
- getTempletonApi().killJob(object.getJobId());
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.debug("Job kill FAILED");
|
|
|
- throw e;
|
|
|
+ if (object.getJobId() != null) {
|
|
|
+ try {
|
|
|
+ getTempletonApi().killJob(object.getJobId());
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.debug("Job kill FAILED");
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ LOG.debug("Job kill OK");
|
|
|
+ } else {
|
|
|
+ LOG.debug("Job was not submitted, ignoring kill request...");
|
|
|
}
|
|
|
- LOG.debug("Job kill OK");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -136,15 +141,18 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
*/
|
|
|
private void submitJob(PigJob job) {
|
|
|
String date = new SimpleDateFormat("dd-MM-yyyy-HH-mm-ss").format(new Date());
|
|
|
- String statusdir = String.format(context.getProperties().get("dataworker.jobs.path") +
|
|
|
- "/%s/%s_%s", getUsername(),
|
|
|
- job.getTitle().toLowerCase().replaceAll("[^a-zA-Z0-9 ]+", "").replace(" ", "_"),
|
|
|
- date);
|
|
|
+ String jobsBaseDir = context.getProperties().get("jobs.dir");
|
|
|
+ String storeBaseDir = context.getProperties().get("store.dir");
|
|
|
+ if (storeBaseDir == null || storeBaseDir.compareTo("null") == 0 || storeBaseDir.compareTo("") == 0)
|
|
|
+ storeBaseDir = context.getProperties().get("jobs.dir");
|
|
|
+ String jobNameCleaned = job.getTitle().toLowerCase().replaceAll("[^a-zA-Z0-9 ]+", "").replace(" ", "_");
|
|
|
+ String storedir = String.format(storeBaseDir + "/%s_%s", jobNameCleaned, date);
|
|
|
+ String statusdir = String.format(jobsBaseDir + "/%s_%s", jobNameCleaned, date);
|
|
|
|
|
|
- String newPigScriptPath = statusdir + "/script.pig";
|
|
|
- String newSourceFilePath = statusdir + "/source.pig";
|
|
|
- String newPythonScriptPath = statusdir + "/udf.py";
|
|
|
- String templetonParamsFilePath = statusdir + "/params";
|
|
|
+ String newPigScriptPath = storedir + "/script.pig";
|
|
|
+ String newSourceFilePath = storedir + "/source.pig";
|
|
|
+ String newPythonScriptPath = storedir + "/udf.py";
|
|
|
+ String templetonParamsFilePath = storedir + "/params";
|
|
|
try {
|
|
|
// additional file can be passed to copy into work directory
|
|
|
if (job.getSourceFileContent() != null && !job.getSourceFileContent().isEmpty()) {
|
|
@@ -152,13 +160,13 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
job.setSourceFileContent(null); // we should not store content in DB
|
|
|
save(job);
|
|
|
|
|
|
- FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newSourceFilePath, true);
|
|
|
+ FSDataOutputStream stream = HdfsApi.getInstance(context).create(newSourceFilePath, true);
|
|
|
stream.writeBytes(sourceFileContent);
|
|
|
stream.close();
|
|
|
} else {
|
|
|
if (job.getSourceFile() != null && !job.getSourceFile().isEmpty()) {
|
|
|
// otherwise, just copy original file
|
|
|
- if (!BaseService.getHdfsApi(context).copy(job.getSourceFile(), newSourceFilePath)) {
|
|
|
+ if (!HdfsApi.getInstance(context).copy(job.getSourceFile(), newSourceFilePath)) {
|
|
|
throw new ServiceFormattedException("Can't copy source file from " + job.getSourceFile() +
|
|
|
" to " + newPigScriptPath);
|
|
|
}
|
|
@@ -176,16 +184,16 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
String forcedContent = job.getForcedContent();
|
|
|
// variable for sourceFile can be passed from front-ent
|
|
|
forcedContent = forcedContent.replace("${sourceFile}",
|
|
|
- context.getProperties().get("dataworker.defaultFs") + newSourceFilePath);
|
|
|
+ context.getProperties().get("webhdfs.url") + newSourceFilePath);
|
|
|
job.setForcedContent(null); // we should not store content in DB
|
|
|
save(job);
|
|
|
|
|
|
- FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newPigScriptPath, true);
|
|
|
+ FSDataOutputStream stream = HdfsApi.getInstance(context).create(newPigScriptPath, true);
|
|
|
stream.writeBytes(forcedContent);
|
|
|
stream.close();
|
|
|
} else {
|
|
|
// otherwise, just copy original file
|
|
|
- if (!BaseService.getHdfsApi(context).copy(job.getPigScript(), newPigScriptPath)) {
|
|
|
+ if (!HdfsApi.getInstance(context).copy(job.getPigScript(), newPigScriptPath)) {
|
|
|
throw new ServiceFormattedException("Can't copy pig script file from " + job.getPigScript() +
|
|
|
" to " + newPigScriptPath);
|
|
|
}
|
|
@@ -198,7 +206,7 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
|
|
|
if (job.getPythonScript() != null && !job.getPythonScript().isEmpty()) {
|
|
|
try {
|
|
|
- if (!BaseService.getHdfsApi(context).copy(job.getPythonScript(), newPythonScriptPath)) {
|
|
|
+ if (!HdfsApi.getInstance(context).copy(job.getPythonScript(), newPythonScriptPath)) {
|
|
|
throw new ServiceFormattedException("Can't copy python udf script file from " + job.getPythonScript() +
|
|
|
" to " + newPythonScriptPath);
|
|
|
}
|
|
@@ -210,7 +218,7 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- FSDataOutputStream stream = BaseService.getHdfsApi(context).create(templetonParamsFilePath, true);
|
|
|
+ FSDataOutputStream stream = HdfsApi.getInstance(context).create(templetonParamsFilePath, true);
|
|
|
if (job.getTempletonArguments() != null) {
|
|
|
stream.writeBytes(job.getTempletonArguments());
|
|
|
}
|
|
@@ -235,7 +243,7 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
}
|
|
|
job.setJobId(data.id);
|
|
|
|
|
|
- JobPolling.pollJob(context, job);
|
|
|
+ JobPolling.pollJob(this, job);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -253,29 +261,40 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
|
|
|
if (info.status != null && (info.status.containsKey("runState"))) {
|
|
|
//TODO: retrieve from RM
|
|
|
+ Long time = System.currentTimeMillis() / 1000L;
|
|
|
+ Long currentDuration = time - job.getDateStarted();
|
|
|
int runState = ((Double) info.status.get("runState")).intValue();
|
|
|
+ boolean isStatusChanged = false;
|
|
|
switch (runState) {
|
|
|
- case PigJob.RUN_STATE_KILLED:
|
|
|
+ case RUN_STATE_KILLED:
|
|
|
LOG.debug(String.format("Job KILLED: %s", job.getJobId()));
|
|
|
- job.setStatus(PigJob.Status.KILLED);
|
|
|
+ isStatusChanged = job.getStatus() != PigJob.PIG_JOB_STATE_KILLED;
|
|
|
+ job.setStatus(PigJob.PIG_JOB_STATE_KILLED);
|
|
|
break;
|
|
|
- case PigJob.RUN_STATE_FAILED:
|
|
|
+ case RUN_STATE_FAILED:
|
|
|
LOG.debug(String.format("Job FAILED: %s", job.getJobId()));
|
|
|
- job.setStatus(PigJob.Status.FAILED);
|
|
|
+ isStatusChanged = job.getStatus() != PigJob.PIG_JOB_STATE_FAILED;
|
|
|
+ job.setStatus(PigJob.PIG_JOB_STATE_FAILED);
|
|
|
break;
|
|
|
- case PigJob.RUN_STATE_PREP:
|
|
|
- case PigJob.RUN_STATE_RUNNING:
|
|
|
- job.setStatus(PigJob.Status.RUNNING);
|
|
|
+ case RUN_STATE_PREP:
|
|
|
+ case RUN_STATE_RUNNING:
|
|
|
+ isStatusChanged = job.getStatus() != PigJob.PIG_JOB_STATE_RUNNING;
|
|
|
+ job.setStatus(PigJob.PIG_JOB_STATE_RUNNING);
|
|
|
break;
|
|
|
- case PigJob.RUN_STATE_SUCCEEDED:
|
|
|
+ case RUN_STATE_SUCCEEDED:
|
|
|
LOG.debug(String.format("Job COMPLETED: %s", job.getJobId()));
|
|
|
- job.setStatus(PigJob.Status.COMPLETED);
|
|
|
+ isStatusChanged = job.getStatus() != PigJob.PIG_JOB_STATE_COMPLETED;
|
|
|
+ job.setStatus(PigJob.PIG_JOB_STATE_COMPLETED);
|
|
|
break;
|
|
|
default:
|
|
|
LOG.debug(String.format("Job in unknown state: %s", job.getJobId()));
|
|
|
- job.setStatus(PigJob.Status.UNKNOWN);
|
|
|
+ isStatusChanged = job.getStatus() != PigJob.PIG_JOB_STATE_UNKNOWN;
|
|
|
+ job.setStatus(PigJob.PIG_JOB_STATE_UNKNOWN);
|
|
|
break;
|
|
|
}
|
|
|
+ if (isStatusChanged) {
|
|
|
+ job.setDuration(currentDuration);
|
|
|
+ }
|
|
|
}
|
|
|
Pattern pattern = Pattern.compile("\\d+");
|
|
|
Matcher matcher = null;
|
|
@@ -306,13 +325,13 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
}
|
|
|
|
|
|
private static TempletonApi connectToTempletonApi(ViewContext context) {
|
|
|
- String webhcatUrl = context.getProperties().get("dataworker.webhcat.url");
|
|
|
+ String webhcatUrl = context.getProperties().get("webhcat.url");
|
|
|
if (webhcatUrl == null) {
|
|
|
- String message = "dataworker.webhcat.url is not configured!";
|
|
|
+ String message = "webhcat.url is not configured!";
|
|
|
LOG.error(message);
|
|
|
- throw new MisconfigurationFormattedException("dataworker.webhcat.url");
|
|
|
+ throw new MisconfigurationFormattedException("webhcat.url");
|
|
|
}
|
|
|
- return new TempletonApi(context.getProperties().get("dataworker.webhcat.url"),
|
|
|
+ return new TempletonApi(context.getProperties().get("webhcat.url"),
|
|
|
getTempletonUser(context), getTempletonUser(context), context);
|
|
|
}
|
|
|
|
|
@@ -322,12 +341,16 @@ public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
|
|
|
* @return username in templeton
|
|
|
*/
|
|
|
private static String getTempletonUser(ViewContext context) {
|
|
|
- String username = context.getProperties().get("dataworker.webhcat.user");
|
|
|
- if (username == null) {
|
|
|
- String message = "dataworker.webhcat.user is not configured!";
|
|
|
- LOG.error(message);
|
|
|
- throw new MisconfigurationFormattedException("dataworker.webhcat.user");
|
|
|
+ String username = context.getProperties().get("webhcat.username");
|
|
|
+ if (username == null || username.compareTo("null") == 0 || username.compareTo("") == 0) {
|
|
|
+ username = getUsername(context);
|
|
|
}
|
|
|
return username;
|
|
|
}
|
|
|
+
|
|
|
+ public static final int RUN_STATE_RUNNING = 1;
|
|
|
+ public static final int RUN_STATE_SUCCEEDED = 2;
|
|
|
+ public static final int RUN_STATE_FAILED = 3;
|
|
|
+ public static final int RUN_STATE_PREP = 4;
|
|
|
+ public static final int RUN_STATE_KILLED = 5;
|
|
|
}
|