|
@@ -654,6 +654,7 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
private long limitMaxMemForReduceTasks;
|
|
private long limitMaxMemForReduceTasks;
|
|
|
|
|
|
private volatile int maxTasksPerHeartbeat;
|
|
private volatile int maxTasksPerHeartbeat;
|
|
|
|
+ private volatile int maxTasksToAssignAfterOffSwitch;
|
|
|
|
|
|
public CapacityTaskScheduler() {
|
|
public CapacityTaskScheduler() {
|
|
this(new Clock());
|
|
this(new Clock());
|
|
@@ -880,7 +881,10 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
mapScheduler.initialize(queueInfoMap);
|
|
mapScheduler.initialize(queueInfoMap);
|
|
reduceScheduler.initialize(queueInfoMap);
|
|
reduceScheduler.initialize(queueInfoMap);
|
|
|
|
|
|
|
|
+ // scheduling tunables
|
|
maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
|
|
maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
|
|
|
|
+ maxTasksToAssignAfterOffSwitch =
|
|
|
|
+ schedConf.getMaxTasksToAssignAfterOffSwitch();
|
|
}
|
|
}
|
|
|
|
|
|
Map<String, CapacitySchedulerQueue>
|
|
Map<String, CapacitySchedulerQueue>
|
|
@@ -1064,6 +1068,7 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
throws IOException {
|
|
throws IOException {
|
|
int availableSlots = maxMapSlots - currentMapSlots;
|
|
int availableSlots = maxMapSlots - currentMapSlots;
|
|
boolean assignOffSwitch = true;
|
|
boolean assignOffSwitch = true;
|
|
|
|
+ int tasksToAssignAfterOffSwitch = this.maxTasksToAssignAfterOffSwitch;
|
|
while (availableSlots > 0) {
|
|
while (availableSlots > 0) {
|
|
mapScheduler.sortQueues();
|
|
mapScheduler.sortQueues();
|
|
TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker,
|
|
TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker,
|
|
@@ -1091,6 +1096,19 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
assignOffSwitch = false;
|
|
assignOffSwitch = false;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Respect limits on #tasks to assign after an off-switch task is assigned
|
|
|
|
+ if (!assignOffSwitch) {
|
|
|
|
+ if (tasksToAssignAfterOffSwitch == 0) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Hit limit of max tasks after off-switch: " +
|
|
|
|
+ this.maxTasksToAssignAfterOffSwitch + " after " +
|
|
|
|
+ tasks.size() + " maps.");
|
|
|
|
+ }
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ --tasksToAssignAfterOffSwitch;
|
|
|
|
+ }
|
|
|
|
+
|
|
// Assigned some slots
|
|
// Assigned some slots
|
|
availableSlots -= t.getNumSlotsRequired();
|
|
availableSlots -= t.getNumSlotsRequired();
|
|
|
|
|