Browse Source

HADOOP-248. Optimize location of map outputs to not use random probes. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@510644 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
5f0a605141

+ 3 - 0
CHANGES.txt

@@ -116,6 +116,9 @@ Trunk (unreleased changes)
 34. HADOOP-985.  Change HDFS to identify nodes by IP address rather
     than by DNS hostname.  (Raghu Angadi via cutting)
 
+35. HADOOP-248.  Optimize location of map outputs to not use random
+    probes.  (Devaraj Das via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

+ 5 - 14
src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -32,8 +32,10 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
    * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
    * version 4 changed TaskReport for HADOOP-549.
+   * version 5 introduced that removes locateMapOutputs and instead uses
+   * getTaskCompletionEvents to figure finished maps and fetch the outputs
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 5L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -63,18 +65,6 @@ interface InterTrackerProtocol extends VersionedProtocol {
           boolean initialContact, boolean acceptNewTasks, short responseId)
   throws IOException;
 
-  /** Called by a reduce task to find which map tasks are completed.
-   *
-   * @param jobId the job id
-   * @param mapTasksNeeded an array of the mapIds that we need
-   * @param partition the reduce's id
-   * @return an array of MapOutputLocation
-   */
-  MapOutputLocation[] locateMapOutputs(String jobId, 
-                                       int[] mapTasksNeeded,
-                                       int partition
-                                       ) throws IOException;
-
   /**
    * The task tracker calls this once, to discern where it can find
    * files referred to by the JobTracker
@@ -97,11 +87,12 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * Returns empty aray if no events are available. 
    * @param jobid job id 
    * @param fromEventId event id to start from. 
+   * @param maxEvents the max number of events we want to look at
    * @return array of task completion events. 
    * @throws IOException
    */
   TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId) throws IOException; 
+      String jobid, int fromEventId, int maxEvents) throws IOException;
   
 }
 

+ 9 - 4
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -158,7 +158,7 @@ public class JobClient extends ToolBase implements MRConstants  {
         public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
             int startFrom) throws IOException{
           return jobSubmitClient.getTaskCompletionEvents(
-              getJobID(), startFrom); 
+              getJobID(), startFrom, 10); 
         }
 
         /**
@@ -726,7 +726,9 @@ public class JobClient extends ToolBase implements MRConstants  {
                 killJob = true;
                 i++;
             } else if ("-events".equals(argv[i])) {
-              listEvents(argv[++i]);
+              listEvents(argv[i+1], Integer.parseInt(argv[i+2]), 
+                         Integer.parseInt(argv[i+3]));
+              i += 3;
             }
         }
 
@@ -766,10 +768,13 @@ public class JobClient extends ToolBase implements MRConstants  {
      * @param jobId the job id for the job's events to list
      * @throws IOException
      */
-    private void listEvents(String jobId) throws IOException {
+    private void listEvents(String jobId, int fromEventId, int numEvents)
+    throws IOException {
       TaskCompletionEvent[] events = 
-        jobSubmitClient.getTaskCompletionEvents(jobId, 0);
+        jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
       System.out.println("Task completion events for " + jobId);
+      System.out.println("Number of events (from " + fromEventId + 
+          ") are: " + events.length);
       for(TaskCompletionEvent event: events) {
         System.out.println(event.getTaskStatus() + " " + event.getTaskId() + 
                            " " + event.getTaskTrackerHttp());

+ 25 - 7
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -282,24 +282,40 @@ class JobInProgress {
           
           if (state == TaskStatus.State.SUCCEEDED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker++, 
-                status.getTaskId(), 
+                taskCompletionEventTracker, 
+                status.getTaskId(),
+                tip.idWithinJob(),
+                status.getIsMap(),
                 TaskCompletionEvent.Status.SUCCEEDED,
                 httpTaskLogLocation ));
+            tip.setSuccessEventNumber(taskCompletionEventTracker);
             completedTask(tip, status, metrics);
           } else if (state == TaskStatus.State.FAILED ||
                      state == TaskStatus.State.KILLED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker++, 
-                status.getTaskId(), 
+                taskCompletionEventTracker, 
+                status.getTaskId(),
+                tip.idWithinJob(),
+                status.getIsMap(),
                 TaskCompletionEvent.Status.FAILED, 
                 httpTaskLogLocation ));
+            // Get the event number for the (possibly) previously successful
+            // task. If there exists one, then set that status to OBSOLETE 
+            int eventNumber;
+            if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+              TaskCompletionEvent t = 
+                this.taskCompletionEvents.get(eventNumber);
+              if (t.getTaskId().equals(status.getTaskId()))
+                t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+            }
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
                        wasRunning, wasComplete);
           }          
         }
 
