فهرست منبع

HADOOP-1554. Log killed tasks in the JobHistory. Contributed by Devaraj.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@555114 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 18 سال پیش
والد
کامیت
d5b9175457

+ 2 - 0
CHANGES.txt

@@ -283,6 +283,8 @@ Trunk (unreleased changes)
  87. HADOOP-1571.  Add contrib lib directories to root build.xml
      javadoc classpath.  (Michael Stack via tomwhite)
 
+ 88. HADOOP-1554.  Log killed tasks to the job history and display them on the
+     web/ui. (Devaraj Das via omalley)
 
 Release 0.13.0 - 2007-06-08
 

+ 29 - 1
src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java

@@ -169,7 +169,7 @@ public class DefaultJobHistoryParser {
   
   
   // call this only for jobs that succeeded for better results. 
-  static class BadNodesFilter implements JobHistory.Listener {
+  static class FailedOnNodesFilter implements JobHistory.Listener {
     private Map<String, Set<String>> badNodesToNumFailedTasks =
       new HashMap<String, Set<String>>();
     
@@ -197,4 +197,32 @@ public class DefaultJobHistoryParser {
       }      
     }
   }
+  static class KilledOnNodesFilter implements JobHistory.Listener {
+    private Map<String, Set<String>> badNodesToNumFailedTasks =
+      new HashMap<String, Set<String>>();
+    
+    Map<String, Set<String>> getValues(){
+      return badNodesToNumFailedTasks; 
+    }
+    public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
+      throws IOException {
+      
+      if (recType.equals(JobHistory.RecordTypes.MapAttempt) || 
+          recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
+        
+        if (Values.KILLED.name().equals(values.get(Keys.TASK_STATUS)) ){
+          String hostName = values.get(Keys.HOSTNAME);
+          String taskid = values.get(Keys.TASKID); 
+          Set<String> tasks = badNodesToNumFailedTasks.get(hostName); 
+          if (null == tasks ){
+            tasks = new TreeSet<String>(); 
+            tasks.add(taskid);
+            badNodesToNumFailedTasks.put(hostName, tasks);
+          }else{
+            tasks.add(taskid);
+          }
+        }
+      }      
+    }
+  }
 }

+ 45 - 0
src/java/org/apache/hadoop/mapred/JobHistory.java

@@ -564,6 +564,28 @@ public class JobHistory {
                                        String.valueOf(timestamp), hostName, error}); 
         }
       }
