Просмотр исходного кода

HADOOP-2181. This issue adds logging for input splits in Jobtracker log and jobHistory log. Also adds web UI for viewing input splits in job UI and history UI. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@653749 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 лет назад
Родитель
Сommit
c880093f3e

+ 4 - 0
CHANGES.txt

@@ -37,6 +37,10 @@ Trunk (unreleased changes)
     The property ipc.client.timeout is removed from the default hadoop
     configuration. It also removes metrics RpcOpsDiscardedOPsNum. (hairong)
 
+    HADOOP-2181. This issue adds logging for input splits in Jobtracker log 
+    and jobHistory log. Also adds web UI for viewing input splits in job UI 
+    and history UI. (Amareshwari Sriramadasu via ddas)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

+ 19 - 0
src/java/org/apache/hadoop/mapred/HistoryViewer.java

@@ -97,6 +97,7 @@ class HistoryViewer {
     printJobDetails();
     printTaskSummary();
     printJobAnalysis();
+    printSplits();
     printTasks("MAP", "FAILED");
     printTasks("MAP", "KILLED");
     printTasks("REDUCE", "FAILED");
@@ -140,6 +141,24 @@ class HistoryViewer {
     System.out.println(jobDetails.toString());
   }
   
+  private void printSplits() {
+    StringBuffer splits = new StringBuffer();
+    Map<String, JobHistory.Task> tasks = job.getAllTasks();
+    splits.append("\nInput split Locations");
+    splits.append("\nTaskId\tSplits");
+    splits.append("\n====================================================");
+
+    for (JobHistory.Task task : tasks.values()) {
+      if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
+        splits.append("\n");
+        splits.append(task.get(Keys.TASKID));
+        splits.append("\t"); 
+        splits.append(task.get(Keys.SPLITS));
+      }
+    }
+    System.out.println(splits.toString());
+  }
+
   private void printTasks(String taskType, String taskStatus) {
     Map<String, JobHistory.Task> tasks = job.getAllTasks();
     StringBuffer taskList = new StringBuffer();

+ 7 - 4
src/java/org/apache/hadoop/mapred/JobHistory.java

@@ -94,7 +94,7 @@ public class JobHistory {
     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
-    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS
+    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS
   }
 
   /**
@@ -677,15 +677,18 @@ public class JobHistory {
      * @param startTime startTime of tip. 
      */
     public static void logStarted(TaskID taskId, String taskType, 
-                                  long startTime){
+                                  long startTime, String splitLocations) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                      + taskId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
-                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME}, 
-                         new String[]{taskId.toString(), taskType, String.valueOf(startTime)});
+                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
+                                    Keys.START_TIME, Keys.SPLITS}, 
+                         new String[]{taskId.toString(), taskType,
+                                      String.valueOf(startTime),
+                                      splitLocations});
         }
       }
     }

+ 11 - 8
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -265,6 +265,7 @@ class JobInProgress {
 
       for(String host: splitLocations) {
         Node node = jobtracker.resolveAndAddToTopology(host);
+        LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
         for (int j = 0; j < maxLevel; j++) {
           node = JobTracker.getParentNode(node, j);
           List<TaskInProgress> hostMaps = cache.get(node);
@@ -317,6 +318,7 @@ class JobInProgress {
                                    jobtracker, conf, this, i);
     }
     if (numMapTasks > 0) { 
+      LOG.info("Split info for job:" + jobId);
       nonRunningMapCache = createCache(splits, maxLevel);
     }
         
@@ -663,10 +665,10 @@ class JobInProgress {
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningMapTasks += 1;
-      boolean wasRunning = maps[target].isRunning();
-      if (!wasRunning) {
+      if (maps[target].isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(),
-                                   System.currentTimeMillis());
+                                   System.currentTimeMillis(),
+                                   maps[target].getSplitNodes());
       }
 
       jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
@@ -696,10 +698,9 @@ class JobInProgress {
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningReduceTasks += 1;
-      boolean wasRunning = reduces[target].isRunning();
-      if (!wasRunning) {
+      if (reduces[target].isFirstAttempt(result.getTaskID())) {
         JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(),
-                                   System.currentTimeMillis());
+                                   System.currentTimeMillis(), "");
       }
 
       jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
@@ -1307,7 +1308,8 @@ class JobInProgress {
     tip.completed(taskid);
 
     // Update jobhistory 
-    String taskTrackerName = status.getTaskTracker();
+    String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
+                               status.getTaskTracker()).getHost()).toString();
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        taskTrackerName); 
@@ -1484,7 +1486,8 @@ class JobInProgress {
     }
         
     // update job history
-    String taskTrackerName = status.getTaskTracker();
+    String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
+                               status.getTaskTracker()).getHost()).toString();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                 taskTrackerName);

+ 1 - 1
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -1932,7 +1932,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   /**
    * Returns specified TaskInProgress, or null.
    */