+        taskCompletionEventTracker++;
+        
         //
         // Update JobInProgress status
         //
@@ -849,12 +865,14 @@ class JobInProgress {
        return null;
     }
     
-    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
+    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId, 
+        int maxEvents) {
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       if( taskCompletionEvents.size() > fromEventId) {
+        int actualMax = Math.min(maxEvents, 
+            (taskCompletionEvents.size() - fromEventId));
         events = (TaskCompletionEvent[])taskCompletionEvents.subList(
-            fromEventId, taskCompletionEvents.size()).
-            toArray(events);        
+            fromEventId, actualMax + fromEventId).toArray(events);        
       }
       return events; 
     }

+ 8 - 3
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -28,7 +28,11 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  * the current system status.
  */ 
 public interface JobSubmissionProtocol extends VersionedProtocol {
-    public static final long versionID = 1L;
+    /*
+     *Changing the versionID to 2L since the getTaskCompletionEvents method has
+     *changed
+     */
+    public static final long versionID = 2L;
     /**
      * Submit a Job for execution.  Returns the latest profile for
      * that job.
@@ -85,11 +89,12 @@ public interface JobSubmissionProtocol extends VersionedProtocol {
      * Get task completion events for the jobid, starting from fromEventId. 
      * Returns empty aray if no events are available. 
      * @param jobid job id 
-     * @param fromEventId event id to start from. 
+     * @param fromEventId event id to start from.
+     * @param maxEvents the max number of events we want to look at 
      * @return array of task completion events. 
      * @throws IOException
      */
     public TaskCompletionEvent[] getTaskCompletionEvents(
-                String jobid, int fromEventId) throws IOException;
+              String jobid, int fromEventId, int maxEvents) throws IOException;
     
 }

+ 3 - 45
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -1320,48 +1320,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         return null;
     }
 
-    /**
-     * A TaskTracker wants to know the physical locations of completed, but not
-     * yet closed, tasks.  This exists so the reduce task thread can locate
-     * map task outputs.
-     */
-    public synchronized MapOutputLocation[] 
-             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) 
-    throws IOException {
-        // Check to make sure that the job hasn't 'completed'.
-        JobInProgress job = getJob(jobId);
-        if (job.status.getRunState() != JobStatus.RUNNING) {
-          return new MapOutputLocation[0];
-        }
-        
-        ArrayList result = new ArrayList(mapTasksNeeded.length);
-        for (int i = 0; i < mapTasksNeeded.length; i++) {
-          TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
-          if (status != null) {
-             String trackerId = 
-               (String) taskidToTrackerMap.get(status.getTaskId());
-             // Safety check, if we can't find the taskid in 
-             // taskidToTrackerMap and job isn't 'running', then just
-             // return an empty array
-             if (trackerId == null && 
-                     job.status.getRunState() != JobStatus.RUNNING) {
-               return new MapOutputLocation[0];
-             }
-             
-             TaskTrackerStatus tracker;
-             synchronized (taskTrackers) {
-               tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
-             }
-             result.add(new MapOutputLocation(status.getTaskId(), 
-                                              mapTasksNeeded[i],
-                                              tracker.getHost(), 
-                                              tracker.getHttpPort()));
-          }
-        }
-        return (MapOutputLocation[]) 
-               result.toArray(new MapOutputLocation[result.size()]);
-    }
-
     /**
      * Grab the local fs name
      */
