|
@@ -36,8 +36,9 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
private int numReduceTasksFailed = 0;
|
|
|
private int numJobsSubmitted = 0;
|
|
|
private int numJobsCompleted = 0;
|
|
|
- private int numWaitingTasks = 0;
|
|
|
-
|
|
|
+ private int numWaitingMaps = 0;
|
|
|
+ private int numWaitingReduces = 0;
|
|
|
+
|
|
|
public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
|
|
|
super(tracker, conf);
|
|
|
String sessionId = conf.getSessionId();
|
|
@@ -64,7 +65,8 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
|
|
|
metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
|
|
|
metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
|
|
|
- metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
|
|
|
+ metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
|
|
|
+ metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
|
|
|
|
|
|
numMapTasksLaunched = 0;
|
|
|
numMapTasksCompleted = 0;
|
|
@@ -72,9 +74,10 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
numReduceTasksLaunched = 0;
|
|
|
numReduceTasksCompleted = 0;
|
|
|
numReduceTasksFailed = 0;
|
|
|
- numWaitingTasks = 0;
|
|
|
numJobsSubmitted = 0;
|
|
|
numJobsCompleted = 0;
|
|
|
+ numWaitingMaps = 0;
|
|
|
+ numWaitingReduces = 0;
|
|
|
}
|
|
|
metricsRecord.update();
|
|
|
|
|
@@ -88,7 +91,7 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
@Override
|
|
|
public synchronized void launchMap(TaskAttemptID taskAttemptID) {
|
|
|
++numMapTasksLaunched;
|
|
|
- decWaiting(taskAttemptID.getJobID(), 1);
|
|
|
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -99,13 +102,13 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
@Override
|
|
|
public synchronized void failedMap(TaskAttemptID taskAttemptID) {
|
|
|
++numMapTasksFailed;
|
|
|
- addWaiting(taskAttemptID.getJobID(), 1);
|
|
|
+ addWaitingMaps(taskAttemptID.getJobID(), 1);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
|
|
|
++numReduceTasksLaunched;
|
|
|
- decWaiting(taskAttemptID.getJobID(), 1);
|
|
|
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -116,7 +119,7 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
@Override
|
|
|
public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
|
|
|
++numReduceTasksFailed;
|
|
|
- addWaiting(taskAttemptID.getJobID(), 1);
|
|
|
+ addWaitingReduces(taskAttemptID.getJobID(), 1);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -130,12 +133,22 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void addWaiting(JobID id, int tasks) {
|
|
|
- numWaitingTasks += tasks;
|
|
|
+ public synchronized void addWaitingMaps(JobID id, int task) {
|
|
|
+ numWaitingMaps += task;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void decWaitingMaps(JobID id, int task) {
|
|
|
+ numWaitingMaps -= task;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void addWaitingReduces(JobID id, int task) {
|
|
|
+ numWaitingReduces += task;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
- public synchronized void decWaiting(JobID id, int tasks) {
|
|
|
- numWaitingTasks -= tasks;
|
|
|
+ public synchronized void decWaitingReduces(JobID id, int task){
|
|
|
+ numWaitingReduces -= task;
|
|
|
}
|
|
|
}
|