|
@@ -57,6 +57,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
|
|
|
TreeMap tasks = null;
|
|
|
TreeMap runningTasks = null;
|
|
|
+ int mapTotal = 0;
|
|
|
+ int reduceTotal = 0;
|
|
|
boolean justStarted = true;
|
|
|
|
|
|
static Random r = new Random();
|
|
@@ -224,6 +226,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
TaskStatus status = tip.createStatus();
|
|
|
taskReports.add(status);
|
|
|
if (status.getRunState() != TaskStatus.RUNNING) {
|
|
|
+ if (tip.getTask().isMapTask()) {
|
|
|
+ mapTotal--;
|
|
|
+ } else {
|
|
|
+ reduceTotal--;
|
|
|
+ }
|
|
|
it.remove();
|
|
|
}
|
|
|
}
|
|
@@ -246,12 +253,17 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
//
|
|
|
// Check if we should create a new Task
|
|
|
//
|
|
|
- if (runningTasks.size() < maxCurrentTasks) {
|
|
|
+ if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
|
|
|
Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
if (t != null) {
|
|
|
TaskInProgress tip = new TaskInProgress(t, this.fConf);
|
|
|
synchronized (this) {
|
|
|
tasks.put(t.getTaskId(), tip);
|
|
|
+ if (t.isMapTask()) {
|
|
|
+ mapTotal++;
|
|
|
+ } else {
|
|
|
+ reduceTotal++;
|
|
|
+ }
|
|
|
runningTasks.put(t.getTaskId(), tip);
|
|
|
}
|
|
|
tip.launchTask();
|