|
@@ -23,10 +23,12 @@ import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
|
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
@@ -67,7 +69,7 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|
//received from a task.
|
|
//received from a task.
|
|
private Thread lostTaskCheckerThread;
|
|
private Thread lostTaskCheckerThread;
|
|
private volatile boolean stopped;
|
|
private volatile boolean stopped;
|
|
- private int taskTimeOut = 5 * 60 * 1000;// 5 mins
|
|
|
|
|
|
+ private long taskTimeOut;
|
|
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
|
|
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
|
|
|
|
|
|
private final EventHandler eventHandler;
|
|
private final EventHandler eventHandler;
|
|
@@ -87,7 +89,19 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|
@Override
|
|
@Override
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
super.serviceInit(conf);
|
|
super.serviceInit(conf);
|
|
- taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
|
|
|
|
|
|
+ taskTimeOut = conf.getLong(
|
|
|
|
+ MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
|
|
|
|
+
|
|
|
|
+ // enforce task timeout is at least twice as long as task report interval
|
|
|
|
+ long taskProgressReportIntervalMillis = MRJobConfUtil.
|
|
|
|
+ getTaskProgressReportInterval(conf);
|
|
|
|
+ long minimumTaskTimeoutAllowed = taskProgressReportIntervalMillis * 2;
|
|
|
|
+ if(taskTimeOut < minimumTaskTimeoutAllowed) {
|
|
|
|
+ taskTimeOut = minimumTaskTimeoutAllowed;
|
|
|
|
+ LOG.info("Task timeout must be as least twice as long as the task " +
|
|
|
|
+ "status report interval. Setting task timeout to " + taskTimeOut);
|
|
|
|
+ }
|
|
|
|
+
|
|
taskTimeOutCheckInterval =
|
|
taskTimeOutCheckInterval =
|
|
conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
|
|
conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
|
|
}
|
|
}
|
|
@@ -140,7 +154,7 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|
|
|
|
|
while (iterator.hasNext()) {
|
|
while (iterator.hasNext()) {
|
|
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
|
|
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
|
|
- boolean taskTimedOut = (taskTimeOut > 0) &&
|
|
|
|
|
|
+ boolean taskTimedOut = (taskTimeOut > 0) &&
|
|
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
|
|
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
|
|
|
|
|
|
if(taskTimedOut) {
|
|
if(taskTimedOut) {
|
|
@@ -163,4 +177,8 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public long getTaskTimeOut() {
|
|
|
|
+ return taskTimeOut;
|
|
|
|
+ }
|
|
}
|
|
}
|