浏览代码

AMBARI-2759. ambari-log4j doesn't work with MySQL. (Chen Chun via billie)

Billie Rinaldi 11 年之前
父节点
当前提交
44cc11dfb3

+ 3 - 2
ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java

@@ -81,8 +81,9 @@ public class PostgresConnector implements DBConnector {
     FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
     FJTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? ORDER BY "
         + TaskAttemptFields.STARTTIME),
-    FWTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", (SELECT " + JobFields.JOBID + " as id FROM " + JOB_TABLE_NAME
-        + " WHERE " + JobFields.WORKFLOWID + " = ?) AS jobs WHERE " + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = jobs.id "
+    FWTA_PS("SELECT " + TaskAttemptFields.join(TASK_ATTEMPT_TABLE_NAME) + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", " + JOB_TABLE_NAME + " WHERE "
+        + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = " + JOB_TABLE_NAME + "." + JobFields.JOBID + " AND " + JOB_TABLE_NAME + "."
+        + JobFields.WORKFLOWID + " = ?"
         + " ORDER BY " + TaskAttemptFields.JOBID + "," + TaskAttemptFields.STARTTIME + ", " + TaskAttemptFields.FINISHTIME),
     FTA_TIMERANGE_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.FINISHTIME + " >= ? AND "
         + TaskAttemptFields.STARTTIME + " <= ? AND (" + TaskAttemptFields.TASKTYPE + " = 'MAP' OR  " + TaskAttemptFields.TASKTYPE + " = 'REDUCE') ORDER BY "

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java

@@ -57,6 +57,13 @@ public class TaskAttempt {
         tmp[i] = TaskAttemptFields.values()[i].toString();
       return StringUtils.join(tmp, ",");
     }
+
+    public static String join(String tableName) {
+      String[] tmp = new String[TaskAttemptFields.values().length];
+      for (int i = 0; i < tmp.length; i++)
+        tmp[i] = tableName + "." + TaskAttemptFields.values()[i].toString();
+      return StringUtils.join(tmp, ",");
+    }
   }
   
   public static final String TASK_ATTEMPT_FIELDS = TaskAttemptFields.join();

+ 24 - 16
contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java

@@ -162,9 +162,7 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
                 "workflowContext = ?, " +
                 "numJobsTotal = ?, " +
                 "lastUpdateTime = ?, " +
-                "duration = ? - (SELECT startTime FROM " +
-                WORKFLOW_TABLE +
-                " WHERE workflowId = ?) " +
+                "duration = ? - startTime " +
                 "WHERE workflowId = ?"
             );
     
@@ -174,20 +172,31 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
                 WORKFLOW_TABLE +
                 " SET " +
                 "lastUpdateTime = ?, " +
-                "duration = ? - (SELECT startTime FROM " +
-                WORKFLOW_TABLE +
-                " WHERE workflowId = selectid), " +
-                "numJobsCompleted = rows, " +
-                "inputBytes = input, " +
-                "outputBytes = output " +
-            "FROM (SELECT count(*) as rows, sum(inputBytes) as input, " +
-                "sum(outputBytes) as output, workflowId as selectid FROM " +
-                JOB_TABLE +
+                "duration = ? - startTime, " +
+                "numJobsCompleted = (" +
+                  "SELECT count(*)" +
+                  " FROM " +
+                  JOB_TABLE +
+                  " WHERE " +
+                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+                  " AND status = 'SUCCESS'), " +
+                "inputBytes = (" +
+                  "SELECT sum(inputBytes)" +
+                  " FROM " +
+                  JOB_TABLE +
+                  " WHERE " +
+                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+                  " AND status = 'SUCCESS'), " +
+                "outputBytes = (" +
+                  "SELECT sum(outputBytes)" +
+                  " FROM " +
+                  JOB_TABLE +
+                  " WHERE " +
+                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+                  " AND status = 'SUCCESS') " +
                 " WHERE workflowId = (SELECT workflowId FROM " +
                 JOB_TABLE +
-                " WHERE jobId = ?) AND status = 'SUCCESS' " +
-                "GROUP BY workflowId) as jobsummary " +
-            "WHERE workflowId = selectid"
+                " WHERE jobId = ?)"
             );
     
     // JobFinishedEvent
@@ -726,7 +735,6 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
         workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime());
         workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime());
         workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId());
-        workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId());
         workflowUpdateTimePS.executeUpdate();
         LOG.debug("Successfully updated workflowId = " + 
             workflowContext.getWorkflowId());