Browse Source

Merge -r 606266:606267 from trunk to branch-0.15 to fix HADOOP-2247.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/branches/branch-0.15@606268 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 years ago
parent
commit
21413463f9

+ 8 - 0
CHANGES.txt

@@ -39,6 +39,14 @@ Branch 0.15 (unreleased)
     round-robin disk selections randomly. This helps in spreading data across
     multiple partitions much better. (acmurhty)
 
+    HADOOP-2247.  Fine-tune the strategies for killing mappers and reducers
+    due to failures while fetching map-outputs. Now the map-completion times
+    and number of currently running reduces are taken into account by the
+    JobTracker before  killing the mappers, while the progress made by the
+    reducer and the number of fetch-failures vis-a-vis total number of
+    fetch-attempts are taken into account before teh reducer kills itself.
+    (Amar Kamat via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-2160.  Remove project-level, non-user documentation from

+ 12 - 1
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -79,6 +79,9 @@ class JobInProgress {
   // The maximum percentage of trackers in cluster added to the 'blacklist'.
   private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
   
+  // The maximum percentage of fetch failures allowed for a map 
+  private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+  
   // No. of tasktrackers in the cluster
   private volatile int clusterSize = 0;
   
@@ -405,6 +408,8 @@ class JobInProgress {
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
+        taskEvent.setTaskRunTime((int)(status.getFinishTime() 
+                                       - status.getStartTime()));
         tip.setSuccessEventNumber(taskCompletionEventTracker); 
       }
       //For a failed task update the JT datastructures.For the task state where
@@ -1164,7 +1169,13 @@ class JobInProgress {
     LOG.info("Failed fetch notification #" + fetchFailures + " for task " + 
             mapTaskId);
     
-    if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) {
+    float failureRate = (float)fetchFailures / runningReduceTasks;
+    // declare faulty if fetch-failures >= max-allowed-failures
+    boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
+                          ? true
+                          : false;
+    if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
+        && isMapFaulty) {
       LOG.info("Too many fetch-failures for output of task: " + mapTaskId 
                + " ... killing it");
       

+ 70 - 3
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -476,10 +476,30 @@ class ReduceTask extends Task {
     private long ramfsMergeOutputSize;
     
     /**
-     * Maximum no. of fetch-retries per-map.
+     * the max of all the map completion times
+     */
+    private int maxMapRuntime;
+    
+    /**
+     * Maximum number of fetch-retries per-map.
      */
     private static final int MAX_FETCH_RETRIES_PER_MAP = 5;
     
+    /**
+     * Maximum percent of failed fetch attempt before killing the reduce task.
+     */
+    private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+
+    /**
+     * Minimum percent of progress required to keep the reduce alive.
+     */
+    private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+
+    /**
+     * Maximum percent of shuffle execution time required to keep the reducer alive.
+     */
+    private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
     /**
      * Maximum no. of unique maps from which we failed to fetch map-outputs
      * even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries; after this the
@@ -862,12 +882,14 @@ class ReduceTask extends Task {
                                        (this.reduceTask.getPartition()%10)
                                       );
       this.random = new Random(randomSeed);
+      this.maxMapRuntime = 0;
     }
     
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
       List<MapOutputLocation> knownOutputs = 
         new ArrayList<MapOutputLocation>(numCopiers);
+      int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
       int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
@@ -896,6 +918,7 @@ class ReduceTask extends Task {
       // start the clock for bandwidth measurement
       long startTime = System.currentTimeMillis();
       long currentTime = startTime;
+      long lastProgressTime = System.currentTimeMillis();
       IntWritable fromEventId = new IntWritable(0);
       
       try {
@@ -1005,6 +1028,7 @@ class ReduceTask extends Task {
             if (cr != null) {
               if (cr.getSuccess()) {  // a successful copy
                 numCopied++;
+                lastProgressTime = System.currentTimeMillis();
                 bytesTransferred += cr.getSize();
                 
                 long secsSinceStart = 
@@ -1033,6 +1057,7 @@ class ReduceTask extends Task {
                 String mapTaskId = cr.getLocation().getMapTaskId();
                 Integer mapId = cr.getLocation().getMapId();
                 
+                totalFailures++;
                 Integer noFailedFetches = 
                   mapTaskToFailedFetchesMap.get(mapTaskId);
                 noFailedFetches = 
@@ -1056,8 +1081,43 @@ class ReduceTask extends Task {
                   fetchFailedMaps.add(mapId);
                   
                   // did we have too many unique failed-fetch maps?
-                  if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) {
-                    LOG.fatal("Shuffle failed with too many fetch failures! " +
+                  // and did we fail on too many fetch attempts?
+                  // and did we progress enough
+                  //     or did we wait for too long without any progress?
+                  
+                  // check if the reducer is healthy
+                  boolean reducerHealthy = 
+                      (((float)totalFailures / (totalFailures + numCopied)) 
+                       < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+                  
+                  // check if the reducer has progressed enough
+                  boolean reducerProgressedEnough = 
+                      (((float)numCopied / numMaps) 
+                       >= MIN_REQUIRED_PROGRESS_PERCENT);
+                  
+                  // check if the reducer is stalled for a long time
+                  
+                  // duration for which the reducer is stalled
+                  int stallDuration = 
+                      (int)(System.currentTimeMillis() - lastProgressTime);
+                  // duration for which the reducer ran with progress
+                  int shuffleProgressDuration = 
+                      (int)(lastProgressTime - startTime);
+                  // min time the reducer should run without getting killed
+                  int minShuffleRunDuration = 
+                      (shuffleProgressDuration > maxMapRuntime) 
+                      ? shuffleProgressDuration 
+                      : maxMapRuntime;
+                  boolean reducerStalled = 
+                      (((float)stallDuration / minShuffleRunDuration) 
+                       >= MAX_ALLOWED_STALL_TIME_PERCENT);
+                  
+                  // kill if not healthy and has insufficient progress
+                  if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
+                      && !reducerHealthy 
+                      && (!reducerProgressedEnough || reducerStalled)) { 
+                    LOG.fatal("Shuffle failed with too many fetch failures " + 
+                              "and insufficient progress!" +
                               "Killing task " + getTaskId() + ".");
                     umbilical.shuffleError(getTaskId(), 
                                            "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
@@ -1251,6 +1311,13 @@ class ReduceTask extends Task {
             int port = u.getPort();
             String taskId = event.getTaskId();
             int mId = event.idWithinJob();
+            int duration = event.getTaskRunTime();
+            if (duration > maxMapRuntime) {
+              maxMapRuntime = duration; 
+              // adjust max-fetch-retries based on max-map-run-time
+              maxFetchRetriesPerMap = 
+                  getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
+            }
             knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
           }
           break;

+ 19 - 0
src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

@@ -33,6 +33,7 @@ public class TaskCompletionEvent implements Writable{
     
   private int eventId; 
   private String taskTrackerHttp;
+  private int taskRunTime; // using int since runtime is the time difference
   private String taskId;
   Status status; 
   boolean isMap = false;
@@ -95,6 +96,22 @@ public class TaskCompletionEvent implements Writable{
   public String getTaskTrackerHttp() {
     return taskTrackerHttp;
   }
+
+  /**
+   * Returns time (in millisec) the task took to complete. 
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+
+  /**
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  public void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
   /**
    * set event Id. should be assigned incrementally starting from 0. 
    * @param eventId
@@ -153,6 +170,7 @@ public class TaskCompletionEvent implements Writable{
     out.writeBoolean(isMap);
     WritableUtils.writeEnum(out, status); 
     WritableUtils.writeString(out, taskTrackerHttp);
+    WritableUtils.writeVInt(out, taskRunTime);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -161,5 +179,6 @@ public class TaskCompletionEvent implements Writable{
     this.isMap = in.readBoolean();
     this.status = WritableUtils.readEnum(in, Status.class);
     this.taskTrackerHttp = WritableUtils.readString(in);
+    this.taskRunTime = WritableUtils.readVInt(in);
   }
 }