|
@@ -245,6 +245,7 @@ public class CapacityScheduler extends
|
|
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
|
|
|
+ ".scheduling-interval-ms";
|
|
|
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
|
|
|
+ private long asyncMaxPendingBacklogs;
|
|
|
|
|
|
public CapacityScheduler() {
|
|
|
super(CapacityScheduler.class.getName());
|
|
@@ -354,6 +355,11 @@ public class CapacityScheduler extends
|
|
|
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
|
|
|
}
|
|
|
resourceCommitterService = new ResourceCommitterService(this);
|
|
|
+ asyncMaxPendingBacklogs = this.conf.getInt(
|
|
|
+ CapacitySchedulerConfiguration.
|
|
|
+ SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
|
|
|
+ CapacitySchedulerConfiguration.
|
|
|
+ DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
|
|
|
}
|
|
|
|
|
|
// Setup how many containers we can allocate for each round
|
|
@@ -541,7 +547,8 @@ public class CapacityScheduler extends
|
|
|
Thread.sleep(100);
|
|
|
} else {
|
|
|
// Don't run schedule if we have some pending backlogs already
|
|
|
- if (cs.getAsyncSchedulingPendingBacklogs() > 100) {
|
|
|
+ if (cs.getAsyncSchedulingPendingBacklogs()
|
|
|
+ > cs.asyncMaxPendingBacklogs) {
|
|
|
Thread.sleep(1);
|
|
|
} else{
|
|
|
schedule(cs);
|