|
@@ -52,6 +52,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
@@ -111,6 +114,9 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
|
|
|
|
|
+ private static final ResourceCalculator RESOURCE_CALCULATOR =
|
|
|
+ new DefaultResourceCalculator();
|
|
|
+
|
|
|
// Value that container assignment methods return when a container is
|
|
|
// reserved
|
|
|
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
|
@@ -246,8 +252,10 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
* Is a queue below its min share for the given task type?
|
|
|
*/
|
|
|
boolean isStarvedForMinShare(FSLeafQueue sched) {
|
|
|
- Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
|
|
|
- return Resources.lessThan(sched.getResourceUsage(), desiredShare);
|
|
|
+ Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getMinShare(), sched.getDemand());
|
|
|
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), desiredShare);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -255,9 +263,10 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
* defined as being below half its fair share.
|
|
|
*/
|
|
|
boolean isStarvedForFairShare(FSLeafQueue sched) {
|
|
|
- Resource desiredFairShare = Resources.max(
|
|
|
+ Resource desiredFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
|
|
|
- return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
|
|
|
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), desiredFairShare);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -283,7 +292,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
|
|
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
|
|
|
}
|
|
|
- if (Resources.greaterThan(resToPreempt, Resources.none())) {
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
|
|
|
+ Resources.none())) {
|
|
|
preemptResources(queueMgr.getLeafQueues(), resToPreempt);
|
|
|
}
|
|
|
}
|
|
@@ -309,7 +319,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
// Collect running containers from over-scheduled queues
|
|
|
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
|
|
|
for (FSLeafQueue sched : scheds) {
|
|
|
- if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
for (AppSchedulable as : sched.getAppSchedulables()) {
|
|
|
for (RMContainer c : as.getApp().getLiveContainers()) {
|
|
|
runningContainers.add(c);
|
|
@@ -332,7 +343,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
// tasks, making sure we don't kill too many from any queue
|
|
|
for (RMContainer container : runningContainers) {
|
|
|
FSLeafQueue sched = queues.get(container);
|
|
|
- if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getResourceUsage(), sched.getFairShare())) {
|
|
|
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
|
|
|
"res=" + container.getContainer().getResource() +
|
|
|
") from queue " + sched.getName());
|
|
@@ -345,7 +357,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
toPreempt = Resources.subtract(toPreempt,
|
|
|
container.getContainer().getResource());
|
|
|
- if (Resources.equals(toPreempt, Resources.none())) {
|
|
|
+ if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ toPreempt, Resources.none())) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -369,17 +382,21 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
Resource resDueToMinShare = Resources.none();
|
|
|
Resource resDueToFairShare = Resources.none();
|
|
|
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
|
|
- Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
|
|
|
- resDueToMinShare = Resources.max(Resources.none(),
|
|
|
- Resources.subtract(target, sched.getResourceUsage()));
|
|
|
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getMinShare(), sched.getDemand());
|
|
|
+ resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
|
|
}
|
|
|
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
|
|
|
- Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
|
|
|
- resDueToFairShare = Resources.max(Resources.none(),
|
|
|
- Resources.subtract(target, sched.getResourceUsage()));
|
|
|
- }
|
|
|
- Resource resToPreempt = Resources.max(resDueToMinShare, resDueToFairShare);
|
|
|
- if (Resources.greaterThan(resToPreempt, Resources.none())) {
|
|
|
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ sched.getFairShare(), sched.getDemand());
|
|
|
+ resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
|
|
+ }
|
|
|
+ Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ resDueToMinShare, resDueToFairShare);
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ resToPreempt, Resources.none())) {
|
|
|
String message = "Should preempt " + resToPreempt + " res for queue "
|
|
|
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
|
|
|
+ ", resDueToFairShare = " + resDueToFairShare;
|
|
@@ -800,9 +817,9 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
int assignedContainers = 0;
|
|
|
while (node.getReservedContainer() == null) {
|
|
|
boolean assignedContainer = false;
|
|
|
- if (Resources.greaterThan(
|
|
|
- queueMgr.getRootQueue().assignContainer(node),
|
|
|
- Resources.none())) {
|
|
|
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
|
|
+ queueMgr.getRootQueue().assignContainer(node),
|
|
|
+ Resources.none())) {
|
|
|
assignedContainer = true;
|
|
|
}
|
|
|
if (!assignedContainer) { break; }
|