|
@@ -282,12 +282,26 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
|
|
else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
|
|
- // neither needs to reclaim. look at how much capacity they've filled
|
|
|
|
- double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
|
|
|
|
- double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
|
|
|
|
- if (r1<r2) return -1;
|
|
|
|
- else if (r1>r2) return 1;
|
|
|
|
- else return 0;
|
|
|
|
|
|
+ // neither needs to reclaim. If either doesn't have a capacity yet,
|
|
|
|
+ // it comes at the end of the queue.
|
|
|
|
+ if ((q1.guaranteedCapacity == 0) &&
|
|
|
|
+ (q2.guaranteedCapacity != 0)) {
|
|
|
|
+ return 1;
|
|
|
|
+ } else if ((q1.guaranteedCapacity != 0) &&
|
|
|
|
+ (q2.guaranteedCapacity == 0)) {
|
|
|
|
+ return -1;
|
|
|
|
+ } else if ((q1.guaranteedCapacity == 0) &&
|
|
|
|
+ (q2.guaranteedCapacity == 0)) {
|
|
|
|
+ // both don't have capacities, treat them as equal.
|
|
|
|
+ return 0;
|
|
|
|
+ } else {
|
|
|
|
+ // look at how much capacity they've filled
|
|
|
|
+ double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
|
|
|
|
+ double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
|
|
|
|
+ if (r1<r2) return -1;
|
|
|
|
+ else if (r1>r2) return 1;
|
|
|
|
+ else return 0;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
// both have to reclaim. Look at which one needs to reclaim earlier
|
|
// both have to reclaim. Look at which one needs to reclaim earlier
|
|
@@ -335,6 +349,10 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
|
|
qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
|
|
long currentTime = scheduler.clock.getTime();
|
|
long currentTime = scheduler.clock.getTime();
|
|
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
|
|
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
|
|
|
|
+ if (qsi.guaranteedCapacity <= 0) {
|
|
|
|
+ // no capacity, hence nothing can be reclaimed.
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
// is there any resource that needs to be reclaimed?
|
|
// is there any resource that needs to be reclaimed?
|
|
if ((!qsi.reclaimList.isEmpty()) &&
|
|
if ((!qsi.reclaimList.isEmpty()) &&
|
|
(qsi.reclaimList.getFirst().whenToKill <
|
|
(qsi.reclaimList.getFirst().whenToKill <
|
|
@@ -484,8 +502,8 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
|
|
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
|
|
// compute new GCs and ACs, if TT slots have changed
|
|
// compute new GCs and ACs, if TT slots have changed
|
|
if (slotsDiff != 0) {
|
|
if (slotsDiff != 0) {
|
|
- qsi.guaranteedCapacity +=
|
|
|
|
- (qsi.guaranteedCapacityPercent*slotsDiff/100);
|
|
|
|
|
|
+ qsi.guaranteedCapacity =
|
|
|
|
+ (int)(qsi.guaranteedCapacityPercent*numSlots/100);
|
|
}
|
|
}
|
|
qsi.numRunningTasks = 0;
|
|
qsi.numRunningTasks = 0;
|
|
qsi.numPendingTasks = 0;
|
|
qsi.numPendingTasks = 0;
|
|
@@ -729,6 +747,13 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
*/
|
|
*/
|
|
updateCollectionOfQSIs();
|
|
updateCollectionOfQSIs();
|
|
for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
|
|
for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
|
|
|
|
+ if (qsi.guaranteedCapacity <= 0.0f) {
|
|
|
|
+ // No capacity is guaranteed yet for this queue.
|
|
|
|
+ // Queues are sorted so that ones without capacities
|
|
|
|
+ // come towards the end. Hence, we can simply return
|
|
|
|
+ // from here without considering any further queues.
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
t = getTaskFromQueue(taskTracker, qsi);
|
|
t = getTaskFromQueue(taskTracker, qsi);
|
|
if (t!= null) {
|
|
if (t!= null) {
|
|
// we have a task. Update reclaimed resource info
|
|
// we have a task. Update reclaimed resource info
|