瀏覽代碼

HADOOP-5272. Fixes a problem to do with detecting whether an attempt is the first attempt of a Task. This affects JobTracker restart. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@747279 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父節點
當前提交
da6fd691d1

+ 3 - 0
CHANGES.txt

@@ -870,6 +870,9 @@ Release 0.20.0 - Unreleased
     HADOOP-5280. Adds a check to prevent a task state transition from FAILED to any of
     UNASSIGNED, RUNNING, COMMIT_PENDING or SUCCEEDED. (ddas) 
 
+    HADOOP-5272. Fixes a problem to do with detecting whether an attempt is the first
+    attempt of a Task. This affects JobTracker restart. (Amar Kamat via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 6 - 6
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1262,6 +1262,12 @@ class JobInProgress {
   synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id, 
                                         TaskTrackerStatus tts, 
                                         boolean isScheduled) {
+    // Make an entry in the tip if the attempt is not scheduled i.e externally
+    // added
+    if (!isScheduled) {
+      tip.addRunningTask(id, tts.getTrackerName());
+    }
+
     // keeping the earlier ordering intact
     String name;
     String splits = "";
@@ -1296,12 +1302,6 @@ class JobInProgress {
       jobCounters.incrCounter(counter, 1);
     }
     
-    // Make an entry in the tip if the attempt is not scheduled i.e externally
-    // added
-    if (!isScheduled) {
-      tip.addRunningTask(id, tts.getTrackerName());
-    }
-
     //TODO The only problem with these counters would be on restart.
     // The jobtracker updates the counter only when the task that is scheduled
     // if from a non-running tip and is local (data, rack ...). But upon restart

+ 10 - 2
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -90,6 +90,9 @@ class TaskInProgress {
     
   // The taskid that took this TIP to SUCCESS
   private TaskAttemptID successfulTaskId;
+
+  // The first taskid of this tip
+  private TaskAttemptID firstTaskId;
   
   // Map from task Id -> TaskTracker Id, contains tasks that are
   // currently runnings
@@ -284,7 +287,7 @@ class TaskInProgress {
    * @return Returns true if the Task is the first attempt of the tip
    */  
   public boolean isFirstAttempt(TaskAttemptID taskId) {
-    return (taskId.getId() == 0); 
+    return firstTaskId == null ? false : firstTaskId.equals(taskId); 
   }
   
   /**
@@ -910,7 +913,7 @@ class TaskInProgress {
     // create the task
     Task t = null;
     if (isMapTask()) {
-      LOG.debug("attemdpt "+  numTaskFailures   +
+      LOG.debug("attempt "+  numTaskFailures   +
           " sending skippedRecords "+failedRanges.getIndicesCount());
       t = new MapTask(jobFile, taskid, partition, 
           rawSplit.getClassName(), rawSplit.getBytes());
@@ -941,6 +944,11 @@ class TaskInProgress {
 
     // Ask JobTracker to note that the task exists
     jobtracker.createTaskEntry(taskid, taskTracker, this);
+
+    // check and set the first attempt
+    if (firstTaskId == null) {
+      firstTaskId = taskid;
+    }
     return t;
   }