Bladeren bron

HADOOP-427. New bug number for this patch.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@429858 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 jaren geleden
bovenliggende
commit
3fae4c4023

+ 1 - 1
CHANGES.txt

@@ -3,7 +3,7 @@ Hadoop Change Log
 
 
 Trunk (unreleased changes)
 Trunk (unreleased changes)
 
 
- 1. HADOOP-415.  Replace some uses of DatanodeDescriptor in the DFS
+ 1. HADOOP-427.  Replace some uses of DatanodeDescriptor in the DFS
     web UI code with DatanodeInfo, the preferred public class.
     web UI code with DatanodeInfo, the preferred public class.
     (Devaraj Das via cutting)
     (Devaraj Das via cutting)
 
 

+ 7 - 3
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -280,7 +280,7 @@ class JobInProgress {
           } else if (status.getRunState() == TaskStatus.FAILED) {
           } else if (status.getRunState() == TaskStatus.FAILED) {
             // Tell the job to fail the relevant task
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
-                       wasRunning, wasComplete);
+                       wasRunning, wasComplete, metrics);
           }          
           }          
         }
         }
 
 
@@ -520,7 +520,7 @@ class JobInProgress {
                 }
                 }
             }
             }
         }
         }
-
+        
         //
         //
         // If all tasks are complete, then the job is done!
         // If all tasks are complete, then the job is done!
         //
         //
@@ -571,7 +571,8 @@ class JobInProgress {
      */
      */
     private void failedTask(TaskInProgress tip, String taskid, 
     private void failedTask(TaskInProgress tip, String taskid, 
                             TaskStatus status, String trackerName,
                             TaskStatus status, String trackerName,
-                            boolean wasRunning, boolean wasComplete) {
+                            boolean wasRunning, boolean wasComplete,
+                            JobTrackerMetrics metrics) {
         tip.failedSubTask(taskid, trackerName);
         tip.failedSubTask(taskid, trackerName);
         boolean isRunning = tip.isRunning();
         boolean isRunning = tip.isRunning();
         boolean isComplete = tip.isComplete();
         boolean isComplete = tip.isComplete();
@@ -596,8 +597,10 @@ class JobInProgress {
         // the failed task goes to the end of the list.
         // the failed task goes to the end of the list.
         if (tip.isMapTask()) {
         if (tip.isMapTask()) {
           firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
           firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
+          metrics.failedMap();
         } else {
         } else {
           firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
           firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
+          metrics.failedReduce();
         }
         }
             
             
         //
         //
@@ -695,4 +698,5 @@ class JobInProgress {
        }
        }
        return null;
        return null;
     }
     }
+    
 }
 }

