|
@@ -107,6 +107,9 @@ public class ProportionalCapacityPreemptionPolicy
|
|
private float minimumThresholdForIntraQueuePreemption;
|
|
private float minimumThresholdForIntraQueuePreemption;
|
|
private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
|
|
private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
|
|
|
|
|
|
|
|
+ // Current configuration
|
|
|
|
+ private CapacitySchedulerConfiguration csConfig;
|
|
|
|
+
|
|
// Pointer to other RM components
|
|
// Pointer to other RM components
|
|
private RMContext rmContext;
|
|
private RMContext rmContext;
|
|
private ResourceCalculator rc;
|
|
private ResourceCalculator rc;
|
|
@@ -120,8 +123,7 @@ public class ProportionalCapacityPreemptionPolicy
|
|
new HashMap<>();
|
|
new HashMap<>();
|
|
private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
|
|
private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
|
|
new HashMap<String, LinkedHashSet<String>>();
|
|
new HashMap<String, LinkedHashSet<String>>();
|
|
- private List<PreemptionCandidatesSelector>
|
|
|
|
- candidatesSelectionPolicies = new ArrayList<>();
|
|
|
|
|
|
+ private List<PreemptionCandidatesSelector> candidatesSelectionPolicies;
|
|
private Set<String> allPartitions;
|
|
private Set<String> allPartitions;
|
|
private Set<String> leafQueueNames;
|
|
private Set<String> leafQueueNames;
|
|
|
|
|
|
@@ -159,60 +161,68 @@ public class ProportionalCapacityPreemptionPolicy
|
|
}
|
|
}
|
|
rmContext = context;
|
|
rmContext = context;
|
|
scheduler = (CapacityScheduler) sched;
|
|
scheduler = (CapacityScheduler) sched;
|
|
- CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration();
|
|
|
|
|
|
+ rc = scheduler.getResourceCalculator();
|
|
|
|
+ nlm = scheduler.getRMContext().getNodeLabelManager();
|
|
|
|
+ updateConfigIfNeeded();
|
|
|
|
+ }
|
|
|
|
|
|
- maxIgnoredOverCapacity = csConfig.getDouble(
|
|
|
|
|
|
+ private void updateConfigIfNeeded() {
|
|
|
|
+ CapacitySchedulerConfiguration config = scheduler.getConfiguration();
|
|
|
|
+ if (config == csConfig) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ maxIgnoredOverCapacity = config.getDouble(
|
|
CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
|
|
CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY);
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY);
|
|
|
|
|
|
- naturalTerminationFactor = csConfig.getDouble(
|
|
|
|
|
|
+ naturalTerminationFactor = config.getDouble(
|
|
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
|
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR);
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR);
|
|
|
|
|
|
- maxWaitTime = csConfig.getLong(
|
|
|
|
|
|
+ maxWaitTime = config.getLong(
|
|
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
|
|
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL);
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL);
|
|
|
|
|
|
- monitoringInterval = csConfig.getLong(
|
|
|
|
|
|
+ monitoringInterval = config.getLong(
|
|
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
|
|
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL);
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL);
|
|
|
|
|
|
- percentageClusterPreemptionAllowed = csConfig.getFloat(
|
|
|
|
|
|
+ percentageClusterPreemptionAllowed = config.getFloat(
|
|
CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
|
|
CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND);
|
|
CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND);
|
|
|
|
|
|
- observeOnly = csConfig.getBoolean(
|
|
|
|
|
|
+ observeOnly = config.getBoolean(
|
|
CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
|
|
CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY);
|
|
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY);
|
|
|
|
|
|
- lazyPreempionEnabled = csConfig.getBoolean(
|
|
|
|
- CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
|
|
|
|
|
|
+ lazyPreempionEnabled = config.getBoolean(
|
|
|
|
+ CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED,
|
|
CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
|
|
CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
|
|
|
|
|
|
- maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat(
|
|
|
|
|
|
+ maxAllowableLimitForIntraQueuePreemption = config.getFloat(
|
|
CapacitySchedulerConfiguration.
|
|
CapacitySchedulerConfiguration.
|
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
|
CapacitySchedulerConfiguration.
|
|
CapacitySchedulerConfiguration.
|
|
DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
|
|
DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
|
|
|
|
|
|
- minimumThresholdForIntraQueuePreemption = csConfig.getFloat(
|
|
|
|
|
|
+ minimumThresholdForIntraQueuePreemption = config.getFloat(
|
|
CapacitySchedulerConfiguration.
|
|
CapacitySchedulerConfiguration.
|
|
INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
|
|
INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
|
|
CapacitySchedulerConfiguration.
|
|
CapacitySchedulerConfiguration.
|
|
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
|
|
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
|
|
|
|
|
|
intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
|
|
intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
|
|
- .valueOf(csConfig
|
|
|
|
|
|
+ .valueOf(config
|
|
.get(
|
|
.get(
|
|
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
|
|
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
|
|
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
|
|
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
|
|
.toUpperCase());
|
|
.toUpperCase());
|
|
|
|
|
|
- rc = scheduler.getResourceCalculator();
|
|
|
|
- nlm = scheduler.getRMContext().getNodeLabelManager();
|
|
|
|
|
|
+ candidatesSelectionPolicies = new ArrayList<>();
|
|
|
|
|
|
// Do we need to specially consider reserved containers?
|
|
// Do we need to specially consider reserved containers?
|
|
- boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
|
|
|
|
|
|
+ boolean selectCandidatesForResevedContainers = config.getBoolean(
|
|
CapacitySchedulerConfiguration.
|
|
CapacitySchedulerConfiguration.
|
|
PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
|
|
PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
|
|
CapacitySchedulerConfiguration.
|
|
CapacitySchedulerConfiguration.
|
|
@@ -222,7 +232,7 @@ public class ProportionalCapacityPreemptionPolicy
|
|
.add(new ReservedContainerCandidatesSelector(this));
|
|
.add(new ReservedContainerCandidatesSelector(this));
|
|
}
|
|
}
|
|
|
|
|
|
- boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean(
|
|
|
|
|
|
+ boolean additionalPreemptionBasedOnReservedResource = config.getBoolean(
|
|
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
|
|
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
|
|
CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
|
|
CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
|
|
|
|
|
|
@@ -231,12 +241,37 @@ public class ProportionalCapacityPreemptionPolicy
|
|
additionalPreemptionBasedOnReservedResource));
|
|
additionalPreemptionBasedOnReservedResource));
|
|
|
|
|
|
// Do we need to specially consider intra queue
|
|
// Do we need to specially consider intra queue
|
|
- boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
|
|
|
|
|
|
+ boolean isIntraQueuePreemptionEnabled = config.getBoolean(
|
|
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
|
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
|
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
|
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
|
if (isIntraQueuePreemptionEnabled) {
|
|
if (isIntraQueuePreemptionEnabled) {
|
|
candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
|
|
candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ LOG.info("Capacity Scheduler configuration changed, updated preemption " +
|
|
|
|
+ "properties to:\n" +
|
|
|
|
+ "max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" +
|
|
|
|
+ "natural_termination_factor = " + naturalTerminationFactor + "\n" +
|
|
|
|
+ "max_wait_before_kill = " + maxWaitTime + "\n" +
|
|
|
|
+ "monitoring_interval = " + monitoringInterval + "\n" +
|
|
|
|
+ "total_preemption_per_round = " + percentageClusterPreemptionAllowed +
|
|
|
|
+ "\n" +
|
|
|
|
+ "observe_only = " + observeOnly + "\n" +
|
|
|
|
+ "lazy-preemption-enabled = " + lazyPreempionEnabled + "\n" +
|
|
|
|
+ "intra-queue-preemption.enabled = " + isIntraQueuePreemptionEnabled +
|
|
|
|
+ "\n" +
|
|
|
|
+ "intra-queue-preemption.max-allowable-limit = " +
|
|
|
|
+ maxAllowableLimitForIntraQueuePreemption + "\n" +
|
|
|
|
+ "intra-queue-preemption.minimum-threshold = " +
|
|
|
|
+ minimumThresholdForIntraQueuePreemption + "\n" +
|
|
|
|
+ "intra-queue-preemption.preemption-order-policy = " +
|
|
|
|
+ intraQueuePreemptionOrderPolicy + "\n" +
|
|
|
|
+ "select_based_on_reserved_containers = " +
|
|
|
|
+ selectCandidatesForResevedContainers + "\n" +
|
|
|
|
+ "additional_res_balance_based_on_reserved_containers = " +
|
|
|
|
+ additionalPreemptionBasedOnReservedResource);
|
|
|
|
+
|
|
|
|
+ csConfig = config;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -246,6 +281,8 @@ public class ProportionalCapacityPreemptionPolicy
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void editSchedule() {
|
|
public void editSchedule() {
|
|
|
|
+ updateConfigIfNeeded();
|
|
|
|
+
|
|
long startTs = clock.getTime();
|
|
long startTs = clock.getTime();
|
|
|
|
|
|
CSQueue root = scheduler.getRootQueue();
|
|
CSQueue root = scheduler.getRootQueue();
|