浏览代码

HADOOP-248. Optimize location of map outputs to no longer use random probes. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@500410 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
4ee86c210e

+ 3 - 0
CHANGES.txt

@@ -87,6 +87,9 @@ Trunk (unreleased changes)
     with different versions of Lucene without worrying about CLASSPATH
     with different versions of Lucene without worrying about CLASSPATH
     order.  (Milind Bhandarkar via cutting)
     order.  (Milind Bhandarkar via cutting)
 
 
+27. HADOOP-248.  Optimize location of map outputs to not use random
+    probes.  (Devaraj Das via cutting)
+
 
 
 Release 0.10.1 - 2007-01-10
 Release 0.10.1 - 2007-01-10
 
 

+ 5 - 17
src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -28,11 +28,10 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  */ 
  */ 
 interface InterTrackerProtocol extends VersionedProtocol {
 interface InterTrackerProtocol extends VersionedProtocol {
   /**
   /**
-   * version 3 introduced to replace 
-   * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
-   * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
+   * version 4 introduced that removes locateMapOutputs and instead uses
+   * getTaskCompletionEvents to figure finished maps and fetch the outputs
    */
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
   
   
   public final static int TRACKERS_OK = 0;
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -62,18 +61,6 @@ interface InterTrackerProtocol extends VersionedProtocol {
           boolean initialContact, boolean acceptNewTasks, short responseId)
           boolean initialContact, boolean acceptNewTasks, short responseId)
   throws IOException;
   throws IOException;
 
 
-  /** Called by a reduce task to find which map tasks are completed.
-   *
-   * @param jobId the job id
-   * @param mapTasksNeeded an array of the mapIds that we need
-   * @param partition the reduce's id
-   * @return an array of MapOutputLocation
-   */
-  MapOutputLocation[] locateMapOutputs(String jobId, 
-                                       int[] mapTasksNeeded,
-                                       int partition
-                                       ) throws IOException;
-
   /**
   /**
    * The task tracker calls this once, to discern where it can find
    * The task tracker calls this once, to discern where it can find
    * files referred to by the JobTracker
    * files referred to by the JobTracker
@@ -96,11 +83,12 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * Returns empty aray if no events are available. 
    * Returns empty aray if no events are available. 
    * @param jobid job id 
    * @param jobid job id 
    * @param fromEventId event id to start from. 
    * @param fromEventId event id to start from. 
+   * @param maxEvents the max number of events we want to look at
    * @return array of task completion events. 
    * @return array of task completion events. 
    * @throws IOException
    * @throws IOException
    */
    */
   TaskCompletionEvent[] getTaskCompletionEvents(
   TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId) throws IOException; 
+      String jobid, int fromEventId, int maxEvents) throws IOException;
   
   
 }
 }
 
 

