|
@@ -213,9 +213,11 @@ public class TaskTracker
|
|
|
shuffleMetricsRecord.update();
|
|
|
}
|
|
|
}
|
|
|
- private class TaskTrackerMetrics implements Updater {
|
|
|
+ public class TaskTrackerMetrics implements Updater {
|
|
|
private MetricsRecord metricsRecord = null;
|
|
|
private int numCompletedTasks = 0;
|
|
|
+ private int timedoutTasks = 0;
|
|
|
+ private int tasksFailedPing = 0;
|
|
|
|
|
|
TaskTrackerMetrics() {
|
|
|
JobConf conf = getJobConf();
|
|
@@ -232,6 +234,15 @@ public class TaskTracker
|
|
|
synchronized void completeTask() {
|
|
|
++numCompletedTasks;
|
|
|
}
|
|
|
+
|
|
|
+ synchronized void timedoutTask() {
|
|
|
+ ++timedoutTasks;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void taskFailedPing() {
|
|
|
+ ++tasksFailedPing;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Since this object is a registered updater, this method will be called
|
|
|
* periodically, e.g. every 5 seconds.
|
|
@@ -243,15 +254,23 @@ public class TaskTracker
|
|
|
metricsRecord.setMetric("reduces_running", reduceTotal);
|
|
|
metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks);
|
|
|
metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
|
|
|
- metricsRecord.update();
|
|
|
+ metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
|
|
|
+ metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
|
|
|
}
|
|
|
numCompletedTasks = 0;
|
|
|
+ timedoutTasks = 0;
|
|
|
+ tasksFailedPing = 0;
|
|
|
}
|
|
|
+ metricsRecord.update();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private TaskTrackerMetrics myMetrics = null;
|
|
|
|
|
|
+ public TaskTrackerMetrics getTaskTrackerMetrics() {
|
|
|
+ return myMetrics;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* A list of tips that should be cleaned up.
|
|
|
*/
|
|
@@ -991,6 +1010,7 @@ public class TaskTracker
|
|
|
LOG.info(tip.getTask().getTaskId() + ": " + msg);
|
|
|
ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
|
|
|
tip.reportDiagnosticInfo(msg);
|
|
|
+ myMetrics.timedoutTask();
|
|
|
purgeTask(tip, true);
|
|
|
}
|
|
|
}
|