Browse Source

HADOOP-4716. Fixes ReduceTask.java to clear out the mapping between hosts and MapOutputLocation upon a JT restart. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@728604 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
573c91bcbe

+ 3 - 0
CHANGES.txt

@@ -495,6 +495,9 @@ Release 0.20.0 - Unreleased
     restart apart from the initialContact flag that there was earlier.
     restart apart from the initialContact flag that there was earlier.
     (Amareshwari Sriramadasu via ddas)
     (Amareshwari Sriramadasu via ddas)
 
 
+    HADOOP-4716. Fixes ReduceTask.java to clear out the mapping between
+    hosts and MapOutputLocation upon a JT restart (Amar Kamat via ddas)
+
 Release 0.19.1 - Unreleased
 Release 0.19.1 - Unreleased
 
 
   IMPROVEMENTS
   IMPROVEMENTS

+ 20 - 5
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -1803,10 +1803,18 @@ class ReduceTask extends Task {
             MapOutputLocation loc = locItr.next(); 
             MapOutputLocation loc = locItr.next(); 
             List<MapOutputLocation> locList = 
             List<MapOutputLocation> locList = 
               mapLocations.get(loc.getHost());
               mapLocations.get(loc.getHost());
-            //Add to the beginning of the list so that this map is 
-            //tried again before the others and we can hasten the 
-            //re-execution of this map should there be a problem
-            locList.add(0, loc);
+            
+            // Check if the list exists. Map output location mapping is cleared 
+            // once the jobtracker restarts and is rebuilt from scratch.
+            // Note that map-output-location mapping will be recreated and hence
+            // we continue with the hope that we might find some locations
+            // from the rebuild map.
+            if (locList != null) {
+              // Add to the beginning of the list so that this map is 
+              //tried again before the others and we can hasten the 
+              //re-execution of this map should there be a problem
+              locList.add(0, loc);
+            }
           }
           }
 
 
           if (retryFetches.size() > 0) {
           if (retryFetches.size() > 0) {
@@ -1839,7 +1847,13 @@ class ReduceTask extends Task {
               List<MapOutputLocation> knownOutputsByLoc = 
               List<MapOutputLocation> knownOutputsByLoc = 
                 mapLocations.get(host);
                 mapLocations.get(host);
 
 
-              if (knownOutputsByLoc.size() == 0) {
+              // Check if the list exists. Map output location mapping is 
+              // cleared once the jobtracker restarts and is rebuilt from 
+              // scratch.
+              // Note that map-output-location mapping will be recreated and 
+              // hence we continue with the hope that we might find some 
+              // locations from the rebuild map and add then for fetching.
+              if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
                 continue;
                 continue;
               }
               }
               
               
@@ -2598,6 +2612,7 @@ class ReduceTask extends Task {
         if (update.shouldReset()) {
         if (update.shouldReset()) {
           fromEventId.set(0);
           fromEventId.set(0);
           obsoleteMapIds.clear(); // clear the obsolete map
           obsoleteMapIds.clear(); // clear the obsolete map
+          mapLocations.clear(); // clear the map locations mapping
         }
         }
         
         
         // Update the last seen event ID
         // Update the last seen event ID

+ 1 - 0
src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java

@@ -148,6 +148,7 @@ public class TestJobTrackerRestartWithLostTracker extends TestCase {
       jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
       jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
       jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
       jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
       
       
       mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
       mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);