瀏覽代碼

HADOOP-2639. Fixes a problem to do with incorrect maintenance of values for runningMapTasks/runningReduceTasks. Contributed by Amar Kamat and Arun murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@616474 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父節點
當前提交
74540d1a5c
共有 2 個文件被更改,包括 31 次插入14 次删除
  1. 4 0
      CHANGES.txt
  2. 27 14
      src/java/org/apache/hadoop/mapred/JobInProgress.java

+ 4 - 0
CHANGES.txt

@@ -597,6 +597,10 @@ Trunk (unreleased changes)
     HADOOP-2713. TestDatanodeDeath failed on windows because the replication
     request was timing out. (dhruba)
 
+    HADOOP-2639. Fixes a problem to do with incorrect maintenance of values 
+    for runningMapTasks/runningReduceTasks. (Amar Kamat and Arun Murthy 
+    via ddas)
+
 Release 0.15.3 - 2008-01-18
 
   BUG FIXES

+ 27 - 14
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -57,6 +57,8 @@ class JobInProgress {
   TaskInProgress reduces[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
+  
+  // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
   int runningReduceTasks = 0;
   int finishedMapTasks = 0;
@@ -563,16 +565,19 @@ class JobInProgress {
       return null;
     }
     
-    boolean wasRunning = maps[target].isRunning();
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (!wasRunning) {
+    if (result != null) {
       runningMapTasks += 1;
-      JobHistory.Task.logStarted(profile.getJobId(), 
-                                 maps[target].getTIPId(), Values.MAP.name(),
-                                 System.currentTimeMillis());
-    }
 
-    jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
+      boolean wasRunning = maps[target].isRunning();
+      if (!wasRunning) {
+        JobHistory.Task.logStarted(profile.getJobId(), 
+                                   maps[target].getTIPId(), Values.MAP.name(),
+                                   System.currentTimeMillis());
+      }
+
+      jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
+    }
 
     return result;
   }    
@@ -596,16 +601,19 @@ class JobInProgress {
       return null;
     }
     
-    boolean wasRunning = reduces[target].isRunning();
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
-    if (!wasRunning) {
+    if (result != null) {
       runningReduceTasks += 1;
-      JobHistory.Task.logStarted(profile.getJobId(), 
-                                 reduces[target].getTIPId(), Values.REDUCE.name(),
-                                 System.currentTimeMillis());
+
+      boolean wasRunning = reduces[target].isRunning();
+      if (!wasRunning) {
+        JobHistory.Task.logStarted(profile.getJobId(), 
+                                   reduces[target].getTIPId(), Values.REDUCE.name(),
+                                   System.currentTimeMillis());
+      }
+
+      jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
     }
-    
-    jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
 
     return result;
   }
@@ -788,6 +796,11 @@ class JobInProgress {
     String taskid = status.getTaskId();
         
     // Sanity check: is the TIP already complete? 
+    // It _is_ safe to not decrement running{Map|Reduce}Tasks and
+    // finished{Map|Reduce}Tasks variables here because one and only
+    // one task-attempt of a TIP gets to completedTask. This is because
+    // the TaskCommitThread in the JobTracker marks other, completed, 
+    // speculative tasks as _complete_.
     if (tip.isComplete()) {
       // Mark this task as KILLED
       tip.alreadyCompletedTask(taskid);