Kaynağa Gözat

AMBARI-1457. Improve Job Diagnostics. (Billie Rinaldi via yusaku)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/trunk@1448447 13f79535-47bb-0310-9956-ffa450edef68
Yusaku Sako 12 yıl önce
ebeveyn
işleme
c249c25efe

+ 2 - 0
CHANGES.txt

@@ -42,6 +42,8 @@ Trunk (unreleased changes):
  accessible for demo/test purposes. (mahadev)
 
  IMPROVEMENTS
+
+ AMBARI-1457. Improve Job Diagnostics. (Billie Rinaldi via yusaku)
  
  AMBARI-1453. Move Ambari Web application config from initialize.js to
  another config file. (yusaku)

+ 9 - 2
ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java

@@ -37,13 +37,20 @@ public interface DBConnector {
   
   public DataTable fetchWorkflows(int offset, int limit, String searchTerm, int echo, WorkflowFields field, boolean sortAscending, String searchWorkflowId,
       String searchWorkflowName, String searchWorkflowType, String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes,
-      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime) throws IOException;
+      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime)
+      throws IOException;
   
   public List<JobDBEntry> fetchJobDetails(String workflowID) throws IOException;
   
+  public List<JobDBEntry> fetchJobDetails(long minFinishTime, long maxStartTime) throws IOException;
+  
   public long[] fetchJobStartStopTimes(String jobID) throws IOException;
   
-  public List<TaskAttempt> fetchTaskAttempts(String jobID, String taskType) throws IOException;
+  public List<TaskAttempt> fetchJobTaskAttempts(String jobID) throws IOException;
+  
+  public List<TaskAttempt> fetchWorkflowTaskAttempts(String workflowID) throws IOException;
+  
+  public List<TaskAttempt> fetchTaskAttempts(long minFinishTime, long maxStartTime) throws IOException;
   
   public void close();
 }

+ 137 - 40
ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java

@@ -75,9 +75,17 @@ public class PostgresConnector implements DBConnector {
         + WorkflowFields.STARTTIME + ") as " + SummaryFields.youngest + ", max(" + WorkflowFields.STARTTIME + ") as " + SummaryFields.oldest + " FROM "
         + WORKFLOW_TABLE_NAME),
     FJD_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.WORKFLOWID.toString() + " = ?"),
+    FJD_TIMERANGE_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.FINISHTIME.toString() + " >= ? AND "
+        + JobFields.SUBMITTIME.toString() + " <= ? ORDER BY " + JobFields.WORKFLOWID + ", " + JobFields.JOBID),
     FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
-    FTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? AND "
-        + TaskAttemptFields.TASKTYPE + " = ? ORDER BY " + TaskAttemptFields.STARTTIME);
+    FJTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? ORDER BY "
+        + TaskAttemptFields.STARTTIME),
+    FWTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", (SELECT " + JobFields.JOBID + " as id FROM " + JOB_TABLE_NAME
+        + " WHERE " + JobFields.WORKFLOWID + " = ?) AS jobs WHERE " + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = jobs.id "
+        + " ORDER BY " + TaskAttemptFields.JOBID + "," + TaskAttemptFields.STARTTIME + ", " + TaskAttemptFields.FINISHTIME),
+    FTA_TIMERANGE_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.FINISHTIME + " >= ? AND "
+        + TaskAttemptFields.STARTTIME + " <= ? AND (" + TaskAttemptFields.TASKTYPE + " = 'MAP' OR  " + TaskAttemptFields.TASKTYPE + " = 'REDUCE') ORDER BY "
+        + TaskAttemptFields.STARTTIME);
     
     private String statementString;
     
