|
@@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
@@ -79,7 +78,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
private final ResourceWeights resourceWeights;
|
|
|
private Resource demand = Resources.createResource(0);
|
|
|
private final FairScheduler scheduler;
|
|
|
- private FSQueue fsQueue;
|
|
|
private Resource fairShare = Resources.createResource(0, 0);
|
|
|
|
|
|
// Preemption related variables
|
|
@@ -118,7 +116,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
|
|
|
|
|
|
this.scheduler = scheduler;
|
|
|
- this.fsQueue = queue;
|
|
|
this.startTime = scheduler.getClock().getTime();
|
|
|
this.lastTimeAtFairShare = this.startTime;
|
|
|
this.appPriority = Priority.newInstance(1);
|
|
@@ -229,11 +226,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
*/
|
|
|
@Override
|
|
|
public Resource getHeadroom() {
|
|
|
- final FSQueue queue = (FSQueue) this.queue;
|
|
|
- SchedulingPolicy policy = queue.getPolicy();
|
|
|
+ final FSQueue fsQueue = getQueue();
|
|
|
+ SchedulingPolicy policy = fsQueue.getPolicy();
|
|
|
|
|
|
- Resource queueFairShare = queue.getFairShare();
|
|
|
- Resource queueUsage = queue.getResourceUsage();
|
|
|
+ Resource queueFairShare = fsQueue.getFairShare();
|
|
|
+ Resource queueUsage = fsQueue.getResourceUsage();
|
|
|
Resource clusterResource = this.scheduler.getClusterResource();
|
|
|
Resource clusterUsage = this.scheduler.getRootQueueMetrics()
|
|
|
.getAllocatedResources();
|
|
@@ -243,7 +240,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
subtractResourcesOnBlacklistedNodes(clusterAvailableResources);
|
|
|
|
|
|
Resource queueMaxAvailableResources =
|
|
|
- Resources.subtract(queue.getMaxShare(), queueUsage);
|
|
|
+ Resources.subtract(fsQueue.getMaxShare(), queueUsage);
|
|
|
Resource maxAvailableResource = Resources.componentwiseMin(
|
|
|
clusterAvailableResources, queueMaxAvailableResources);
|
|
|
|
|
@@ -514,8 +511,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
|
|
|
@Override
|
|
|
public FSLeafQueue getQueue() {
|
|
|
- Queue queue = super.getQueue();
|
|
|
- assert queue instanceof FSLeafQueue;
|
|
|
return (FSLeafQueue) queue;
|
|
|
}
|
|
|
|
|
@@ -1102,7 +1097,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
Resource fairShareStarvation() {
|
|
|
long now = scheduler.getClock().getTime();
|
|
|
Resource threshold = Resources.multiply(
|
|
|
- getFairShare(), fsQueue.getFairSharePreemptionThreshold());
|
|
|
+ getFairShare(), getQueue().getFairSharePreemptionThreshold());
|
|
|
Resource fairDemand = Resources.componentwiseMin(threshold, demand);
|
|
|
|
|
|
// Check if the queue is starved for fairshare
|
|
@@ -1113,7 +1108,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
}
|
|
|
|
|
|
if (!starved ||
|
|
|
- now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) {
|
|
|
+ now - lastTimeAtFairShare < getQueue().getFairSharePreemptionTimeout()) {
|
|
|
fairshareStarvation = Resources.none();
|
|
|
} else {
|
|
|
// The app has been starved for longer than preemption-timeout.
|
|
@@ -1128,7 +1123,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
* {@code share}.
|
|
|
*/
|
|
|
private boolean isUsageBelowShare(Resource usage, Resource share) {
|
|
|
- return fsQueue.getPolicy().getResourceCalculator().compare(
|
|
|
+ return getQueue().getPolicy().getResourceCalculator().compare(
|
|
|
scheduler.getClusterResource(), usage, share, true) < 0;
|
|
|
}
|
|
|
|