|
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
|
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
|
|
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
|
@@ -96,9 +97,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Priority defaultAppPriorityPerQueue;
|
|
|
|
|
|
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
|
|
|
-
|
|
|
- // Always give preference to this while activating the application attempts.
|
|
|
- private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
|
|
|
|
|
|
private volatile float minimumAllocationFactor;
|
|
|
|
|
@@ -126,6 +124,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
|
|
new HashMap<>();
|
|
|
|
|
|
+ @SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
public LeafQueue(CapacitySchedulerContext cs,
|
|
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
|
|
super(cs, queueName, parent, old);
|
|
@@ -133,6 +132,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
this.activeUsersManager = new ActiveUsersManager(metrics);
|
|
|
|
|
|
+ // One time initialization is enough since it is static ordering policy
|
|
|
+ this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
|
|
|
+
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("LeafQueue:" + " name=" + queueName
|
|
|
+ ", fullname=" + getQueuePath());
|
|
@@ -159,11 +161,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
|
|
|
|
|
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
|
|
|
- setPendingAppsOrderingPolicy(conf
|
|
|
- .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
|
|
|
- setPendingAppsOrderingPolicyRecovery(conf
|
|
|
- .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
|
|
|
-
|
|
|
+
|
|
|
userLimit = conf.getUserLimit(getQueuePath());
|
|
|
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
|
|
|
|
|
@@ -327,8 +325,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
}
|
|
|
|
|
|
public synchronized int getNumPendingApplications() {
|
|
|
- return pendingOrderingPolicy.getNumSchedulableEntities()
|
|
|
- + pendingOPForRecoveredApps.getNumSchedulableEntities();
|
|
|
+ return pendingOrderingPolicy.getNumSchedulableEntities();
|
|
|
}
|
|
|
|
|
|
public synchronized int getNumActiveApplications() {
|
|
@@ -627,18 +624,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
calculateAndGetAMResourceLimitPerPartition(nodePartition);
|
|
|
}
|
|
|
|
|
|
- activateApplications(getPendingAppsOrderingPolicyRecovery()
|
|
|
- .getAssignmentIterator(), userAmPartitionLimit);
|
|
|
-
|
|
|
- activateApplications(
|
|
|
- getPendingAppsOrderingPolicy().getAssignmentIterator(),
|
|
|
- userAmPartitionLimit);
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void activateApplications(
|
|
|
- Iterator<FiCaSchedulerApp> fsApp,
|
|
|
- Map<String, Resource> userAmPartitionLimit) {
|
|
|
- while (fsApp.hasNext()) {
|
|
|
+ for (Iterator<FiCaSchedulerApp> fsApp =
|
|
|
+ getPendingAppsOrderingPolicy().getAssignmentIterator();
|
|
|
+ fsApp.hasNext();) {
|
|
|
FiCaSchedulerApp application = fsApp.next();
|
|
|
ApplicationId applicationId = application.getApplicationId();
|
|
|
|
|
@@ -740,11 +728,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
User user) {
|
|
|
// Accept
|
|
|
user.submitApplication();
|
|
|
- if (application.isAttemptRecovering()) {
|
|
|
- getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
|
|
|
- } else {
|
|
|
- getPendingAppsOrderingPolicy().addSchedulableEntity(application);
|
|
|
- }
|
|
|
+ getPendingAppsOrderingPolicy().addSchedulableEntity(application);
|
|
|
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
|
|
|
|
|
|
// Activate applications
|
|
@@ -784,11 +768,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
boolean wasActive =
|
|
|
orderingPolicy.removeSchedulableEntity(application);
|
|
|
if (!wasActive) {
|
|
|
- if (application.isAttemptRecovering()) {
|
|
|
- pendingOPForRecoveredApps.removeSchedulableEntity(application);
|
|
|
- } else {
|
|
|
- pendingOrderingPolicy.removeSchedulableEntity(application);
|
|
|
- }
|
|
|
+ pendingOrderingPolicy.removeSchedulableEntity(application);
|
|
|
} else {
|
|
|
queueUsage.decAMUsed(partitionName,
|
|
|
application.getAMResource(partitionName));
|
|
@@ -1539,18 +1519,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
* Obtain (read-only) collection of pending applications.
|
|
|
*/
|
|
|
public Collection<FiCaSchedulerApp> getPendingApplications() {
|
|
|
- Collection<FiCaSchedulerApp> pendingApps =
|
|
|
- new ArrayList<FiCaSchedulerApp>();
|
|
|
- pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
|
|
|
- pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
|
|
|
- return pendingApps;
|
|
|
+ return Collections.unmodifiableCollection(pendingOrderingPolicy
|
|
|
+ .getSchedulableEntities());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Obtain (read-only) collection of active applications.
|
|
|
*/
|
|
|
public Collection<FiCaSchedulerApp> getApplications() {
|
|
|
- return orderingPolicy.getSchedulableEntities();
|
|
|
+ return Collections.unmodifiableCollection(orderingPolicy
|
|
|
+ .getSchedulableEntities());
|
|
|
}
|
|
|
|
|
|
// Consider the headroom for each user in the queue.
|
|
@@ -1587,10 +1565,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
@Override
|
|
|
public synchronized void collectSchedulerApplications(
|
|
|
Collection<ApplicationAttemptId> apps) {
|
|
|
- for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
|
|
|
- .getSchedulableEntities()) {
|
|
|
- apps.add(pendingApp.getApplicationAttemptId());
|
|
|
- }
|
|
|
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
|
|
|
.getSchedulableEntities()) {
|
|
|
apps.add(pendingApp.getApplicationAttemptId());
|
|
@@ -1759,30 +1733,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
getPendingAppsOrderingPolicy() {
|
|
|
return pendingOrderingPolicy;
|
|
|
}
|
|
|
- public synchronized void setPendingAppsOrderingPolicy(
|
|
|
- OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy) {
|
|
|
- if (null != this.pendingOrderingPolicy) {
|
|
|
- pendingOrderingPolicy
|
|
|
- .addAllSchedulableEntities(this.pendingOrderingPolicy
|
|
|
- .getSchedulableEntities());
|
|
|
- }
|
|
|
- this.pendingOrderingPolicy = pendingOrderingPolicy;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized OrderingPolicy<FiCaSchedulerApp>
|
|
|
- getPendingAppsOrderingPolicyRecovery() {
|
|
|
- return pendingOPForRecoveredApps;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void setPendingAppsOrderingPolicyRecovery(
|
|
|
- OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
|
|
|
- if (null != this.pendingOPForRecoveredApps) {
|
|
|
- pendingOrderingPolicyRecovery
|
|
|
- .addAllSchedulableEntities(this.pendingOPForRecoveredApps
|
|
|
- .getSchedulableEntities());
|
|
|
- }
|
|
|
- this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
|
|
|
- }
|
|
|
|
|
|
/*
|
|
|
* Holds shared values used by all applications in
|