@@ -219,7 +227,8 @@ public class PostgresConnector implements DBConnector {
   @Override
   public DataTable fetchWorkflows(int offset, int limit, String searchTerm, int echo, WorkflowFields col, boolean sortAscending, String searchWorkflowId,
       String searchWorkflowName, String searchWorkflowType, String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes,
-      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime) throws IOException {
+      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime)
+      throws IOException {
     int total = 0;
     PreparedStatement ps = getPS(Statements.FW_COUNT_PS);
     ResultSet rs = null;
@@ -239,7 +248,7 @@ public class PostgresConnector implements DBConnector {
     }
     
     String searchClause = buildSearchClause(searchTerm, searchWorkflowId, searchWorkflowName, searchWorkflowType, searchUserName, minJobs, maxJobs,
-        minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime);
+        minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime, minFinishTime, maxFinishTime);
     List<WorkflowDBEntry> workflows = fetchWorkflows(getQualifiedPS(Statements.FW_PS, searchClause, col, sortAscending, offset, limit));
     Summary summary = fetchSummary(getQualifiedPS(Statements.FW_SUMMARY_PS, searchClause));
     DataTable table = new DataTable();
@@ -258,6 +267,28 @@ public class PostgresConnector implements DBConnector {
     return table;
   }
   
+  private static JobDBEntry getJobDBEntry(ResultSet rs) throws SQLException {
+    JobDBEntry j = new JobDBEntry();
+    j.setConfPath(JobFields.CONFPATH.getString(rs));
+    j.setSubmitTime(JobFields.SUBMITTIME.getLong(rs));
+    long finishTime = JobFields.FINISHTIME.getLong(rs);
+    if (finishTime > j.getSubmitTime())
+      j.setElapsedTime(finishTime - j.getSubmitTime());
+    else
+      j.setElapsedTime(0);
+    j.setInputBytes(JobFields.INPUTBYTES.getLong(rs));
+    j.setJobId(JobFields.JOBID.getString(rs));
+    j.setJobName(JobFields.JOBNAME.getString(rs));
+    j.setMaps(JobFields.MAPS.getInt(rs));
+    j.setOutputBytes(JobFields.OUTPUTBYTES.getLong(rs));
+    j.setReduces(JobFields.REDUCES.getInt(rs));
+    j.setStatus(JobFields.STATUS.getString(rs));
+    j.setUserName(JobFields.USERNAME.getString(rs));
+    j.setWorkflowEntityName(JobFields.WORKFLOWENTITYNAME.getString(rs));
+    j.setWorkflowId(JobFields.WORKFLOWID.getString(rs));
+    return j;
+  }
+  
   @Override
   public List<JobDBEntry> fetchJobDetails(String workflowId) throws IOException {
     PreparedStatement ps = getPS(Statements.FJD_PS);
@@ -267,25 +298,34 @@ public class PostgresConnector implements DBConnector {
       ps.setString(1, workflowId);
       rs = ps.executeQuery();
       while (rs.next()) {
-        JobDBEntry j = new JobDBEntry();
-        j.setConfPath(JobFields.CONFPATH.getString(rs));
-        j.setSubmitTime(JobFields.SUBMITTIME.getLong(rs));
-        long finishTime = JobFields.FINISHTIME.getLong(rs);
-        if (finishTime > j.getSubmitTime())
-          j.setElapsedTime(finishTime - j.getSubmitTime());
-        else
-          j.setElapsedTime(0);
-        j.setInputBytes(JobFields.INPUTBYTES.getLong(rs));
-        j.setJobId(JobFields.JOBID.getString(rs));
-        j.setJobName(JobFields.JOBNAME.getString(rs));
-        j.setMaps(JobFields.MAPS.getInt(rs));
-        j.setOutputBytes(JobFields.OUTPUTBYTES.getLong(rs));
-        j.setReduces(JobFields.REDUCES.getInt(rs));
-        j.setStatus(JobFields.STATUS.getString(rs));
-        j.setUserName(JobFields.USERNAME.getString(rs));
-        j.setWorkflowEntityName(JobFields.WORKFLOWENTITYNAME.getString(rs));
-        j.setWorkflowId(JobFields.WORKFLOWID.getString(rs));
-        jobs.add(j);
+        jobs.add(getJobDBEntry(rs));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
+      
+    }
+    return jobs;
+  }
+  
+  @Override
+  public List<JobDBEntry> fetchJobDetails(long minFinishTime, long maxStartTime) throws IOException {
+    PreparedStatement ps = getPS(Statements.FJD_TIMERANGE_PS);
+    List<JobDBEntry> jobs = new ArrayList<JobDBEntry>();
+    ResultSet rs = null;
+    try {
+      ps.setLong(1, minFinishTime);
+      ps.setLong(2, maxStartTime);
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        jobs.add(getJobDBEntry(rs));
       }
       rs.close();
     } catch (SQLException e) {
@@ -332,29 +372,84 @@ public class PostgresConnector implements DBConnector {
     return times;
   }
   
+  private static TaskAttempt getTaskAttempt(ResultSet rs) throws SQLException {
+    TaskAttempt t = new TaskAttempt();
+    t.setFinishTime(TaskAttemptFields.FINISHTIME.getLong(rs));
+    t.setInputBytes(TaskAttemptFields.INPUTBYTES.getLong(rs));
+    t.setJobId(TaskAttemptFields.JOBID.getString(rs));
+    t.setLocality(TaskAttemptFields.LOCALITY.getString(rs));
+    t.setMapFinishTime(TaskAttemptFields.MAPFINISHTIME.getLong(rs));
+    t.setOutputBytes(TaskAttemptFields.OUTPUTBYTES.getLong(rs));
+    t.setShuffleFinishTime(TaskAttemptFields.SHUFFLEFINISHTIME.getLong(rs));
+    t.setSortFinishTime(TaskAttemptFields.SORTFINISHTIME.getLong(rs));
+    t.setStartTime(TaskAttemptFields.STARTTIME.getLong(rs));
+    t.setStatus(TaskAttemptFields.STATUS.getString(rs));
+    t.setTaskAttemptId(TaskAttemptFields.TASKATTEMPTID.getString(rs));
+    t.setTaskType(TaskAttemptFields.TASKTYPE.getString(rs));
+    return t;
+  }
+  
   @Override
-  public List<TaskAttempt> fetchTaskAttempts(String jobID, String taskType) throws IOException {
-    PreparedStatement ps = getPS(Statements.FTA_PS);
+  public List<TaskAttempt> fetchTaskAttempts(long minFinishTime, long maxStartTime) throws IOException {
+    PreparedStatement ps = getPS(Statements.FTA_TIMERANGE_PS);
+    List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
+    ResultSet rs = null;
+    try {
+      ps.setLong(1, minFinishTime);
+      ps.setLong(2, maxStartTime);
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        taskAttempts.add(getTaskAttempt(rs));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
+    }
+    return taskAttempts;
+  }
+  
+  @Override
+  public List<TaskAttempt> fetchJobTaskAttempts(String jobID) throws IOException {
+    PreparedStatement ps = getPS(Statements.FJTA_PS);
     List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
     ResultSet rs = null;
     try {
       ps.setString(1, jobID);
-      ps.setString(2, taskType);
       rs = ps.executeQuery();
       while (rs.next()) {
-        TaskAttempt t = new TaskAttempt();
-        t.setFinishTime(TaskAttemptFields.FINISHTIME.getLong(rs));
-        t.setInputBytes(TaskAttemptFields.INPUTBYTES.getLong(rs));
-        t.setLocality(TaskAttemptFields.LOCALITY.getString(rs));
-        t.setMapFinishTime(TaskAttemptFields.MAPFINISHTIME.getLong(rs));
-        t.setOutputBytes(TaskAttemptFields.OUTPUTBYTES.getLong(rs));
-        t.setShuffleFinishTime(TaskAttemptFields.SHUFFLEFINISHTIME.getLong(rs));
-        t.setSortFinishTime(TaskAttemptFields.SORTFINISHTIME.getLong(rs));
-        t.setStartTime(TaskAttemptFields.STARTTIME.getLong(rs));
-        t.setStatus(TaskAttemptFields.STATUS.getString(rs));
-        t.setTaskAttemptId(TaskAttemptFields.TASKATTEMPTID.getString(rs));
-        t.setTaskType(TaskAttemptFields.TASKTYPE.getString(rs));
-        taskAttempts.add(t);
+        taskAttempts.add(getTaskAttempt(rs));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
+    }
+    return taskAttempts;
+  }
+  
+  @Override
+  public List<TaskAttempt> fetchWorkflowTaskAttempts(String workflowId) throws IOException {
+    PreparedStatement ps = getPS(Statements.FWTA_PS);
+    List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
+    ResultSet rs = null;
+    try {
+      ps.setString(1, workflowId);
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        taskAttempts.add(getTaskAttempt(rs));
       }
       rs.close();
     } catch (SQLException e) {
@@ -377,6 +472,7 @@ public class PostgresConnector implements DBConnector {
     synchronized (preparedStatements) {
       if (!preparedStatements.containsKey(statement)) {
         try {
+          // LOG.debug("preparing " + statement.getStatementString());
           preparedStatements.put(statement, db.prepareStatement(statement.getStatementString()));
         } catch (SQLException e) {
           throw new IOException(e);
@@ -451,7 +547,7 @@ public class PostgresConnector implements DBConnector {
   
   private static String buildSearchClause(String searchTerm, String searchWorkflowId, String searchWorkflowName, String searchWorkflowType,
       String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes, long minOutputBytes, long maxOutputBytes, long minDuration,
-      long maxDuration, long minStartTime, long maxStartTime) {
+      long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime) {
     StringBuilder sb = new StringBuilder();
     sb.append(WHERE);
     if (searchTerm != null && searchTerm.length() > 0) {
@@ -476,6 +572,7 @@ public class PostgresConnector implements DBConnector {
     addRangeSearch(sb, WorkflowFields.OUTPUTBYTES, minOutputBytes, maxOutputBytes);
     addRangeSearch(sb, WorkflowFields.DURATION, minDuration, maxDuration);
     addRangeSearch(sb, WorkflowFields.STARTTIME, minStartTime, maxStartTime);
+    addRangeSearch(sb, WorkflowFields.LASTUPDATETIME, minFinishTime, maxFinishTime);
     
     if (sb.length() == WHERE.length())
       return "";

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/eventdb/model/Jobs.java

@@ -29,6 +29,7 @@ import java.util.List;
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
 public class Jobs {
+  int numJobs;
   List<JobDBEntry> jobs;
   
   public static class JobDBEntry {
@@ -195,11 +196,20 @@ public class Jobs {
   
   public Jobs() {}
   
+  public int getNumJobs() {
+    return numJobs;
+  }
+  
+  public void setNumJobs(int numJobs) {
+    this.numJobs = numJobs;
+  }
+  
   public List<JobDBEntry> getJobs() {
     return jobs;
   }
   
   public void setJobs(List<JobDBEntry> jobs) {
     this.jobs = jobs;
+    this.numJobs = jobs.size();
   }
 }

+ 13 - 4
ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java

@@ -61,6 +61,7 @@ public class TaskAttempt {
   
   public static final String TASK_ATTEMPT_FIELDS = TaskAttemptFields.join();
   
+  private String jobId;
   private String taskAttemptId;
   private String taskType;
   private long startTime;
@@ -75,6 +76,14 @@ public class TaskAttempt {
   
   public TaskAttempt() {}
   
+  public String getJobId() {
+    return jobId;
+  }
+  
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+  
   public String getTaskAttemptId() {
     return taskAttemptId;
   }
@@ -134,19 +143,19 @@ public class TaskAttempt {
   public long getInputBytes() {
     return inputBytes;
   }
-
+  
   public long getOutputBytes() {
     return outputBytes;
   }
-
+  
   public void setInputBytes(long inputBytes) {
     this.inputBytes = inputBytes;
   }
-
+  
   public void setOutputBytes(long outputBytes) {
     this.outputBytes = outputBytes;
   }
-
+  
   public String getStatus() {
     return status;
   }

+ 81 - 43
ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java

@@ -112,7 +112,8 @@ public class WorkflowJsonService {
       @DefaultValue("-1") @QueryParam("minInputBytes") long minInputBytes, @DefaultValue("-1") @QueryParam("maxInputBytes") long maxInputBytes,
       @DefaultValue("-1") @QueryParam("minOutputBytes") long minOutputBytes, @DefaultValue("-1") @QueryParam("maxOutputBytes") long maxOutputBytes,
       @DefaultValue("-1") @QueryParam("minDuration") long minDuration, @DefaultValue("-1") @QueryParam("maxDuration") long maxDuration,
-      @DefaultValue("-1") @QueryParam("minStartTime") long minStartTime, @DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime) {
+      @DefaultValue("-1") @QueryParam("minStartTime") long minStartTime, @DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime,
+      @DefaultValue("-1") @QueryParam("minFinishTime") long minFinishTime, @DefaultValue("-1") @QueryParam("maxFinishTime") long maxFinishTime) {
     
     if (start < 0)
       start = 0;
@@ -152,6 +153,9 @@ public class WorkflowJsonService {
       case 8: // startTime
         field = WorkflowFields.STARTTIME;
         break;
+      case 9: // lastUpdateTime
+        field = WorkflowFields.LASTUPDATETIME;
+        break;
       default:
         field = WorkflowFields.WORKFLOWID;
     }
@@ -161,7 +165,7 @@ public class WorkflowJsonService {
     try {
       conn = getConnector();
       table = conn.fetchWorkflows(start, amount, searchTerm, echo, field, sortAscending, workflowId, workflowName, workflowType, userName, minJobs, maxJobs,
-          minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime);
+          minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime, minFinishTime, maxFinishTime);
     } catch (IOException e) {
       e.printStackTrace();
     } finally {
@@ -175,12 +179,16 @@ public class WorkflowJsonService {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/job")
-  public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
+  public Jobs getJobs(@QueryParam("workflowId") String workflowId, @DefaultValue("-1") @QueryParam("startTime") long minFinishTime,
+      @DefaultValue("-1") @QueryParam("endTime") long maxStartTime) {
     Jobs jobs = new Jobs();
     PostgresConnector conn = null;
     try {
       conn = getConnector();
-      jobs.setJobs(conn.fetchJobDetails(workflowId));
+      if (workflowId != null)
+        jobs.setJobs(conn.fetchJobDetails(workflowId));
+      else if (maxStartTime >= minFinishTime)
+        jobs.setJobs(conn.fetchJobDetails(minFinishTime, maxStartTime));
     } catch (IOException e) {
       e.printStackTrace();
       jobs.setJobs(EMPTY_JOBS);
@@ -195,20 +203,35 @@ public class WorkflowJsonService {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/task")
-  public TaskData getTaskDetails(@QueryParam("jobId") String jobId, @QueryParam("width") int steps) {
+  public TaskData getTaskSummary(@QueryParam("jobId") String jobId, @QueryParam("width") int steps,
+      @DefaultValue("-1") @QueryParam("startTime") long minFinishTime, @DefaultValue("-1") @QueryParam("endTime") long maxStartTime) {
     TaskData points = new TaskData();
     PostgresConnector conn = null;
     try {
       conn = getConnector();
-      long[] times = conn.fetchJobStartStopTimes(jobId);
-      if (times != null) {
-        double submitTimeSecs = times[0] / 1000.0;
-        double finishTimeSecs = times[1] / 1000.0;
+      List<TaskAttempt> taskAttempts = null;
+      long startTime = -1;
+      long endTime = -1;
+      if (jobId != null) {
+        long[] times = conn.fetchJobStartStopTimes(jobId);
+        if (times != null) {
+          startTime = times[0];
+          endTime = times[1];
+          taskAttempts = conn.fetchJobTaskAttempts(jobId);
+        }
+      } else {
+        startTime = minFinishTime;
+        endTime = maxStartTime;
+        taskAttempts = conn.fetchTaskAttempts(minFinishTime, maxStartTime);
+      }
+      if (startTime > 0 && endTime > 0 && endTime >= startTime) {
+        double submitTimeSecs = startTime / 1000.0;
+        double finishTimeSecs = endTime / 1000.0;
         double step = (finishTimeSecs - submitTimeSecs) / steps;
         if (step < 1)
           step = 1;
-        getMapDetails(conn, points, jobId, submitTimeSecs, finishTimeSecs, step);
-        getReduceDetails(conn, points, jobId, submitTimeSecs, finishTimeSecs, step);
+        if (taskAttempts != null)
+          getTaskDetails(taskAttempts, points, submitTimeSecs, finishTimeSecs, step);
       }
     } catch (IOException e) {
       e.printStackTrace();
@@ -220,10 +243,33 @@ public class WorkflowJsonService {
     return points;
   }
   
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/taskdetails")
+  public List<TaskAttempt> getTaskDetails(@QueryParam("jobId") String jobId, @QueryParam("workflowId") String workflowId) {
+    List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
+    PostgresConnector conn = null;
+    try {
+      conn = getConnector();
+      if (jobId != null) {
+        taskAttempts = conn.fetchJobTaskAttempts(jobId);
+      } else if (workflowId != null) {
+        taskAttempts = conn.fetchWorkflowTaskAttempts(workflowId);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+    return taskAttempts;
+  }
+  
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tasklocality")
-  public TaskLocalityData getTaskLocalityDetails(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
+  public TaskLocalityData getTaskLocalitySummary(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
       @DefaultValue("24") @QueryParam("maxr") int maxr) {
     if (maxr < minr)
       maxr = minr;
@@ -245,38 +291,32 @@ public class WorkflowJsonService {
     return data;
   }
   
-  private static void getMapDetails(PostgresConnector conn, TaskData points, String jobId, double submitTimeSecs, double finishTimeSecs, double step)
+  private static void getTaskDetails(List<TaskAttempt> taskAttempts, TaskData points, double submitTimeSecs, double finishTimeSecs, double step)
       throws IOException {
-    List<TaskAttempt> taskAttempts = conn.fetchTaskAttempts(jobId, "MAP");
     List<Point> mapPoints = new ArrayList<Point>();
-    for (double time = submitTimeSecs; time < finishTimeSecs; time += step) {
-      int numTasks = 0;
-      for (TaskAttempt taskAttempt : taskAttempts)
-        if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time)
-          numTasks++;
-      mapPoints.add(new Point(Math.round(time), numTasks));
-    }
-    points.setMapData(mapPoints);
-  }
-  
-  private static void getReduceDetails(PostgresConnector conn, TaskData points, String jobId, double submitTimeSecs, double finishTimeSecs, double step)
-      throws IOException {
-    List<TaskAttempt> taskAttempts = conn.fetchTaskAttempts(jobId, "REDUCE");
     List<Point> shufflePoints = new ArrayList<Point>();
     List<Point> reducePoints = new ArrayList<Point>();
     for (double time = submitTimeSecs; time < finishTimeSecs; time += step) {
+      int numTasks = 0;
       int numShuffleTasks = 0;
       int numReduceTasks = 0;
       for (TaskAttempt taskAttempt : taskAttempts) {
-        if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getShuffleFinishTime() / 1000.0) >= time) {
-          numShuffleTasks++;
-        } else if ((taskAttempt.getShuffleFinishTime() / 1000.0) < (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time) {
-          numReduceTasks++;
+        if (taskAttempt.getTaskType().equals("MAP")) {
+          if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time)
+            numTasks++;
+        } else if (taskAttempt.getTaskType().equals("REDUCE")) {
+          if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getShuffleFinishTime() / 1000.0) >= time) {
+            numShuffleTasks++;
+          } else if ((taskAttempt.getShuffleFinishTime() / 1000.0) < (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time) {
+            numReduceTasks++;
+          }
         }
       }
+      mapPoints.add(new Point(Math.round(time), numTasks));
       shufflePoints.add(new Point(Math.round(time), numShuffleTasks));
       reducePoints.add(new Point(Math.round(time), numReduceTasks));
     }
+    points.setMapData(mapPoints);
     points.setShuffleData(shufflePoints);
     points.setReduceData(reducePoints);
   }
@@ -285,15 +325,14 @@ public class WorkflowJsonService {
       int maxr) throws IOException {
     long submitTimeX = transformX(submitTime);
     long finishTimeX = transformX(finishTime);
-    List<TaskAttempt> mapAttempts = conn.fetchTaskAttempts(jobId, "MAP");
-    List<TaskAttempt> reduceAttempts = conn.fetchTaskAttempts(jobId, "REDUCE");
-    Set<Long> xPoints = getXPoints(mapAttempts, reduceAttempts, submitTimeX, finishTimeX);
+    List<TaskAttempt> taskAttempts = conn.fetchJobTaskAttempts(jobId);
+    Set<Long> xPoints = getXPoints(taskAttempts, submitTimeX, finishTimeX);
     Long[] xList = xPoints.toArray(new Long[xPoints.size()]);
     MinMax io = new MinMax();
-    data.setMapNodeLocal(processLocalityData(mapAttempts, "NODE_LOCAL", xList, io));
-    data.setMapRackLocal(processLocalityData(mapAttempts, "RACK_LOCAL", xList, io));
-    data.setMapOffSwitch(processLocalityData(mapAttempts, "OFF_SWITCH", xList, io));
-    data.setReduceOffSwitch(processLocalityData(reduceAttempts, "OFF_SWITCH", xList, io));
+    data.setMapNodeLocal(processLocalityData(taskAttempts, "MAP", "NODE_LOCAL", xList, io));
+    data.setMapRackLocal(processLocalityData(taskAttempts, "MAP", "RACK_LOCAL", xList, io));
+    data.setMapOffSwitch(processLocalityData(taskAttempts, "MAP", "OFF_SWITCH", xList, io));
+    data.setReduceOffSwitch(processLocalityData(taskAttempts, "REDUCE", "OFF_SWITCH", xList, io));
     setRValues(data.getMapNodeLocal(), minr, maxr, io.max);
     setRValues(data.getMapRackLocal(), minr, maxr, io.max);
     setRValues(data.getMapOffSwitch(), minr, maxr, io.max);
@@ -319,7 +358,7 @@ public class WorkflowJsonService {
     return time;
   }
   
-  private static Set<Long> getXPoints(List<TaskAttempt> mapAttempts, List<TaskAttempt> reduceAttempts, long submitTimeX, long finishTimeX) {
+  private static Set<Long> getXPoints(List<TaskAttempt> taskAttempts, long submitTimeX, long finishTimeX) {
     TreeSet<Long> xPoints = new TreeSet<Long>();
     TreeSet<TaskAttempt> sortedAttempts = new TreeSet<TaskAttempt>(new Comparator<TaskAttempt>() {
       @Override
@@ -331,8 +370,7 @@ public class WorkflowJsonService {
         return t1.getTaskAttemptId().compareTo(t2.getTaskAttemptId());
       }
     });
-    sortedAttempts.addAll(mapAttempts);
-    sortedAttempts.addAll(reduceAttempts);
+    sortedAttempts.addAll(taskAttempts);
     getXPoints(sortedAttempts, xPoints);
     xPoints.add(submitTimeX);
     xPoints.add(finishTimeX);
@@ -362,11 +400,11 @@ public class WorkflowJsonService {
     return index;
   }
   
-  private static List<DataPoint> processLocalityData(List<TaskAttempt> taskAttempts, String locality, Long[] xPoints, MinMax io) {
+  private static List<DataPoint> processLocalityData(List<TaskAttempt> taskAttempts, String taskType, String locality, Long[] xPoints, MinMax io) {
     List<DataPoint> data = new ArrayList<DataPoint>();
     int i = 0;
     for (TaskAttempt taskAttempt : taskAttempts) {
-      if (locality.equals(taskAttempt.getLocality())) {
+      if (taskType.equals(taskAttempt.getTaskType()) && locality.equals(taskAttempt.getLocality())) {
         DataPoint point = new DataPoint();
         point.setX(transformX(taskAttempt.getStartTime()));
         point.setY(transformY(taskAttempt.getFinishTime() - taskAttempt.getStartTime()));

+ 32 - 16
ambari-web/vendor/scripts/workflow_visualization.js

@@ -85,15 +85,14 @@ DagViewer.prototype._addLink = function (sourceNode, targetNode) {
 //                 nodeHeight = 15, labelFontSize = 10, maxLabelWidth = 120
 //                 nodeHeight = 40, labelFontSize = 20, maxLabelWidth = 260
 //                 nodeHeight = 30, labelFontSize = 16
-DagViewer.prototype.drawDag = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding, svgPadding) {
-  this._addTimelineGraph(svgw, svgh, nodeHeight || 20, labelFontSize || 14, maxLabelWidth || 180, axisPadding || 30, svgPadding || 20);
+DagViewer.prototype.drawDag = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding) {
+  this._addTimelineGraph(svgw, svgh, nodeHeight || 20, labelFontSize || 14, maxLabelWidth || 180, axisPadding || 30);
   return this;
 }
 
 // draw timeline graph
-DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding, svgPadding) {
-  // want to avoid having unnecessary scrollbars, so we need to size the div slightly larger than the svg
-  svgw = svgw - svgPadding;
+DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding) {
+  svgw = svgw;
 
   var margin = {"top":10, "bottom":10, "left":30, "right":30};
   var w = svgw - margin.left - margin.right;
@@ -157,15 +156,26 @@ DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelF
   }
 
   var h = 2*axisPadding + 2*nodeHeight*(maxIndex+1);
-  d3.select("div#" + this._id)
-    .attr("style","width:"+(svgw+svgPadding)+"px;height:"+Math.min(svgh,h+margin.top+margin.bottom+svgPadding)+"px;overflow:auto;padding:none;");
-  svgh = h + margin.top + margin.bottom;
+  var realh = svgh - margin.top - margin.bottom;
+  var scale = 1;
+  if (h > realh)
+    scale = realh / h;
+  svgh = Math.min(svgh, h + margin.top + margin.bottom);
   var svg = d3.select("div#" + this._id).append("svg:svg")
     .attr("width", svgw+"px")
     .attr("height", svgh+"px");
+    
   var svgg = svg.append("g")
-    .attr("transform", "translate("+margin.left+","+margin.top+")");
-  
+    .attr("transform", "translate("+margin.left+","+margin.top+") scale("+scale+")");
+  // add an untranslated white rectangle below everything
+  // so mouse doesn't have to be over nodes for panning/zooming
+  svgg.append("svg:rect")
+    .attr("x", 0)
+    .attr("y", 0)
+    .attr("width", svgw)
+    .attr("height", svgh/scale)
+    .attr("style", "fill:white;stroke:none");
+ 
   // create axes
   var x = d3.time.scale()
     .domain([0, elapsedTime])
@@ -196,18 +206,17 @@ DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelF
     return weeks + "w " + (x==0 ? "" : " " + x + "d");
   };
   var topAxis = d3.svg.axis()
-    .scale(x)
-    .orient("bottom")
-    .tickFormat(tickFormatter);
+    .scale(d3.time.scale().domain([startTime, startTime+elapsedTime]).range([0, w]))
+    .orient("bottom");
   var bottomAxis = d3.svg.axis()
     .scale(x)
     .orient("top")
     .tickFormat(tickFormatter);
   svgg.append("g")
-    .attr("class", "x axis")
+    .attr("class", "x axis top")
     .call(topAxis);
   svgg.append("g")
-    .attr("class", "x axis")
+    .attr("class", "x axis bottom")
     .call(bottomAxis)
     .attr("transform", "translate(0,"+h+")");
   
@@ -313,7 +322,8 @@ DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelF
     .attr("x", function(d) {
       var goal = d.x + d.w/2;
       var halfLabel = maxLabelWidth/2;
-      if (goal < halfLabel) return halfLabel;      else if (goal > w-halfLabel) return w-halfLabel;
+      if (goal < halfLabel) return halfLabel;
+      else if (goal > w-halfLabel) return w-halfLabel;
       return goal;
     } )
     .attr("y", function(d) { return d.y + d.h + labelFontSize; } )
@@ -338,4 +348,10 @@ DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelF
     .text(function (d) {
       return d.name;
     });
+
+  svg.call(d3.behavior.zoom().on("zoom", function() {
+    var left = Math.min(Math.max(d3.event.translate[0]+margin.left, margin.left-w*d3.event.scale*scale), margin.left+w);
+    var top = Math.min(Math.max(d3.event.translate[1]+margin.top, margin.top-h*d3.event.scale*scale), margin.top+h);
+    svgg.attr("transform", "translate("+left+","+top+") scale("+(d3.event.scale*scale)+")");
+  }));
 }