@@ -1493,14 +1451,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     /* 
      * Returns a list of TaskCompletionEvent for the given job, 
      * starting from fromEventId.
-     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int)
+     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
      */
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-        String jobid, int fromEventId) throws IOException{
+        String jobid, int fromEventId, int maxEvents) throws IOException{
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       JobInProgress job = (JobInProgress)this.jobs.get(jobid);
       if (null != job) {
-        events = job.getTaskCompletionEvents(fromEventId);
+        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
       }
       return events;
     }

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -251,7 +251,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
   public JobStatus[] jobsToComplete() {return null;}
   public TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId) throws IOException{
+      String jobid, int fromEventId, int maxEvents) throws IOException{
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   

+ 71 - 26
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.IntWritable;
 
 import java.io.*;
 import java.util.*;
@@ -130,9 +131,19 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
   
   /**
    * the number of map output locations to poll for at one time
+   */  
+  private int probe_sample_size = 50;
+  
+  /**
+   * a Random used during the map output fetching
    */
-  private static final int PROBE_SAMPLE_SIZE = 50;
-
+  private Random randForProbing;
+  
+  /**
+   * a hashmap from mapId to MapOutputLocation for retrials
+   */
+  private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
+  
   /** Represents the result of an attempt to copy a map output */
   private class CopyResult {
     
@@ -417,6 +428,10 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
     
+    //tweak the probe sample size (make it a function of numCopiers)
+    probe_sample_size = Math.max(numCopiers*5, 50);
+    randForProbing = new Random(reduceTask.getPartition() * 100);
+    
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
       copyPhase.addPhase();       // add sub-phase per file
@@ -434,6 +449,8 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
     long currentTime = startTime;
+    IntWritable fromEventId = new IntWritable(0);
+    
     PingTimer pingTimer = new PingTimer();
     pingTimer.setName("Map output copy reporter for task " + 
                       reduceTask.getTaskId());
@@ -450,17 +467,36 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
         LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
                  " map output location(s)");
         try {
-          MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
+          // the call to queryJobTracker will modify fromEventId to a value
+          // that it should be for the next call to queryJobTracker
+          MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
           
           // remove discovered outputs from needed list
           // and put them on the known list
+          int gotLocs = (locs == null ? 0 : locs.length);
           for (int i=0; i < locs.length; i++) {
-            neededOutputs.remove(new Integer(locs[i].getMapId()));
-            knownOutputs.add(locs[i]);
+            // check whether we actually need an output. It could happen
+            // that a map task that successfully ran earlier got lost, but
+            // if we already have copied the output of that unfortunate task
+            // we need not copy it again from the new TT (we will ignore 
+            // the event for the new rescheduled execution)
+            if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
+              // remove the mapId from the retryFetches hashmap since we now
+              // prefer the new location instead of what we saved earlier
+              retryFetches.remove(new Integer(locs[i].getMapId()));
+              knownOutputs.add(locs[i]);
+            }
+            else gotLocs--; //we don't need this output
+            
           }
+          // now put the remaining hash entries for the failed fetches 
+          // and clear the hashmap
+          knownOutputs.addAll(retryFetches.values());
           LOG.info(reduceTask.getTaskId() +
-                   " Got " + (locs == null ? 0 : locs.length) + 
-                   " map outputs from jobtracker");
+                " Got " + gotLocs + 
+                " new map outputs from jobtracker and " + retryFetches.size() +
+                " map outputs from previous failures");
+          retryFetches.clear();
         }
         catch (IOException ie) {
           LOG.warn(reduceTask.getTaskId() +
@@ -534,6 +570,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
           } else {
             // this copy failed, put it back onto neededOutputs
             neededOutputs.add(new Integer(cr.getMapId()));
+            retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
           
             // wait a random amount of time for next contact
             currentTime = System.currentTimeMillis();
@@ -553,6 +590,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
             while (locIt.hasNext()) {
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
               if (cr.getHost().equals(loc.getHost())) {
+                retryFetches.put(new Integer(loc.getMapId()), loc);
                 locIt.remove();
                 neededOutputs.add(new Integer(loc.getMapId()));
               }
@@ -563,7 +601,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
         }
         
         // ensure we have enough to keep us busy
-        if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
+        if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
           break;
         }
       }
@@ -658,28 +696,16 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
   }
   
   /** Queries the job tracker for a set of outputs ready to be copied
-   * @param neededOutputs the list of currently unknown outputs
+   * @param fromEventId the first event ID we want to start from, this will be
+   * modified by the call to this method
    * @param jobClient the job tracker
    * @return a set of locations to copy outputs from
    * @throws IOException
    */  