+ 1 - 1
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -157,7 +157,7 @@ public class JobClient extends ToolBase implements MRConstants  {
         public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
         public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
             int startFrom) throws IOException{
             int startFrom) throws IOException{
           return jobSubmitClient.getTaskCompletionEvents(
           return jobSubmitClient.getTaskCompletionEvents(
-              getJobID(), startFrom); 
+              getJobID(), startFrom, 10); 
         }
         }
 
 
         /**
         /**

+ 25 - 7
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -291,24 +291,40 @@ class JobInProgress {
           
           
           if (state == TaskStatus.State.SUCCEEDED) {
           if (state == TaskStatus.State.SUCCEEDED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker++, 
-                status.getTaskId(), 
+                taskCompletionEventTracker, 
+                status.getTaskId(),
+                tip.idWithinJob(),
+                status.getIsMap(),
                 TaskCompletionEvent.Status.SUCCEEDED,
                 TaskCompletionEvent.Status.SUCCEEDED,
                 httpTaskLogLocation ));
                 httpTaskLogLocation ));
+            tip.setSuccessEventNumber(taskCompletionEventTracker);
             completedTask(tip, status, metrics);
             completedTask(tip, status, metrics);
           } else if (state == TaskStatus.State.FAILED ||
           } else if (state == TaskStatus.State.FAILED ||
                      state == TaskStatus.State.KILLED) {
                      state == TaskStatus.State.KILLED) {
             this.taskCompletionEvents.add( new TaskCompletionEvent(
             this.taskCompletionEvents.add( new TaskCompletionEvent(
-                taskCompletionEventTracker++, 
-                status.getTaskId(), 
+                taskCompletionEventTracker, 
+                status.getTaskId(),
+                tip.idWithinJob(),
+                status.getIsMap(),
                 TaskCompletionEvent.Status.FAILED, 
                 TaskCompletionEvent.Status.FAILED, 
                 httpTaskLogLocation ));
                 httpTaskLogLocation ));
+            // Get the event number for the (possibly) previously successful
+            // task. If there exists one, then set that status to OBSOLETE 
+            int eventNumber;
+            if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+              TaskCompletionEvent t = 
+                this.taskCompletionEvents.get(eventNumber);
+              if (t.getTaskId().equals(status.getTaskId()))
+                t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+            }
             // Tell the job to fail the relevant task
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
                        wasRunning, wasComplete);
                        wasRunning, wasComplete);
           }          
           }          
         }
         }
 
 
+        taskCompletionEventTracker++;
+        
         //
         //
         // Update JobInProgress status
         // Update JobInProgress status
         //
         //
@@ -778,12 +794,14 @@ class JobInProgress {
        return null;
        return null;
     }
     }
     
     
-    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
+    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId, 
+        int maxEvents) {
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       if( taskCompletionEvents.size() > fromEventId) {
       if( taskCompletionEvents.size() > fromEventId) {
+        int actualMax = Math.min(maxEvents, 
+            (taskCompletionEvents.size() - fromEventId));
         events = (TaskCompletionEvent[])taskCompletionEvents.subList(
         events = (TaskCompletionEvent[])taskCompletionEvents.subList(
-            fromEventId, taskCompletionEvents.size()).
-            toArray(events);        
+            fromEventId, actualMax + fromEventId).toArray(events);        
       }
       }
       return events; 
       return events; 
     }
     }

+ 3 - 2
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -85,11 +85,12 @@ public interface JobSubmissionProtocol extends VersionedProtocol {
      * Get task completion events for the jobid, starting from fromEventId. 
      * Get task completion events for the jobid, starting from fromEventId. 
      * Returns empty aray if no events are available. 
      * Returns empty aray if no events are available. 
      * @param jobid job id 
      * @param jobid job id 
-     * @param fromEventId event id to start from. 
+     * @param fromEventId event id to start from.
+     * @param maxEvents the max number of events we want to look at 
      * @return array of task completion events. 
      * @return array of task completion events. 
      * @throws IOException
      * @throws IOException
      */
      */
     public TaskCompletionEvent[] getTaskCompletionEvents(
     public TaskCompletionEvent[] getTaskCompletionEvents(
-                String jobid, int fromEventId) throws IOException;
+              String jobid, int fromEventId, int maxEvents) throws IOException;
     
     
 }
 }

+ 3 - 45
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -1262,48 +1262,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         return null;
         return null;
     }
     }
 
 
-    /**
-     * A TaskTracker wants to know the physical locations of completed, but not
-     * yet closed, tasks.  This exists so the reduce task thread can locate
-     * map task outputs.
-     */
-    public synchronized MapOutputLocation[] 
-             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) 
-    throws IOException {
-        // Check to make sure that the job hasn't 'completed'.
-        JobInProgress job = getJob(jobId);
-        if (job.status.getRunState() != JobStatus.RUNNING) {
-          return new MapOutputLocation[0];
-        }
-        
-        ArrayList result = new ArrayList(mapTasksNeeded.length);
-        for (int i = 0; i < mapTasksNeeded.length; i++) {
-          TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
-          if (status != null) {
-             String trackerId = 
-               (String) taskidToTrackerMap.get(status.getTaskId());
-             // Safety check, if we can't find the taskid in 
-             // taskidToTrackerMap and job isn't 'running', then just
-             // return an empty array
-             if (trackerId == null && 
-                     job.status.getRunState() != JobStatus.RUNNING) {
-               return new MapOutputLocation[0];
-             }
-             
-             TaskTrackerStatus tracker;
-             synchronized (taskTrackers) {
-               tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
-             }
-             result.add(new MapOutputLocation(status.getTaskId(), 
-                                              mapTasksNeeded[i],
-                                              tracker.getHost(), 
-                                              tracker.getHttpPort()));
-          }
-        }
-        return (MapOutputLocation[]) 
-               result.toArray(new MapOutputLocation[result.size()]);
-    }
-
     /**
     /**
      * Grab the local fs name
      * Grab the local fs name
      */
      */
