|
@@ -218,7 +218,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
tip.isMapTask()? TaskStatus.Phase.MAP:
|
|
|
TaskStatus.Phase.STARTING,
|
|
|
TaskStatus.State.FAILED,
|
|
|
- trackerName, myMetrics);
|
|
|
+ trackerName, myInstrumentation);
|
|
|
}
|
|
|
itr.remove();
|
|
|
} else {
|
|
@@ -393,83 +393,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static class JobTrackerMetrics implements Updater {
|
|
|
- private MetricsRecord metricsRecord = null;
|
|
|
- private int numMapTasksLaunched = 0;
|
|
|
- private int numMapTasksCompleted = 0;
|
|
|
- private int numReduceTasksLaunched = 0;
|
|
|
- private int numReduceTasksCompleted = 0;
|
|
|
- private int numJobsSubmitted = 0;
|
|
|
- private int numJobsCompleted = 0;
|
|
|
- private JobTracker tracker;
|
|
|
-
|
|
|
- JobTrackerMetrics(JobTracker tracker, JobConf conf) {
|
|
|
- String sessionId = conf.getSessionId();
|
|
|
- // Initiate JVM Metrics
|
|
|
- JvmMetrics.init("JobTracker", sessionId);
|
|
|
- // Create a record for map-reduce metrics
|
|
|
- MetricsContext context = MetricsUtil.getContext("mapred");
|
|
|
- metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
|
|
|
- metricsRecord.setTag("sessionId", sessionId);
|
|
|
- this.tracker = tracker;
|
|
|
- context.registerUpdater(this);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Since this object is a registered updater, this method will be called
|
|
|
- * periodically, e.g. every 5 seconds.
|
|
|
- */
|
|
|
- public void doUpdates(MetricsContext unused) {
|
|
|
- synchronized (this) {
|
|
|
- metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
|
|
|
- metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
|
|
|
- metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
|
|
|
- metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
|
|
|
- metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
|
|
|
- metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
|
|
|
-
|
|
|
- numMapTasksLaunched = 0;
|
|
|
- numMapTasksCompleted = 0;
|
|
|
- numReduceTasksLaunched = 0;
|
|
|
- numReduceTasksCompleted = 0;
|
|
|
- numJobsSubmitted = 0;
|
|
|
- numJobsCompleted = 0;
|
|
|
- }
|
|
|
- metricsRecord.update();
|
|
|
-
|
|
|
- if (tracker != null) {
|
|
|
- for (JobInProgress jip : tracker.getRunningJobs()) {
|
|
|
- jip.updateMetrics();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void launchMap() {
|
|
|
- ++numMapTasksLaunched;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void completeMap() {
|
|
|
- ++numMapTasksCompleted;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void launchReduce() {
|
|
|
- ++numReduceTasksLaunched;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void completeReduce() {
|
|
|
- ++numReduceTasksCompleted;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void submitJob() {
|
|
|
- ++numJobsSubmitted;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void completeJob() {
|
|
|
- ++numJobsCompleted;
|
|
|
- }
|
|
|
- }
|
|
|
+
|
|
|
|
|
|
- private JobTrackerMetrics myMetrics = null;
|
|
|
+ private JobTrackerInstrumentation myInstrumentation = null;
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////
|
|
|
// The real JobTracker
|
|
@@ -661,7 +587,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
|
|
|
trackerIdentifier = dateFormat.format(new Date());
|
|
|
|
|
|
- myMetrics = new JobTrackerMetrics(this, jobConf);
|
|
|
+ Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
|
|
|
+ try {
|
|
|
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
|
|
|
+ metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
|
|
|
+ this.myInstrumentation = c.newInstance(this, jobConf);
|
|
|
+ } catch(Exception e) {
|
|
|
+ //Reflection can throw lots of exceptions -- handle them all by
|
|
|
+ //falling back on the default.
|
|
|
+ LOG.error("failed to initialize job tracker metrics", e);
|
|
|
+ this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
// The rpc/web-server ports can be ephemeral ports...
|
|
|
// ... ensure we have the correct info
|
|
@@ -728,6 +665,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
LOG.info("Starting RUNNING");
|
|
|
}
|
|
|
|
|
|
+ public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
|
|
|
+ return conf.getClass("mapred.jobtracker.instrumentation",
|
|
|
+ JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
|
|
|
+ conf.setClass("mapred.jobtracker.instrumentation",
|
|
|
+ t, JobTrackerInstrumentation.class);
|
|
|
+ }
|
|
|
+
|
|
|
public static InetSocketAddress getAddress(Configuration conf) {
|
|
|
String jobTrackerStr =
|
|
|
conf.get("mapred.job.tracker", "localhost:8012");
|
|
@@ -1290,9 +1237,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
for (Task task : tasks) {
|
|
|
expireLaunchingTasks.addNewTask(task.getTaskID());
|
|
|
if (task.isMapTask()) {
|
|
|
- myMetrics.launchMap();
|
|
|
+ myInstrumentation.launchMap(task.getTaskID());
|
|
|
} else {
|
|
|
- myMetrics.launchReduce();
|
|
|
+ myInstrumentation.launchReduce(task.getTaskID());
|
|
|
}
|
|
|
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
|
|
|
actions.add(new LaunchTaskAction(task));
|
|
@@ -1620,7 +1567,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- myMetrics.submitJob();
|
|
|
+ myInstrumentation.submitJob();
|
|
|
return job.getStatus();
|
|
|
}
|
|
|
|
|
@@ -1880,7 +1827,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID());
|
|
|
} else {
|
|
|
expireLaunchingTasks.removeTask(taskId);
|
|
|
- tip.getJob().updateTaskStatus(tip, report, myMetrics);
|
|
|
+ tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
|
|
|
}
|
|
|
|
|
|
// Process 'failed fetch' notifications
|
|
@@ -1898,7 +1845,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
failedFetchMap.getJob().fetchFailureNotification(failedFetchMap,
|
|
|
mapTaskId,
|
|
|
failedFetchTrackerName,
|
|
|
- myMetrics);
|
|
|
+ myInstrumentation);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1934,7 +1881,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
(tip.isMapTask() ?
|
|
|
TaskStatus.Phase.MAP :
|
|
|
TaskStatus.Phase.REDUCE),
|
|
|
- TaskStatus.State.KILLED, trackerName, myMetrics);
|
|
|
+ TaskStatus.State.KILLED, trackerName, myInstrumentation);
|
|
|
jobsWithFailures.add(job);
|
|
|
}
|
|
|
} else {
|
|
@@ -2006,7 +1953,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
JobInProgress[] jobs = new JobInProgress[jobList.size()];
|
|
|
TaskInProgress[] tips = new TaskInProgress[jobList.size()];
|
|
|
TaskAttemptID[] taskids = new TaskAttemptID[jobList.size()];
|
|
|
- JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
|
|
|
+ JobTrackerInstrumentation[] metrics = new JobTrackerInstrumentation[jobList.size()];
|
|
|
|
|
|
Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
|
|
|
int count = 0;
|
|
@@ -2169,4 +2116,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
}
|