浏览代码

MAPREDUCE-3528. Fixed TaskHeartBeatHandler to use a new configuration for the thread loop interval separate from task-timeout configuration property. (Siddharth Seth via vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229403 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 年之前
父节点
当前提交
428529b58f

+ 6 - 1
hadoop-mapreduce-project/CHANGES.txt

@@ -156,10 +156,15 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3547. Added a bunch of unit tests for the the RM/NM webservices.
     (Thomas Graves via acmurthy)
 
-    MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh)
+    MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block
+    size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh)
 
     MAPREDUCE-3478. Cannot build against ZooKeeper 3.4.0. (Tom White via mahadev)
 
+    MAPREDUCE-3528. Fixed TaskHeartBeatHandler to use a new configuration
+    for the thread loop interval separate from task-timeout configuration
+    property. (Siddharth Seth via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

+ 9 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java

@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.service.AbstractService;
  * not hear from it for a long time.
  * 
  */
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TaskHeartbeatHandler extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
@@ -48,7 +50,8 @@ public class TaskHeartbeatHandler extends AbstractService {
   //received from a task.
   private Thread lostTaskCheckerThread;
   private volatile boolean stopped;
-  private int taskTimeOut = 5*60*1000;//5 mins
+  private int taskTimeOut = 5 * 60 * 1000;// 5 mins
+  private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
 
   private final EventHandler eventHandler;
   private final Clock clock;
@@ -64,8 +67,10 @@ public class TaskHeartbeatHandler extends AbstractService {
 
   @Override
   public void init(Configuration conf) {
-   super.init(conf);
-   taskTimeOut = conf.getInt("mapreduce.task.timeout", 5*60*1000);
+    super.init(conf);
+    taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+    taskTimeOutCheckInterval =
+        conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
   }
 
   @Override
@@ -125,7 +130,7 @@ public class TaskHeartbeatHandler extends AbstractService {
           }
         }
         try {
-          Thread.sleep(taskTimeOut);
+          Thread.sleep(taskTimeOutCheckInterval);
         } catch (InterruptedException e) {
           LOG.info("TaskHeartbeatHandler thread interrupted");
           break;

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -156,6 +156,8 @@ public interface MRJobConfig {
   
   public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
 
+  public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
+  
   public static final String TASK_ID = "mapreduce.task.id";
 
   public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";