@@ -1435,14 +1393,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     /* 
     /* 
      * Returns a list of TaskCompletionEvent for the given job, 
      * Returns a list of TaskCompletionEvent for the given job, 
      * starting from fromEventId.
      * starting from fromEventId.
-     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int)
+     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
      */
      */
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-        String jobid, int fromEventId) throws IOException{
+        String jobid, int fromEventId, int maxEvents) throws IOException{
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
       JobInProgress job = (JobInProgress)this.jobs.get(jobid);
       JobInProgress job = (JobInProgress)this.jobs.get(jobid);
       if (null != job) {
       if (null != job) {
-        events = job.getTaskCompletionEvents(fromEventId);
+        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
       }
       }
       return events;
       return events;
     }
     }

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -244,7 +244,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
 
   public JobStatus[] jobsToComplete() {return null;}
   public JobStatus[] jobsToComplete() {return null;}
   public TaskCompletionEvent[] getTaskCompletionEvents(
   public TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId) throws IOException{
+      String jobid, int fromEventId, int maxEvents) throws IOException{
     return TaskCompletionEvent.EMPTY_ARRAY;
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   }
   
   

+ 70 - 26
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -122,9 +122,19 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
   
   
   /**
   /**
    * the number of map output locations to poll for at one time
    * the number of map output locations to poll for at one time
+   */  
+  private int probe_sample_size = 50;
+  
+  /**
+   * a Random used during the map output fetching
    */
    */
-  private static final int PROBE_SAMPLE_SIZE = 50;
-
+  private Random randForProbing;
+  
+  /**
+   * a hashmap from mapId to MapOutputLocation for retrials
+   */
+  private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
+  
   /** Represents the result of an attempt to copy a map output */
   /** Represents the result of an attempt to copy a map output */
   private class CopyResult {
   private class CopyResult {
     
     
@@ -368,6 +378,10 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     Random         backoff = new Random();
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
     final Progress copyPhase = getTask().getProgress().phase();
     
     
+    //tweak the probe sample size (make it a function of numCopiers)
+    probe_sample_size = Math.max(numCopiers*5, 50);
+    randForProbing = new Random(reduceTask.getPartition() * 100);
+    
     for (int i = 0; i < numOutputs; i++) {
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
       neededOutputs.add(new Integer(i));
       copyPhase.addPhase();       // add sub-phase per file
       copyPhase.addPhase();       // add sub-phase per file
@@ -385,6 +399,8 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     // start the clock for bandwidth measurement
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
     long startTime = System.currentTimeMillis();
     long currentTime = startTime;
     long currentTime = startTime;
+    int fromEventId = 0;
+    
     PingTimer pingTimer = new PingTimer();
     PingTimer pingTimer = new PingTimer();
     pingTimer.setName("Map output copy reporter for task " + 
     pingTimer.setName("Map output copy reporter for task " + 
                       reduceTask.getTaskId());
                       reduceTask.getTaskId());
@@ -401,17 +417,36 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
         LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
         LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
                  " map output location(s)");
                  " map output location(s)");
         try {
         try {
-          MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
+          // the call to queryJobTracker will modify fromEventId to a value
+          // that it should be for the next call to queryJobTracker
+          MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
           
           
           // remove discovered outputs from needed list
           // remove discovered outputs from needed list
           // and put them on the known list
           // and put them on the known list
+          int gotLocs = (locs == null ? 0 : locs.length);
           for (int i=0; i < locs.length; i++) {
           for (int i=0; i < locs.length; i++) {
-            neededOutputs.remove(new Integer(locs[i].getMapId()));
-            knownOutputs.add(locs[i]);
+            // check whether we actually need an output. It could happen
+            // that a map task that successfully ran earlier got lost, but
+            // if we already have copied the output of that unfortunate task
+            // we need not copy it again from the new TT (we will ignore 
+            // the event for the new rescheduled execution)
+            if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
+              // remove the mapId from the retryFetches hashmap since we now
+              // prefer the new location instead of what we saved earlier
+              retryFetches.remove(new Integer(locs[i].getMapId()));
+              knownOutputs.add(locs[i]);
+              gotLocs--;
+            }
+            
           }
           }
+          // now put the remaining hash entries for the failed fetches 
+          // and clear the hashmap
+          knownOutputs.addAll(retryFetches.values());
           LOG.info(reduceTask.getTaskId() +
           LOG.info(reduceTask.getTaskId() +
-                   " Got " + (locs == null ? 0 : locs.length) + 
-                   " map outputs from jobtracker");
+                " Got " + gotLocs + 
+                " new map outputs from jobtracker and " + retryFetches.size() +
+                " map outputs from previous failures");
+          retryFetches.clear();
         }
         }
         catch (IOException ie) {
         catch (IOException ie) {
           LOG.warn(reduceTask.getTaskId() +
           LOG.warn(reduceTask.getTaskId() +
@@ -485,6 +520,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
           } else {
           } else {
             // this copy failed, put it back onto neededOutputs
             // this copy failed, put it back onto neededOutputs
             neededOutputs.add(new Integer(cr.getMapId()));
             neededOutputs.add(new Integer(cr.getMapId()));
+            retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
           
           
             // wait a random amount of time for next contact
             // wait a random amount of time for next contact
             currentTime = System.currentTimeMillis();
             currentTime = System.currentTimeMillis();
@@ -504,6 +540,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
             while (locIt.hasNext()) {
             while (locIt.hasNext()) {
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
               if (cr.getHost().equals(loc.getHost())) {
               if (cr.getHost().equals(loc.getHost())) {
+                retryFetches.put(new Integer(loc.getMapId()), loc);
                 locIt.remove();
                 locIt.remove();
                 neededOutputs.add(new Integer(loc.getMapId()));
                 neededOutputs.add(new Integer(loc.getMapId()));
               }
               }
@@ -514,7 +551,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
         }
         }
         
         
         // ensure we have enough to keep us busy
         // ensure we have enough to keep us busy
-        if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
+        if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
           break;
           break;
         }
         }
       }
       }
