Quellcode durchsuchen

commit c862b35f79e94d71689da38a30fb2ee4b3271e55
Author: Lee Tucker <ltucker@yahoo-inc.com>
Date: Thu Jul 30 17:40:21 2009 -0700

Applying patch 2730546.5733.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1076937 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley vor 14 Jahren
Ursprung
Commit
45805aea24

+ 25 - 10
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -630,11 +630,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
-          totalMapTaskCapacity -= status.getMaxMapTasks();
-          totalReduceTaskCapacity -= status.getMaxReduceTasks();
+          int mapSlots = status.getMaxMapTasks();
+          totalMapTaskCapacity -= mapSlots;
+          int reduceSlots = status.getMaxReduceTasks();
+          totalReduceTaskCapacity -= reduceSlots;
+          getInstrumentation().addBlackListedMapSlots(
+              mapSlots);
+          getInstrumentation().addBlackListedReduceSlots(
+              reduceSlots);
         }
-        numBlacklistedTrackers +=
-          uniqueHostsMap.remove(hostName);
+        numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
       }
     }
     
@@ -644,9 +649,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         int numTrackersOnHost = 0;
         // add the capacity of trackers on the host
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
-          totalMapTaskCapacity += status.getMaxMapTasks();
-          totalReduceTaskCapacity += status.getMaxReduceTasks();
+          int mapSlots = status.getMaxMapTasks();
+          totalMapTaskCapacity += mapSlots;
+          int reduceSlots = status.getMaxReduceTasks();
+          totalReduceTaskCapacity += reduceSlots;
           numTrackersOnHost++;
+          getInstrumentation().decBlackListedMapSlots(mapSlots);
+          getInstrumentation().decBlackListedReduceSlots(reduceSlots);
         }
         uniqueHostsMap.put(hostName,
                            numTrackersOnHost);
@@ -2755,8 +2764,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
-        totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
-        totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
+        int mapSlots = oldStatus.getMaxMapTasks();
+        totalMapTaskCapacity -= mapSlots;
+        int reduceSlots = oldStatus.getMaxReduceTasks();
+        totalReduceTaskCapacity -= reduceSlots;
       }
       if (status == null) {
         taskTrackers.remove(trackerName);
@@ -2775,8 +2786,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
-        totalMapTaskCapacity += status.getMaxMapTasks();
-        totalReduceTaskCapacity += status.getMaxReduceTasks();
+        int mapSlots = status.getMaxMapTasks();
+        totalMapTaskCapacity += mapSlots;
+        int reduceSlots = status.getMaxReduceTasks();
+        totalReduceTaskCapacity += reduceSlots;
       }
       boolean alreadyPresent = false;
       if (taskTrackers.containsKey(trackerName)) {
@@ -2794,6 +2807,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         uniqueHostsMap.put(status.getHost(), numTaskTrackersInHost);
       }
     }
+    getInstrumentation().setMapSlots(totalMapTaskCapacity);
+    getInstrumentation().setReduceSlots(totalReduceTaskCapacity);
     return oldStatus != null;
   }
     

+ 18 - 0
src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java

@@ -66,4 +66,22 @@ class JobTrackerInstrumentation {
   
   public void decWaitingReduces(JobID id, int task)
   { }
+
+  public void setMapSlots(int slots)
+  { }
+
+  public void setReduceSlots(int slots)
+  { }
+
+  public void addBlackListedMapSlots(int slots)
+  { }
+
+  public void decBlackListedMapSlots(int slots)
+  { }
+
+  public void addBlackListedReduceSlots(int slots)
+  { }
+
+  public void decBlackListedReduceSlots(int slots)
+  { }
 }

+ 44 - 1
src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java

@@ -38,7 +38,13 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
   private int numJobsCompleted = 0;
   private int numWaitingMaps = 0;
   private int numWaitingReduces = 0;
-  
+
+  //Cluster status fields.
+  private volatile int numMapSlots = 0;
+  private volatile int numReduceSlots = 0;
+  private int numBlackListedMapSlots = 0;
+  private int numBlackListedReduceSlots = 0;
+
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
     String sessionId = conf.getSessionId();
@@ -57,6 +63,11 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
    */
   public void doUpdates(MetricsContext unused) {
     synchronized (this) {
+      metricsRecord.setMetric("map_slots", numMapSlots);
+      metricsRecord.setMetric("reduce_slots", numReduceSlots);
+      metricsRecord.incrMetric("blacklisted_maps", numBlackListedMapSlots);
+      metricsRecord.incrMetric("blacklisted_reduces",
+          numBlackListedReduceSlots);
       metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
       metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
       metricsRecord.incrMetric("maps_failed", numMapTasksFailed);
@@ -78,6 +89,8 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
       numJobsCompleted = 0;
       numWaitingMaps = 0;
       numWaitingReduces = 0;
+      numBlackListedMapSlots = 0;
+      numBlackListedReduceSlots = 0;
     }
     metricsRecord.update();
 
@@ -151,4 +164,34 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
   public synchronized void decWaitingReduces(JobID id, int task){
     numWaitingReduces -= task;
   }
+
+  @Override
+  public void setMapSlots(int slots) {
+    numMapSlots = slots;
+  }
+
+  @Override
+  public void setReduceSlots(int slots) {
+    numReduceSlots = slots;
+  }
+
+  @Override
+  public synchronized void addBlackListedMapSlots(int slots){
+    numBlackListedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decBlackListedMapSlots(int slots){
+    numBlackListedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addBlackListedReduceSlots(int slots){
+    numBlackListedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decBlackListedReduceSlots(int slots){
+    numBlackListedReduceSlots -= slots;
+  }
 }