|
@@ -47,8 +47,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
|
|
|
public static final String NAME = "DRF";
|
|
|
|
|
|
- private static final DominantResourceFairnessComparator COMPARATOR =
|
|
|
- new DominantResourceFairnessComparator();
|
|
|
+ private static final int NUM_RESOURCES =
|
|
|
+ ResourceUtils.getNumberOfKnownResourceTypes();
|
|
|
+ private static final DominantResourceFairnessComparator COMPARATORN =
|
|
|
+ new DominantResourceFairnessComparatorN();
|
|
|
+ private static final DominantResourceFairnessComparator COMPARATOR2 =
|
|
|
+ new DominantResourceFairnessComparator2();
|
|
|
private static final DominantResourceCalculator CALCULATOR =
|
|
|
new DominantResourceCalculator();
|
|
|
|
|
@@ -59,7 +63,15 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
|
|
|
@Override
|
|
|
public Comparator<Schedulable> getComparator() {
|
|
|
- return COMPARATOR;
|
|
|
+ if (NUM_RESOURCES == 2) {
|
|
|
+ // To improve performance, if we know we're dealing with the common
|
|
|
+ // case of only CPU and memory, then handle CPU and memory explicitly.
|
|
|
+ return COMPARATOR2;
|
|
|
+ } else {
|
|
|
+ // Otherwise, do it the generic way.
|
|
|
+ return COMPARATORN;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -107,25 +119,56 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
|
|
|
@Override
|
|
|
public void initialize(FSContext fsContext) {
|
|
|
- COMPARATOR.setFSContext(fsContext);
|
|
|
+ COMPARATORN.setFSContext(fsContext);
|
|
|
+ COMPARATOR2.setFSContext(fsContext);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* This class compares two {@link Schedulable} instances according to the
|
|
|
* DRF policy. If neither instance is below min share, approximate fair share
|
|
|
- * ratios are compared.
|
|
|
+ * ratios are compared. Subclasses of this class will do the actual work of
|
|
|
+ * the comparison, specialized for the number of configured resource types.
|
|
|
*/
|
|
|
- public static class DominantResourceFairnessComparator
|
|
|
+ public abstract static class DominantResourceFairnessComparator
|
|
|
implements Comparator<Schedulable> {
|
|
|
- private FSContext fsContext;
|
|
|
+ protected FSContext fsContext;
|
|
|
|
|
|
public void setFSContext(FSContext fsContext) {
|
|
|
this.fsContext = fsContext;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This method is used when apps are tied in fairness ratio. It breaks
|
|
|
+ * the tie by submit time and job name to get a deterministic ordering,
|
|
|
+ * which is useful for unit tests.
|
|
|
+ *
|
|
|
+ * @param s1 the first item to compare
|
|
|
+ * @param s2 the second item to compare
|
|
|
+ * @return < 0, 0, or > 0 if the first item is less than, equal to,
|
|
|
+ * or greater than the second item, respectively
|
|
|
+ */
|
|
|
+ protected int compareAttribrutes(Schedulable s1, Schedulable s2) {
|
|
|
+ int res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
|
|
+
|
|
|
+ if (res == 0) {
|
|
|
+ res = s1.getName().compareTo(s2.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class compares two {@link Schedulable} instances according to the
|
|
|
+ * DRF policy. If neither instance is below min share, approximate fair share
|
|
|
+ * ratios are compared. This class makes no assumptions about the number of
|
|
|
+ * resource types.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ static class DominantResourceFairnessComparatorN
|
|
|
+ extends DominantResourceFairnessComparator {
|
|
|
@Override
|
|
|
public int compare(Schedulable s1, Schedulable s2) {
|
|
|
- ResourceInformation[] info = ResourceUtils.getResourceTypesArray();
|
|
|
Resource usage1 = s1.getResourceUsage();
|
|
|
Resource usage2 = s2.getResourceUsage();
|
|
|
Resource minShare1 = s1.getMinShare();
|
|
@@ -135,8 +178,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
// These arrays hold the usage, fair, and min share ratios for each
|
|
|
// resource type. ratios[0][x] are the usage ratios, ratios[1][x] are
|
|
|
// the fair share ratios, and ratios[2][x] are the min share ratios.
|
|
|
- float[][] ratios1 = new float[info.length][3];
|
|
|
- float[][] ratios2 = new float[info.length][3];
|
|
|
+ float[][] ratios1 = new float[NUM_RESOURCES][3];
|
|
|
+ float[][] ratios2 = new float[NUM_RESOURCES][3];
|
|
|
|
|
|
// Calculate cluster shares and approximate fair shares for each
|
|
|
// resource type of both schedulables.
|
|
@@ -155,7 +198,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
usage2.getResources()[dominant2].getValue() <
|
|
|
minShare2.getResources()[dominant2].getValue();
|
|
|
|
|
|
- int res = 0;
|
|
|
+ int res;
|
|
|
|
|
|
if (!s2Needy && !s1Needy) {
|
|
|
// Sort shares by usage ratio and compare them by approximate fair share
|
|
@@ -176,13 +219,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
}
|
|
|
|
|
|
if (res == 0) {
|
|
|
- // Apps are tied in fairness ratio. Break the tie by submit time and job
|
|
|
- // name to get a deterministic ordering, which is useful for unit tests.
|
|
|
- res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
|
|
-
|
|
|
- if (res == 0) {
|
|
|
- res = s1.getName().compareTo(s2.getName());
|
|
|
- }
|
|
|
+ res = compareAttribrutes(s1, s2);
|
|
|
}
|
|
|
|
|
|
return res;
|
|
@@ -206,7 +243,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
|
|
|
/**
|
|
|
* Calculate a resource's usage ratio and approximate fair share ratio.
|
|
|
- * The {@code shares} array will be populated with both the usage ratio
|
|
|
+ * The {@code ratios} array will be populated with both the usage ratio
|
|
|
* and the approximate fair share ratio for each resource type. The usage
|
|
|
* ratio is calculated as {@code resource} divided by {@code cluster}.
|
|
|
* The approximate fair share ratio is calculated as the usage ratio
|
|
@@ -221,18 +258,18 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
* because when comparing resources, the resource with the higher weight
|
|
|
* will be assigned by the scheduler a proportionally higher fair share.
|
|
|
*
|
|
|
- * The {@code shares} array must be at least <i>n</i> x 2, where <i>n</i>
|
|
|
+ * The {@code ratios} array must be at least <i>n</i> x 2, where <i>n</i>
|
|
|
* is the number of resource types. Only the first and second indices of
|
|
|
- * the inner arrays in the {@code shares} array will be used, e.g.
|
|
|
- * {@code shares[x][0]} and {@code shares[x][1]}.
|
|
|
+ * the inner arrays in the {@code ratios} array will be used, e.g.
|
|
|
+ * {@code ratios[x][0]} and {@code ratios[x][1]}.
|
|
|
*
|
|
|
* The return value will be the index of the dominant resource type in the
|
|
|
- * {@code shares} array. The dominant resource is the resource type for
|
|
|
+ * {@code ratios} array. The dominant resource is the resource type for
|
|
|
* which {@code resource} has the largest usage ratio.
|
|
|
*
|
|
|
* @param resource the resource for which to calculate ratios
|
|
|
* @param cluster the total cluster resources
|
|
|
- * @param ratios the shares array to populate
|
|
|
+ * @param ratios the share ratios array to populate
|
|
|
* @param weight the resource weight
|
|
|
* @return the index of the resource type with the largest cluster share
|
|
|
*/
|
|
@@ -275,7 +312,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
*
|
|
|
* @param resource the resource for which to calculate min shares
|
|
|
* @param minShare the min share
|
|
|
- * @param ratios the shares array to populate
|
|
|
+ * @param ratios the share ratios array to populate
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
void calculateMinShareRatios(Resource resource, Resource minShare,
|
|
@@ -320,4 +357,155 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|
|
return ret;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class compares two {@link Schedulable} instances according to the
|
|
|
+ * DRF policy in the special case that only CPU and memory are configured.
|
|
|
+ * If neither instance is below min share, approximate fair share
|
|
|
+ * ratios are compared.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ static class DominantResourceFairnessComparator2
|
|
|
+ extends DominantResourceFairnessComparator {
|
|
|
+ @Override
|
|
|
+ public int compare(Schedulable s1, Schedulable s2) {
|
|
|
+ ResourceInformation[] resourceInfo1 =
|
|
|
+ s1.getResourceUsage().getResources();
|
|
|
+ ResourceInformation[] resourceInfo2 =
|
|
|
+ s2.getResourceUsage().getResources();
|
|
|
+ ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources();
|
|
|
+ ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources();
|
|
|
+ ResourceInformation[] clusterInfo =
|
|
|
+ fsContext.getClusterResource().getResources();
|
|
|
+ double[] shares1 = new double[2];
|
|
|
+ double[] shares2 = new double[2];
|
|
|
+
|
|
|
+ int dominant1 = calculateClusterAndFairRatios(resourceInfo1,
|
|
|
+ s1.getWeight(), clusterInfo, shares1);
|
|
|
+ int dominant2 = calculateClusterAndFairRatios(resourceInfo2,
|
|
|
+ s2.getWeight(), clusterInfo, shares2);
|
|
|
+
|
|
|
+ // A queue is needy for its min share if its dominant resource
|
|
|
+ // (with respect to the cluster capacity) is below its configured min
|
|
|
+ // share for that resource
|
|
|
+ boolean s1Needy = resourceInfo1[dominant1].getValue() <
|
|
|
+ minShareInfo1[dominant1].getValue();
|
|
|
+ boolean s2Needy = resourceInfo1[dominant2].getValue() <
|
|
|
+ minShareInfo2[dominant2].getValue();
|
|
|
+
|
|
|
+ int res;
|
|
|
+
|
|
|
+ if (!s2Needy && !s1Needy) {
|
|
|
+ res = (int) Math.signum(shares1[dominant1] - shares2[dominant2]);
|
|
|
+
|
|
|
+ if (res == 0) {
|
|
|
+ // Because memory and CPU are indices 0 and 1, we can find the
|
|
|
+ // non-dominant index by subtracting the dominant index from 1.
|
|
|
+ res = (int) Math.signum(shares1[1 - dominant1] -
|
|
|
+ shares2[1 - dominant2]);
|
|
|
+ }
|
|
|
+ } else if (s1Needy && !s2Needy) {
|
|
|
+ res = -1;
|
|
|
+ } else if (s2Needy && !s1Needy) {
|
|
|
+ res = 1;
|
|
|
+ } else {
|
|
|
+ double[] minShares1 =
|
|
|
+ calculateMinShareRatios(resourceInfo1, minShareInfo1);
|
|
|
+ double[] minShares2 =
|
|
|
+ calculateMinShareRatios(resourceInfo2, minShareInfo2);
|
|
|
+
|
|
|
+ res = (int) Math.signum(minShares1[dominant1] - minShares2[dominant2]);
|
|
|
+
|
|
|
+ if (res == 0) {
|
|
|
+ res = (int) Math.signum(minShares1[1 - dominant1] -
|
|
|
+ minShares2[1 - dominant2]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (res == 0) {
|
|
|
+ res = compareAttribrutes(s1, s2);
|
|
|
+ }
|
|
|
+
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calculate a resource's usage ratio and approximate fair share ratio
|
|
|
+ * assuming that CPU and memory are the only configured resource types.
|
|
|
+ * The {@code shares} array will be populated with the approximate fair
|
|
|
+ * share ratio for each resource type. The approximate fair share ratio
|
|
|
+ * is calculated as {@code resourceInfo} divided by {@code cluster} and
|
|
|
+ * the {@code weight}. If the cluster's resources are 100MB and
|
|
|
+ * 10 vcores, the usage ({@code resourceInfo}) is 10 MB and 5 CPU, and the
|
|
|
+ * weights are 2, the fair share ratios will be 0.05 and 0.25.
|
|
|
+ *
|
|
|
+ * The approximate fair share ratio is the usage divided by the
|
|
|
+ * approximate fair share, i.e. the cluster resources times the weight.
|
|
|
+ * The approximate fair share is an acceptable proxy for the fair share
|
|
|
+ * because when comparing resources, the resource with the higher weight
|
|
|
+ * will be assigned by the scheduler a proportionally higher fair share.
|
|
|
+ *
|
|
|
+ * The length of the {@code shares} array must be at least 2.
|
|
|
+ *
|
|
|
+ * The return value will be the index of the dominant resource type in the
|
|
|
+ * {@code shares} array. The dominant resource is the resource type for
|
|
|
+ * which {@code resourceInfo} has the largest usage ratio.
|
|
|
+ *
|
|
|
+ * @param resourceInfo the resource for which to calculate ratios
|
|
|
+ * @param weight the resource weight
|
|
|
+ * @param clusterInfo the total cluster resources
|
|
|
+ * @param shares the share ratios array to populate
|
|
|
+ * @return the index of the resource type with the largest cluster share
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ int calculateClusterAndFairRatios(ResourceInformation[] resourceInfo,
|
|
|
+ float weight, ResourceInformation[] clusterInfo, double[] shares) {
|
|
|
+ int dominant;
|
|
|
+
|
|
|
+ shares[Resource.MEMORY_INDEX] =
|
|
|
+ ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) /
|
|
|
+ clusterInfo[Resource.MEMORY_INDEX].getValue();
|
|
|
+ shares[Resource.VCORES_INDEX] =
|
|
|
+ ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) /
|
|
|
+ clusterInfo[Resource.VCORES_INDEX].getValue();
|
|
|
+ dominant =
|
|
|
+ shares[Resource.VCORES_INDEX] > shares[Resource.MEMORY_INDEX] ?
|
|
|
+ Resource.VCORES_INDEX : Resource.MEMORY_INDEX;
|
|
|
+
|
|
|
+ shares[Resource.MEMORY_INDEX] /= weight;
|
|
|
+ shares[Resource.VCORES_INDEX] /= weight;
|
|
|
+
|
|
|
+ return dominant;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calculate a resource's min share ratios assuming that CPU and memory
|
|
|
+ * are the only configured resource types. The return array will be
|
|
|
+ * populated with the {@code resourceInfo} divided by {@code minShareInfo}
|
|
|
+ * for each resource type. If the min shares are 5 MB and 10 vcores, and
|
|
|
+ * the usage ({@code resourceInfo}) is 10 MB and 5 CPU, the ratios will
|
|
|
+ * be 2 and 0.5.
|
|
|
+ *
|
|
|
+ * The length of the {@code ratios} array must be 2.
|
|
|
+ *
|
|
|
+ * @param resourceInfo the resource for which to calculate min shares
|
|
|
+ * @param minShareInfo the min share
|
|
|
+ * @return the share ratios
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ double[] calculateMinShareRatios(ResourceInformation[] resourceInfo,
|
|
|
+ ResourceInformation[] minShareInfo) {
|
|
|
+ double[] minShares1 = new double[2];
|
|
|
+
|
|
|
+ // both are needy below min share
|
|
|
+ minShares1[Resource.MEMORY_INDEX] =
|
|
|
+ ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) /
|
|
|
+ minShareInfo[Resource.MEMORY_INDEX].getValue();
|
|
|
+ minShares1[Resource.VCORES_INDEX] =
|
|
|
+ ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) /
|
|
|
+ minShareInfo[Resource.VCORES_INDEX].getValue();
|
|
|
+
|
|
|
+ return minShares1;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|