|
@@ -15,65 +15,44 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
-
|
|
|
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
|
|
|
|
|
|
import java.io.Serializable;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Comparator;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
|
|
|
|
|
|
-/**
|
|
|
- * Utility class containing scheduling algorithms used in the fair scheduler.
|
|
|
- */
|
|
|
-@Private
|
|
|
-@Unstable
|
|
|
-class SchedulingAlgorithms {
|
|
|
- public static final Log LOG = LogFactory.getLog(
|
|
|
- SchedulingAlgorithms.class.getName());
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
- /**
|
|
|
- * Compare Schedulables in order of priority and then submission time, as in
|
|
|
- * the default FIFO scheduler in Hadoop.
|
|
|
- */
|
|
|
- public static class FifoComparator implements Comparator<Schedulable>, Serializable {
|
|
|
- private static final long serialVersionUID = -5905036205491177060L;
|
|
|
+public class FairSchedulingMode extends SchedulingMode {
|
|
|
+ @VisibleForTesting
|
|
|
+ public static final String NAME = "FairShare";
|
|
|
+ private FairShareComparator comparator = new FairShareComparator();
|
|
|
|
|
|
- @Override
|
|
|
- public int compare(Schedulable s1, Schedulable s2) {
|
|
|
- int res = s1.getPriority().compareTo(s2.getPriority());
|
|
|
- if (res == 0) {
|
|
|
- res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
|
|
- }
|
|
|
- if (res == 0) {
|
|
|
- // In the rare case where jobs were submitted at the exact same time,
|
|
|
- // compare them by name (which will be the JobID) to get a deterministic
|
|
|
- // ordering, so we don't alternately launch tasks from different jobs.
|
|
|
- res = s1.getName().compareTo(s2.getName());
|
|
|
- }
|
|
|
- return res;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public String getName() {
|
|
|
+ return NAME;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Compare Schedulables via weighted fair sharing. In addition, Schedulables
|
|
|
* below their min share get priority over those whose min share is met.
|
|
|
- *
|
|
|
+ *
|
|
|
* Schedulables below their min share are compared by how far below it they
|
|
|
* are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
|
|
|
* and job B has 50 out of a min share of 100, then job B is scheduled next,
|
|
|
* because B is at 50% of its min share and A is at 80% of its min share.
|
|
|
- *
|
|
|
+ *
|
|
|
* Schedulables above their min share are compared by (runningTasks / weight).
|
|
|
* If all weights are equal, slots are given to the job with the fewest tasks;
|
|
|
* otherwise, jobs with more weight get proportionally more slots.
|
|
|
*/
|
|
|
- public static class FairShareComparator implements Comparator<Schedulable>, Serializable {
|
|
|
+ private static class FairShareComparator implements Comparator<Schedulable>,
|
|
|
+ Serializable {
|
|
|
private static final long serialVersionUID = 5564969375856699313L;
|
|
|
|
|
|
@Override
|
|
@@ -85,10 +64,10 @@ class SchedulingAlgorithms {
|
|
|
boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
|
|
|
boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
|
|
|
Resource one = Resources.createResource(1);
|
|
|
- minShareRatio1 = (double) s1.getResourceUsage().getMemory() /
|
|
|
- Resources.max(minShare1, one).getMemory();
|
|
|
- minShareRatio2 = (double) s2.getResourceUsage().getMemory() /
|
|
|
- Resources.max(minShare2, one).getMemory();
|
|
|
+ minShareRatio1 = (double) s1.getResourceUsage().getMemory()
|
|
|
+ / Resources.max(minShare1, one).getMemory();
|
|
|
+ minShareRatio2 = (double) s2.getResourceUsage().getMemory()
|
|
|
+ / Resources.max(minShare2, one).getMemory();
|
|
|
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
|
|
|
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
|
|
|
int res = 0;
|
|
@@ -98,7 +77,8 @@ class SchedulingAlgorithms {
|
|
|
res = 1;
|
|
|
else if (s1Needy && s2Needy)
|
|
|
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
|
|
|
- else // Neither schedulable is needy
|
|
|
+ else
|
|
|
+ // Neither schedulable is needy
|
|
|
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
|
|
|
if (res == 0) {
|
|
|
// Apps are tied in fairness ratio. Break the tie by submit time and job
|
|
@@ -111,6 +91,17 @@ class SchedulingAlgorithms {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Comparator<Schedulable> getComparator() {
|
|
|
+ return comparator;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void computeShares(Collection<? extends Schedulable> schedulables,
|
|
|
+ Resource totalResources) {
|
|
|
+ computeFairShares(schedulables, totalResources);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Number of iterations for the binary search in computeFairShares. This is
|
|
|
* equivalent to the number of bits of precision in the output. 25 iterations
|
|
@@ -121,43 +112,42 @@ class SchedulingAlgorithms {
|
|
|
/**
|
|
|
* Given a set of Schedulables and a number of slots, compute their weighted
|
|
|
* fair shares. The min shares and demands of the Schedulables are assumed to
|
|
|
- * be set beforehand. We compute the fairest possible allocation of shares
|
|
|
- * to the Schedulables that respects their min shares and demands.
|
|
|
- *
|
|
|
+ * be set beforehand. We compute the fairest possible allocation of shares to
|
|
|
+ * the Schedulables that respects their min shares and demands.
|
|
|
+ *
|
|
|
* To understand what this method does, we must first define what weighted
|
|
|
* fair sharing means in the presence of minimum shares and demands. If there
|
|
|
* were no minimum shares and every Schedulable had an infinite demand (i.e.
|
|
|
* could launch infinitely many tasks), then weighted fair sharing would be
|
|
|
* achieved if the ratio of slotsAssigned / weight was equal for each
|
|
|
- * Schedulable and all slots were assigned. Minimum shares and demands add
|
|
|
- * two further twists:
|
|
|
- * - Some Schedulables may not have enough tasks to fill all their share.
|
|
|
- * - Some Schedulables may have a min share higher than their assigned share.
|
|
|
- *
|
|
|
- * To deal with these possibilities, we define an assignment of slots as
|
|
|
- * being fair if there exists a ratio R such that:
|
|
|
- * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
|
|
|
- * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
|
|
|
- * - All other Schedulables S are assigned share R * S.weight
|
|
|
- * - The sum of all the shares is totalSlots.
|
|
|
- *
|
|
|
+ * Schedulable and all slots were assigned. Minimum shares and demands add two
|
|
|
+ * further twists: - Some Schedulables may not have enough tasks to fill all
|
|
|
+ * their share. - Some Schedulables may have a min share higher than their
|
|
|
+ * assigned share.
|
|
|
+ *
|
|
|
+ * To deal with these possibilities, we define an assignment of slots as being
|
|
|
+ * fair if there exists a ratio R such that: - Schedulables S where S.demand <
|
|
|
+ * R * S.weight are assigned share S.demand - Schedulables S where S.minShare
|
|
|
+ * > R * S.weight are given share S.minShare - All other Schedulables S are
|
|
|
+ * assigned share R * S.weight - The sum of all the shares is totalSlots.
|
|
|
+ *
|
|
|
* We call R the weight-to-slots ratio because it converts a Schedulable's
|
|
|
* weight to the number of slots it is assigned.
|
|
|
- *
|
|
|
+ *
|
|
|
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
|
|
|
- * To do this, we use binary search. Given a ratio R, we compute the number
|
|
|
- * of slots that would be used in total with this ratio (the sum of the shares
|
|
|
+ * To do this, we use binary search. Given a ratio R, we compute the number of
|
|
|
+ * slots that would be used in total with this ratio (the sum of the shares
|
|
|
* computed using the conditions above). If this number of slots is less than
|
|
|
* totalSlots, then R is too small and more slots could be assigned. If the
|
|
|
* number of slots is more than totalSlots, then R is too large.
|
|
|
- *
|
|
|
+ *
|
|
|
* We begin the binary search with a lower bound on R of 0 (which means that
|
|
|
* all Schedulables are only given their minShare) and an upper bound computed
|
|
|
* to be large enough that too many slots are given (by doubling R until we
|
|
|
- * either use more than totalSlots slots or we fulfill all jobs' demands).
|
|
|
- * The helper method slotsUsedWithWeightToSlotRatio computes the total number
|
|
|
- * of slots used with a given value of R.
|
|
|
- *
|
|
|
+ * either use more than totalSlots slots or we fulfill all jobs' demands). The
|
|
|
+ * helper method slotsUsedWithWeightToSlotRatio computes the total number of
|
|
|
+ * slots used with a given value of R.
|
|
|
+ *
|
|
|
* The running time of this algorithm is linear in the number of Schedulables,
|
|
|
* because slotsUsedWithWeightToSlotRatio is linear-time and the number of
|
|
|
* iterations of binary search is a constant (dependent on desired precision).
|
|
@@ -168,12 +158,13 @@ class SchedulingAlgorithms {
|
|
|
// at R = 1 and double it until we have either used totalSlots slots or we
|
|
|
// have met all Schedulables' demands (if total demand < totalSlots).
|
|
|
Resource totalDemand = Resources.createResource(0);
|
|
|
- for (Schedulable sched: schedulables) {
|
|
|
+ for (Schedulable sched : schedulables) {
|
|
|
Resources.addTo(totalDemand, sched.getDemand());
|
|
|
}
|
|
|
Resource cap = Resources.min(totalDemand, totalResources);
|
|
|
double rMax = 1.0;
|
|
|
- while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) {
|
|
|
+ while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables),
|
|
|
+ cap)) {
|
|
|
rMax *= 2.0;
|
|
|
}
|
|
|
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
|
|
@@ -181,27 +172,28 @@ class SchedulingAlgorithms {
|
|
|
double right = rMax;
|
|
|
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
|
|
|
double mid = (left + right) / 2.0;
|
|
|
- if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) {
|
|
|
+ if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables),
|
|
|
+ cap)) {
|
|
|
left = mid;
|
|
|
} else {
|
|
|
right = mid;
|
|
|
}
|
|
|
}
|
|
|
// Set the fair shares based on the value of R we've converged to
|
|
|
- for (Schedulable sched: schedulables) {
|
|
|
+ for (Schedulable sched : schedulables) {
|
|
|
sched.setFairShare(computeShare(sched, right));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Compute the number of slots that would be used given a weight-to-slot
|
|
|
- * ratio w2sRatio, for use in the computeFairShares algorithm as described
|
|
|
- * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
|
|
+ * Compute the number of slots that would be used given a weight-to-slot ratio
|
|
|
+ * w2sRatio, for use in the computeFairShares algorithm as described in #
|
|
|
+ * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
|
|
*/
|
|
|
private static Resource resUsedWithWeightToResRatio(double w2sRatio,
|
|
|
Collection<? extends Schedulable> schedulables) {
|
|
|
Resource slotsTaken = Resources.createResource(0);
|
|
|
- for (Schedulable sched: schedulables) {
|
|
|
+ for (Schedulable sched : schedulables) {
|
|
|
Resource share = computeShare(sched, w2sRatio);
|
|
|
Resources.addTo(slotsTaken, share);
|
|
|
}
|
|
@@ -210,8 +202,8 @@ class SchedulingAlgorithms {
|
|
|
|
|
|
/**
|
|
|
* Compute the resources assigned to a Schedulable given a particular
|
|
|
- * res-to-slot ratio r2sRatio, for use in computeFairShares as described
|
|
|
- * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
|
|
+ * res-to-slot ratio r2sRatio, for use in computeFairShares as described in #
|
|
|
+ * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
|
|
*/
|
|
|
private static Resource computeShare(Schedulable sched, double r2sRatio) {
|
|
|
double share = sched.getWeight() * r2sRatio;
|