Browse Source

HADOOP-1270. Randomize the fetch of map outputs, speeding the shuffle. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@535982 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
a2eea72216
2 changed files with 27 additions and 13 deletions
  1. 3 0
      CHANGES.txt
  2. 24 13
      src/java/org/apache/hadoop/mapred/ReduceTask.java

+ 3 - 0
CHANGES.txt

@@ -345,6 +345,9 @@ Trunk (unreleased changes)
 102. HADOOP-1326.  Change JobClient#RunJob() to return the job.
     (omalley via cutting)
 
+103. HADOOP-1270.  Randomize the fetch of map outputs, speeding the
+     shuffle.  (Arun C Murthy via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 24 - 13
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -29,7 +29,6 @@ import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -484,10 +483,10 @@ class ReduceTask extends Task {
     private int probe_sample_size = 100;
     
     /**
-     * a hashmap from mapId to MapOutputLocation for retrials
+     * a list of map output locations for fetch retrials 
      */
-    private Map<Integer, MapOutputLocation> retryFetches =
-      new HashMap<Integer, MapOutputLocation>();
+    private List<MapOutputLocation> retryFetches =
+      new ArrayList<MapOutputLocation>();
     
     /** 
      * a TreeSet for needed map outputs
@@ -495,6 +494,8 @@ class ReduceTask extends Task {
     private Set <Integer> neededOutputs = 
       Collections.synchronizedSet(new TreeSet<Integer>());
     
+    private Random random = null;
+    
     /** Represents the result of an attempt to copy a map output */
     private class CopyResult {
       
@@ -783,12 +784,19 @@ class ReduceTask extends Task {
       this.shuffleMetrics = 
         MetricsUtil.createRecord(metricsContext, "shuffleInput");
       this.shuffleMetrics.setTag("user", conf.getUser());
+
+      // Seed the random number generator with a reasonably globally unique seed
+      long randomSeed = System.nanoTime() + 
+                        (long)Math.pow(this.reduceTask.getPartition(),
+                                       (this.reduceTask.getPartition()%10)
+                                      );
+      this.random = new Random(randomSeed);
     }
     
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
-      Map<Integer, MapOutputLocation> knownOutputs = 
-        new HashMap<Integer, MapOutputLocation>();
+      List<MapOutputLocation> knownOutputs = 
+        new ArrayList<MapOutputLocation>(numCopiers);
       int            numInFlight = 0, numCopied = 0;
       int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
@@ -834,15 +842,14 @@ class ReduceTask extends Task {
             // The replacements, if at all, will happen when we query the
             // tasktracker and put the mapId hashkeys with new 
             // MapOutputLocations as values
-            knownOutputs.putAll(retryFetches);
+            knownOutputs.addAll(retryFetches);
             // The call getsMapCompletionEvents will modify fromEventId to a val
             // that it should be for the next call to getSuccessMapEvents
             List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId);
 
             // put discovered them on the known list
             for (int i=0; i < locs.size(); i++) {
-              knownOutputs.put(new Integer(locs.get(i).getMapId()), 
-                      locs.get(i));
+              knownOutputs.add(locs.get(i));
             }
             LOG.info(reduceTask.getTaskId() +
                     " Got " + locs.size() + 
@@ -865,7 +872,11 @@ class ReduceTask extends Task {
                    " known map output location(s); scheduling...");
           
           synchronized (scheduledCopies) {
-            Iterator locIt = knownOutputs.values().iterator();
+            // Randomize the map output locations to prevent 
+            // all reduce-tasks swamping the same tasktracker
+            Collections.shuffle(knownOutputs, this.random);
+            
+            Iterator locIt = knownOutputs.iterator();
             
             currentTime = System.currentTimeMillis();
             while (locIt.hasNext()) {
@@ -928,7 +939,7 @@ class ReduceTask extends Task {
                          cr.getLocation().getMapTaskId() + " from host: " + 
                          cr.getHost());
               } else {
-                retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
+                retryFetches.add(cr.getLocation());
                 
                 // wait a random amount of time for next contact
                 currentTime = System.currentTimeMillis();
@@ -944,11 +955,11 @@ class ReduceTask extends Task {
                 // the failure is due to a lost tasktracker (causes many
                 // unnecessary backoffs). If not, we only take a small hit
                 // polling the tasktracker a few more times
-                Iterator locIt = knownOutputs.values().iterator();
+                Iterator locIt = knownOutputs.iterator();
                 while (locIt.hasNext()) {
                   MapOutputLocation loc = (MapOutputLocation)locIt.next();
                   if (cr.getHost().equals(loc.getHost())) {
-                    retryFetches.put(new Integer(loc.getMapId()), loc);
+                    retryFetches.add(loc);
                     locIt.remove();
                   }
                 }