|
@@ -27,18 +27,20 @@ import java.util.EnumMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
-import org.apache.ambari.eventdb.model.*;
|
|
|
+import org.apache.ambari.eventdb.model.DataTable;
|
|
|
import org.apache.ambari.eventdb.model.DataTable.AvgData;
|
|
|
import org.apache.ambari.eventdb.model.DataTable.Summary;
|
|
|
import org.apache.ambari.eventdb.model.DataTable.Summary.SummaryFields;
|
|
|
import org.apache.ambari.eventdb.model.DataTable.Times;
|
|
|
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
|
|
|
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry.JobFields;
|
|
|
+import org.apache.ambari.eventdb.model.TaskAttempt;
|
|
|
import org.apache.ambari.eventdb.model.TaskAttempt.TaskAttemptFields;
|
|
|
+import org.apache.ambari.eventdb.model.WorkflowContext;
|
|
|
+import org.apache.ambari.eventdb.model.Workflows;
|
|
|
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
|
|
|
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
|
|
|
import org.apache.commons.lang.NotImplementedException;
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.codehaus.jackson.JsonParseException;
|
|
@@ -48,7 +50,6 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|
|
public class PostgresConnector implements DBConnector {
|
|
|
private static Log LOG = LogFactory.getLog(PostgresConnector.class);
|
|
|
private static final String WORKFLOW_TABLE_NAME = "workflow";
|
|
|
- private static final String APP_TABLE_NAME = "application";
|
|
|
private static final String JOB_TABLE_NAME = "job";
|
|
|
private static final String TASK_ATTEMPT_TABLE_NAME = "taskattempt";
|
|
|
public static final String SORT_ASC = "ASC";
|
|
@@ -74,7 +75,6 @@ public class PostgresConnector implements DBConnector {
|
|
|
+ getAvg(WorkflowFields.DURATION, SummaryFields.avgDuration, SummaryFields.minDuration, SummaryFields.maxDuration) + ", min("
|
|
|
+ WorkflowFields.STARTTIME + ") as " + SummaryFields.oldest + ", max(" + WorkflowFields.STARTTIME + ") as " + SummaryFields.youngest + " FROM "
|
|
|
+ WORKFLOW_TABLE_NAME),
|
|
|
- FAD_PS("SELECT " + Apps.AppDBEntry.APP_FIELDS + " FROM " + APP_TABLE_NAME + " WHERE " + Apps.AppDBEntry.AppFields.WORKFLOWID.toString() + " = ?"),
|
|
|
FJD_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.WORKFLOWID.toString() + " = ?"),
|
|
|
FJD_TIMERANGE_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.FINISHTIME.toString() + " >= ? AND "
|
|
|
+ JobFields.SUBMITTIME.toString() + " <= ? ORDER BY " + JobFields.WORKFLOWID + ", " + JobFields.JOBID),
|
|
@@ -214,7 +214,6 @@ public class PostgresConnector implements DBConnector {
|
|
|
w.setOutputBytes(WorkflowFields.OUTPUTBYTES.getLong(rs));
|
|
|
w.setNumJobsCompleted(WorkflowFields.NUMJOBSCOMPLETED.getInt(rs));
|
|
|
w.setWorkflowContext(jsonMapper.readValue(WorkflowFields.WORKFLOWCONTEXT.getString(rs), WorkflowContext.class));
|
|
|
- w.setWorkflowTags(WorkflowFields.WORKFLOWTAGS.getString(rs));
|
|
|
return w;
|
|
|
}
|
|
|
|
|
@@ -229,8 +228,8 @@ public class PostgresConnector implements DBConnector {
|
|
|
@Override
|
|
|
public DataTable fetchWorkflows(int offset, int limit, String searchTerm, int echo, WorkflowFields col, boolean sortAscending, String searchWorkflowId,
|
|
|
String searchWorkflowName, String searchWorkflowType, String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes,
|
|
|
- long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime, long minFinishTime,
|
|
|
- long maxFinishTime, String tagSearchTerm) throws IOException {
|
|
|
+ long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime)
|
|
|
+ throws IOException {
|
|
|
int total = 0;
|
|
|
PreparedStatement ps = getPS(Statements.FW_COUNT_PS);
|
|
|
ResultSet rs = null;
|
|
@@ -250,8 +249,7 @@ public class PostgresConnector implements DBConnector {
|
|
|
}
|
|
|
|
|
|
String searchClause = buildSearchClause(searchTerm, searchWorkflowId, searchWorkflowName, searchWorkflowType, searchUserName, minJobs, maxJobs,
|
|
|
- minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime, minFinishTime, maxFinishTime,
|
|
|
- tagSearchTerm);
|
|
|
+ minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime, minFinishTime, maxFinishTime);
|
|
|
List<WorkflowDBEntry> workflows = fetchWorkflows(getQualifiedPS(Statements.FW_PS, searchClause, col, sortAscending, offset, limit));
|
|
|
Summary summary = fetchSummary(getQualifiedPS(Statements.FW_SUMMARY_PS, searchClause));
|
|
|
DataTable table = new DataTable();
|
|
@@ -269,54 +267,7 @@ public class PostgresConnector implements DBConnector {
|
|
|
table.setSummary(summary);
|
|
|
return table;
|
|
|
}
|
|
|
-
|
|
|
- private static Apps.AppDBEntry getAppDBEntry(ResultSet rs) throws SQLException {
|
|
|
- Apps.AppDBEntry a = new Apps.AppDBEntry();
|
|
|
- a.setWorkflowId(Apps.AppDBEntry.AppFields.WORKFLOWID.getString(rs));
|
|
|
- a.setWorkflowEntityName(Apps.AppDBEntry.AppFields.WORKFLOWENTITYNAME.getString(rs));
|
|
|
- a.setAppId(Apps.AppDBEntry.AppFields.APPID.getString(rs));
|
|
|
- a.setAppName(Apps.AppDBEntry.AppFields.APPNAME.getString(rs));
|
|
|
- a.setAppType(Apps.AppDBEntry.AppFields.APPTYPE.getString(rs));
|
|
|
- a.setFinishTime(Apps.AppDBEntry.AppFields.FINISHTIME.getLong(rs));
|
|
|
- a.setLaunchTime(Apps.AppDBEntry.AppFields.LAUNCHTIME.getLong(rs));
|
|
|
- a.setQueue(Apps.AppDBEntry.AppFields.QUEUE.getString(rs));
|
|
|
- String[] stageStrings = StringUtils.split(Apps.AppDBEntry.AppFields.APPINFO.getString(rs), "-");
|
|
|
- List<Integer> stages = new ArrayList<Integer>();
|
|
|
- for (String s : stageStrings)
|
|
|
- stages.add(Integer.parseInt(s));
|
|
|
- a.setStages(stages);
|
|
|
- a.setStatus(Apps.AppDBEntry.AppFields.STATUS.getString(rs));
|
|
|
- a.setSubmitTime(Apps.AppDBEntry.AppFields.SUBMITTIME.getLong(rs));
|
|
|
- a.setUserName(Apps.AppDBEntry.AppFields.USERNAME.getString(rs));
|
|
|
- return a;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public List<Apps.AppDBEntry> fetchAppDetails(String workflowId) throws IOException {
|
|
|
- PreparedStatement ps = getPS(Statements.FAD_PS);
|
|
|
- List<Apps.AppDBEntry> apps = new ArrayList<Apps.AppDBEntry>();
|
|
|
- ResultSet rs = null;
|
|
|
- try {
|
|
|
- ps.setString(1, workflowId);
|
|
|
- rs = ps.executeQuery();
|
|
|
- while (rs.next()) {
|
|
|
- apps.add(getAppDBEntry(rs));
|
|
|
- }
|
|
|
- rs.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- throw new IOException(e);
|
|
|
- } finally {
|
|
|
- if (rs != null)
|
|
|
- try {
|
|
|
- rs.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- LOG.error("Exception while closing ResultSet", e);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- return apps;
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
private static JobDBEntry getJobDBEntry(ResultSet rs) throws SQLException {
|
|
|
JobDBEntry j = new JobDBEntry();
|
|
|
j.setConfPath(JobFields.CONFPATH.getString(rs));
|
|
@@ -338,7 +289,7 @@ public class PostgresConnector implements DBConnector {
|
|
|
j.setWorkflowId(JobFields.WORKFLOWID.getString(rs));
|
|
|
return j;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public List<JobDBEntry> fetchJobDetails(String workflowId) throws IOException {
|
|
|
PreparedStatement ps = getPS(Statements.FJD_PS);
|
|
@@ -360,11 +311,11 @@ public class PostgresConnector implements DBConnector {
|
|
|
} catch (SQLException e) {
|
|
|
LOG.error("Exception while closing ResultSet", e);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
return jobs;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public List<JobDBEntry> fetchJobDetails(long minFinishTime, long maxStartTime) throws IOException {
|
|
|
PreparedStatement ps = getPS(Statements.FJD_TIMERANGE_PS);
|
|
@@ -597,7 +548,7 @@ public class PostgresConnector implements DBConnector {
|
|
|
|
|
|
private static String buildSearchClause(String searchTerm, String searchWorkflowId, String searchWorkflowName, String searchWorkflowType,
|
|
|
String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes, long minOutputBytes, long maxOutputBytes, long minDuration,
|
|
|
- long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime, String searchTags) {
|
|
|
+ long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime) {
|
|
|
StringBuilder sb = new StringBuilder();
|
|
|
sb.append(WHERE);
|
|
|
if (searchTerm != null && searchTerm.length() > 0) {
|
|
@@ -617,8 +568,6 @@ public class PostgresConnector implements DBConnector {
|
|
|
append(sb, startsWith(WorkflowFields.WORKFLOWID, searchWorkflowType));
|
|
|
if (searchUserName != null)
|
|
|
append(sb, equals(WorkflowFields.USERNAME, searchUserName));
|
|
|
- if (searchTags != null)
|
|
|
- append(sb, like(WorkflowFields.WORKFLOWTAGS, searchTags));
|
|
|
addRangeSearch(sb, WorkflowFields.NUMJOBSTOTAL, minJobs, maxJobs);
|
|
|
addRangeSearch(sb, WorkflowFields.INPUTBYTES, minInputBytes, maxInputBytes);
|
|
|
addRangeSearch(sb, WorkflowFields.OUTPUTBYTES, minOutputBytes, maxOutputBytes);
|