浏览代码

HADOOP-1060. Fix an IndexOutOfBoundsException in the JobTracker that could cause jobs to hang. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@515815 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
65b1a3f0c6
共有 3 个文件被更改,包括 38 次插入24 次删除
  1. 4 0
      CHANGES.txt
  2. 8 5
      src/java/org/apache/hadoop/mapred/JobClient.java
  3. 26 19
      src/java/org/apache/hadoop/mapred/JobInProgress.java

+ 4 - 0
CHANGES.txt

@@ -15,6 +15,10 @@ Trunk (unreleased changes)
     directory. Also remove dependency on a particular Checkstyle
     version number. (tomwhite)
 
+ 4. HADOOP-1060.  Fix an IndexOutOfBoundsException in the JobTracker
+    that could cause jobs to hang.  (Arun C Murthy via cutting)
+
+
 Release 0.12.0 - 2007-03-02
 
  1. HADOOP-975.  Separate stdout and stderr from tasks.

+ 8 - 5
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -617,11 +617,14 @@ public class JobClient extends ToolBase implements MRConstants  {
 
     private static void displayTaskLogs(String taskId, String baseUrl)
     throws IOException {
-      // Copy tasks's stdout of the JobClient
-      getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
-      
-      // Copy task's stderr to stderr of the JobClient 
-      getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
+      // The tasktracker for a 'failed/killed' job might not be around...
+      if (baseUrl != null) {
+        // Copy tasks's stdout of the JobClient
+        getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
+        
+        // Copy task's stderr to stderr of the JobClient 
+        getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
+      }
     }
     
     private static void getTaskLogs(String taskId, URL taskLogUrl, 

+ 26 - 19
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -315,26 +315,29 @@ class JobInProgress {
               ttStatus.getHttpPort() + "/tasklog.jsp?plaintext=true&taskid=" +
               status.getTaskId() + "&all=true";
           }
-          
+
+          TaskCompletionEvent taskEvent = null;
           if (state == TaskStatus.State.SUCCEEDED) {
-            this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker, 
-                status.getTaskId(),
-                tip.idWithinJob(),
-                status.getIsMap(),
-                TaskCompletionEvent.Status.SUCCEEDED,
-                httpTaskLogLocation ));
+            taskEvent = new TaskCompletionEvent(
+                          taskCompletionEventTracker, 
+                          status.getTaskId(),
+                          tip.idWithinJob(),
+                          status.getIsMap(),
+                          TaskCompletionEvent.Status.SUCCEEDED,
+                          httpTaskLogLocation 
+                          );
             tip.setSuccessEventNumber(taskCompletionEventTracker);
             completedTask(tip, status, metrics);
           } else if (state == TaskStatus.State.FAILED ||
                      state == TaskStatus.State.KILLED) {
-            this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker, 
-                status.getTaskId(),
-                tip.idWithinJob(),
-                status.getIsMap(),
-                TaskCompletionEvent.Status.FAILED, 
-                httpTaskLogLocation ));
+            taskEvent = new TaskCompletionEvent(
+                          taskCompletionEventTracker, 
+                          status.getTaskId(),
+                          tip.idWithinJob(),
+                          status.getIsMap(),
+                          TaskCompletionEvent.Status.FAILED, 
+                          httpTaskLogLocation
+                          );
             // Get the event number for the (possibly) previously successful
             // task. If there exists one, then set that status to OBSOLETE 
             int eventNumber;
@@ -348,9 +351,13 @@ class JobInProgress {
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
                        wasRunning, wasComplete);
           }          
-        }
 
-        taskCompletionEventTracker++;
+          // Add the 'complete' task i.e. successful/failed
+          if (taskEvent != null) {
+            this.taskCompletionEvents.add(taskEvent);
+            taskCompletionEventTracker++;
+          }
+        }
         
         //
         // Update JobInProgress status
@@ -936,8 +943,8 @@ class JobInProgress {
        return null;
     }
     
-    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId, 
-        int maxEvents) {
+    synchronized public TaskCompletionEvent[] getTaskCompletionEvents(
+            int fromEventId, int maxEvents) {
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       if( taskCompletionEvents.size() > fromEventId) {
         int actualMax = Math.min(maxEvents,