Browse Source

Fix HADOOP-100. Be more consistent about synchronization of access to taskTracker collection. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@390287 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
d98cef1ab3
1 changed files with 25 additions and 12 deletions
  1. 25 12
      src/java/org/apache/hadoop/mapred/JobTracker.java

+ 25 - 12
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -261,7 +261,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     //
     int totalMaps = 0;
     int totalReduces = 0;
-    TreeMap taskTrackers = new TreeMap();
+    private TreeMap taskTrackers = new TreeMap();
     Vector jobInitQueue = new Vector();
     ExpireTrackers expireTrackers = new ExpireTrackers();
     RetireJobs retireJobs = new RetireJobs();
@@ -464,10 +464,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         return v;
     }
     public Collection taskTrackers() {
+      synchronized (taskTrackers) {
         return taskTrackers.values();
+      }
     }
     public TaskTrackerStatus getTaskTracker(String trackerID) {
+      synchronized (taskTrackers) {
         return (TaskTrackerStatus) taskTrackers.get(trackerID);
+      }
     }
 
     ////////////////////////////////////////////////////
@@ -557,16 +561,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         //
         int avgMaps = 0;
         int avgReduces = 0;
-        if (taskTrackers.size() > 0) {
-            avgMaps = totalMaps / taskTrackers.size();
-            avgReduces = totalReduces / taskTrackers.size();
+        int numTaskTrackers;
+        TaskTrackerStatus tts;
+        synchronized (taskTrackers) {
+          numTaskTrackers = taskTrackers.size();
+          tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
         }
-        int totalCapacity = taskTrackers.size() * maxCurrentTasks;
-
+        if (numTaskTrackers > 0) {
+          avgMaps = totalMaps / numTaskTrackers;
+          avgReduces = totalReduces / numTaskTrackers;
+        }
+        int totalCapacity = numTaskTrackers * maxCurrentTasks;
         //
         // Get map + reduce counts for the current tracker.
         //
-        TaskTrackerStatus tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
         if (tts == null) {
           LOG.warning("Unknown task tracker polling; ignoring: " + taskTracker);
           return null;
@@ -694,7 +702,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(mapTasksNeeded[i][j]);
                 if (tip != null && tip.isComplete(mapTasksNeeded[i][j])) {
                     String trackerId = (String) taskidToTrackerMap.get(mapTasksNeeded[i][j]);
-                    TaskTrackerStatus tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
+                    TaskTrackerStatus tracker;
+                    synchronized (taskTrackers) {
+                      tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
+                    }
                     v.add(new MapOutputLocation(mapTasksNeeded[i][j], tracker.getHost(), tracker.getPort()));
                     break;
                 }
@@ -745,10 +756,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
 
     public synchronized ClusterStatus getClusterStatus() {
-        return new ClusterStatus(taskTrackers.size(),
-                                 totalMaps,
-                                 totalReduces,
-                                 maxCurrentTasks);
+        synchronized (taskTrackers) {
+          return new ClusterStatus(taskTrackers.size(),
+                                   totalMaps,
+                                   totalReduces,
+                                   maxCurrentTasks);          
+        }
     }
     
     public synchronized void killJob(String jobid) {