-  private MapOutputLocation[] queryJobTracker(List neededOutputs,
+  private MapOutputLocation[] queryJobTracker(IntWritable fromEventId, 
                                               InterTrackerProtocol jobClient)
   throws IOException {
     
-    // query for a just a random subset of needed segments so that we don't
-    // overwhelm jobtracker.  ideally perhaps we could send a more compact
-    // representation of all needed, i.e., a bit-vector
-    int     checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
-    int     neededIds[] = new int[checkSize];
-      
-    Collections.shuffle(neededOutputs);
-      
-    ListIterator itr = neededOutputs.listIterator();
-    for (int i=0; i < checkSize; i++) {
-      neededIds[i] = ((Integer)itr.next()).intValue();
-    }
-
     long currentTime = System.currentTimeMillis();    
     long pollTime = lastPollTime + MIN_POLL_INTERVAL;
     while (currentTime < pollTime) {
@@ -690,9 +716,28 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     }
     lastPollTime = currentTime;
 
-    return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), 
-                                      neededIds,
-                                      reduceTask.getPartition());
+    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+                                      reduceTask.getJobId().toString(),
+                                      fromEventId.get(),
+                                      probe_sample_size);
+    
+    List <MapOutputLocation> mapOutputsList = new ArrayList();
+    for (int i = 0; i < t.length; i++) {
+      if (t[i].isMap && 
+          t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
+        URI u = URI.create(t[i].getTaskTrackerHttp());
+        String host = u.getHost();
+        int port = u.getPort();
+        String taskId = t[i].getTaskId();
+        int mId = t[i].idWithinJob();
+        mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
+      }
+    }
+    Collections.shuffle(mapOutputsList, randForProbing);
+    MapOutputLocation[] locations =
+                        new MapOutputLocation[mapOutputsList.size()];
+    fromEventId.set(fromEventId.get() + t.length);
+    return mapOutputsList.toArray(locations);
   }
 
   

+ 20 - 3
src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

@@ -12,12 +12,14 @@ import org.apache.hadoop.io.WritableUtils;
  *
  */
 public class TaskCompletionEvent implements Writable{
-    static public enum Status {FAILED, SUCCEEDED};
+    static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
     
     private int eventId ; 
     private String taskTrackerHttp ;
     private String taskId ;
     Status status ; 
+    boolean isMap = false ;
+    private int idWithinJob;
     public static final TaskCompletionEvent[] EMPTY_ARRAY = 
         new TaskCompletionEvent[0];
     /**
@@ -35,11 +37,15 @@ public class TaskCompletionEvent implements Writable{
      * @param taskTrackerHttp task tracker's host:port for http. 
      */
     public TaskCompletionEvent(int eventId, 
-        String taskId, 
+        String taskId,
+        int idWithinJob,
+        boolean isMap,
         Status status, 
         String taskTrackerHttp){
       
-        this.taskId = taskId ; 
+        this.taskId = taskId ;
+        this.idWithinJob = idWithinJob ;
+        this.isMap = isMap ;
         this.eventId = eventId ; 
         this.status =status ; 
         this.taskTrackerHttp = taskTrackerHttp ;
@@ -114,17 +120,28 @@ public class TaskCompletionEvent implements Writable{
         return buf.toString();
     }
     
+    public boolean isMapTask() {
+        return isMap;
+    }
+    
+    public int idWithinJob() {
+      return idWithinJob;
+    }
     //////////////////////////////////////////////
     // Writable
     //////////////////////////////////////////////
     public void write(DataOutput out) throws IOException {
         WritableUtils.writeString(out, taskId); 
+        WritableUtils.writeVInt(out, idWithinJob);
+        out.writeBoolean(isMap);
         WritableUtils.writeEnum(out, status); 
         WritableUtils.writeString(out, taskTrackerHttp);
     }
   
     public void readFields(DataInput in) throws IOException {
         this.taskId = WritableUtils.readString(in) ; 
+        this.idWithinJob = WritableUtils.readVInt(in);
+        this.isMap = in.readBoolean();
         this.status = WritableUtils.readEnum(in, Status.class);
         this.taskTrackerHttp = WritableUtils.readString(in);
     }

+ 24 - 0
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -63,6 +63,7 @@ class TaskInProgress {
     private JobInProgress job;
 
     // Status of the TIP
+    private int successEventNumber = -1;
     private int numTaskFailures = 0;
     private double progress = 0;
     private String state = "";
@@ -143,6 +144,15 @@ class TaskInProgress {
       return result.toString();
     }
     
+    /**
+     * Return the index of the tip within the job, so "tip_0002_m_012345"
+     * would return 12345;
+     * @return int the tip index
+     */
+     public int idWithinJob() {
+       return partition;
+     }    
+
     /**
      * Initialization common to Map and Reduce
      */
@@ -574,4 +584,18 @@ class TaskInProgress {
     public int getIdWithinJob() {
       return partition;
     }
+    
+    /**
+     * Set the event number that was raised for this tip
+     */
+    public void setSuccessEventNumber(int eventNumber) {
+      successEventNumber = eventNumber;
+    }
+       
+    /**
+     * Get the event number that was raised for this tip
+     */
+    public int getSuccessEventNumber() {
+      return successEventNumber;
+    }
 }