浏览代码

HADOOP-185. Fix so that, if a task tracker times out making the RPC asking for a new task to run, the job tracker does not think that it is actually running the task returned (but never received). Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@399065 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 年之前
父节点
当前提交
c9dbe05377
共有 2 个文件被更改,包括 108 次插入10 次删除
  1. 4 0
      CHANGES.txt
  2. 104 10
      src/java/org/apache/hadoop/mapred/JobTracker.java

+ 4 - 0
CHANGES.txt

@@ -151,6 +151,10 @@ Trunk (unreleased)
     files containing random data.  The second sorts the output of the
     files containing random data.  The second sorts the output of the
     first.  (omalley via cutting)
     first.  (omalley via cutting)
 
 
+40. HADOOP-185.  Fix so that, when a task tracker times out making the
+    RPC asking for a new task to run, the job tracker does not think
+    that it is actually running the task returned.  (omalley via cutting)
+
 Release 0.1.1 - 2006-04-08
 Release 0.1.1 - 2006-04-08
 
 
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)

+ 104 - 10
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -76,6 +76,93 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         return tracker;
         return tracker;
     }
     }
 
 
+    /**
+     * A thread to timeout tasks that have been assigned to task trackers,
+     * but that haven't reported back yet.
+     * Note that I included a stop() method, even though there is no place
+     * where JobTrackers are cleaned up.
+     * @author Owen O'Malley
+     */
+    private class ExpireLaunchingTasks implements Runnable {
+      private volatile boolean shouldRun = true;
+      /**
+       * This is a map of the tasks that have been assigned to task trackers,
+       * but that have not yet been seen in a status report.
+       * map: task-id (String) -> time-assigned (Long)
+       */
+      private Map launchingTasks = new LinkedHashMap();
+      private static final String errorMsg = "Error launching task";
+      private static final String errorHost = "n/a";
+      
+      public void run() {
+        try {
+          while (shouldRun) {
+            // Every 3 minutes check for any tasks that are overdue
+            Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
+            long now = System.currentTimeMillis();
+            LOG.fine("Starting launching task sweep");
+            synchronized (launchingTasks) {
+              Iterator itr = launchingTasks.entrySet().iterator();
+              while (itr.hasNext()) {
+                Map.Entry pair = (Map.Entry) itr.next();
+                String taskId = (String) pair.getKey();
+                long age = now - ((Long) pair.getValue()).longValue();
+                LOG.fine(taskId + " is " + age + " ms old.");
+                if (age > TASKTRACKER_EXPIRY_INTERVAL) {
+                  LOG.info("Launching task " + taskId + " timed out.");
+                  TaskInProgress tip = null;
+                  synchronized (JobTracker.this) {
+                    tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+                  }
+                  if (tip != null) {
+                    synchronized (tip) {
+                      JobInProgress job = tip.getJob();
+                      // record why the job failed, so that the user can
+                      // see the problem
+                      TaskStatus status = 
+                        new TaskStatus(taskId,
+                                       tip.isMapTask(),
+                                       0.0f,
+                                       TaskStatus.FAILED,
+                                       errorMsg,
+                                       errorMsg,
+                                       errorHost);
+                      tip.updateStatus(status);
+                      job.failedTask(tip, taskId, errorHost);
+                    }
+                  }
+                  itr.remove();
+                } else {
+                  // the tasks are sorted by start time, so once we find
+                  // one that we want to keep, we are done for this cycle.
+                  break;
+                }
+              }
+            }
+          }
+        } catch (InterruptedException ie) {
+          // all done
+        }
+      }
+      
+      public void addNewTask(String taskName) {
+        synchronized (launchingTasks) {
+          launchingTasks.put(taskName, 
+                             Long.valueOf(System.currentTimeMillis()));
+        }
+      }
+      
+      public void removeTask(String taskName) {
+        synchronized (launchingTasks) {
+          launchingTasks.remove(taskName);
+        }
+      }
+      
+      public void stop() {
+        shouldRun = false;
+      }
+    }
+    
     ///////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////
     // Used to expire TaskTrackers that have gone down
     // Used to expire TaskTrackers that have gone down
     ///////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////
@@ -277,7 +364,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     ExpireTrackers expireTrackers = new ExpireTrackers();
     ExpireTrackers expireTrackers = new ExpireTrackers();
     RetireJobs retireJobs = new RetireJobs();
     RetireJobs retireJobs = new RetireJobs();
     JobInitThread initJobs = new JobInitThread();
     JobInitThread initJobs = new JobInitThread();
-
+    ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+    Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
+    
     /**
     /**
      * It might seem like a bug to maintain a TreeSet of status objects,
      * It might seem like a bug to maintain a TreeSet of status objects,
      * which can be updated at any time.  But that's not what happens!  We
      * which can be updated at any time.  But that's not what happens!  We
@@ -346,12 +435,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         this.port = addr.getPort();
         this.port = addr.getPort();
         this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
         this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
         this.interTrackerServer.start();
         this.interTrackerServer.start();
-	Properties p = System.getProperties();
-	for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
-	    String key = (String) it.next();
-	    String val = (String) p.getProperty(key);
-	    LOG.info("Property '" + key + "' is " + val);
-	}
+        Properties p = System.getProperties();
+        for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
+          String key = (String) it.next();
+          String val = (String) p.getProperty(key);
+          LOG.info("Property '" + key + "' is " + val);
+        }
 
 
         this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
         this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
         this.infoServer = new JobTrackerInfoServer(this, infoPort);
         this.infoServer = new JobTrackerInfoServer(this, infoPort);
@@ -362,6 +451,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         new Thread(this.expireTrackers).start();
         new Thread(this.expireTrackers).start();
         new Thread(this.retireJobs).start();
         new Thread(this.retireJobs).start();
         new Thread(this.initJobs).start();
         new Thread(this.initJobs).start();
+        expireLaunchingTaskThread.start();
     }
     }
 
 
     public static InetSocketAddress getAddress(Configuration conf) {
     public static InetSocketAddress getAddress(Configuration conf) {
@@ -622,7 +712,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
 
                     Task t = job.obtainNewMapTask(taskTracker, tts);
                     Task t = job.obtainNewMapTask(taskTracker, tts);
                     if (t != null) {
                     if (t != null) {
-                        return t;
+                      expireLaunchingTasks.addNewTask(t.getTaskId());
+                      return t;
                     }
                     }
 
 
                     //
                     //
@@ -656,7 +747,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
 
                     Task t = job.obtainNewReduceTask(taskTracker, tts);
                     Task t = job.obtainNewReduceTask(taskTracker, tts);
                     if (t != null) {
                     if (t != null) {
-                        return t;
+                      expireLaunchingTasks.addNewTask(t.getTaskId());
+                      return t;
                     }
                     }
 
 
                     //
                     //
@@ -878,10 +970,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         for (Iterator it = status.taskReports(); it.hasNext(); ) {
         for (Iterator it = status.taskReports(); it.hasNext(); ) {
             TaskStatus report = (TaskStatus) it.next();
             TaskStatus report = (TaskStatus) it.next();
             report.setHostname(status.getHost());
             report.setHostname(status.getHost());
-            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(report.getTaskId());
+            String taskId = report.getTaskId();
+            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
             if (tip == null) {
             if (tip == null) {
                 LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());
                 LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());
             } else {
             } else {
+                expireLaunchingTasks.removeTask(taskId);
                 JobInProgress job = tip.getJob();
                 JobInProgress job = tip.getJob();
                 job.updateTaskStatus(tip, report);
                 job.updateTaskStatus(tip, report);