소스 검색

HADOOP-1158. Change JobTracker to record map-output transmission errors and use them to trigger speculative re-execution of tasks. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@569063 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 17 년 전
부모
커밋
002d8f2de9

+ 4 - 0
CHANGES.txt

@@ -98,6 +98,10 @@ Trunk (unreleased changes)
     HADOOP-1654.  Add IOUtils class, containing generic io-related
     utility methods.   (Enis Soztutar via cutting)
 
+    HADOOP-1158.  Change JobTracker to record map-output transmission
+    errors and use them to trigger speculative re-execution of tasks.
+    (Arun C Murthy via cutting)
+
 
 Release 0.14.0 - 2007-08-17
 

+ 9 - 6
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -51,6 +51,10 @@ public class IsolationRunner {
       LOG.info("Task " + taskId + " reporting file system error: " + message);
     }
 
+    public void shuffleError(String taskId, String message) throws IOException {
+      LOG.info("Task " + taskId + " reporting shuffle error: " + message);
+    }
+
     public Task getTask(String taskid) throws IOException {
       return null;
     }
@@ -59,14 +63,13 @@ public class IsolationRunner {
       return true;
     }
 
-    public boolean progress(String taskid, float progress, String state,
-                         TaskStatus.Phase phase, Counters counters) 
-      throws IOException 
-    {
+    public boolean statusUpdate(String taskId, TaskStatus taskStatus) 
+    throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");
-      buf.append(taskid);
+      buf.append(taskId);
       buf.append(" making progress to ");
-      buf.append(progress);
+      buf.append(taskStatus.getProgress());
+      String state = taskStatus.getStateString();
       if (state != null) {
         buf.append(" and state of ");
         buf.append(state);

+ 39 - 8
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -110,6 +110,14 @@ class JobInProgress {
   
   private MetricsRecord jobMetrics;
   
+  // Maximum no. of fetch-failure notifications after which
+  // the map task is killed
+  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+  
+  // Map of mapTaskId -> no. of fetch failures
+  private Map<String, Integer> mapTaskIdToFetchFailuresMap =
+    new TreeMap<String, Integer>();
+  
   /**
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
@@ -1034,14 +1042,14 @@ class JobInProgress {
                          TaskStatus.Phase phase, TaskStatus.State state, 
                          String hostname, String trackerName,
                          JobTrackerMetrics metrics) {
-    TaskStatus status = new TaskStatus(taskid,
-                                       tip.isMapTask(),
-                                       0.0f,
-                                       state,
-                                       reason,
-                                       reason,
-                                       trackerName, phase,
-                                       tip.getCounters());
+    TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
+                                                    taskid,
+                                                    0.0f,
+                                                    state,
+                                                    reason,
+                                                    reason,
+                                                    trackerName, phase,
+                                                    tip.getCounters());
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
@@ -1132,4 +1140,27 @@ class JobInProgress {
     }
     return events; 
   }
+  
+  synchronized void fetchFailureNotification(TaskInProgress tip, 
+                                             String mapTaskId, 
+                                             String hostname, String trackerName, 
+                                             JobTrackerMetrics metrics) {
+    Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
+    fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
+    mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
+    LOG.info("Failed fetch notification #" + fetchFailures + " for task " + 
+            mapTaskId);
+    
+    if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) {
+      LOG.info("Too many fetch-failures for output of task: " + mapTaskId 
+               + " ... killing it");
+      
+      failedTask(tip, mapTaskId, "Too many fetch-failures",                            
+                 (tip.isMapTask() ? TaskStatus.Phase.MAP : 
+                                    TaskStatus.Phase.REDUCE), 
+                 TaskStatus.State.FAILED, hostname, trackerName, metrics);
+      
+      mapTaskIdToFetchFailuresMap.remove(mapTaskId);
+    }
+  }
 }

+ 17 - 1
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -1708,8 +1708,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    * jobs that might be affected.
    */
   void updateTaskStatuses(TaskTrackerStatus status) {
+    String trackerName = status.getTrackerName();
     for (TaskStatus report : status.getTaskReports()) {
-      report.setTaskTracker(status.getTrackerName());
+      report.setTaskTracker(trackerName);
       String taskId = report.getTaskId();
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       if (tip == null) {
@@ -1718,6 +1719,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         expireLaunchingTasks.removeTask(taskId);
         tip.getJob().updateTaskStatus(tip, report, myMetrics);
       }
+      
+      // Process 'failed fetch' notifications 
+      List<String> failedFetchMaps = report.getFetchFailedMaps();
+      if (failedFetchMaps != null) {
+        for (String mapTaskId : failedFetchMaps) {
+          TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId);
+          if (failedFetchMap != null) {
+            failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, 
+                                                             mapTaskId, 
+                                                             status.getHost(), 
+                                                             trackerName, 
+                                                             myMetrics);
+          }
+        }
+      }
     }
   }
 

+ 11 - 7
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -203,23 +203,23 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
     public Task getTask(String taskid) { return null; }
 
-    public boolean progress(String taskId, float progress, String state, 
-                         TaskStatus.Phase phase, Counters taskCounters) {
-      LOG.info(state);
+    public boolean statusUpdate(String taskId, TaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      LOG.info(taskStatus.getStateString());
       float taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
         float numTasks = mapIds.size();
-        status.setMapProgress(taskIndex/numTasks + progress/numTasks);
+        status.setMapProgress(taskIndex/numTasks + taskStatus.getProgress()/numTasks);
       } else {
-        status.setReduceProgress(progress);
+        status.setReduceProgress(taskStatus.getProgress());
       }
-      currentCounters = Counters.sum(completedTaskCounters, taskCounters);
+      currentCounters = Counters.sum(completedTaskCounters, taskStatus.getCounters());
       
       // ignore phase
       
       return true;
     }
-    
+
     /**
      * Updates counters corresponding to completed tasks.
      * @param task A map or reduce task which has just been 
@@ -251,6 +251,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
       LOG.fatal("FSError: "+ message + "from task: " + taskId);
     }
 
+    public void shuffleError(String taskId, String message) throws IOException {
+      LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+    }
+    
     public TaskCompletionEvent[] getMapCompletionEvents(
                                                         String jobId, int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;

+ 3 - 1
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -67,7 +67,9 @@ class MapTask extends Task {
     setPhase(TaskStatus.Phase.MAP); 
   }
 
-  public MapTask() {}
+  public MapTask() {
+    super();
+  }
 
   public MapTask(String jobId, String jobFile, String tipId, String taskId, 
                  int partition, String splitClass, BytesWritable split

+ 51 - 0
src/java/org/apache/hadoop/mapred/MapTaskStatus.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+public class MapTaskStatus extends TaskStatus {
+
+  public MapTaskStatus() {}
+
+  public MapTaskStatus(String taskid, float progress,
+          State runState, String diagnosticInfo, String stateString,
+          String taskTracker, Phase phase, Counters counters) {
+    super(taskid, progress, runState, diagnosticInfo, stateString,
+          taskTracker, phase, counters);
+  }
+
+  public boolean getIsMap() {
+    return true;
+  }
+
+  public long getShuffleFinishTime() {
+    throw new UnsupportedOperationException("getShuffleFinishTime() not supported for MapTask");
+  }
+
+  void setShuffleFinishTime(long shuffleFinishTime) {
+    throw new UnsupportedOperationException("setShuffleFinishTime() not supported for MapTask");
+  }
+
+  public long getSortFinishTime() {
+    throw new UnsupportedOperationException("getSortFinishTime() not supported for MapTask");
+  }
+
+  void setSortFinishTime(long sortFinishTime) {
+    throw new UnsupportedOperationException("setSortFinishTime() not supported for MapTask");
+  }
+}

+ 71 - 5
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -28,6 +28,7 @@ import java.net.URLClassLoader;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -36,7 +37,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -90,7 +90,9 @@ class ReduceTask extends Task {
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
 
-  public ReduceTask() {}
+  public ReduceTask() {
+    super();
+  }
 
   public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
                     int partition, int numMaps) {
@@ -466,6 +468,30 @@ class ReduceTask extends Task {
      */
     private long ramfsMergeOutputSize;
     
+    /**
+     * Maximum no. of fetch-retries per-map.
+     */
+    private static final int MAX_FETCH_RETRIES_PER_MAP = 5;
+    
+    /**
+     * Maximum no. of unique maps from which we failed to fetch map-outputs
+     * even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries; after this the
+     * reduce task is failed.
+     */
+    private static final int MAX_FAILED_UNIQUE_FETCHES = 5;
+
+    /**
+     * The maps from which we fail to fetch map-outputs 
+     * even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries.
+     */
+    Set<Integer> fetchFailedMaps = new TreeSet<Integer>(); 
+    
+    /**
+     * A map of taskId -> no. of failed fetches
+     */
+    Map<String, Integer> mapTaskToFailedFetchesMap = 
+      new HashMap<String, Integer>();    
+
     /**
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
@@ -958,7 +984,11 @@ class ReduceTask extends Task {
                 copyPhase.startNextPhase();
                 copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
                                     + " at " +
-                                    mbpsFormat.format(transferRate) +  " MB/s)");          
+                                    mbpsFormat.format(transferRate) +  " MB/s)");
+                
+                // Note successfull fetch for this mapId to invalidate
+                // (possibly) old fetch-failures
+                fetchFailedMaps.remove(cr.getLocation().getMapId());
               } else if (cr.isObsolete()) {
                 //ignore
                 LOG.info(reduceTask.getTaskId() + 
@@ -968,10 +998,46 @@ class ReduceTask extends Task {
               } else {
                 retryFetches.add(cr.getLocation());
                 
+                // note the failed-fetch
+                String mapTaskId = cr.getLocation().getMapTaskId();
+                Integer mapId = cr.getLocation().getMapId();
+                
+                Integer noFailedFetches = 
+                  mapTaskToFailedFetchesMap.get(mapTaskId);
+                noFailedFetches = 
+                  (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
+                mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
+                LOG.info("Task " + getTaskId() + ": Failed fetch #" + 
+                         noFailedFetches + " from " + mapTaskId);
+                
+                // did the fetch fail too many times?
+                if ((noFailedFetches % MAX_FETCH_RETRIES_PER_MAP) == 0) {
+                  synchronized (ReduceTask.this) {
+                    taskStatus.addFetchFailedMap(mapTaskId);
+                    LOG.info("Failed to fetch map-output from " + mapTaskId + 
+                             " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+                             + " reporting to the JobTracker");
+                  }
+                }
+
+                // note unique failed-fetch maps
+                if (noFailedFetches == MAX_FETCH_RETRIES_PER_MAP) {
+                  fetchFailedMaps.add(mapId);
+                  
+                  // did we have too many unique failed-fetch maps?
+                  if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) {
+                    LOG.fatal("Shuffle failed with too many fetch failures! " +
+                             "Killing task " + getTaskId() + ".");
+                    umbilical.shuffleError(getTaskId(), 
+                                           "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+                                           + " bailing-out.");
+                  }
+                }
+                
                 // wait a random amount of time for next contact
                 currentTime = System.currentTimeMillis();
-                long nextContact = currentTime + 60 * 1000 +
-                  backoff.nextInt(maxBackoff*1000);
+                long nextContact = currentTime + 60 * 1000 + 
+                                   backoff.nextInt(maxBackoff*1000);
                 penaltyBox.put(cr.getHost(), nextContact);          
                 LOG.warn(reduceTask.getTaskId() + " adding host " +
                          cr.getHost() + " to penalty box, next contact in " +

+ 137 - 0
src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+
+public class ReduceTaskStatus extends TaskStatus {
+
+  private long shuffleFinishTime; 
+  private long sortFinishTime; 
+  private List<String> failedFetchTasks = new ArrayList<String>(1);
+  
+  public ReduceTaskStatus() {}
+
+  public ReduceTaskStatus(String taskid, float progress, State runState,
+          String diagnosticInfo, String stateString, String taskTracker,
+          Phase phase, Counters counters) {
+    super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker,
+            phase, counters);
+  }
+
+  public Object clone() {
+    ReduceTaskStatus myClone = (ReduceTaskStatus)super.clone();
+    myClone.failedFetchTasks = new ArrayList<String>(failedFetchTasks);
+    return myClone;
+  }
+
+  public boolean getIsMap() {
+    return false;
+  }
+
+  void setFinishTime(long finishTime) {
+    if (shuffleFinishTime == 0) {
+      this.shuffleFinishTime = finishTime; 
+    }
+    if (sortFinishTime == 0){
+      this.sortFinishTime = finishTime;
+    }
+    super.setFinishTime(finishTime);
+  }
+
+  public long getShuffleFinishTime() {
+    return shuffleFinishTime;
+  }
+
+  void setShuffleFinishTime(long shuffleFinishTime) {
+    this.shuffleFinishTime = shuffleFinishTime;
+  }
+
+  public long getSortFinishTime() {
+    return sortFinishTime;
+  }
+
+  void setSortFinishTime(long sortFinishTime) {
+    this.sortFinishTime = sortFinishTime;
+    if (0 == this.shuffleFinishTime){
+      this.shuffleFinishTime = sortFinishTime;
+    }
+  }
+
+  public List<String> getFetchFailedMaps() {
+    return failedFetchTasks;
+  }
+  
+  void addFetchFailedMap(String mapTaskId) {
+    failedFetchTasks.add(mapTaskId);
+  }
+  
+  synchronized void statusUpdate(TaskStatus status) {
+    super.statusUpdate(status);
+    
+    if (status.getShuffleFinishTime() != 0) {
+      this.shuffleFinishTime = status.getShuffleFinishTime();
+    }
+    
+    if (status.getSortFinishTime() != 0) {
+      sortFinishTime = status.getSortFinishTime();
+    }
+    
+    List<String> newFetchFailedMaps = status.getFetchFailedMaps();
+    if (failedFetchTasks == null) {
+      failedFetchTasks = newFetchFailedMaps;
+    } else if (newFetchFailedMaps != null){
+      failedFetchTasks.addAll(newFetchFailedMaps);
+    }
+  }
+
+  synchronized void clearStatus() {
+    super.clearStatus();
+    failedFetchTasks.clear();
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    shuffleFinishTime = in.readLong(); 
+    sortFinishTime = in.readLong();
+    int noFailedFetchTasks = in.readInt();
+    failedFetchTasks = new ArrayList<String>(noFailedFetchTasks);
+    for (int i=0; i < noFailedFetchTasks; ++i) {
+      failedFetchTasks.add(Text.readString(in));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeLong(shuffleFinishTime);
+    out.writeLong(sortFinishTime);
+    out.writeInt(failedFetchTasks.size());
+    for (String taskId : failedFetchTasks) {
+      Text.writeString(out, taskId);
+    }
+  }
+  
+}

+ 26 - 9
src/java/org/apache/hadoop/mapred/Task.java

@@ -83,7 +83,7 @@ abstract class Task implements Writable, Configurable {
   private String jobId;                           // unique jobid
   private String tipId;
   private int partition;                          // id within job
-  private TaskStatus.Phase phase ;                // current phase of the task
+  TaskStatus taskStatus; 										      // current status of the task
   private Path taskOutputPath;                    // task-specific output dir
   
   protected JobConf conf;
@@ -94,7 +94,9 @@ abstract class Task implements Writable, Configurable {
   // Constructors
   ////////////////////////////////////////////
 
-  public Task() {}
+  public Task() {
+    taskStatus = TaskStatus.createTaskStatus(isMapTask());
+  }
 
   public Task(String jobId, String jobFile, String tipId, 
               String taskId, int partition) {
@@ -103,6 +105,14 @@ abstract class Task implements Writable, Configurable {
     this.jobId = jobId;
     this.tipId = tipId; 
     this.partition = partition;
+    this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
+                                                  0.0f, 
+                                                  TaskStatus.State.UNASSIGNED, 
+                                                  "", "", "", 
+                                                  isMapTask() ? 
+                                                    TaskStatus.Phase.MAP : 
+                                                    TaskStatus.Phase.SHUFFLE, 
+                                                  counters);
   }
 
   ////////////////////////////////////////////
@@ -135,14 +145,14 @@ abstract class Task implements Writable, Configurable {
    * @return
    */
   public synchronized TaskStatus.Phase getPhase(){
-    return this.phase; 
+    return this.taskStatus.getPhase(); 
   }
   /**
    * Set current phase of the task. 
    * @param p
    */
-  protected synchronized void setPhase(TaskStatus.Phase p){
-    this.phase = p; 
+  protected synchronized void setPhase(TaskStatus.Phase phase){
+    this.taskStatus.setPhase(phase); 
   }
 
   ////////////////////////////////////////////
@@ -160,6 +170,7 @@ abstract class Task implements Writable, Configurable {
     } else {
       Text.writeString(out, "");
     }
+    taskStatus.write(out);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = UTF8.readString(in);
@@ -173,6 +184,7 @@ abstract class Task implements Writable, Configurable {
     } else {
       taskOutputPath = null;
     }
+    taskStatus.readFields(in);
   }
 
   public String toString() { return taskId; }
@@ -276,8 +288,10 @@ abstract class Task implements Writable, Configurable {
               
               if (sendProgress) {
                 // we need to send progress update
-                taskFound = umbilical.progress(taskId, taskProgress.get(), 
-                    taskProgress.toString(), getPhase(), counters);
+                taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
+                        counters);
+                taskFound = umbilical.statusUpdate(taskId, taskStatus);
+                taskStatus.clearStatus();
               }
               else {
                 // send ping 
@@ -351,18 +365,21 @@ abstract class Task implements Writable, Configurable {
       try {
         if (needProgress) {
           // send a final status report
+          taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
+                                  counters);
           try {
-            if (!umbilical.progress(taskId, taskProgress.get(),
-                taskProgress.toString(), getPhase(), counters)) {
+            if (!umbilical.statusUpdate(getTaskId(), taskStatus)) {
               LOG.warn("Parent died.  Exiting "+taskId);
               System.exit(66);
             }
+            taskStatus.clearStatus();
             needProgress = false;
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();       // interrupt ourself
           }
         }
         umbilical.done(taskId);
+        LOG.info("Task '" + getTaskId() + "' done.");
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 

+ 1 - 2
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -302,8 +302,7 @@ class TaskInProgress {
                !tasksReportedClosed.contains(taskid)){
       tasksReportedClosed.add(taskid);
       return true; 
-    }
-    else {
+    } else {
       return false;
     }
   }

+ 138 - 40
src/java/org/apache/hadoop/mapred/TaskStatus.java

@@ -20,7 +20,10 @@ package org.apache.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -29,7 +32,10 @@ import org.apache.hadoop.io.WritableUtils;
  * not intended to be a comprehensive piece of data.
  *
  **************************************************/
-class TaskStatus implements Writable {
+abstract class TaskStatus implements Writable, Cloneable {
+  static final Log LOG =
+    LogFactory.getLog(TaskStatus.class.getName());
+  
   //enumeration for reporting current phase of a task. 
   public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
 
@@ -37,7 +43,6 @@ class TaskStatus implements Writable {
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
     
   private String taskid;
-  private boolean isMap;
   private float progress;
   private State runState;
   private String diagnosticInfo;
@@ -47,21 +52,16 @@ class TaskStatus implements Writable {
   private long startTime; 
   private long finishTime; 
     
-  // only for reduce tasks
-  private long shuffleFinishTime; 
-  private long sortFinishTime; 
-    
   private Phase phase = Phase.STARTING; 
   private Counters counters;
 
   public TaskStatus() {}
 
-  public TaskStatus(String taskid, boolean isMap, float progress,
+  public TaskStatus(String taskid, float progress,
                     State runState, String diagnosticInfo,
                     String stateString, String taskTracker,
                     Phase phase, Counters counters) {
     this.taskid = taskid;
-    this.isMap = isMap;
     this.progress = progress;
     this.runState = runState;
     this.diagnosticInfo = diagnosticInfo;
@@ -70,9 +70,9 @@ class TaskStatus implements Writable {
     this.phase = phase;
     this.counters = counters;
   }
-    
+  
   public String getTaskId() { return taskid; }
-  public boolean getIsMap() { return isMap; }
+  public abstract boolean getIsMap();
   public float getProgress() { return progress; }
   public void setProgress(float progress) { this.progress = progress; } 
   public State getRunState() { return runState; }
@@ -100,12 +100,6 @@ class TaskStatus implements Writable {
    * @param finishTime finish time of task.
    */
   void setFinishTime(long finishTime) {
-    if (shuffleFinishTime == 0) {
-      this.shuffleFinishTime = finishTime; 
-    }
-    if (sortFinishTime == 0){
-      this.sortFinishTime = finishTime;
-    }
     this.finishTime = finishTime;
   }
   /**
@@ -117,16 +111,14 @@ class TaskStatus implements Writable {
    * it returns approximate shuffle finish time.  
    */
   public long getShuffleFinishTime() {
-    return shuffleFinishTime;
+    return 0;
   }
 
   /**
    * Set shuffle finish time. 
    * @param shuffleFinishTime 
    */
-  void setShuffleFinishTime(long shuffleFinishTime) {
-    this.shuffleFinishTime = shuffleFinishTime;
-  }
+  void setShuffleFinishTime(long shuffleFinishTime) {}
 
   /**
    * Get sort finish time for the task,. If sort finish time was not set 
@@ -136,7 +128,7 @@ class TaskStatus implements Writable {
    * finish time if that is set, else it returns finish time. 
    */
   public long getSortFinishTime() {
-    return sortFinishTime;
+    return 0;
   }
 
   /**
@@ -144,12 +136,7 @@ class TaskStatus implements Writable {
    * then its set to sortFinishTime.  
    * @param sortFinishTime
    */
-  void setSortFinishTime(long sortFinishTime) {
-    this.sortFinishTime = sortFinishTime;
-    if (0 == this.shuffleFinishTime){
-      this.shuffleFinishTime = sortFinishTime;
-    }
-  }
+  void setSortFinishTime(long sortFinishTime) {}
 
   /**
    * Get start time of the task. 
@@ -176,10 +163,19 @@ class TaskStatus implements Writable {
   }
   /**
    * Set current phase of this task.  
-   * @param p
+   * @param phase phase of this task
    */
-  void setPhase(Phase p){
-    this.phase = p; 
+  void setPhase(Phase phase){
+    TaskStatus.Phase oldPhase = getPhase();
+    if (oldPhase != phase){
+      // sort phase started
+      if (phase == TaskStatus.Phase.SORT){
+        setShuffleFinishTime(System.currentTimeMillis());
+      }else if (phase == TaskStatus.Phase.REDUCE){
+        setSortFinishTime(System.currentTimeMillis());
+      }
+    }
+    this.phase = phase; 
   }
   /**
    * Get task's counters.
@@ -194,13 +190,81 @@ class TaskStatus implements Writable {
   public void setCounters(Counters counters) {
     this.counters = counters;
   }
+  
+  /**
+   * Get the list of maps from which output-fetches failed.
+   * 
+   * @return the list of maps from which output-fetches failed.
+   */
+  public List<String> getFetchFailedMaps() {
+    return null;
+  }
+  
+  /**
+   * Add to the list of maps from which output-fetches failed.
+   *  
+   * @param mapTaskId map from which fetch failed
+   */
+  synchronized void addFetchFailedMap(String mapTaskId) {}
+
+  /**
+   * Update the status of the task.
+   * 
+   * @param progress
+   * @param state
+   * @param phase
+   * @param counters
+   */
+  synchronized void statusUpdate(float progress, String state, 
+                                 Counters counters) {
+    setRunState(TaskStatus.State.RUNNING);
+    setProgress(progress);
+    setStateString(state);
+    setCounters(counters);
+  }
+  
+  /**
+   * Update the status of the task.
+   * 
+   * @param status updated status
+   */
+  synchronized void statusUpdate(TaskStatus status) {
+    this.progress = status.getProgress();
+    this.runState = status.getRunState();
+    this.diagnosticInfo = status.getDiagnosticInfo();
+    this.stateString = status.getStateString();
+      
+    if (status.getStartTime() != 0) {
+      this.startTime = status.getStartTime(); 
+    }
+    if (status.getFinishTime() != 0) {
+      this.finishTime = status.getFinishTime(); 
+    }
     
+    this.phase = status.getPhase();
+    this.counters = status.getCounters();
+  }
+  
+  /**
+   * Clear out transient information after sending out a status update
+   * to the {@link TaskTracker}.
+   */
+  synchronized void clearStatus() {}
+
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      // Shouldn't happen since we do implement Clonable
+      throw new InternalError(cnse.toString());
+    }
+  }
+  
   //////////////////////////////////////////////
   // Writable
   //////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, taskid);
-    out.writeBoolean(isMap);
     out.writeFloat(progress);
     WritableUtils.writeEnum(out, runState);
     UTF8.writeString(out, diagnosticInfo);
@@ -208,16 +272,11 @@ class TaskStatus implements Writable {
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
-    if (!isMap){
-      out.writeLong(shuffleFinishTime);
-      out.writeLong(sortFinishTime);
-    }
     counters.write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
     this.taskid = UTF8.readString(in);
-    this.isMap = in.readBoolean();
     this.progress = in.readFloat();
     this.runState = WritableUtils.readEnum(in, State.class);
     this.diagnosticInfo = UTF8.readString(in);
@@ -225,12 +284,51 @@ class TaskStatus implements Writable {
     this.phase = WritableUtils.readEnum(in, Phase.class); 
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 
-    if (!this.isMap){
-      shuffleFinishTime = in.readLong(); 
-      sortFinishTime = in.readLong(); 
-    }
     counters = new Counters();
     counters.readFields(in);
   }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Factory-like methods to create/read/write appropriate TaskStatus objects
+  //////////////////////////////////////////////////////////////////////////////
+  
+  static TaskStatus createTaskStatus(DataInput in, String taskId, float progress,
+                                     State runState, String diagnosticInfo,
+                                     String stateString, String taskTracker,
+                                     Phase phase, Counters counters) 
+  throws IOException {
+    boolean isMap = in.readBoolean();
+    return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo, 
+                          stateString, taskTracker, phase, counters);
+  }
+  
+  static TaskStatus createTaskStatus(boolean isMap, String taskId, float progress,
+                                   State runState, String diagnosticInfo,
+                                   String stateString, String taskTracker,
+                                   Phase phase, Counters counters) { 
+    return (isMap) ? new MapTaskStatus(taskId, progress, runState, 
+                                       diagnosticInfo, stateString, taskTracker, 
+                                       phase, counters) :
+                     new ReduceTaskStatus(taskId, progress, runState, 
+                                          diagnosticInfo, stateString, 
+                                          taskTracker, phase, counters);
+  }
+  
+  static TaskStatus createTaskStatus(boolean isMap) {
+    return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
+  }
+
+  static TaskStatus readTaskStatus(DataInput in) throws IOException {
+    boolean isMap = in.readBoolean();
+    TaskStatus taskStatus = createTaskStatus(isMap);
+    taskStatus.readFields(in);
+    return taskStatus;
+  }
+  
+  static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
+  throws IOException {
+    out.writeBoolean(taskStatus.getIsMap());
+    taskStatus.write(out);
+  }
 }
 

+ 77 - 72
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -669,7 +669,7 @@ public class TaskTracker
         tip.setJobConf(jobConf);
         tip.launchTask();
       } catch (Throwable ie) {
-        tip.runstate = TaskStatus.State.FAILED;
+        tip.taskStatus.setRunState(TaskStatus.State.FAILED);
         try {
           tip.cleanup();
         } catch (Throwable ie2) {
@@ -906,15 +906,9 @@ public class TaskTracker
     //
     if (status == null) {
       synchronized (this) {
-        List<TaskStatus> taskReports = 
-          new ArrayList<TaskStatus>(runningTasks.size());
-        for (TaskInProgress tip: runningTasks.values()) {
-          taskReports.add(tip.createStatus());
-        }
-        status = 
-          new TaskTrackerStatus(taskTrackerName, localHostname, 
-                                httpPort, taskReports, 
-                                failures); 
+        status = new TaskTrackerStatus(taskTrackerName, localHostname, 
+                                       httpPort, cloneAndResetRunningTaskStatuses(), 
+                                       failures); 
       }
     } else {
       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
@@ -963,6 +957,12 @@ public class TaskTracker
           runningTasks.remove(taskStatus.getTaskId());
         }
       }
+      
+      // Clear transient status information which should only
+      // be sent once to the JobTracker
+      for (TaskInProgress tip: runningTasks.values()) {
+        tip.getStatus().clearStatus();
+      }
     }
 
     // Force a rebuild of 'status' on the next iteration
@@ -1265,8 +1265,6 @@ public class TaskTracker
   ///////////////////////////////////////////////////////
   class TaskInProgress {
     Task task;
-    float progress;
-    volatile TaskStatus.State runstate;
     long lastProgressReport;
     StringBuffer diagnosticInfo = new StringBuffer();
     private TaskRunner runner;
@@ -1283,19 +1281,18 @@ public class TaskTracker
      */
     public TaskInProgress(Task task, JobConf conf) {
       this.task = task;
-      this.progress = 0.0f;
-      this.runstate = TaskStatus.State.UNASSIGNED;
       this.lastProgressReport = System.currentTimeMillis();
       this.defaultJobConf = conf;
       localJobConf = null;
-      taskStatus = new TaskStatus(task.getTaskId(), 
-                                  task.isMapTask(),
-                                  progress, runstate, 
-                                  diagnosticInfo.toString(), 
-                                  "initializing",  
-                                  getName(), task.isMapTask()? TaskStatus.Phase.MAP:
-                                  TaskStatus.Phase.SHUFFLE,
-                                  task.getCounters()); 
+      taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskId(), 
+                                               0.0f, 
+                                               TaskStatus.State.UNASSIGNED, 
+                                               diagnosticInfo.toString(), 
+                                               "initializing",  
+                                               getName(), 
+                                               task.isMapTask()? TaskStatus.Phase.MAP:
+                                               TaskStatus.Phase.SHUFFLE,
+                                               task.getCounters()); 
       taskTimeout = (10 * 60 * 1000);
     }
         
@@ -1350,14 +1347,12 @@ public class TaskTracker
         
     /**
      */
-    public synchronized TaskStatus createStatus() {
-      taskStatus.setProgress(progress);
-      taskStatus.setRunState(runstate);
+    public synchronized TaskStatus getStatus() {
       taskStatus.setDiagnosticInfo(diagnosticInfo.toString());
-          
       if (diagnosticInfo.length() > 0) {
         diagnosticInfo = new StringBuffer();
       }
+      
       return taskStatus;
     }
 
@@ -1366,7 +1361,7 @@ public class TaskTracker
      */
     public synchronized void launchTask() throws IOException {
       localizeTask(task);
-      this.runstate = TaskStatus.State.RUNNING;
+      this.taskStatus.setRunState(TaskStatus.State.RUNNING);
       this.runner = task.createRunner(TaskTracker.this);
       this.runner.start();
       this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1375,31 +1370,18 @@ public class TaskTracker
     /**
      * The task is reporting its progress
      */
-    public synchronized void reportProgress(float p, String state, 
-                                            TaskStatus.Phase newPhase,
-                                            Counters counters) 
+    public synchronized void reportProgress(TaskStatus taskStatus) 
     {
       if (this.done) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done()
         return;
       }
-      LOG.info(task.getTaskId()+" "+p+"% "+state);
-      this.progress = p;
-      this.runstate = TaskStatus.State.RUNNING;
+      
+      LOG.info(task.getTaskId() + " " + taskStatus.getProgress() + 
+               "% " + taskStatus.getStateString());
+      this.taskStatus.statusUpdate(taskStatus);
       this.lastProgressReport = System.currentTimeMillis();
-      TaskStatus.Phase oldPhase = taskStatus.getPhase();
-      if (oldPhase != newPhase){
-        // sort phase started
-        if (newPhase == TaskStatus.Phase.SORT){
-          this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
-        }else if (newPhase == TaskStatus.Phase.REDUCE){
-          this.taskStatus.setSortFinishTime(System.currentTimeMillis());
-        }
-        this.taskStatus.setPhase(newPhase);
-      }
-      this.taskStatus.setStateString(state);
-      this.taskStatus.setCounters(counters);
     }
 
     /**
@@ -1411,7 +1393,7 @@ public class TaskTracker
     /**
      */
     public TaskStatus.State getRunState() {
-      return runstate;
+      return taskStatus.getRunState();
     }
 
     /**
@@ -1434,10 +1416,12 @@ public class TaskTracker
      * The task is reporting that it's done running
      */
     public synchronized void reportDone() {
-      LOG.info("Task " + task.getTaskId() + " is done.");
-      this.progress = 1.0f;
+      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
+      
+      LOG.info("Task " + task.getTaskId() + " is done.");
     }
 
     /**
@@ -1464,19 +1448,19 @@ public class TaskTracker
       boolean needCleanup = false;
       synchronized (this) {
         if (done) {
-          runstate = TaskStatus.State.SUCCEEDED;
+          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
         } else {
           if (!wasKilled) {
             failures += 1;
-            runstate = TaskStatus.State.FAILED;
+            taskStatus.setRunState(TaskStatus.State.FAILED);
           } else {
-            runstate = TaskStatus.State.KILLED;
+            taskStatus.setRunState(TaskStatus.State.KILLED);
           }
-          progress = 0.0f;
+          taskStatus.setProgress(0.0f);
         }
         this.taskStatus.setFinishTime(System.currentTimeMillis());
-        needCleanup = (runstate == TaskStatus.State.FAILED) |
-          (runstate == TaskStatus.State.KILLED);
+        needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || 
+                       taskStatus.getRunState() == TaskStatus.State.KILLED);
       }
 
       //
@@ -1517,20 +1501,21 @@ public class TaskTracker
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
-      if (runstate == TaskStatus.State.RUNNING) {
+      if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
         runner.kill();
-        runstate = 
-          (wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED;
-      } else if (runstate == TaskStatus.State.UNASSIGNED) {
+        taskStatus.setRunState((wasFailure) ? 
+                                  TaskStatus.State.FAILED : 
+                                  TaskStatus.State.KILLED);
+      } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;
-          runstate = TaskStatus.State.FAILED;
+          taskStatus.setRunState(TaskStatus.State.FAILED);
         } else {
-          runstate = TaskStatus.State.KILLED;
+          taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
     }
@@ -1540,10 +1525,11 @@ public class TaskTracker
      */
     private synchronized void mapOutputLost(String failure
                                            ) throws IOException {
-      if (runstate == TaskStatus.State.SUCCEEDED) {
+      if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
+        // change status to failure
         LOG.info("Reporting output lost:"+task.getTaskId());
-        runstate = TaskStatus.State.FAILED;    // change status to failure
-        progress = 0.0f;
+        taskStatus.setRunState(TaskStatus.State.FAILED);
+        taskStatus.setProgress(0.0f);
         reportDiagnosticInfo("Map output lost, rescheduling: " + 
                              failure);
         runningTasks.put(task.getTaskId(), this);
@@ -1567,7 +1553,7 @@ public class TaskTracker
       synchronized (TaskTracker.this) {
         tasks.remove(taskId);
         if (alwaysKeepTaskFiles ||
-            (runstate == TaskStatus.State.FAILED && 
+            (taskStatus.getRunState() == TaskStatus.State.FAILED && 
              keepFailedTaskFiles)) {
           return;
         }
@@ -1618,14 +1604,12 @@ public class TaskTracker
   /**
    * Called periodically to report Task progress, from 0.0 to 1.0.
    */
-  public synchronized boolean progress(String taskid, float progress, 
-                                    String state, 
-                                    TaskStatus.Phase phase,
-                                    Counters counters
-                                    ) throws IOException {
+  public synchronized boolean statusUpdate(String taskid, 
+                                              TaskStatus taskStatus) 
+  throws IOException {
     TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
     if (tip != null) {
-      tip.reportProgress(progress, state, phase, counters);
+      tip.reportProgress(taskStatus);
       return true;
     } else {
       LOG.warn("Progress from unknown child task: "+taskid);
@@ -1663,6 +1647,18 @@ public class TaskTracker
     }
   }
 
+
+  /** 
+   * A reduce-task failed to shuffle the map-outputs. Kill the task.
+   */  
+  public synchronized void shuffleError(String taskId, String message) 
+  throws IOException { 
+    LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
+    TaskInProgress tip = runningTasks.get(taskId);
+    tip.reportDiagnosticInfo("Shuffle Error: " + message);
+    purgeTask(tip, true);
+  }
+
   /** 
    * A child task had a local filesystem error. Kill the task.
    */  
@@ -1826,6 +1822,15 @@ public class TaskTracker
     return taskTrackerName;
   }
     
+  private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
+    List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
+    for(TaskInProgress tip: runningTasks.values()) {
+      TaskStatus status = tip.getStatus();
+      result.add((TaskStatus)status.clone());
+      status.clearStatus();
+    }
+    return result;
+  }
   /**
    * Get the list of tasks that will be reported back to the 
    * job tracker in the next heartbeat cycle.
@@ -1834,7 +1839,7 @@ public class TaskTracker
   synchronized List<TaskStatus> getRunningTaskStatuses() {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
     for(TaskInProgress tip: runningTasks.values()) {
-      result.add(tip.createStatus());
+      result.add(tip.getStatus());
     }
     return result;
   }
@@ -1847,7 +1852,7 @@ public class TaskTracker
     List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
     for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) {
       if (!runningTasks.containsKey(task.getKey())) {
-        result.add(task.getValue().createStatus());
+        result.add(task.getValue().getStatus());
       }
     }
     return result;

+ 5 - 10
src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -156,29 +156,24 @@ class TaskTrackerStatus implements Writable {
     UTF8.writeString(out, trackerName);
     UTF8.writeString(out, host);
     out.writeInt(httpPort);
+    out.writeInt(failures);
 
     out.writeInt(taskReports.size());
-    out.writeInt(failures);
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
-      ((TaskStatus) it.next()).write(out);
+    for (TaskStatus taskStatus : taskReports) {
+      TaskStatus.writeTaskStatus(out, taskStatus);
     }
   }
 
-  /**
-   */     
   public void readFields(DataInput in) throws IOException {
     this.trackerName = UTF8.readString(in);
     this.host = UTF8.readString(in);
     this.httpPort = in.readInt();
+    this.failures = in.readInt();
 
     taskReports.clear();
-
     int numTasks = in.readInt();
-    this.failures = in.readInt();
     for (int i = 0; i < numTasks; i++) {
-      TaskStatus tmp = new TaskStatus();
-      tmp.readFields(in);
-      taskReports.add(tmp);
+      taskReports.add(TaskStatus.readTaskStatus(in));
     }
   }
 }

+ 20 - 12
src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -29,26 +29,31 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  * and parent is via this protocol. */ 
 interface TaskUmbilicalProtocol extends VersionedProtocol {
 
-  /** Changed the version to 2, since we have a new method getMapOutputs 
+  /** 
+   * Changed the version to 2, since we have a new method getMapOutputs 
    * Changed version to 3 to have progress() return a boolean
+   * Changed the version to 4, since we have replaced 
+   *         TaskUmbilicalProtocol.progress(String, float, String, 
+   *         org.apache.hadoop.mapred.TaskStatus.Phase, Counters) 
+   *         with {@link #statusUpdate(String, TaskStatus)}
    * */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;
 
-  /** Report child's progress to parent.
-   * @param taskid the id of the task
-   * @param progress value between zero and one
-   * @param state description of task's current state
-   * @param phase current phase of the task.
-   * @param counters the counters for this task.
+  /**
+   * Report child's progress to parent.
+   * 
+   * @param taskId task-id of the child
+   * @param taskStatus status of the child
+   * @throws IOException
+   * @throws InterruptedException
    * @return True if the task is known
    */
-  boolean progress(String taskid, float progress, String state, 
-                TaskStatus.Phase phase, Counters counters)
-    throws IOException, InterruptedException;
-
+  boolean statusUpdate(String taskId, TaskStatus taskStatus) 
+  throws IOException, InterruptedException;
+  
   /** Report error messages back to parent.  Calls should be sparing, since all
    *  such messages are held in the job tracker.
    *  @param taskid the id of the task involved
@@ -65,6 +70,9 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
    * the task process exits without calling this. */
   void done(String taskid) throws IOException;
 
+  /** Report that a reduce-task couldn't shuffle map-outputs.*/
+  void shuffleError(String taskId, String message) throws IOException;
+  
   /** Report that the task encounted a local filesystem error.*/
   void fsError(String taskId, String message) throws IOException;