|
@@ -22,25 +22,31 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
|
|
+
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
/**
|
|
|
- * A {@link ResourceCalculator} which uses the concept of
|
|
|
+ * A {@link ResourceCalculator} which uses the concept of
|
|
|
* <em>dominant resource</em> to compare multi-dimensional resources.
|
|
|
*
|
|
|
- * Essentially the idea is that the in a multi-resource environment,
|
|
|
- * the resource allocation should be determined by the dominant share
|
|
|
- * of an entity (user or queue), which is the maximum share that the
|
|
|
- * entity has been allocated of any resource.
|
|
|
- *
|
|
|
- * In a nutshell, it seeks to maximize the minimum dominant share across
|
|
|
- * all entities.
|
|
|
- *
|
|
|
+ * Essentially the idea is that the in a multi-resource environment,
|
|
|
+ * the resource allocation should be determined by the dominant share
|
|
|
+ * of an entity (user or queue), which is the maximum share that the
|
|
|
+ * entity has been allocated of any resource.
|
|
|
+ *
|
|
|
+ * In a nutshell, it seeks to maximize the minimum dominant share across
|
|
|
+ * all entities.
|
|
|
+ *
|
|
|
* For example, if user A runs CPU-heavy tasks and user B runs
|
|
|
- * memory-heavy tasks, it attempts to equalize CPU share of user A
|
|
|
- * with Memory-share of user B.
|
|
|
- *
|
|
|
+ * memory-heavy tasks, it attempts to equalize CPU share of user A
|
|
|
+ * with Memory-share of user B.
|
|
|
+ *
|
|
|
* In the single resource case, it reduces to max-min fairness for that resource.
|
|
|
- *
|
|
|
+ *
|
|
|
* See the Dominant Resource Fairness paper for more details:
|
|
|
* www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf
|
|
|
*/
|
|
@@ -50,6 +56,56 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(DominantResourceCalculator.class);
|
|
|
|
|
|
+
|
|
|
+ private Set<String> resourceNames;
|
|
|
+
|
|
|
+ public DominantResourceCalculator() {
|
|
|
+ resourceNames = new HashSet<>();
|
|
|
+ resourceNames.add(ResourceInformation.MEMORY.getName());
|
|
|
+ resourceNames.add(ResourceInformation.VCORES.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compare two resources - if the value for every resource type for the lhs
|
|
|
+ * is greater than that of the rhs, return 1. If the value for every resource
|
|
|
+ * type in the lhs is less than the rhs, return -1. Otherwise, return 0
|
|
|
+ *
|
|
|
+ * @param lhs resource to be compared
|
|
|
+ * @param rhs resource to be compared
|
|
|
+ * @return 0, 1, or -1
|
|
|
+ */
|
|
|
+ private int compare(Resource lhs, Resource rhs) {
|
|
|
+ boolean lhsGreater = false;
|
|
|
+ boolean rhsGreater = false;
|
|
|
+ int ret = 0;
|
|
|
+
|
|
|
+ for (String rName : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation lhsResourceInformation =
|
|
|
+ lhs.getResourceInformation(rName);
|
|
|
+ ResourceInformation rhsResourceInformation =
|
|
|
+ rhs.getResourceInformation(rName);
|
|
|
+ int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
|
|
|
+ if (diff >= 1) {
|
|
|
+ lhsGreater = true;
|
|
|
+ } else if (diff <= -1) {
|
|
|
+ rhsGreater = true;
|
|
|
+ }
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + rName, ye);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (lhsGreater && rhsGreater) {
|
|
|
+ ret = 0;
|
|
|
+ } else if (lhsGreater) {
|
|
|
+ ret = 1;
|
|
|
+ } else if (rhsGreater) {
|
|
|
+ ret = -1;
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public int compare(Resource clusterResource, Resource lhs, Resource rhs,
|
|
|
boolean singleType) {
|
|
@@ -57,25 +113,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|
|
if (lhs.equals(rhs)) {
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (isInvalidDivisor(clusterResource)) {
|
|
|
- if ((lhs.getMemorySize() < rhs.getMemorySize() &&
|
|
|
- lhs.getVirtualCores() > rhs.getVirtualCores()) ||
|
|
|
- (lhs.getMemorySize() > rhs.getMemorySize() &&
|
|
|
- lhs.getVirtualCores() < rhs.getVirtualCores())) {
|
|
|
- return 0;
|
|
|
- } else if (lhs.getMemorySize() > rhs.getMemorySize()
|
|
|
- || lhs.getVirtualCores() > rhs.getVirtualCores()) {
|
|
|
- return 1;
|
|
|
- } else if (lhs.getMemorySize() < rhs.getMemorySize()
|
|
|
- || lhs.getVirtualCores() < rhs.getVirtualCores()) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
+ return this.compare(lhs, rhs);
|
|
|
}
|
|
|
|
|
|
float l = getResourceAsValue(clusterResource, lhs, true);
|
|
|
float r = getResourceAsValue(clusterResource, rhs, true);
|
|
|
-
|
|
|
+
|
|
|
if (l < r) {
|
|
|
return -1;
|
|
|
} else if (l > r) {
|
|
@@ -83,75 +128,142 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|
|
} else if (!singleType) {
|
|
|
l = getResourceAsValue(clusterResource, lhs, false);
|
|
|
r = getResourceAsValue(clusterResource, rhs, false);
|
|
|
+
|
|
|
if (l < r) {
|
|
|
return -1;
|
|
|
} else if (l > r) {
|
|
|
return 1;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Use 'dominant' for now since we only have 2 resources - gives us a slight
|
|
|
* performance boost.
|
|
|
- *
|
|
|
+ * <p></p>
|
|
|
* Once we add more resources, we'll need a more complicated (and slightly
|
|
|
* less performant algorithm).
|
|
|
*/
|
|
|
- protected float getResourceAsValue(
|
|
|
- Resource clusterResource, Resource resource, boolean dominant) {
|
|
|
- // Just use 'dominant' resource
|
|
|
- return (dominant) ?
|
|
|
- Math.max(
|
|
|
- (float)resource.getMemorySize() / clusterResource.getMemorySize(),
|
|
|
- (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
|
|
|
- )
|
|
|
- :
|
|
|
- Math.min(
|
|
|
- (float)resource.getMemorySize() / clusterResource.getMemorySize(),
|
|
|
- (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
+ protected float getResourceAsValue(Resource clusterResource,
|
|
|
+ Resource resource, boolean dominant) {
|
|
|
+
|
|
|
+ float min = Float.MAX_VALUE;
|
|
|
+ float max = 0.0f;
|
|
|
+ for (String rName : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation clusterResourceResourceInformation =
|
|
|
+ clusterResource.getResourceInformation(rName);
|
|
|
+ ResourceInformation resourceInformation =
|
|
|
+ resource.getResourceInformation(rName);
|
|
|
+ Long resourceValue = UnitsConversionUtil
|
|
|
+ .convert(resourceInformation.getUnits(),
|
|
|
+ clusterResourceResourceInformation.getUnits(),
|
|
|
+ resourceInformation.getValue());
|
|
|
+ float tmp =
|
|
|
+ (float) resourceValue / (float) clusterResourceResourceInformation
|
|
|
+ .getValue();
|
|
|
+ min = min < tmp ? min : tmp;
|
|
|
+ max = max > tmp ? max : tmp;
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return (dominant) ? max : min;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public long computeAvailableContainers(Resource available, Resource required) {
|
|
|
- return Math.min(
|
|
|
- available.getMemorySize() / required.getMemorySize(),
|
|
|
- available.getVirtualCores() / required.getVirtualCores());
|
|
|
+ long min = Long.MAX_VALUE;
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation availableResource =
|
|
|
+ available.getResourceInformation(resource);
|
|
|
+ ResourceInformation requiredResource =
|
|
|
+ required.getResourceInformation(resource);
|
|
|
+ Long requiredResourceValue = UnitsConversionUtil
|
|
|
+ .convert(requiredResource.getUnits(), availableResource.getUnits(),
|
|
|
+ requiredResource.getValue());
|
|
|
+ Long tmp = availableResource.getValue() / requiredResourceValue;
|
|
|
+ min = min < tmp ? min : tmp;
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ return min > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) min;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public float divide(Resource clusterResource,
|
|
|
+ public float divide(Resource clusterResource,
|
|
|
Resource numerator, Resource denominator) {
|
|
|
- return
|
|
|
- getResourceAsValue(clusterResource, numerator, true) /
|
|
|
+ return
|
|
|
+ getResourceAsValue(clusterResource, numerator, true) /
|
|
|
getResourceAsValue(clusterResource, denominator, true);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public boolean isInvalidDivisor(Resource r) {
|
|
|
- if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) {
|
|
|
- return true;
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ if (r.getResourceValue(resource).equals(0L)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource value for " + resource, ye);
|
|
|
+ }
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public float ratio(Resource a, Resource b) {
|
|
|
- return Math.max(
|
|
|
- (float)a.getMemorySize()/b.getMemorySize(),
|
|
|
- (float)a.getVirtualCores()/b.getVirtualCores()
|
|
|
- );
|
|
|
+ float ratio = 0.0f;
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation aResourceInformation =
|
|
|
+ a.getResourceInformation(resource);
|
|
|
+ ResourceInformation bResourceInformation =
|
|
|
+ b.getResourceInformation(resource);
|
|
|
+ Long bResourceValue = UnitsConversionUtil
|
|
|
+ .convert(bResourceInformation.getUnits(),
|
|
|
+ aResourceInformation.getUnits(),
|
|
|
+ bResourceInformation.getValue());
|
|
|
+ float tmp =
|
|
|
+ (float) aResourceInformation.getValue() / (float) bResourceValue;
|
|
|
+ ratio = ratio > tmp ? ratio : tmp;
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ratio;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Resource divideAndCeil(Resource numerator, int denominator) {
|
|
|
- return Resources.createResource(
|
|
|
- divideAndCeil(numerator.getMemorySize(), denominator),
|
|
|
- divideAndCeil(numerator.getVirtualCores(), denominator)
|
|
|
- );
|
|
|
+ return divideAndCeil(numerator, (long) denominator);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Resource divideAndCeil(Resource numerator, long denominator) {
|
|
|
+ Resource ret = Resources.createResource(0, 0);
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation resourceInformation = ResourceInformation
|
|
|
+ .newInstance(numerator.getResourceInformation(resource));
|
|
|
+ resourceInformation.setValue(
|
|
|
+ divideAndCeil(resourceInformation.getValue(), denominator));
|
|
|
+ ret.setResourceInformation(resource, resourceInformation);
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -164,73 +276,127 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|
|
|
|
|
@Override
|
|
|
public Resource normalize(Resource r, Resource minimumResource,
|
|
|
- Resource maximumResource, Resource stepFactor) {
|
|
|
- if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) {
|
|
|
- Resource step = Resources.clone(stepFactor);
|
|
|
- if (stepFactor.getMemorySize() == 0) {
|
|
|
- LOG.error("Memory cannot be allocated in increments of zero. Assuming "
|
|
|
- + minimumResource.getMemorySize() + "MB increment size. "
|
|
|
- + "Please ensure the scheduler configuration is correct.");
|
|
|
- step.setMemorySize(minimumResource.getMemorySize());
|
|
|
- }
|
|
|
+ Resource maximumResource, Resource stepFactor) {
|
|
|
+ Resource ret = Resources.createResource(0, 0);
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation rResourceInformation =
|
|
|
+ r.getResourceInformation(resource);
|
|
|
+ ResourceInformation minimumResourceInformation =
|
|
|
+ minimumResource.getResourceInformation(resource);
|
|
|
+ ResourceInformation maximumResourceInformation =
|
|
|
+ maximumResource.getResourceInformation(resource);
|
|
|
+ ResourceInformation stepFactorResourceInformation =
|
|
|
+ stepFactor.getResourceInformation(resource);
|
|
|
+ ResourceInformation tmp =
|
|
|
+ ResourceInformation.newInstance(rResourceInformation);
|
|
|
|
|
|
- if (stepFactor.getVirtualCores() == 0) {
|
|
|
- LOG.error("VCore cannot be allocated in increments of zero. Assuming "
|
|
|
- + minimumResource.getVirtualCores() + "VCores increment size. "
|
|
|
- + "Please ensure the scheduler configuration is correct.");
|
|
|
- step.setVirtualCores(minimumResource.getVirtualCores());
|
|
|
- }
|
|
|
+ Long rValue = rResourceInformation.getValue();
|
|
|
+ Long minimumValue = UnitsConversionUtil
|
|
|
+ .convert(minimumResourceInformation.getUnits(),
|
|
|
+ rResourceInformation.getUnits(),
|
|
|
+ minimumResourceInformation.getValue());
|
|
|
+ Long maximumValue = UnitsConversionUtil
|
|
|
+ .convert(maximumResourceInformation.getUnits(),
|
|
|
+ rResourceInformation.getUnits(),
|
|
|
+ maximumResourceInformation.getValue());
|
|
|
+ Long stepFactorValue = UnitsConversionUtil
|
|
|
+ .convert(stepFactorResourceInformation.getUnits(),
|
|
|
+ rResourceInformation.getUnits(),
|
|
|
+ stepFactorResourceInformation.getValue());
|
|
|
|
|
|
- stepFactor = step;
|
|
|
+ tmp.setValue(
|
|
|
+ Math.min(roundUp(Math.max(rValue, minimumValue), stepFactorValue),
|
|
|
+ maximumValue));
|
|
|
+ ret.setResourceInformation(resource, tmp);
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- long normalizedMemory = Math.min(
|
|
|
- roundUp(
|
|
|
- Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
|
|
|
- stepFactor.getMemorySize()),
|
|
|
- maximumResource.getMemorySize());
|
|
|
- int normalizedCores = Math.min(
|
|
|
- roundUp(
|
|
|
- Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
|
|
|
- stepFactor.getVirtualCores()),
|
|
|
- maximumResource.getVirtualCores());
|
|
|
- return Resources.createResource(normalizedMemory,
|
|
|
- normalizedCores);
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Resource roundUp(Resource r, Resource stepFactor) {
|
|
|
- return Resources.createResource(
|
|
|
- roundUp(r.getMemorySize(), stepFactor.getMemorySize()),
|
|
|
- roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
|
|
|
- );
|
|
|
+ return this.rounding(r, stepFactor, true);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Resource roundDown(Resource r, Resource stepFactor) {
|
|
|
- return Resources.createResource(
|
|
|
- roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
|
|
|
- roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
|
|
|
- );
|
|
|
+ return this.rounding(r, stepFactor, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
|
|
|
+ Resource ret = Resources.createResource(0, 0);
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation rResourceInformation =
|
|
|
+ r.getResourceInformation(resource);
|
|
|
+ ResourceInformation stepFactorResourceInformation =
|
|
|
+ stepFactor.getResourceInformation(resource);
|
|
|
+ ResourceInformation tmp =
|
|
|
+ ResourceInformation.newInstance(rResourceInformation);
|
|
|
+
|
|
|
+ Long rValue = rResourceInformation.getValue();
|
|
|
+ Long stepFactorValue = UnitsConversionUtil
|
|
|
+ .convert(stepFactorResourceInformation.getUnits(),
|
|
|
+ rResourceInformation.getUnits(),
|
|
|
+ stepFactorResourceInformation.getValue());
|
|
|
+
|
|
|
+ Long value = roundUp ? roundUp(rValue, stepFactorValue) :
|
|
|
+ roundDown(rValue, stepFactorValue);
|
|
|
+ tmp.setValue(value);
|
|
|
+ ret.setResourceInformation(resource, tmp);
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Resource multiplyAndNormalizeUp(Resource r, double by,
|
|
|
Resource stepFactor) {
|
|
|
- return Resources.createResource(
|
|
|
- roundUp((long) Math.ceil((float) (r.getMemorySize() * by)),
|
|
|
- stepFactor.getMemorySize()),
|
|
|
- roundUp((int) Math.ceil((float) (r.getVirtualCores() * by)),
|
|
|
- stepFactor.getVirtualCores()));
|
|
|
+ return this.multiplyAndNormalize(r, by, stepFactor, true);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Resource multiplyAndNormalizeDown(Resource r, double by,
|
|
|
Resource stepFactor) {
|
|
|
- return Resources.createResource(
|
|
|
- roundDown((long) (r.getMemorySize() * by), stepFactor.getMemorySize()),
|
|
|
- roundDown((int) (r.getVirtualCores() * by),
|
|
|
- stepFactor.getVirtualCores()));
|
|
|
+ return this.multiplyAndNormalize(r, by, stepFactor, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Resource multiplyAndNormalize(Resource r, double by,
|
|
|
+ Resource stepFactor, boolean roundUp) {
|
|
|
+ Resource ret = Resources.createResource(0, 0);
|
|
|
+ for (String resource : resourceNames) {
|
|
|
+ try {
|
|
|
+ ResourceInformation rResourceInformation =
|
|
|
+ r.getResourceInformation(resource);
|
|
|
+ ResourceInformation stepFactorResourceInformation =
|
|
|
+ stepFactor.getResourceInformation(resource);
|
|
|
+ ResourceInformation tmp =
|
|
|
+ ResourceInformation.newInstance(rResourceInformation);
|
|
|
+
|
|
|
+ Long rValue = rResourceInformation.getValue();
|
|
|
+ Long stepFactorValue = UnitsConversionUtil
|
|
|
+ .convert(stepFactorResourceInformation.getUnits(),
|
|
|
+ rResourceInformation.getUnits(),
|
|
|
+ stepFactorResourceInformation.getValue());
|
|
|
+
|
|
|
+ Long value =
|
|
|
+ roundUp ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) :
|
|
|
+ roundDown((long) (rValue * by), stepFactorValue);
|
|
|
+ tmp.setValue(value);
|
|
|
+ ret.setResourceInformation(resource, tmp);
|
|
|
+ } catch (YarnException ye) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Error getting resource information for " + resource, ye);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
@Override
|