Browse Source

HADOOP-1862. reduces are getting stuck trying to find map outputs. (Arun C. Murthy via ddas)

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/branches/branch-0.14@581502 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
53dd4ade7e

+ 3 - 0
CHANGES.txt

@@ -11,6 +11,9 @@ Release 0.14.2 (unreleased changes)
     HADOOP-1948. Removed spurious error message during block crc upgrade.
     (Raghu Angadi via dhruba)
 
+    HADOOP-1862.  reduces are getting stuck trying to find map outputs. 
+    (Arun C. Murthy via ddas)
+
     HADOOP-1977. Fixed handling of ToolBase cli options in JobClient.
     (enis via omalley)
 

+ 2 - 1
conf/hadoop-default.xml

@@ -820,7 +820,8 @@ creations/deletions), or "all".</description>
   <value>FAILED</value>
   <description>The filter for controlling the output of the task's userlogs sent
                to the console of the JobClient. 
-               The permissible options are: NONE, FAILED, SUCCEEDED and ALL.
+               The permissible options are: NONE, KILLED, FAILED, SUCCEEDED and 
+               ALL.
   </description>
 </property>
 

+ 6 - 1
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -39,7 +39,7 @@ import java.util.*;
  *******************************************************/
 public class JobClient extends ToolBase implements MRConstants  {
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
-  public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL }
+  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
 
   static long MAX_JOBPROFILE_AGE = 1000 * 2;
@@ -604,6 +604,11 @@ public class JobClient extends ToolBase implements MRConstants  {
                   displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
                 }
                 break; 
+              case KILLED:
+                if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
+                  LOG.info(event.toString());
+                }
+                break; 
               case ALL:
                 LOG.info(event.toString());
                 displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());

