浏览代码

HADOOP-593. Fix an NPE in JobTracker. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@462877 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
b4de7fb333
共有 3 个文件被更改,包括 32 次插入22 次删除
  1. 6 0
      CHANGES.txt
  2. 11 14
      src/java/org/apache/hadoop/mapred/JobInProgress.java
  3. 15 8
      src/java/org/apache/hadoop/mapred/JobTracker.java

+ 6 - 0
CHANGES.txt

@@ -1,6 +1,12 @@
 Hadoop Change Log
 
 
+Release 0.7.1 - unreleased
+
+ 1. HADOOP-593.  Fix a NullPointerException in the JobTracker.
+    (omalley via cutting)
+
+
 Release 0.7.0 - 2006-10-06
 
  1. HADOOP-243.  Fix rounding in the display of task and job progress

+ 11 - 14
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -20,7 +20,6 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
-import org.apache.hadoop.mapred.JobHistory.Keys ; 
 import org.apache.hadoop.mapred.JobHistory.Values ; 
 import java.io.*;
 import java.net.*;
@@ -33,7 +32,7 @@ import java.util.*;
 // doing bookkeeping of its Tasks.
 ///////////////////////////////////////////////////////
 class JobInProgress {
-    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress");
+    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress");
 
     JobProfile profile;
     JobStatus status;
@@ -473,25 +472,24 @@ class JobInProgress {
                    " successfully.");          
 
           String taskTrackerName = status.getTaskTracker();
-          TaskTrackerStatus taskTracker = this.jobtracker.getTaskTracker(taskTrackerName);
           
           if(status.getIsMap()){
             JobHistory.MapAttempt.logStarted(profile.getJobId(), 
                 tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                taskTracker.getHost()); 
+                taskTrackerName); 
             JobHistory.MapAttempt.logFinished(profile.getJobId(), 
                 tip.getTIPId(), status.getTaskId(), status.getFinishTime(), 
-                taskTracker.getHost()); 
+                taskTrackerName); 
             JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
                 Values.MAP.name(), status.getFinishTime()); 
           }else{
               JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
                   tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                  taskTracker.getHost()); 
+                  taskTrackerName); 
               JobHistory.ReduceAttempt.logFinished(profile.getJobId(), 
                   tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
                   status.getSortFinishTime(), status.getFinishTime(), 
-                  taskTracker.getHost()); 
+                  taskTrackerName); 
               JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
                   Values.REDUCE.name(), status.getFinishTime()); 
           }
@@ -609,21 +607,20 @@ class JobInProgress {
         
         // update job history
         String taskTrackerName = status.getTaskTracker();
-        TaskTrackerStatus taskTracker = this.jobtracker.getTaskTracker(taskTrackerName);
-        if(status.getIsMap()){
+        if (status.getIsMap()) {
           JobHistory.MapAttempt.logStarted(profile.getJobId(), 
               tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-              taskTracker.getHost()); 
+              taskTrackerName); 
           JobHistory.MapAttempt.logFailed(profile.getJobId(), 
               tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
-              taskTracker.getHost(), status.getDiagnosticInfo()); 
-        }else{
+              taskTrackerName, status.getDiagnosticInfo()); 
+        } else {
           JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
               tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-              taskTracker.getHost()); 
+              taskTrackerName); 
           JobHistory.ReduceAttempt.logFailed(profile.getJobId(), 
               tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
-              taskTracker.getHost(), status.getDiagnosticInfo()); 
+              taskTrackerName, status.getDiagnosticInfo()); 
         }
         
         // After this, try to assign tasks with the one after this, so that

+ 15 - 8
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -21,6 +21,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.*;
 import java.net.*;
@@ -114,8 +115,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       private Map launchingTasks = new LinkedHashMap();
       
       public void run() {
-        try {
-          while (shouldRun) {
+        while (shouldRun) {
+          try {
             // Every 3 minutes check for any tasks that are overdue
             Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
             long now = System.currentTimeMillis();
@@ -151,9 +152,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 }
               }
             }
+          } catch (InterruptedException ie) {
+            // all done
+            return;
+          } catch (Exception e) {
+            LOG.error("Expire Launching Task Thread got exception: " +
+                      StringUtils.stringifyException(e));
           }
-        } catch (InterruptedException ie) {
-          // all done
         }
       }
       
@@ -188,15 +193,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
          */
         public void run() {
             while (shouldRun) {
+              try {
                 //
                 // Thread runs periodically to check whether trackers should be expired.
                 // The sleep interval must be no more than half the maximum expiry time
                 // for a task tracker.
                 //
-                try {
-                    Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3);
-                } catch (InterruptedException ie) {
-                }
+                Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3);
 
                 //
                 // Loop through all expired items in the queue
@@ -232,6 +235,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                         }
                     }
                 }
+              } catch (Exception t) {
+                LOG.error("Tracker Expiry Thread got exception: " +
+                          StringUtils.stringifyException(t));
+              }
             }
         }