ソースを参照

commit dc266a14307f7bfbd0f4c8ed15c694c0faf551ab
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date: Wed Oct 28 21:12:36 2009 +0530

MAPREDUCE:1158 from https://issues.apache.org/jira/secure/attachment/12423451/1158_yahoo.patch

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1158. Fix JT running maps and running reduces metrics.
+ (sharad)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077040 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年 前
コミット
6144f3e632

+ 0 - 6
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1411,7 +1411,6 @@ class JobInProgress {
       name = Values.CLEANUP.name();
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
-      metrics.addRunningMaps(jobId, 1);
       name = Values.MAP.name();
       counter = Counter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
@@ -1420,7 +1419,6 @@ class JobInProgress {
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
-      metrics.addRunningReduces(jobId, 1);
       name = Values.REDUCE.name();
       counter = Counter.TOTAL_LAUNCHED_REDUCES;
       if (tip.getActiveTasks().size() > 1)
@@ -2312,7 +2310,6 @@ class JobInProgress {
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      metrics.decRunningMaps(jobId, 1);
       // check if this was a sepculative task
       if (oldNumAttempts > 1) {
         speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
@@ -2326,7 +2323,6 @@ class JobInProgress {
       }
     } else {
       runningReduceTasks -= 1;
-      metrics.decRunningReduces(jobId, 1);
       if (oldNumAttempts > 1) {
         speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
       }
@@ -2588,7 +2584,6 @@ class JobInProgress {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
-        metrics.decRunningMaps(jobId, 1);
         metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -2598,7 +2593,6 @@ class JobInProgress {
         }
       } else {
         runningReduceTasks -= 1;
-        metrics.decRunningReduces(jobId, 1);
         metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -3081,6 +3081,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       totalReduces -= oldStatus.countReduceTasks();
       occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
       occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+      getInstrumentation().decRunningMaps(oldStatus.countMapTasks());
+      getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());
       getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
       getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -3107,6 +3109,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       totalReduces += status.countReduceTasks();
       occupiedMapSlots += status.countOccupiedMapSlots();
       occupiedReduceSlots += status.countOccupiedReduceSlots();
+      getInstrumentation().addRunningMaps(status.countMapTasks());
+      getInstrumentation().addRunningReduces(status.countReduceTasks());
       getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
       getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {

+ 4 - 4
src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java

@@ -127,16 +127,16 @@ class JobTrackerInstrumentation {
   public void decRunningJob(JobConf conf, JobID id) 
   { }
 
-  public void addRunningMaps(JobID id, int task)
+  public void addRunningMaps(int tasks)
   { }
 
-  public void decRunningMaps(JobID id, int task) 
+  public void decRunningMaps(int tasks) 
   { }
 
-  public void addRunningReduces(JobID id, int task)
+  public void addRunningReduces(int tasks)
   { }
 
-  public void decRunningReduces(JobID id, int task)
+  public void decRunningReduces(int tasks)
   { }
 
   public void killedMap(TaskAttemptID taskAttemptID)

+ 4 - 4
src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java

@@ -341,25 +341,25 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
   }
 
   @Override
-  public synchronized void addRunningMaps(JobID id, int task)
+  public synchronized void addRunningMaps(int task)
   {
     numRunningMaps += task;
   }
 
   @Override
-  public synchronized void decRunningMaps(JobID id, int task) 
+  public synchronized void decRunningMaps(int task) 
   {
     numRunningMaps -= task;
   }
 
   @Override
-  public synchronized void addRunningReduces(JobID id, int task)
+  public synchronized void addRunningReduces(int task)
   {
     numRunningReduces += task;
   }
 
   @Override
-  public synchronized void decRunningReduces(JobID id, int task)
+  public synchronized void decRunningReduces(int task)
   {
     numRunningReduces -= task;
   }

+ 4 - 4
src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java

@@ -138,25 +138,25 @@ public class TestJobTrackerInstrumentation extends TestCase {
     }
     
     @Override
-    public synchronized void addRunningMaps(JobID id, int task)
+    public synchronized void addRunningMaps(int task)
     {
       incrRunningMaps += task;
     }
 
     @Override
-    public synchronized void decRunningMaps(JobID id, int task) 
+    public synchronized void decRunningMaps(int task) 
     {
       decrRunningMaps += task;
     }
 
     @Override
-    public synchronized void addRunningReduces(JobID id, int task)
+    public synchronized void addRunningReduces(int task)
     {
       incrRunningReduces += task;
     }
 
     @Override
-    public synchronized void decRunningReduces(JobID id, int task)
+    public synchronized void decRunningReduces(int task)
     {
       decrRunningReduces += task;
     }