-  private TaskInProgress getTip(TaskID tipid) {
+  public TaskInProgress getTip(TaskID tipid) {
     JobInProgress job = jobs.get(tipid.getJobID());
     return (job == null ? null : job.getTaskInProgress(tipid));
   }

+ 45 - 0
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.net.Node;
 
 
 /*************************************************************
@@ -207,6 +210,15 @@ class TaskInProgress {
   public Task getTask(TaskAttemptID taskId) {
     return tasks.get(taskId);
   }
+
+  /**
+   * Is the Task associated with taskid is the first attempt of the tip? 
+   * @param taskId
+   * @return Returns true if the Task is the first attempt of the tip
+   */  
+  public boolean isFirstAttempt(TaskAttemptID taskId) {
+    return (taskId.getId() == 0); 
+  }
   
   /**
    * Is this tip currently running any tasks?
@@ -755,4 +767,37 @@ class TaskInProgress {
   public int getSuccessEventNumber() {
     return successEventNumber;
   }
+  
+  /** 
+   * Gets the Node list of input split locations sorted in rack order.
+   */ 
+  public String getSplitNodes() {
+    String[] splits = rawSplit.getLocations();
+    Node[] nodes = new Node[splits.length];
+    for (int i = 0; i < splits.length; i++) {
+      nodes[i] = jobtracker.getNode(splits[i]);
+    }
+    // sort nodes on rack location
+    Arrays.sort(nodes, new Comparator<Node>() {
+      public int compare(Node a, Node b) {
+        String left = a.getNetworkLocation();
+        String right = b.getNetworkLocation();
+        return left.compareTo(right);
+      }
+    }); 
+    return nodeToString(nodes);
+  }
+
+  private static String nodeToString(Node[] nodes) {
+    if (nodes == null || nodes.length == 0) {
+      return "";
+    }
+    StringBuffer ret = new StringBuffer(nodes[0].toString());
+    for(int i = 1; i < nodes.length;i++) {
+      ret.append(",");
+      ret.append(nodes[i].toString());
+    }
+    return ret.toString();
+  }
+
 }

+ 18 - 2
src/webapps/job/taskdetails.jsp

@@ -109,7 +109,7 @@
         taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
           + taskTracker.getHttpPort();
         out.print("<td><a href=\"" + taskAttemptTracker + "\">"
-          + taskTracker.getHost() + "</a></td>");
+          + tracker.getNode(taskTracker.getHost()) + "</a></td>");
         }
         out.print("<td>" + status.getRunState() + "</td>");
         out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
@@ -173,11 +173,27 @@
           out.print("<pre>&nbsp;</pre>");
         out.println("</td></tr>");
       }
-    }
   %>
 </table>
 </center>
 
+<%
+      if (ts[0].getIsMap()) {
+%>
+<h3>Input Split Locations</h3>
+<table border=2 cellpadding="5" cellspacing="2">
+<%
+        for (String split: StringUtils.split(tracker.getTip(
+                                         tipidObj).getSplitNodes())) {
+          out.println("<tr><td>" + split + "</td></tr>");
+        }
+%>
+</table>
+<%    
+      }
+    }
+%>
+
 <hr>
 <a href="jobdetails.jsp?jobid=<%=jobid%>">Go back to the job</a><br>
 <a href="jobtracker.jsp">Go back to JobTracker</a><br>

+ 19 - 3
src/webapps/job/taskdetailshistory.jsp

@@ -22,6 +22,7 @@
   JobHistory.JobInfo job = (JobHistory.JobInfo)
                               request.getSession().getAttribute("job");
   JobHistory.Task task = job.getAllTasks().get(taskid); 
+  String type = task.get(Keys.TASK_TYPE);
 %>
 <html>
 <body>
@@ -30,7 +31,7 @@
 <table border="2" cellpadding="5" cellspacing="2">
 <tr><td>Task Id</td><td>Start Time</td>
 <%	
-  if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) {
+  if (Values.REDUCE.name().equals(type)) {
 %>
     <td>Shuffle Finished</td><td>Sort Finished</td>
 <%
@@ -39,10 +40,26 @@
 <td>Finish Time</td><td>Host</td><td>Error</td></tr>
 <%
   for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
-    printTaskAttempt(attempt, task.get(Keys.TASK_TYPE), out); 
+    printTaskAttempt(attempt, type, out);
   }
 %>
 </table>
+</center>
+<%	
+  if (Values.MAP.name().equals(type)) {
+%>
+<h3>Input Split Locations</h3>
+<table border="2" cellpadding="5" cellspacing="2">
+<%
+    for (String split : StringUtils.split(task.get(Keys.SPLITS)))
+    {
+      out.println("<tr><td>" + split + "</td></tr>");
+    }
+%>
+</table>    
+<%
+  }
+%>
 <%!
   private void printTaskAttempt(JobHistory.TaskAttempt taskAttempt,
                                 String type, JspWriter out) 
@@ -70,6 +87,5 @@
     out.print("</tr>"); 
   }
 %>
-</center>
 </body>
 </html>