Browse Source

HADOOP-3478. Improves the handling of map output fetching. Now the randomization is by the hosts (and not the map outputs themselves). Contributed by Jothi Padmanabhan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@675846 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
8f725caed2
2 changed files with 97 additions and 60 deletions
  1. 4 0
      CHANGES.txt
  2. 93 60
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java

+ 4 - 0
CHANGES.txt

@@ -73,6 +73,10 @@ Trunk (unreleased changes)
     permitting users to define a more efficient method for cloning values from
     the reduce than serialization/deserialization. (Runping Qi via cdouglas)
 
+    HADOOP-3478. Improves the handling of map output fetching. Now the
+    randomization is by the hosts (and not the map outputs themselves). 
+    (Jothi Padmanabhan via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 93 - 60
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -1433,8 +1433,9 @@ class ReduceTask extends Task {
     
     @SuppressWarnings("unchecked")
     public boolean fetchOutputs() throws IOException {
-      List<MapOutputLocation> knownOutputs = 
-        new ArrayList<MapOutputLocation>(numCopiers);
+      //The map for (Hosts, List of MapIds from this Host)
+      HashMap<String, List<MapOutputLocation>> mapLocations = 
+        new HashMap<String, List<MapOutputLocation>>();
       int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
       long           bytesTransferred = 0;
@@ -1472,6 +1473,9 @@ class ReduceTask extends Task {
       long lastProgressTime = startTime;
       long lastOutputTime = 0;
       IntWritable fromEventId = new IntWritable(0);
+
+      //List of unique hosts containing map outputs
+      List<String> hostList = new ArrayList<String>();
       
         // loop until we get all required outputs
         while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
@@ -1489,30 +1493,40 @@ class ReduceTask extends Task {
           }
           
           try {
-            // Put the hash entries for the failed fetches. Entries here
-            // might be replaced by (mapId) hashkeys from new successful 
-            // Map executions, if the fetch failures were due to lost tasks.
-            // The replacements, if at all, will happen when we query the
-            // tasktracker and put the mapId hashkeys with new 
-            // MapOutputLocations as values
-            knownOutputs.addAll(retryFetches);
+            // Put the hash entries for the failed fetches.
+            Iterator<MapOutputLocation> locItr = retryFetches.iterator();
+            while (locItr.hasNext()) {
+              MapOutputLocation loc = locItr.next(); 
+              List<MapOutputLocation> locList = 
+                mapLocations.get(loc.getHost());
+              if (locList == null) {
+                locList = new LinkedList<MapOutputLocation>();
+                mapLocations.put(loc.getHost(), locList);
+                hostList.add(loc.getHost());
+              }
+              //Add to the beginning of the list so that this map is 
+              //tried again before the others and we can hasten the 
+              //re-execution of this map should there be a problem
+              locList.add(0, loc);
+            }
              
             // The call getMapCompletionEvents will update fromEventId to
             // used for the next call to getMapCompletionEvents
-            int currentNumKnownMaps = knownOutputs.size();
+
             int currentNumObsoleteMapIds = obsoleteMapIds.size();
-            getMapCompletionEvents(fromEventId, knownOutputs);
 
+            int numNewOutputs = getMapCompletionEvents(fromEventId, 
+                                                       mapLocations, 
+                                                       hostList);
 
-            int numNewOutputs = knownOutputs.size()-currentNumKnownMaps;
             if (numNewOutputs > 0 || logNow) {
               LOG.info(reduceTask.getTaskID() + ": " +  
                   "Got " + numNewOutputs + 
-                  " new map-outputs & number of known map outputs is " + 
-                  knownOutputs.size());
+                  " new map-outputs"); 
             }
             
             int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds;
+
             if (numNewObsoleteMaps > 0) {
               LOG.info(reduceTask.getTaskID() + ": " +  
                   "Got " + numNewObsoleteMaps + 
@@ -1534,55 +1548,77 @@ class ReduceTask extends Task {
           }
           
           // now walk through the cache and schedule what we can
-          int numKnown = knownOutputs.size(), numScheduled = 0;
+          int numScheduled = 0;
           int numDups = 0;
           
           synchronized (scheduledCopies) {
+
             // Randomize the map output locations to prevent 
             // all reduce-tasks swamping the same tasktracker
-            Collections.shuffle(knownOutputs, this.random);
+            Collections.shuffle(hostList, this.random);
             
-            Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
+            Iterator<String> hostsItr = hostList.iterator();
+            while (hostsItr.hasNext()) {
             
-            
-            while (locIt.hasNext()) {
-              
-              MapOutputLocation loc = locIt.next();
-              
-              // Do not schedule fetches from OBSOLETE maps
-              if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
-                locIt.remove();
-                continue;
+              String host = hostsItr.next();
+
+              List<MapOutputLocation> knownOutputsByLoc = 
+                mapLocations.get(host);
+
+              //Identify duplicate hosts here
+              if (uniqueHosts.contains(host)) {
+                 numDups += knownOutputsByLoc.size() -1; 
+                 continue;
               }
-              
-              Long penaltyEnd = penaltyBox.get(loc.getHost());
-              boolean penalized = false, duplicate = false;
-              
+
+              Long penaltyEnd = penaltyBox.get(host);
+              boolean penalized = false;
+            
               if (penaltyEnd != null) {
                 if (currentTime < penaltyEnd.longValue()) {
                   penalized = true;
                 } else {
-                  penaltyBox.remove(loc.getHost());
+                  penaltyBox.remove(host);
                 }
               }
-              if (uniqueHosts.contains(loc.getHost())) {
-                duplicate = true; numDups++;
-              }
               
-              if (!penalized && !duplicate) {
-                uniqueHosts.add(loc.getHost());
+              if (penalized)
+                continue;
+
+              Iterator<MapOutputLocation> locItr = 
+                knownOutputsByLoc.iterator();
+            
+              while (locItr.hasNext()) {
+              
+                MapOutputLocation loc = locItr.next();
+              
+                // Do not schedule fetches from OBSOLETE maps
+                if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
+                  locItr.remove();
+                  continue;
+                }
+
+                uniqueHosts.add(host);
                 scheduledCopies.add(loc);
-                locIt.remove();  // remove from knownOutputs
+                locItr.remove();  // remove from knownOutputs
                 numInFlight++; numScheduled++;
+
+                break; //we have a map from this host
+              }
+   	
+              if (knownOutputsByLoc.size() == 0) {
+                mapLocations.remove(host);
+                hostsItr.remove();
               }
             }
             scheduledCopies.notifyAll();
           }
           if (numScheduled > 0 || logNow) {
             LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
-                   " of " + numKnown + " known outputs (" + penaltyBox.size() +
-                   " slow hosts and " + numDups + " dup hosts)");
+                   " outputs (" + penaltyBox.size() +
+                   " slow hosts and" + numDups + " dup hosts)");
           }
+
           if (penaltyBox.size() > 0 && logNow) {
             LOG.info("Penalized(slow) Hosts: ");
             for (String host : penaltyBox.keySet()) {
@@ -1732,20 +1768,6 @@ class ReduceTask extends Task {
               LOG.warn(reduceTask.getTaskID() + " adding host " +
                        cr.getHost() + " to penalty box, next contact in " +
                        (currentBackOff/1000) + " seconds");
-              
-              // other outputs from the failed host may be present in the
-              // knownOutputs cache, purge them. This is important in case
-              // the failure is due to a lost tasktracker (causes many
-              // unnecessary backoffs). If not, we only take a small hit
-              // polling the tasktracker a few more times
-              Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
-              while (locIt.hasNext()) {
-                MapOutputLocation loc = locIt.next();
-                if (cr.getHost().equals(loc.getHost())) {
-                  retryFetches.add(loc);
-                  locIt.remove();
-                }
-              }
             }
             uniqueHosts.remove(cr.getHost());
             numInFlight--;
@@ -1844,16 +1866,21 @@ class ReduceTask extends Task {
      * 
      * @param fromEventId the first event ID we want to start from, this is
      *                     modified by the call to this method
-     * @param jobClient the {@link JobTracker}
-     * @return the set of map-completion events from the given event ID 
+     * @param mapLocations the hash map of map locations by host
+     * @param hostsList    the list that contains unique hosts having 
+     *                     map outputs, will be updated on the return 
+     *                     of this method
+     * @return the number of new map-completion events from the given event ID 
      * @throws IOException
      */  
-    private void getMapCompletionEvents(IntWritable fromEventId, 
-                                        List<MapOutputLocation> knownOutputs)
+    private int getMapCompletionEvents(IntWritable fromEventId, 
+                        HashMap<String,List<MapOutputLocation>> mapLocations,
+                        List<String> hostsList)
     throws IOException {
       
       long currentTime = System.currentTimeMillis();    
       long pollTime = lastPollTime + MIN_POLL_INTERVAL;
+      int numNewMaps = 0;
       while (currentTime < pollTime) {
         try {
           Thread.sleep(pollTime-currentTime);
@@ -1895,8 +1922,14 @@ class ReduceTask extends Task {
                                     "/mapOutput?job=" + taskId.getJobID() +
                                     "&map=" + taskId + 
                                     "&reduce=" + getPartition());
-            knownOutputs.add(new MapOutputLocation(taskId, host, 
-                                                   mapOutputLocation));
+            List<MapOutputLocation> loc = mapLocations.get(host);
+            if (loc == null) {
+              loc = new LinkedList<MapOutputLocation>();
+              mapLocations.put(host, loc);
+              hostsList.add(host);
+             }
+            loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
+            numNewMaps ++;
           }
           break;
           case FAILED:
@@ -1917,7 +1950,7 @@ class ReduceTask extends Task {
           break;
         }
       }
-
+      return numNewMaps;
     }