+ 123 - 1
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -219,6 +219,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                             // tracker has already been destroyed.
                             // tracker has already been destroyed.
                             if (newProfile != null) {
                             if (newProfile != null) {
                                 if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
                                 if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+                                    // But save the state so that if at a later
+                                    // point of time, we happen to hear from the
+                                    // same TaskTracker, we can reinstate 
+                                    // the state
+                                    ExpiredTaskTrackerState 
+                                        expTaskTrackerState = 
+                                        new ExpiredTaskTrackerState(
+                                           leastRecent.getTrackerName());
+                                    if (LOG.isDebugEnabled())
+                                      LOG.debug("Saving state of TaskTracker " +
+                                             leastRecent.getTrackerName());
+                                    expiredTaskTrackerStates.put(
+                                            leastRecent.getTrackerName(), 
+                                            expTaskTrackerState);
                                     // Remove completely
                                     // Remove completely
                                     updateTaskTrackerStatus(trackerName, null);
                                     updateTaskTrackerStatus(trackerName, null);
                                     lostTaskTracker(leastRecent.getTrackerName(),
                                     lostTaskTracker(leastRecent.getTrackerName(),
@@ -347,6 +361,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         Metrics.report(metricsRecord, "maps-completed",
         Metrics.report(metricsRecord, "maps-completed",
             ++numMapTasksCompleted);
             ++numMapTasksCompleted);
       }
       }
+
+      synchronized void failedMap() {
+        Metrics.report(metricsRecord, "maps-completed",
+            --numMapTasksCompleted);
+      }
       
       
       synchronized void launchReduce() {
       synchronized void launchReduce() {
         Metrics.report(metricsRecord, "reduces-launched",
         Metrics.report(metricsRecord, "reduces-launched",
@@ -357,6 +376,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         Metrics.report(metricsRecord, "reduces-completed",
         Metrics.report(metricsRecord, "reduces-completed",
             ++numReduceTasksCompleted);
             ++numReduceTasksCompleted);
       }
       }
+
+      synchronized void failedReduce() {
+        Metrics.report(metricsRecord, "reduces-completed",
+            --numReduceTasksCompleted);
+      }
       
       
       synchronized void submitJob() {
       synchronized void submitJob() {
         Metrics.report(metricsRecord, "jobs-submitted",
         Metrics.report(metricsRecord, "jobs-submitted",
@@ -427,6 +451,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     Thread initJobsThread = null;
     Thread initJobsThread = null;
     ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
     ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
     Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
     Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
+    private TreeMap expiredTaskTrackerStates = new TreeMap();
     
     
     /**
     /**
      * It might seem like a bug to maintain a TreeSet of status objects,
      * It might seem like a bug to maintain a TreeSet of status objects,
@@ -599,6 +624,36 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         LOG.info("stopped all jobtracker services");
         LOG.info("stopped all jobtracker services");
         return;
         return;
     }
     }
+
+    boolean reinstateStateOfTaskTracker(String trackerName) {
+      if (LOG.isDebugEnabled())
+        LOG.debug("Going to reinstate state of tasktracker " + trackerName);
+      ExpiredTaskTrackerState e = (ExpiredTaskTrackerState)
+                                   expiredTaskTrackerStates.get(trackerName);
+      if (e == null) return false;
+      Set taskset = e.getTaskSet();
+      if (taskset == null) return true;
+      for (Iterator it = taskset.iterator(); it.hasNext(); ) {
+        String taskId = (String) it.next();
+        TaskInProgress tip = e.getTIP(taskId);
+        if (LOG.isDebugEnabled())
+          LOG.debug("Going to recreate task entry for task " + taskId);
+        //check whether the job is still running
+        if (tip != null && 
+                tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
+          createTaskEntry(taskId, trackerName, tip);
+      }
+      ArrayList completedTasks = e.getCompletedTasks();
+      for (int i = 0; i < completedTasks.size(); i++) {
+        TaskStatus ts = (TaskStatus)completedTasks.get(i);
+        TaskInProgress tip = (TaskInProgress)taskidToTIPMap.get(ts.getTaskId());
+        if (tip == null) continue;
+        JobInProgress j = tip.getJob();
+        if (j != null && j.getStatus().getRunState() == JobStatus.RUNNING)
+          j.updateTaskStatus(tip, ts, myMetrics); 
+      }
+      return true;
+    }
     
     
     ///////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////
     // Maintain lookup tables; called by JobInProgress
     // Maintain lookup tables; called by JobInProgress
@@ -748,7 +803,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 } else {
                 } else {
                     // If not first contact, there should be some record of the tracker
                     // If not first contact, there should be some record of the tracker
                     if (!seenBefore) {
                     if (!seenBefore) {
-                        return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+                        if (!reinstateStateOfTaskTracker(trackerName))
+                          return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+                        else 
+                          trackerExpiryQueue.add(trackerStatus);
+                          
                     }
                     }
                 }
                 }
 
 
@@ -1197,4 +1256,67 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         Configuration conf=new Configuration();
         Configuration conf=new Configuration();
         startTracker(conf);
         startTracker(conf);
     }
     }
+
+    private class ExpiredTaskTrackerState {
+      //Map from taskId (assigned to a given tasktracker) to the taskId's TIP
+      private TreeMap trackerTaskIdToTIPMap = new TreeMap();
+      //completedTasks is an array list that contains the list of tasks that a
+      //tasktracker successfully completed
+      ArrayList completedTasks = new ArrayList();
+
+      public ExpiredTaskTrackerState(String trackerId) {
+        trackerTaskIdToTIPMap.clear();
+        completedTasks.clear();
+        TreeSet tasks = (TreeSet) trackerToTaskMap.get(trackerId);
+        if (tasks == null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("This tasktracker has no tasks");
+          return;
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Task IDs that this tasktracker has: ");
+        //We save the status of completed tasks only since TaskTrackers don't
+        //send updates about completed tasks. We don't need to save the status
+        //of other tasks since the TaskTracker will send the update along
+        //with the heartbeat (whenever that happens).
+        //Saving the status of completed tasks is required since the JobTracker
+        //will mark all tasks that belonged to a given TaskTracker as failed
+        //if that TaskTracker is lost. Now, if that same TaskTracker reports 
+        //in later on, we can simply re-mark the completed tasks (TIPs really) 
+        //it reported earlier about as "completed" and avoid unnecessary 
+        //re-run of those tasks.
+        for (Iterator it = tasks.iterator(); it.hasNext(); ) {
+          String taskId = (String) it.next();
+          if (LOG.isDebugEnabled())
+            LOG.debug(taskId);
+          TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+          if (tip !=null && 
+                  tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
+            trackerTaskIdToTIPMap.put(taskId, tip);
+          else continue;
+          TaskStatus ts = tip.getTaskStatus(taskId);
+          //ts could be null for a recently assigned task, in the case where,
+          //the tasktracker hasn't yet reported status about that task
+          if (ts == null) continue;
+          if (tip.isComplete()) {
+            TaskStatus saveTS = null;
+            try {
+              saveTS = (TaskStatus)ts.clone();
+            } catch (Exception e) {
+              LOG.fatal("Could not save TaskTracker state",e);
+            }
+            completedTasks.add(saveTS);
+          }
+        } 
+      }
+      public Set getTaskSet() {
+        return trackerTaskIdToTIPMap.keySet();
+      }
+      public TaskInProgress getTIP(String taskId) {
+        return (TaskInProgress)trackerTaskIdToTIPMap.get(taskId);
+      }
+      public ArrayList getCompletedTasks() {
+        return completedTasks;
+      }
+    }
 }
 }

+ 13 - 0
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -283,6 +283,12 @@ class TaskInProgress {
         
         
         taskStatuses.put(taskid, status);
         taskStatuses.put(taskid, status);
 
 
+        //since if this task was declared failed due to tasktracker getting 
+        //lost, but now that same tasktracker reports in with this taskId as 
+        //running, we update recentTasks
+        if (status.getRunState() == TaskStatus.RUNNING)
+          recentTasks.add(taskid);
+
         // Recompute progress
         // Recompute progress
         recomputeProgress();
         recomputeProgress();
         return changed;
         return changed;
@@ -470,4 +476,11 @@ class TaskInProgress {
     public int getIdWithinJob() {
     public int getIdWithinJob() {
       return partition;
       return partition;
     }
     }
+
+    /**
+     * Get the TaskStatus associated with a given taskId
+     */
+    public TaskStatus getTaskStatus(String taskId) {
+      return (TaskStatus)taskStatuses.get(taskId);
+    }
 }
 }

+ 11 - 1
src/java/org/apache/hadoop/mapred/TaskStatus.java

@@ -25,7 +25,7 @@ import java.io.*;
  *
  *
  * @author Mike Cafarella
  * @author Mike Cafarella
  **************************************************/
  **************************************************/
-class TaskStatus implements Writable {
+class TaskStatus implements Writable, Cloneable {
     public static final int RUNNING = 0;
     public static final int RUNNING = 0;
     public static final int SUCCEEDED = 1;
     public static final int SUCCEEDED = 1;
     public static final int FAILED = 2;
     public static final int FAILED = 2;
@@ -53,6 +53,16 @@ class TaskStatus implements Writable {
         this.taskTracker = taskTracker;
         this.taskTracker = taskTracker;
     }
     }
     
     
+    //Implementing the clone method so that we can save the status of tasks
+    public Object clone() throws CloneNotSupportedException {
+        TaskStatus ts = (TaskStatus)super.clone();
+        if (this.diagnosticInfo != null)
+          ts.diagnosticInfo = new String(this.diagnosticInfo);
+        if (this.stateString != null)
+          ts.stateString = new String(this.stateString);
+        return ts;
+    }
+    
     public String getTaskId() { return taskid; }
     public String getTaskId() { return taskid; }
     public boolean getIsMap() { return isMap; }
     public boolean getIsMap() { return isMap; }
     public float getProgress() { return progress; }
     public float getProgress() { return progress; }