@@ -608,28 +645,16 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
   }
   }
   
   
   /** Queries the job tracker for a set of outputs ready to be copied
   /** Queries the job tracker for a set of outputs ready to be copied
-   * @param neededOutputs the list of currently unknown outputs
+   * @param fromEventId the first event ID we want to start from, this will be
+   * modified by the call to this method
    * @param jobClient the job tracker
    * @param jobClient the job tracker
    * @return a set of locations to copy outputs from
    * @return a set of locations to copy outputs from
    * @throws IOException
    * @throws IOException
    */  
    */  
-  private MapOutputLocation[] queryJobTracker(List neededOutputs,
+  private MapOutputLocation[] queryJobTracker(int fromEventId, 
                                               InterTrackerProtocol jobClient)
                                               InterTrackerProtocol jobClient)
   throws IOException {
   throws IOException {
     
     
-    // query for a just a random subset of needed segments so that we don't
-    // overwhelm jobtracker.  ideally perhaps we could send a more compact
-    // representation of all needed, i.e., a bit-vector
-    int     checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
-    int     neededIds[] = new int[checkSize];
-      
-    Collections.shuffle(neededOutputs);
-      
-    ListIterator itr = neededOutputs.listIterator();
-    for (int i=0; i < checkSize; i++) {
-      neededIds[i] = ((Integer)itr.next()).intValue();
-    }
-
     long currentTime = System.currentTimeMillis();    
     long currentTime = System.currentTimeMillis();    
     long pollTime = lastPollTime + MIN_POLL_INTERVAL;
     long pollTime = lastPollTime + MIN_POLL_INTERVAL;
     while (currentTime < pollTime) {
     while (currentTime < pollTime) {
@@ -640,9 +665,28 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     }
     }
     lastPollTime = currentTime;
     lastPollTime = currentTime;
 
 
-    return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), 
-                                      neededIds,
-                                      reduceTask.getPartition());
+    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+                                      reduceTask.getJobId().toString(),
+                                      fromEventId,
+                                      probe_sample_size);
+    
+    List <MapOutputLocation> mapOutputsList = new ArrayList();
+    for (int i = 0; i < t.length; i++) {
+      if (t[i].isMap && 
+          t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
+        URI u = URI.create(t[i].getTaskTrackerHttp());
+        String host = u.getHost();
+        int port = u.getPort();
+        String taskId = t[i].getTaskId();
+        int mId = t[i].idWithinJob();
+        mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
+      }
+    }
+    Collections.shuffle(mapOutputsList, randForProbing);
+    MapOutputLocation[] locations =
+                        new MapOutputLocation[mapOutputsList.size()];
+    fromEventId += t.length;
+    return mapOutputsList.toArray(locations);
   }
   }
 
 
   
   

+ 20 - 3
src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

