|
@@ -152,7 +152,8 @@ public class TaskTracker
|
|
|
private static final String JOBCACHE = "jobcache";
|
|
|
private JobConf fConf;
|
|
|
private MapOutputFile mapOutputFile;
|
|
|
- private int maxCurrentTasks;
|
|
|
+ private int maxCurrentMapTasks;
|
|
|
+ private int maxCurrentReduceTasks;
|
|
|
private int failures;
|
|
|
private int finishedCount[] = new int[1];
|
|
|
private MapEventsFetcherThread mapEventsFetcher;
|
|
@@ -259,7 +260,9 @@ public class TaskTracker
|
|
|
if (metricsRecord != null) {
|
|
|
metricsRecord.setMetric("maps_running", mapTotal);
|
|
|
metricsRecord.setMetric("reduces_running", reduceTotal);
|
|
|
- metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks);
|
|
|
+ metricsRecord.setMetric("mapTaskSlots", (short)maxCurrentMapTasks);
|
|
|
+ metricsRecord.setMetric("reduceTaskSlots",
|
|
|
+ (short)maxCurrentReduceTasks);
|
|
|
metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
|
|
|
metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
|
|
|
metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
|
|
@@ -414,8 +417,10 @@ public class TaskTracker
|
|
|
this.fConf.get("mapred.task.tracker.report.bindAddress", "127.0.0.1");
|
|
|
|
|
|
// RPC initialization
|
|
|
+ int max = maxCurrentMapTasks > maxCurrentReduceTasks ?
|
|
|
+ maxCurrentMapTasks : maxCurrentReduceTasks;
|
|
|
this.taskReportServer =
|
|
|
- RPC.getServer(this, bindAddress, 0, maxCurrentTasks, false, this.fConf);
|
|
|
+ RPC.getServer(this, bindAddress, 0, max, false, this.fConf);
|
|
|
this.taskReportServer.start();
|
|
|
|
|
|
// get the assigned address
|
|
@@ -693,11 +698,46 @@ public class TaskTracker
|
|
|
this.mapEventsFetcher.interrupt();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Handles deprecated "mapred.tasktracker.tasks.maximum"
|
|
|
+ * @param newMax new max values specified through
|
|
|
+ * mapred.tasktracker.map.tasks.maximum or
|
|
|
+ * mapred.tasktracker.reduce.tasks.maximum
|
|
|
+ * @param oldMax old max value specified through
|
|
|
+ * mapred.tasktracker.tasks.maximum
|
|
|
+ * @param def default value if max tasks not specified at all.
|
|
|
+ * @return new value supercedes old value. If both new and old values
|
|
|
+ * are not set, default value is returned.
|
|
|
+ */
|
|
|
+ private int handleDeprecatedMaxTasks(String newMax,
|
|
|
+ String oldMax,
|
|
|
+ int def) {
|
|
|
+ try {
|
|
|
+ if (newMax != null) {
|
|
|
+ return Integer.parseInt(newMax);
|
|
|
+ }
|
|
|
+ if (oldMax != null ) {
|
|
|
+ LOG.warn("mapred.tasktracker.tasks.maximum is deprecated. Use " +
|
|
|
+ "mapred.tasktracker.map.tasks.maximum and " +
|
|
|
+ "mapred.tasktracker.reduce.tasks.maximum instead.");
|
|
|
+ return Integer.parseInt(oldMax);
|
|
|
+ }
|
|
|
+ } catch (NumberFormatException ne) {
|
|
|
+ return def;
|
|
|
+ }
|
|
|
+ return def;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start with the local machine name, and the default JobTracker
|
|
|
*/
|
|
|
public TaskTracker(JobConf conf) throws IOException {
|
|
|
- maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
|
|
|
+ maxCurrentMapTasks = handleDeprecatedMaxTasks(
|
|
|
+ conf.get("mapred.tasktracker.map.tasks.maximum"),
|
|
|
+ conf.get("mapred.tasktracker.tasks.maximum"), 2);
|
|
|
+ maxCurrentReduceTasks = handleDeprecatedMaxTasks(
|
|
|
+ conf.get("mapred.tasktracker.reduce.tasks.maximum"),
|
|
|
+ conf.get("mapred.tasktracker.tasks.maximum"), 2);
|
|
|
this.fConf = conf;
|
|
|
this.jobTrackAddr = JobTracker.getAddress(conf);
|
|
|
this.mapOutputFile = new MapOutputFile();
|
|
@@ -868,7 +908,9 @@ public class TaskTracker
|
|
|
synchronized (this) {
|
|
|
status = new TaskTrackerStatus(taskTrackerName, localHostname,
|
|
|
httpPort, cloneAndResetRunningTaskStatuses(),
|
|
|
- failures, maxCurrentTasks);
|
|
|
+ failures,
|
|
|
+ maxCurrentMapTasks,
|
|
|
+ maxCurrentReduceTasks);
|
|
|
}
|
|
|
} else {
|
|
|
LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
|
|
@@ -881,8 +923,8 @@ public class TaskTracker
|
|
|
boolean askForNewTask;
|
|
|
long localMinSpaceStart;
|
|
|
synchronized (this) {
|
|
|
- askForNewTask = (mapTotal < maxCurrentTasks ||
|
|
|
- reduceTotal < maxCurrentTasks) &&
|
|
|
+ askForNewTask = (mapTotal < maxCurrentMapTasks ||
|
|
|
+ reduceTotal < maxCurrentReduceTasks) &&
|
|
|
acceptNewTasks;
|
|
|
localMinSpaceStart = minSpaceStart;
|
|
|
}
|