Prechádzať zdrojové kódy

HADOOP-1984. Makes the backoff for failed fetches exponential. Earlier, it was a random backoff from an interval. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@599296 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 rokov pred
rodič
commit
d569e24cf8

+ 4 - 0
CHANGES.txt

@@ -79,6 +79,10 @@ Trunk (unreleased changes)
     permits each to determine what files are copied into release
     permits each to determine what files are copied into release
     builds.  (stack via cutting)
     builds.  (stack via cutting)
 
 
+    HADOOP-1984. Makes the backoff for failed fetches exponential. 
+    Earlier, it was a random backoff from an interval. 
+    (Amar Kamat via ddas)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 8 - 0
conf/hadoop-default.xml

@@ -593,6 +593,14 @@ creations/deletions), or "all".</description>
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapred.reduce.copy.backoff</name>
+  <value>300</value>
+  <description>The maximum amount of time (in seconds) a reducer spends on 
+  fetching one map output before declaring it as failed.
+  </description>
+</property>
+
 <property>
 <property>
   <name>mapred.task.timeout</name>
   <name>mapred.task.timeout</name>
   <value>600000</value>
   <value>600000</value>

+ 51 - 15
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -377,9 +377,8 @@ class ReduceTask extends Task {
     private int numCopiers;
     private int numCopiers;
     
     
     /**
     /**
-     * the maximum amount of time (less 1 minute) to wait to 
-     * contact a host after a copy from it fails. We wait for (1 min +
-     * Random.nextInt(maxBackoff)) seconds.
+     * the amount of time spent on fetching one map output before considering 
+     * it as failed and notifying the jobtracker about it.
      */
      */
     private int maxBackoff;
     private int maxBackoff;
     
     
@@ -475,20 +474,20 @@ class ReduceTask extends Task {
     private long ramfsMergeOutputSize;
     private long ramfsMergeOutputSize;
     
     
     /**
     /**
-     * Maximum no. of fetch-retries per-map.
+     * Maximum number of fetch-retries per-map.
      */
      */
-    private static final int MAX_FETCH_RETRIES_PER_MAP = 5;
+    private int maxFetchRetriesPerMap;
     
     
     /**
     /**
      * Maximum no. of unique maps from which we failed to fetch map-outputs
      * Maximum no. of unique maps from which we failed to fetch map-outputs
-     * even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries; after this the
+     * even after {@link #maxFetchRetriesPerMap} retries; after this the
      * reduce task is failed.
      * reduce task is failed.
      */
      */
     private static final int MAX_FAILED_UNIQUE_FETCHES = 5;
     private static final int MAX_FAILED_UNIQUE_FETCHES = 5;
 
 
     /**
     /**
      * The maps from which we fail to fetch map-outputs 
      * The maps from which we fail to fetch map-outputs 
-     * even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries.
+     * even after {@link #maxFetchRetriesPerMap} retries.
      */
      */
     Set<Integer> fetchFailedMaps = new TreeSet<Integer>(); 
     Set<Integer> fetchFailedMaps = new TreeSet<Integer>(); 
     
     
@@ -498,6 +497,11 @@ class ReduceTask extends Task {
     Map<String, Integer> mapTaskToFailedFetchesMap = 
     Map<String, Integer> mapTaskToFailedFetchesMap = 
       new HashMap<String, Integer>();    
       new HashMap<String, Integer>();    
 
 
+    /**
+     * Initial backoff interval (milliseconds)
+     */
+    private static final int BACKOFF_INIT = 4000; 
+
     /**
     /**
      * This class contains the methods that should be used for metrics-reporting
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
      * the specific metrics for shuffle. This class actually reports the
@@ -822,6 +826,19 @@ class ReduceTask extends Task {
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+      
+      // the exponential backoff formula
+      //    backoff (t) = init * base^(t-1)
+      // so for max retries we get
+      //    backoff(1) + .... + backoff(max_fetch_retries) ~ max
+      // solving which we get
+      //    max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
+      // for the default value of max = 300 (5min) we get max_fetch_retries = 6
+      // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
+      
+      // optimizing for the base 2
+      this.maxFetchRetriesPerMap = getClosestPowerOf2((this.maxBackoff * 1000 
+                                                       / BACKOFF_INIT) + 1); 
       this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
       this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
       
       
       //we want to distinguish inmem fs instances for different reduces. Hence,
       //we want to distinguish inmem fs instances for different reduces. Hence,
@@ -864,7 +881,6 @@ class ReduceTask extends Task {
       int            lowThreshold = numCopiers*2;
       int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
       long           bytesTransferred = 0;
       DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
       DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-      Random         backoff = new Random();
       final Progress copyPhase = 
       final Progress copyPhase = 
         reduceTask.getProgress().phase();
         reduceTask.getProgress().phase();
       
       
@@ -1034,7 +1050,11 @@ class ReduceTask extends Task {
                          noFailedFetches + " from " + mapTaskId);
                          noFailedFetches + " from " + mapTaskId);
                 
                 
                 // did the fetch fail too many times?
                 // did the fetch fail too many times?
-                if ((noFailedFetches % MAX_FETCH_RETRIES_PER_MAP) == 0) {
+                // using a hybrid technique for notifying the jobtracker.
+                //   a. the first notification is sent after max-retries 
+                //   b. subsequent notifications are sent after 2 retries.   
+                if ((noFailedFetches >= maxFetchRetriesPerMap) 
+                    && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
                   synchronized (ReduceTask.this) {
                   synchronized (ReduceTask.this) {
                     taskStatus.addFetchFailedMap(mapTaskId);
                     taskStatus.addFetchFailedMap(mapTaskId);
                     LOG.info("Failed to fetch map-output from " + mapTaskId + 
                     LOG.info("Failed to fetch map-output from " + mapTaskId + 
@@ -1044,7 +1064,7 @@ class ReduceTask extends Task {
                 }
                 }
 
 
                 // note unique failed-fetch maps
                 // note unique failed-fetch maps
-                if (noFailedFetches == MAX_FETCH_RETRIES_PER_MAP) {
+                if (noFailedFetches == maxFetchRetriesPerMap) {
                   fetchFailedMaps.add(mapId);
                   fetchFailedMaps.add(mapId);
                   
                   
                   // did we have too many unique failed-fetch maps?
                   // did we have too many unique failed-fetch maps?
@@ -1057,14 +1077,17 @@ class ReduceTask extends Task {
                   }
                   }
                 }
                 }
                 
                 
-                // wait a random amount of time for next contact
+                // back off exponentially until num_retries <= max_retries
+                // back off by max_backoff/2 on subsequent failed attempts
                 currentTime = System.currentTimeMillis();
                 currentTime = System.currentTimeMillis();
-                long nextContact = currentTime + 60 * 1000 + 
-                                   backoff.nextInt(maxBackoff*1000);
-                penaltyBox.put(cr.getHost(), nextContact);          
+                int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
+                                     ? BACKOFF_INIT 
+                                       * (1 << (noFailedFetches - 1)) 
+                                     : (this.maxBackoff * 1000 / 2);
+                penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
                 LOG.warn(reduceTask.getTaskId() + " adding host " +
                 LOG.warn(reduceTask.getTaskId() + " adding host " +
                          cr.getHost() + " to penalty box, next contact in " +
                          cr.getHost() + " to penalty box, next contact in " +
-                         ((nextContact-currentTime)/1000) + " seconds");
+                         (currentBackOff/1000) + " seconds");
                 
                 
                 // other outputs from the failed host may be present in the
                 // other outputs from the failed host may be present in the
                 // knownOutputs cache, purge them. This is important in case
                 // knownOutputs cache, purge them. This is important in case
@@ -1345,4 +1368,17 @@ class ReduceTask extends Task {
         }     
         }     
       };
       };
   }
   }
+
+  private static int getClosestPowerOf2(int value) {
+    int power = 0;
+    int approx = 1;
+    while (approx < value) {
+      ++power;
+      approx = (approx << 1);
+    }
+    if ((value - (approx >> 1)) < (approx - value)) {
+      --power;
+    }
+    return power;
+  }
 }
 }