+    }
+    /**
+     * Log task attempt failed event.  
+     * @param jobId jobid
+     * @param taskId taskid
+     * @param taskAttemptId task attempt id
+     * @param timestamp timestamp
+     * @param hostName hostname of this task attempt.
+     * @param error error message if any for this task attempt. 
+     */
+    public static void logKilled(String jobId, String taskId, String taskAttemptId, 
+                                 long timestamp, String hostName, String error){
+      if (!disableHistory){
+        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.MapAttempt, 
+                         new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                    Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
+                         new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.KILLED.name(),
+                                       String.valueOf(timestamp), hostName, error}); 
+        }
+      }
     } 
   }
   /**
@@ -638,6 +660,29 @@ public class JobHistory {
         }
       }
     }
+    /**
+     * Log failed reduce task attempt. 
+     * @param jobId job id 
+     * @param taskId task id
+     * @param taskAttemptId task attempt id
+     * @param timestamp time stamp when task failed
+     * @param hostName host name of the task attempt.  
+     * @param error error message of the task. 
+     */
+    public static void logKilled(String jobId, String taskId, String taskAttemptId, long timestamp, 
+                                 String hostName, String error){
+      if (!disableHistory){
+        PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.ReduceAttempt, 
+                         new Enum[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                      Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
+                         new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.KILLED.name(), 
+                                       String.valueOf(timestamp), hostName, error }); 
+        }
+      }
+    }
+
   }
   /**
    * Callback interface for reading back log events from JobHistory. This interface 

+ 22 - 10
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -929,18 +929,30 @@ class JobInProgress {
     String taskTrackerName = status.getTaskTracker();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(profile.getJobId(), 
-                                       tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                                       taskTrackerName); 
-      JobHistory.MapAttempt.logFailed(profile.getJobId(), 
-                                      tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
-                                      taskTrackerName, status.getDiagnosticInfo()); 
+                tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+                taskTrackerName);
+      if (status.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.MapAttempt.logFailed(profile.getJobId(), 
+                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+                taskTrackerName, status.getDiagnosticInfo());
+      } else {
+        JobHistory.MapAttempt.logKilled(profile.getJobId(), 
+                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+                taskTrackerName, status.getDiagnosticInfo());
+      }
     } else {
       JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
-                                          tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                                          taskTrackerName); 
-      JobHistory.ReduceAttempt.logFailed(profile.getJobId(), 
-                                         tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
-                                         taskTrackerName, status.getDiagnosticInfo()); 
+                tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+                taskTrackerName);
+      if (status.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.ReduceAttempt.logFailed(profile.getJobId(), 
+                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+                taskTrackerName, status.getDiagnosticInfo());
+      } else {
+        JobHistory.ReduceAttempt.logKilled(profile.getJobId(), 
+                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+                taskTrackerName, status.getDiagnosticInfo());
+      }
     }
         
     // After this, try to assign tasks with the one after this, so that

+ 57 - 6
src/webapps/job/jobdetailshistory.jsp

@@ -29,26 +29,29 @@
 <b>Launched At : </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.LAUNCH_TIME), job.getLong(Keys.SUBMIT_TIME)) %><br/>
 <b>Finished At : </b>  <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.FINISH_TIME), job.getLong(Keys.LAUNCH_TIME)) %><br/>
 <b>Status : </b> <%= ((job.get(Keys.JOB_STATUS) == null)?"Incomplete" :job.get(Keys.JOB_STATUS)) %><br/> 
-<b><a href="analysejobhistory.jsp?jobid=<%=jobid %>&jobTrackerId=<%=jobTrackerId %>">Analyse This Job</a></b> 
-<hr/>
-<center>
 <%
 	Map<String, JobHistory.Task> tasks = job.getAllTasks();
 	int totalMaps = 0 ; 
 	int totalReduces = 0; 
 	int failedMaps = 0; 
+	int killedMaps = 0;
 	int failedReduces = 0 ; 
+	int killedReduces = 0;
 	
 	long mapStarted = 0 ; 
 	long mapFinished = 0 ; 
 	long reduceStarted = 0 ; 
 	long reduceFinished = 0; 
+        
+        Map <String,String> allHosts = new TreeMap<String,String>();
 	
 	for( JobHistory.Task task : tasks.values() ) {
 	  
 	  long startTime = task.getLong(Keys.START_TIME) ; 
 	  long finishTime = task.getLong(Keys.FINISH_TIME) ; 
 	  
+          allHosts.put(task.get(Keys.HOSTNAME), null);
+
 	  if( Values.MAP.name().equals(task.get(Keys.TASK_TYPE)) ){
 	    if( mapStarted==0 || mapStarted > startTime ){
 	      mapStarted = startTime; 
@@ -63,6 +66,9 @@
 	        if( Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
 	            failedMaps++; 
 	        }
+	        if( Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
+	            killedMaps++; 
+	        }
 	    }
 	  }else{
 	    if( reduceStarted==0||reduceStarted > startTime ){
@@ -77,13 +83,20 @@
 	        if( Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
 	            failedReduces++; 
 	        }
+	        if( Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
+	            killedReduces++; 
+	        }
 	    }
 	  }
 	}
 %>
+<b>Number of nodes used: </b> <%=allHosts.size() %><br/>
+<b><a href="analysejobhistory.jsp?jobid=<%=jobid %>&jobTrackerId=<%=jobTrackerId %>">Analyse This Job</a></b> 
+<hr/>
+<center>
 <table border="2" cellpadding="5" cellspacing="2">
 <tr>
-<td>Kind</td><td>Total Tasks</td><td>Finished tasks</td><td>Failed tasks</td><td>Start Time</td><td>Finish Time</td>
+<td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
 </tr>
 <tr>
 <td>Map</td>
@@ -93,6 +106,8 @@
 	  <%=job.getInt(Keys.FINISHED_MAPS) %></a></td>
 	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&jobTrackerId=<%=jobTrackerId %>&taskType=<%=Values.MAP.name() %>&status=<%=Values.FAILED %>">
 	  <%=failedMaps %></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&jobTrackerId=<%=jobTrackerId %>&taskType=<%=Values.MAP.name() %>&status=<%=Values.KILLED %>">
+	  <%=killedMaps %></a></td>
 	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, mapStarted, 0) %></td>
 	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, mapFinished, mapStarted) %></td>
 </tr>
@@ -104,6 +119,8 @@
 	  <%=job.getInt(Keys.FINISHED_REDUCES)%></a></td>
 	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&jobTrackerId=<%=jobTrackerId %>&taskType=<%=Values.REDUCE.name() %>&status=<%=Values.FAILED %>">
 	  <%=failedReduces%></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&jobTrackerId=<%=jobTrackerId %>&taskType=<%=Values.REDUCE.name() %>&status=<%=Values.KILLED %>">
+	  <%=killedReduces%></a></td>  
 	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, reduceStarted, 0) %></td>
 	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, reduceFinished, reduceStarted) %></td>
 </tr>
@@ -111,7 +128,7 @@
 
 <br/>
  <%
-	DefaultJobHistoryParser.BadNodesFilter filter = new DefaultJobHistoryParser.BadNodesFilter();
+	DefaultJobHistoryParser.FailedOnNodesFilter filter = new DefaultJobHistoryParser.FailedOnNodesFilter();
 	String dir = System.getProperty("hadoop.log.dir") + File.separator + "history" ; 
  
 	JobHistory.parseHistory(new File(dir, jobTrackerId+"_" + jobid), filter); 
@@ -142,7 +159,41 @@
      }
 	}
  %>
+</table>
+<br/>
+ <%
+	DefaultJobHistoryParser.KilledOnNodesFilter killedFilter = new DefaultJobHistoryParser.KilledOnNodesFilter();
+	dir = System.getProperty("hadoop.log.dir") + File.separator + "history" ; 
+ 
+	JobHistory.parseHistory(new File(dir, jobTrackerId+"_" + jobid), filter); 
+	badNodes = killedFilter.getValues(); 
+	if( badNodes.size() > 0 ) {
+ %>
+<h3>Killed tasks attempts by nodes </h3>
+<table border="1">
+<tr><td>Hostname</td><td>Killed Tasks</td></tr>
+ <%	  
+  for (Map.Entry<String, Set<String>> entry : badNodes.entrySet()) {
+    String node = entry.getKey();
+    Set<String> killedTasks = entry.getValue();
+%>
+	<tr>
+		<td><%=node %></td>
+		<td>
+<%
+		for( String t : killedTasks ) {
+%>
+		 <a href="taskdetailshistory.jsp?jobid=<%=jobid%>&jobTrackerId=<%=jobTrackerId %>&taskid=<%=t %>"><%=t %></a>,&nbsp;
+<%		  
+		}
+%>	
+		</td>
+	</tr>
+<%	  
+     }
+	}
+ %>
 </table>
  </center>
 
-</body></html>
+</body></html>

+ 6 - 3
src/webapps/job/jobtaskshistory.jsp

@@ -33,9 +33,12 @@
 <%
 	for( JobHistory.Task task : tasks.values() ) {
 	  if( taskType.equals(task.get(Keys.TASK_TYPE) ) ){
-	    if( taskStatus.equals(task.get(Keys.TASK_STATUS)) || taskStatus.equals("all")){
-	       printTask(jobid, jobTrackerId, task, out); 
-	    }
+            Map <String, TaskAttempt> taskAttempts = task.getTaskAttempts();
+            for (JobHistory.TaskAttempt taskAttempt : taskAttempts.values()) {
+	      if( taskStatus.equals(taskAttempt.get(Keys.TASK_STATUS)) || taskStatus.equals("all")){
+	         printTask(jobid, jobTrackerId, task, out); 
+	      }
+            }
 	  }
 	}
 %>