@@ -12,12 +12,14 @@ import org.apache.hadoop.io.WritableUtils;
  *
  *
  */
  */
 public class TaskCompletionEvent implements Writable{
 public class TaskCompletionEvent implements Writable{
-    static public enum Status {FAILED, SUCCEEDED};
+    static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
     
     
     private int eventId ; 
     private int eventId ; 
     private String taskTrackerHttp ;
     private String taskTrackerHttp ;
     private String taskId ;
     private String taskId ;
     Status status ; 
     Status status ; 
+    boolean isMap = false ;
+    private int idWithinJob;
     public static final TaskCompletionEvent[] EMPTY_ARRAY = 
     public static final TaskCompletionEvent[] EMPTY_ARRAY = 
         new TaskCompletionEvent[0];
         new TaskCompletionEvent[0];
     /**
     /**
@@ -35,11 +37,15 @@ public class TaskCompletionEvent implements Writable{
      * @param taskTrackerHttp task tracker's host:port for http. 
      * @param taskTrackerHttp task tracker's host:port for http. 
      */
      */
     public TaskCompletionEvent(int eventId, 
     public TaskCompletionEvent(int eventId, 
-        String taskId, 
+        String taskId,
+        int idWithinJob,
+        boolean isMap,
         Status status, 
         Status status, 
         String taskTrackerHttp){
         String taskTrackerHttp){
       
       
-        this.taskId = taskId ; 
+        this.taskId = taskId ;
+        this.idWithinJob = idWithinJob ;
+        this.isMap = isMap ;
         this.eventId = eventId ; 
         this.eventId = eventId ; 
         this.status =status ; 
         this.status =status ; 
         this.taskTrackerHttp = taskTrackerHttp ;
         this.taskTrackerHttp = taskTrackerHttp ;
@@ -114,17 +120,28 @@ public class TaskCompletionEvent implements Writable{
         return buf.toString();
         return buf.toString();
     }
     }
     
     
+    public boolean isMapTask() {
+        return isMap;
+    }
+    
+    public int idWithinJob() {
+      return idWithinJob;
+    }
     //////////////////////////////////////////////
     //////////////////////////////////////////////
     // Writable
     // Writable
     //////////////////////////////////////////////
     //////////////////////////////////////////////
     public void write(DataOutput out) throws IOException {
     public void write(DataOutput out) throws IOException {
         WritableUtils.writeString(out, taskId); 
         WritableUtils.writeString(out, taskId); 
+        WritableUtils.writeVInt(out, idWithinJob);
+        out.writeBoolean(isMap);
         WritableUtils.writeEnum(out, status); 
         WritableUtils.writeEnum(out, status); 
         WritableUtils.writeString(out, taskTrackerHttp);
         WritableUtils.writeString(out, taskTrackerHttp);
     }
     }
   
   
     public void readFields(DataInput in) throws IOException {
     public void readFields(DataInput in) throws IOException {
         this.taskId = WritableUtils.readString(in) ; 
         this.taskId = WritableUtils.readString(in) ; 
+        this.idWithinJob = WritableUtils.readVInt(in);
+        this.isMap = in.readBoolean();
         this.status = WritableUtils.readEnum(in, Status.class);
         this.status = WritableUtils.readEnum(in, Status.class);
         this.taskTrackerHttp = WritableUtils.readString(in);
         this.taskTrackerHttp = WritableUtils.readString(in);
     }
     }

+ 24 - 0
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -60,6 +60,7 @@ class TaskInProgress {
     private JobInProgress job;
     private JobInProgress job;
 
 
     // Status of the TIP
     // Status of the TIP
+    private int successEventNumber = -1;
     private int numTaskFailures = 0;
     private int numTaskFailures = 0;
     private double progress = 0;
     private double progress = 0;
     private String state = "";
     private String state = "";
@@ -138,6 +139,15 @@ class TaskInProgress {
       return result.toString();
       return result.toString();
     }
     }
     
     
+    /**
+     * Return the index of the tip within the job, so "tip_0002_m_012345"
+     * would return 12345;
+     * @return int the tip index
+     */
+     public int idWithinJob() {
+       return partition;
+     }    
+
     /**
     /**
      * Initialization common to Map and Reduce
      * Initialization common to Map and Reduce
      */
      */
@@ -568,4 +578,18 @@ class TaskInProgress {
     public int getIdWithinJob() {
     public int getIdWithinJob() {
       return partition;
       return partition;
     }
     }
+    
+    /**
+     * Set the event number that was raised for this tip
+     */
+    public void setSuccessEventNumber(int eventNumber) {
+      successEventNumber = eventNumber;
+    }
+       
+    /**
+     * Get the event number that was raised for this tip
+     */
+    public int getSuccessEventNumber() {
+      return successEventNumber;
+    }
 }
 }