+ 17 - 6
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -381,6 +381,7 @@ class JobInProgress {
 
       TaskCompletionEvent taskEvent = null;
       if (state == TaskStatus.State.SUCCEEDED) {
+        boolean complete = false;
         taskEvent = new TaskCompletionEvent(
                                             taskCompletionEventTracker, 
                                             status.getTaskId(),
@@ -388,9 +389,9 @@ class JobInProgress {
                                             status.getIsMap(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
-                                            );
+                                           );
         try {
-          completedTask(tip, status, metrics);
+          complete = completedTask(tip, status, metrics);
         } catch (IOException ioe) {
           // Oops! Failed to copy the task's output to its final place;
           // fail the task!
@@ -405,7 +406,12 @@ class JobInProgress {
                    " with: " + StringUtils.stringifyException(ioe));
           return;
         }
-        tip.setSuccessEventNumber(taskCompletionEventTracker);
+        
+        if (complete) {
+          tip.setSuccessEventNumber(taskCompletionEventTracker);
+        } else {
+          taskEvent.setTaskStatus(TaskCompletionEvent.Status.KILLED);
+        }
       } else if (state == TaskStatus.State.FAILED ||
                  state == TaskStatus.State.KILLED) {
         // Get the event number for the (possibly) previously successful
@@ -424,7 +430,9 @@ class JobInProgress {
 
         // Did the task failure lead to tip failure?
         TaskCompletionEvent.Status taskCompletionStatus = 
-          TaskCompletionEvent.Status.FAILED;
+          (state == TaskStatus.State.FAILED) ?
+              TaskCompletionEvent.Status.FAILED :
+              TaskCompletionEvent.Status.KILLED;
         if (tip.isFailed()) {
           taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
         }
@@ -751,7 +759,7 @@ class JobInProgress {
   /**
    * A taskid assigned to this JobInProgress has reported in successfully.
    */
-  public synchronized void completedTask(TaskInProgress tip, 
+  public synchronized boolean completedTask(TaskInProgress tip, 
                                          TaskStatus status,
                                          JobTrackerMetrics metrics) 
   throws IOException {
@@ -766,7 +774,7 @@ class JobInProgress {
       if (this.status.getRunState() != JobStatus.RUNNING) {
         jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
       }
-      return;
+      return false;
     } 
 
     LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
@@ -818,7 +826,10 @@ class JobInProgress {
       // The job has been killed/failed, 
       // JobTracker should cleanup this task
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+      return false;
     }
+    
+    return true;
   }
 
   /**

+ 86 - 38
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -452,11 +452,17 @@ class ReduceTask extends Task {
       new ArrayList<MapOutputLocation>();
     
     /** 
-     * a TreeSet for needed map outputs
+     * The set of required map outputs
      */
     private Set <Integer> neededOutputs = 
       Collections.synchronizedSet(new TreeSet<Integer>());
     
+    /** 
+    * The set of obsolete map taskids.
+    */
+    private Set <String> obsoleteMapIds = 
+      Collections.synchronizedSet(new TreeSet<String>());
+
     private Random random = null;
     
     /**
@@ -635,6 +641,9 @@ class ReduceTask extends Task {
                        loc.getMapTaskId() + " from " + loc.getHost());
               LOG.warn(StringUtils.stringifyException(e));
               shuffleClientMetrics.failedFetch();
+
+              // Reset
+              size = -1;
             } finally {
               shuffleClientMetrics.threadFree();
               finish(size);
@@ -648,7 +657,7 @@ class ReduceTask extends Task {
         }
       }
       
-      /** Copies a a map output from a remote host, using raw RPC. 
+      /** Copies a a map output from a remote host, via HTTP. 
        * @param currentLocation the map output location to be copied
        * @return the path (fully qualified) of the copied file
        * @throws IOException if there is an error copying the file
@@ -656,9 +665,12 @@ class ReduceTask extends Task {
        */
       private long copyOutput(MapOutputLocation loc
                               ) throws IOException, InterruptedException {
-        if (!neededOutputs.contains(loc.getMapId())) {
+        // check if we still need to copy the output from this location
+        if (!neededOutputs.contains(loc.getMapId()) || 
+            obsoleteMapIds.contains(loc.getMapTaskId())) {
           return CopyResult.OBSOLETE;
         }
+        
         String reduceId = reduceTask.getTaskId();
         LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
                  " output from " + loc.getHost() + ".");
@@ -865,18 +877,21 @@ class ReduceTask extends Task {
             // tasktracker and put the mapId hashkeys with new 
             // MapOutputLocations as values
             knownOutputs.addAll(retryFetches);
-            // The call getsMapCompletionEvents will modify fromEventId to a val
-            // that it should be for the next call to getSuccessMapEvents
-            List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId);
+            
+            // The call getMapCompletionEvents will update fromEventId to
+            // used for the next call to getMapCompletionEvents
+            int currentNumKnownMaps = knownOutputs.size();
+            int currentNumObsoleteMapIds = obsoleteMapIds.size();
+            getMapCompletionEvents(fromEventId, knownOutputs);
+
+            LOG.info(reduceTask.getTaskId() + ": " +  
+                     "Got " + (knownOutputs.size()-currentNumKnownMaps) + 
+                     " new map-outputs & " + 
+                     (obsoleteMapIds.size()-currentNumObsoleteMapIds) + 
+                     " obsolete map-outputs from tasktracker and " + 
+                     retryFetches.size() + " map-outputs from previous failures"
+                    );
 
-            // put discovered them on the known list
-            for (int i=0; i < locs.size(); i++) {
-              knownOutputs.add(locs.get(i));
-            }
-            LOG.info(reduceTask.getTaskId() +
-                    " Got " + locs.size() + 
-                    " new map outputs from tasktracker and " + retryFetches.size()
-                    + " map outputs from previous failures");
             // clear the "failed" fetches hashmap
             retryFetches.clear();
           }
@@ -904,6 +919,13 @@ class ReduceTask extends Task {
             while (locIt.hasNext()) {
               
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
+              
+              // Do not schedule fetches from OBSOLETE maps
+              if (obsoleteMapIds.contains(loc.getMapTaskId())) {
+                locIt.remove();
+                continue;
+              }
+
               Long penaltyEnd = penaltyBox.get(loc.getHost());
               boolean penalized = false, duplicate = false;
               
@@ -1104,14 +1126,17 @@ class ReduceTask extends Task {
       }    
     }
     
-    /** Queries the task tracker for a set of outputs ready to be copied
+    /** 
+     * Queries the task tracker for a set of map-completion events from
+     * a given event ID.
      * @param fromEventId the first event ID we want to start from, this is
-     * modified by the call to this method
+     *                    modified by the call to this method
      * @param jobClient the job tracker
      * @return a set of locations to copy outputs from
      * @throws IOException
      */  
-    private List <MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId)
+    private void getMapCompletionEvents(IntWritable fromEventId, 
+                                        List<MapOutputLocation> knownOutputs)
       throws IOException {
       
       long currentTime = System.currentTimeMillis();    
@@ -1122,31 +1147,54 @@ class ReduceTask extends Task {
         } catch (InterruptedException ie) { } // IGNORE
         currentTime = System.currentTimeMillis();
       }
-      lastPollTime = currentTime;
       
-      TaskCompletionEvent t[] = umbilical.getMapCompletionEvents(
-                                                                 reduceTask.getJobId(),
-                                                                 fromEventId.get(),
-                                                                 probe_sample_size);
+      TaskCompletionEvent events[] = 
+        umbilical.getMapCompletionEvents(reduceTask.getJobId(),
+                                         fromEventId.get(), probe_sample_size);
       
-      List <MapOutputLocation> mapOutputsList = 
-        new ArrayList<MapOutputLocation>();
-      for (TaskCompletionEvent event : t) {
-        if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
-          URI u = URI.create(event.getTaskTrackerHttp());
-          String host = u.getHost();
-          int port = u.getPort();
-          String taskId = event.getTaskId();
-          int mId = event.idWithinJob();
-          mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
-        } else if (event.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED) {
-          neededOutputs.remove(event.idWithinJob());
-          LOG.info("Ignoring output of failed map: '" + event.getTaskId() + "'");
+      // Note the last successful poll time-stamp
+      lastPollTime = currentTime;
+
+      // Update the last seen event ID
+      fromEventId.set(fromEventId.get() + events.length);
+        
+      // Process the TaskCompletionEvents:
+      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
+      //    from those maps.
+      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+      //    outputs at all.
+      for (TaskCompletionEvent event : events) {
+        switch (event.getTaskStatus()) {
+          case SUCCEEDED:
+          {
+            URI u = URI.create(event.getTaskTrackerHttp());
+            String host = u.getHost();
+            int port = u.getPort();
+            String taskId = event.getTaskId();
+            int mId = event.idWithinJob();
+            knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
+          }
+          break;
+          case FAILED:
+          case KILLED:
+          case OBSOLETE:
+          {
+            obsoleteMapIds.add(event.getTaskId());
+            LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
+                     " map-task: '" + event.getTaskId() + "'");
+          }
+          break;
+          case TIPFAILED:
+          {
+            neededOutputs.remove(event.idWithinJob());
+            LOG.info("Ignoring output of failed map TIP: '" +  
+                     event.getTaskId() + "'");
+          }
+          break;
         }
       }
-    
-      fromEventId.set(fromEventId.get() + t.length);
-      return mapOutputsList;
+
     }
     
     

+ 1 - 1
src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

@@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableUtils;
  * job tracker. 
  */
 public class TaskCompletionEvent implements Writable{
-  static public enum Status {FAILED, SUCCEEDED, OBSOLETE, TIPFAILED};
+  static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
     
   private int eventId; 
   private String taskTrackerHttp;

+ 1 - 35
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -513,14 +513,6 @@ public class TaskTracker
     private IntWritable fromEventId;
     /** This is the cache of map events for a given job */ 
     private List<TaskCompletionEvent> allMapEvents;
-    /** This array will store indexes to "SUCCEEDED" map events from
-     * allMapEvents. The array is indexed by the mapId. 
-     * The reason why we store the indexes is to quickly reset SUCCEEDED 
-     * events to OBSOLETE. Thus ReduceTasks might also get to know about 
-     * OBSOLETE events and avoid fetching map outputs from the corresponding 
-     * locations.
-     */ 
-    private int indexToEventsCache[];
     /** What jobid this fetchstatus object is for*/
     private String jobId;
      
@@ -528,7 +520,6 @@ public class TaskTracker
       this.fromEventId = new IntWritable(0);
       this.jobId = jobId;
       this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
-      this.indexToEventsCache = new int[numMaps];
     }
       
     public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
@@ -551,32 +542,7 @@ public class TaskTracker
       List <TaskCompletionEvent> recentMapEvents = 
         queryJobTracker(fromEventId, jobId, jobClient);
       synchronized (allMapEvents) {
-        for (TaskCompletionEvent t : recentMapEvents) {
-          TaskCompletionEvent.Status status = t.getTaskStatus();
-          allMapEvents.add(t);
-            
-          if (status == TaskCompletionEvent.Status.SUCCEEDED) {
-            //store the index of the events cache for this success event.
-            indexToEventsCache[t.idWithinJob()] = allMapEvents.size();
-          }
-          else if (status == TaskCompletionEvent.Status.FAILED || 
-                   status == TaskCompletionEvent.Status.OBSOLETE) {
-            int idx = indexToEventsCache[t.idWithinJob()];
-            //if this map task was declared a success earlier, we will have
-            //idx > 0
-            if (idx > 0) {
-              //Mark the event as OBSOLETE and reset the index to 0. Note 
-              //we access the 'idx - 1' entry. This is because while storing
-              //the idx in indexToEventsCache, we store the 'actual idx + 1'
-              //Helps us to eliminate the index array elements initialization
-              //to something like '-1'
-              TaskCompletionEvent obsoleteEvent = allMapEvents.get(idx - 1);
-              obsoleteEvent.setTaskStatus(
-                                          TaskCompletionEvent.Status.OBSOLETE);
-              indexToEventsCache[t.idWithinJob()] = 0;
-            }
-          }
-        }
+        allMapEvents.addAll(recentMapEvents);
       }
     }
   }