فهرست منبع

HADOOP-1278. Improve blacklisting of tasktrackers by jobtracker, to reduce false positives. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@533224 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 سال پیش
والد
کامیت
45e32d0587
3فایلهای تغییر یافته به همراه64 افزوده شده و 28 حذف شده
  1. 3 0
      CHANGES.txt
  2. 45 24
      src/java/org/apache/hadoop/mapred/JobInProgress.java
  3. 16 4
      src/java/org/apache/hadoop/mapred/JobTracker.java

+ 3 - 0
CHANGES.txt

@@ -288,6 +288,9 @@ Trunk (unreleased changes)
 85. HADOOP-1299.  Fix so that RPC will restart after RPC.stopClient()
     has been called.  (Michael Stack via cutting)
 
+86. HADOOP-1278.  Improve blacklisting of TaskTrackers by JobTracker,
+    to reduce false positives.  (Arun C Murthy via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 45 - 24
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -70,6 +70,12 @@ class JobInProgress {
   private int taskCompletionEventTracker = 0; 
   List<TaskCompletionEvent> taskCompletionEvents;
     
+  // The maximum percentage of trackers in cluster added to the 'blacklist'.
+  private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
+  
+  // No. of tasktrackers in the cluster
+  private volatile int clusterSize = 0;
+  
   // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
   // tasks have failed
   private volatile int flakyTaskTrackers = 0;
@@ -341,7 +347,10 @@ class JobInProgress {
           // fail the task!
           failedTask(tip, status.getTaskId(), 
                      "Failed to copy reduce's output", 
-                     TaskStatus.Phase.REDUCE, TaskStatus.State.FAILED, 
+                     (tip.isMapTask() ? 
+                         TaskStatus.Phase.MAP : 
+                         TaskStatus.Phase.REDUCE), 
+                     TaskStatus.State.FAILED, 
                      ttStatus.getHost(), status.getTaskTracker(), null);
           LOG.info("Failed to copy the output of " + status.getTaskId() + 
                    " with: " + StringUtils.stringifyException(ioe));
@@ -504,19 +513,28 @@ class JobInProgress {
     return trackerHostName;
   }
     
-  private void addTrackerTaskFailure(String trackerName) {
-    String trackerHostName = convertTrackerNameToHostName(trackerName);
-      
-    Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
-    if (trackerFailures == null) {
-      trackerFailures = new Integer(0);
-    }
-    trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
-      
-    // Check if this tasktracker has turned 'flaky'
-    if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
-      ++flakyTaskTrackers;
-      LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
+  /**
+   * Note that a task has failed on a given tracker and add the tracker  
+   * to the blacklist iff too many trackers in the cluster i.e. 
+   * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
+   * 
+   * @param trackerName task-tracker on which a task failed
+   */
+  void addTrackerTaskFailure(String trackerName) {
+    if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
+      String trackerHostName = convertTrackerNameToHostName(trackerName);
+
+      Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
+      if (trackerFailures == null) {
+        trackerFailures = new Integer(0);
+      }
+      trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
+
+      // Check if this tasktracker has turned 'flaky'
+      if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
+        ++flakyTaskTrackers;
+        LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
+      }
     }
   }
     
@@ -566,22 +584,23 @@ class JobInProgress {
                           List cachedTasks) {
     String taskTracker = tts.getTrackerName();
 
+    //
+    // Update the last-known clusterSize
+    //
+    this.clusterSize = clusterSize;
+
     //
     // Check if too many tasks of this job have failed on this
     // tasktracker prior to assigning it a new one.
     //
     int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
     if (taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
-      String flakyTracker = convertTrackerNameToHostName(taskTracker); 
-      if (flakyTaskTrackers < clusterSize) {
+      if (LOG.isDebugEnabled()) {
+        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
         LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
-                  + "' for assigning a new task");
-        return -1;
-      } else {
-        LOG.warn("Trying to assign a new task for black-listed tracker " + 
-                 flakyTracker + " since all task-trackers in the cluster are " +
-                 "'flaky' !");
+                + "' for assigning a new task");
       }
+      return -1;
     }
         
     //
@@ -849,9 +868,11 @@ class JobInProgress {
     }
             
     //
-    // Note down that a task has failed on this tasktracker
+    // Note down that a task has failed on this tasktracker 
     //
-    addTrackerTaskFailure(trackerName);
+    if (status.getRunState() == TaskStatus.State.FAILED) { 
+      addTrackerTaskFailure(trackerName);
+    }
         
     //
     // Let the JobTracker know that this task has failed

+ 16 - 4
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -24,6 +24,7 @@ import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -1671,8 +1672,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     trackerToTaskMap.remove(trackerName);
 
     if (lostTasks != null) {
-      for (Iterator it = lostTasks.iterator(); it.hasNext();) {
-        String taskId = (String) it.next();
+      // List of jobs which had any of their tasks fail on this tracker
+      Set<JobInProgress> jobsWithFailures = new HashSet<JobInProgress>(); 
+      for (String taskId : lostTasks) {
         TaskInProgress tip = taskidToTIPMap.get(taskId);
 
         // Completed reduce tasks never need to be failed, because 
@@ -1682,8 +1684,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING) {
             job.failedTask(tip, taskId, "Lost task tracker", 
-                           TaskStatus.Phase.MAP, TaskStatus.State.KILLED,
+                           (tip.isMapTask() ? 
+                               TaskStatus.Phase.MAP : 
+                               TaskStatus.Phase.REDUCE), 
+                           TaskStatus.State.KILLED,
                            hostname, trackerName, myMetrics);
+            jobsWithFailures.add(job);
           }
         } else if (!tip.isMapTask() && tip.isComplete()) {
           // Completed 'reduce' task, not failed;
@@ -1691,7 +1697,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
           markCompletedTaskAttempt(trackerName, taskId);
         }
       }
-            
+      
+      // Penalize this tracker for each of the jobs which   
+      // had any tasks running on it when it was 'lost' 
+      for (JobInProgress job : jobsWithFailures) {
+        job.addTrackerTaskFailure(trackerName);
+      }
+      
       // Purge 'marked' tasks, needs to be done  
       // here to prevent hanging references!
       removeMarkedTasks(trackerName);