浏览代码

HADOOP-491. Change mapred.task.timeout to be per-job. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@506745 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
5f4b37d6d3

+ 4 - 0
CHANGES.txt

@@ -35,6 +35,10 @@ Trunk (unreleased changes)
 10. HADOOP-1007. Make names of metrics used in Hadoop unique.
     (Nigel Daley via cutting)
 
+11. HADOOP-491.  Change mapred.task.timeout to be per-job, and make a
+    value of zero mean no timeout.  Also change contrib/streaming to
+    disable task timeouts.  (Arun C Murthy via cutting)
+
 
 Release 0.11.1 - 2007-02-09
 

+ 3 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -514,6 +514,9 @@ public class StreamJob {
 
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
+    
+    // All streaming jobs have, by default, no time-out for tasks
+    jobConf_.setLong("mapred.task.timeout", 0);
 
     setUserJobConfProps(true);
 

+ 39 - 15
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -53,7 +53,6 @@ import org.apache.hadoop.net.DNS;
 public class TaskTracker 
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
     static final long WAIT_FOR_DONE = 3 * 1000;
-    private long taskTimeout; 
     private int httpPort;
 
     static enum State {NORMAL, STALE, INTERRUPTED}
@@ -435,7 +434,6 @@ public class TaskTracker
       maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
       this.fConf = conf;
       this.jobTrackAddr = JobTracker.getAddress(conf);
-      this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
       this.mapOutputFile = new MapOutputFile();
       this.mapOutputFile.setConf(conf);
       int httpPort = conf.getInt("tasktracker.http.port", 50060);
@@ -643,20 +641,29 @@ public class TaskTracker
      */
     private synchronized void markUnresponsiveTasks() throws IOException {
       long now = System.currentTimeMillis();
-        for (TaskInProgress tip: runningTasks.values()) {
-            long timeSinceLastReport = now - tip.getLastProgressReport();
-            if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
-                (timeSinceLastReport > this.taskTimeout) &&
-                !tip.wasKilled) {
-                String msg = "Task failed to report status for " +
-                             (timeSinceLastReport / 1000) + 
-                             " seconds. Killing.";
-                LOG.info(tip.getTask().getTaskId() + ": " + msg);
-                ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
-                tip.reportDiagnosticInfo(msg);
-                purgeTask(tip);
-            }
+      for (TaskInProgress tip: runningTasks.values()) {
+        if (tip.getRunState() == TaskStatus.State.RUNNING) {
+          // Check the per-job timeout interval for tasks;
+          // an interval of '0' implies it is never timed-out
+          long jobTaskTimeout = tip.getTaskTimeout();
+          if (jobTaskTimeout == 0) {
+            continue;
+          }
+          
+          // Check if the task has not reported progress for a 
+          // time-period greater than the configured time-out
+          long timeSinceLastReport = now - tip.getLastProgressReport();
+          if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
+            String msg = "Task failed to report status for " +
+            (timeSinceLastReport / 1000) + 
+            " seconds. Killing.";
+            LOG.info(tip.getTask().getTaskId() + ": " + msg);
+            ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
+            tip.reportDiagnosticInfo(msg);
+            purgeTask(tip);
+          }
         }
+      }
     }
 
     /**
@@ -902,6 +909,7 @@ public class TaskTracker
         private boolean alwaysKeepTaskFiles;
         private TaskStatus taskStatus ; 
         private boolean keepJobFiles;
+        private long taskTimeout;
         
         /**
          */
@@ -920,6 +928,7 @@ public class TaskTracker
                  getName(), task.isMapTask()? TaskStatus.Phase.MAP:
                    TaskStatus.Phase.SHUFFLE); 
             keepJobFiles = false;
+            taskTimeout = (10 * 60 * 1000);
         }
         
         private void localizeTask(Task task) throws IOException{
@@ -964,6 +973,12 @@ public class TaskTracker
         public void setJobConf(JobConf lconf){
             this.localJobConf = lconf;
             keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+            taskTimeout = localJobConf.getLong("mapred.task.timeout", 
+                                                10 * 60 * 1000);
+        }
+        
+        public JobConf getJobConf() {
+          return localJobConf;
         }
         
         /**
@@ -1024,6 +1039,15 @@ public class TaskTracker
             return runstate;
         }
 
+        /**
+         * The task's configured timeout.
+         * 
+         * @return the task's configured timeout.
+         */
+        public long getTaskTimeout() {
+          return taskTimeout;
+        }
+        
         /**
          * The task has reported some diagnostic info about its status
          */