소스 검색

HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare Scheduler accesses the tasktrackers stored by the JobTracker. Contributed by Rahul Kumar Singh.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@746206 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 년 전
부모
커밋
6bea058714
2개의 변경된 파일38개의 추가작업 그리고 32개의 파일을 삭제
  1. 4 0
      CHANGES.txt
  2. 34 32
      src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

+ 4 - 0
CHANGES.txt

@@ -804,6 +804,10 @@ Release 0.20.0 - Unreleased
     HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
     or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas) 
 
+    HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare
+    Scheduler accesses the tasktrackers stored by the JobTracker.
+    (Rahul Kumar Singh via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 34 - 32
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -233,11 +233,12 @@ public class FairScheduler extends TaskScheduler {
       runnableReduces += runnableTasks(job, TaskType.REDUCE);
     }
 
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     // Compute total map/reduce slots
     // In the future we can precompute this if the Scheduler becomes a 
     // listener of tracker join/leave events.
-    int totalMapSlots = getTotalSlots(TaskType.MAP);
-    int totalReduceSlots = getTotalSlots(TaskType.REDUCE);
+    int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
+    int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
     
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
@@ -331,31 +332,36 @@ public class FairScheduler extends TaskScheduler {
    * fair shares, deficits, minimum slot allocations, and numbers of running
    * and needed tasks of each type. 
    */
-  protected synchronized void update() {
+  protected void update() {
+    //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    // Got clusterStatus hence acquiring scheduler lock now
     // Remove non-running jobs
-    List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
-    for (JobInProgress job: infos.keySet()) { 
-      int runState = job.getStatus().getRunState();
-      if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
+    synchronized(this){
+      List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
+      for (JobInProgress job: infos.keySet()) { 
+        int runState = job.getStatus().getRunState();
+        if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
           || runState == JobStatus.KILLED) {
-        toRemove.add(job);
+            toRemove.add(job);
+        }
       }
+      for (JobInProgress job: toRemove) {
+        infos.remove(job);
+        poolMgr.removeJob(job);
+      }
+      // Update running jobs with deficits since last update, and compute new
+      // slot allocations, weight, shares and task counts
+      long now = clock.getTime();
+      long timeDelta = now - lastUpdateTime;
+      updateDeficits(timeDelta);
+      updateRunnability();
+      updateTaskCounts();
+      updateWeights();
+      updateMinSlots();
+      updateFairShares(clusterStatus);
+      lastUpdateTime = now;
     }
-    for (JobInProgress job: toRemove) {
-      infos.remove(job);
-      poolMgr.removeJob(job);
-    }
-    // Update running jobs with deficits since last update, and compute new
-    // slot allocations, weight, shares and task counts
-    long now = clock.getTime();
-    long timeDelta = now - lastUpdateTime;
-    updateDeficits(timeDelta);
-    updateRunnability();
-    updateTaskCounts();
-    updateWeights();
-    updateMinSlots();
-    updateFairShares();
-    lastUpdateTime = now;
   }
   
   private void updateDeficits(long timeDelta) {
@@ -594,7 +600,7 @@ public class FairScheduler extends TaskScheduler {
     return slotsLeft;
   }
 
-  private void updateFairShares() {
+  private void updateFairShares(ClusterStatus clusterStatus) {
     // Clear old fairShares
     for (JobInfo info: infos.values()) {
       info.mapFairShare = 0;
@@ -618,7 +624,7 @@ public class FairScheduler extends TaskScheduler {
           jobsLeft.add(info);
         }
       }
-      double slotsLeft = getTotalSlots(type);
+      double slotsLeft = getTotalSlots(type, clusterStatus);
       while (!jobsLeft.isEmpty()) {
         double totalWeight = 0;
         for (JobInfo info: jobsLeft) {
@@ -697,13 +703,9 @@ public class FairScheduler extends TaskScheduler {
     return poolMgr;
   }
 
-  public int getTotalSlots(TaskType type) {
-    int slots = 0;
-    for (TaskTrackerStatus tt: taskTrackerManager.taskTrackers()) {
-      slots += (type == TaskType.MAP ?
-          tt.getMaxMapTasks() : tt.getMaxReduceTasks());
-    }
-    return slots;
+  private int getTotalSlots(TaskType type, ClusterStatus clusterStatus) {
+    return (type == TaskType.MAP ?
+      clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
   }
 
   public boolean getUseFifo() {