|
@@ -87,8 +87,8 @@ public class ReservedContainerCandidatesSelector
|
|
|
|
|
|
// Get list of nodes for preemption, ordered by preemption cost
|
|
|
List<NodeForPreemption> nodesForPreemption = getNodesForPreemption(
|
|
|
- clusterResource, queueToPreemptableResourceByPartition,
|
|
|
- selectedCandidates, totalPreemptedResourceAllowed);
|
|
|
+ queueToPreemptableResourceByPartition, selectedCandidates,
|
|
|
+ totalPreemptedResourceAllowed);
|
|
|
|
|
|
for (NodeForPreemption nfp : nodesForPreemption) {
|
|
|
RMContainer reservedContainer = nfp.schedulerNode.getReservedContainer();
|
|
@@ -97,9 +97,8 @@ public class ReservedContainerCandidatesSelector
|
|
|
}
|
|
|
|
|
|
NodeForPreemption preemptionResult = getPreemptionCandidatesOnNode(
|
|
|
- nfp.schedulerNode, clusterResource,
|
|
|
- queueToPreemptableResourceByPartition, selectedCandidates,
|
|
|
- totalPreemptedResourceAllowed, false);
|
|
|
+ nfp.schedulerNode, queueToPreemptableResourceByPartition,
|
|
|
+ selectedCandidates, totalPreemptedResourceAllowed, false);
|
|
|
if (null != preemptionResult) {
|
|
|
for (RMContainer c : preemptionResult.selectedContainers) {
|
|
|
ApplicationAttemptId appId = c.getApplicationAttemptId();
|
|
@@ -135,8 +134,7 @@ public class ReservedContainerCandidatesSelector
|
|
|
return preemptable;
|
|
|
}
|
|
|
|
|
|
- private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
|
|
|
- String partitionName,
|
|
|
+ private boolean tryToPreemptFromQueue(String queueName, String partitionName,
|
|
|
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
|
|
|
Resource required, Resource totalPreemptionAllowed, boolean readOnly) {
|
|
|
Resource preemptable = getPreemptableResource(queueName, partitionName,
|
|
@@ -145,11 +143,11 @@ public class ReservedContainerCandidatesSelector
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- if (!Resources.fitsIn(rc, cluster, required, preemptable)) {
|
|
|
+ if (!Resources.fitsIn(rc, required, preemptable)) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- if (!Resources.fitsIn(rc, cluster, required, totalPreemptionAllowed)) {
|
|
|
+ if (!Resources.fitsIn(rc, required, totalPreemptionAllowed)) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -165,7 +163,6 @@ public class ReservedContainerCandidatesSelector
|
|
|
/**
|
|
|
* Try to check if we can preempt resources for reserved container in given node
|
|
|
* @param node
|
|
|
- * @param cluster
|
|
|
* @param queueToPreemptableResourceByPartition it's a map of
|
|
|
* <queueName, <partition, preemptable-resource>>
|
|
|
* @param readOnly do we want to modify preemptable resource after we selected
|
|
@@ -174,7 +171,7 @@ public class ReservedContainerCandidatesSelector
|
|
|
* to satisfy reserved resource
|
|
|
*/
|
|
|
private NodeForPreemption getPreemptionCandidatesOnNode(
|
|
|
- FiCaSchedulerNode node, Resource cluster,
|
|
|
+ FiCaSchedulerNode node,
|
|
|
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
|
|
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
|
|
|
Resource totalPreemptionAllowed, boolean readOnly) {
|
|
@@ -204,8 +201,7 @@ public class ReservedContainerCandidatesSelector
|
|
|
String partition = node.getPartition();
|
|
|
|
|
|
// Avoid preempt any container if required <= available + killable
|
|
|
- if (Resources.fitsIn(rc, cluster, reservedContainer.getReservedResource(),
|
|
|
- cur)) {
|
|
|
+ if (Resources.fitsIn(rc, reservedContainer.getReservedResource(), cur)) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -232,9 +228,9 @@ public class ReservedContainerCandidatesSelector
|
|
|
|
|
|
// Can we preempt container c?
|
|
|
// Check if we have quota to preempt this container
|
|
|
- boolean canPreempt = tryToPreemptFromQueue(cluster, containerQueueName,
|
|
|
- partition, queueToPreemptableResourceByPartition,
|
|
|
- c.getAllocatedResource(), totalPreemptionAllowed, readOnly);
|
|
|
+ boolean canPreempt = tryToPreemptFromQueue(containerQueueName, partition,
|
|
|
+ queueToPreemptableResourceByPartition, c.getAllocatedResource(),
|
|
|
+ totalPreemptionAllowed, readOnly);
|
|
|
|
|
|
// If we can, add to selected container, and change resource accordingly.
|
|
|
if (canPreempt) {
|
|
@@ -246,7 +242,7 @@ public class ReservedContainerCandidatesSelector
|
|
|
Resources.addTo(totalSelected, c.getAllocatedResource());
|
|
|
}
|
|
|
Resources.addTo(cur, c.getAllocatedResource());
|
|
|
- if (Resources.fitsIn(rc, cluster,
|
|
|
+ if (Resources.fitsIn(rc,
|
|
|
reservedContainer.getReservedResource(), cur)) {
|
|
|
canAllocateReservedContainer = true;
|
|
|
break;
|
|
@@ -282,7 +278,7 @@ public class ReservedContainerCandidatesSelector
|
|
|
return nfp;
|
|
|
}
|
|
|
|
|
|
- private List<NodeForPreemption> getNodesForPreemption(Resource cluster,
|
|
|
+ private List<NodeForPreemption> getNodesForPreemption(
|
|
|
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
|
|
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
|
|
|
Resource totalPreemptionAllowed) {
|
|
@@ -292,7 +288,7 @@ public class ReservedContainerCandidatesSelector
|
|
|
for (FiCaSchedulerNode node : preemptionContext.getScheduler()
|
|
|
.getAllNodes()) {
|
|
|
if (node.getReservedContainer() != null) {
|
|
|
- NodeForPreemption nfp = getPreemptionCandidatesOnNode(node, cluster,
|
|
|
+ NodeForPreemption nfp = getPreemptionCandidatesOnNode(node,
|
|
|
queueToPreemptableResourceByPartition, selectedCandidates,
|
|
|
totalPreemptionAllowed, true);
|
|
|
if (null != nfp) {
|