|
@@ -82,8 +82,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
|
|
|
// Preemption related variables
|
|
// Preemption related variables
|
|
private final Object preemptionVariablesLock = new Object();
|
|
private final Object preemptionVariablesLock = new Object();
|
|
- private final Resource preemptedResources = Resources.clone(Resources.none());
|
|
|
|
- private final Set<RMContainer> containersToPreempt = new HashSet<>();
|
|
|
|
|
|
+ private final Set<RMContainer> containersToBePreempted = new HashSet<>();
|
|
|
|
+ private final Resource resourcesToBePreempted =
|
|
|
|
+ Resources.clone(Resources.none());
|
|
|
|
|
|
private Resource fairshareStarvation = Resources.none();
|
|
private Resource fairshareStarvation = Resources.none();
|
|
private long lastTimeAtFairShare;
|
|
private long lastTimeAtFairShare;
|
|
@@ -565,16 +566,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
|
|
|
void trackContainerForPreemption(RMContainer container) {
|
|
void trackContainerForPreemption(RMContainer container) {
|
|
synchronized (preemptionVariablesLock) {
|
|
synchronized (preemptionVariablesLock) {
|
|
- if (containersToPreempt.add(container)) {
|
|
|
|
- Resources.addTo(preemptedResources, container.getAllocatedResource());
|
|
|
|
|
|
+ if (containersToBePreempted.add(container)) {
|
|
|
|
+ Resources.addTo(resourcesToBePreempted,
|
|
|
|
+ container.getAllocatedResource());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
private void untrackContainerForPreemption(RMContainer container) {
|
|
private void untrackContainerForPreemption(RMContainer container) {
|
|
synchronized (preemptionVariablesLock) {
|
|
synchronized (preemptionVariablesLock) {
|
|
- if (containersToPreempt.remove(container)) {
|
|
|
|
- Resources.subtractFrom(preemptedResources,
|
|
|
|
|
|
+ if (containersToBePreempted.remove(container)) {
|
|
|
|
+ Resources.subtractFrom(resourcesToBePreempted,
|
|
container.getAllocatedResource());
|
|
container.getAllocatedResource());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -583,7 +585,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
Set<ContainerId> getPreemptionContainerIds() {
|
|
Set<ContainerId> getPreemptionContainerIds() {
|
|
synchronized (preemptionVariablesLock) {
|
|
synchronized (preemptionVariablesLock) {
|
|
Set<ContainerId> preemptionContainerIds = new HashSet<>();
|
|
Set<ContainerId> preemptionContainerIds = new HashSet<>();
|
|
- for (RMContainer container : containersToPreempt) {
|
|
|
|
|
|
+ for (RMContainer container : containersToBePreempted) {
|
|
preemptionContainerIds.add(container.getContainerId());
|
|
preemptionContainerIds.add(container.getContainerId());
|
|
}
|
|
}
|
|
return preemptionContainerIds;
|
|
return preemptionContainerIds;
|
|
@@ -604,7 +606,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
}
|
|
}
|
|
|
|
|
|
synchronized (preemptionVariablesLock) {
|
|
synchronized (preemptionVariablesLock) {
|
|
- if (containersToPreempt.contains(container)) {
|
|
|
|
|
|
+ if (containersToBePreempted.contains(container)) {
|
|
// The container is already under consideration for preemption
|
|
// The container is already under consideration for preemption
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -1271,9 +1273,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
// Subtract copies the object, so that we have a snapshot,
|
|
// Subtract copies the object, so that we have a snapshot,
|
|
// in case usage changes, while the caller is using the value
|
|
// in case usage changes, while the caller is using the value
|
|
synchronized (preemptionVariablesLock) {
|
|
synchronized (preemptionVariablesLock) {
|
|
- return containersToPreempt.isEmpty()
|
|
|
|
|
|
+ return containersToBePreempted.isEmpty()
|
|
? getCurrentConsumption()
|
|
? getCurrentConsumption()
|
|
- : Resources.subtract(getCurrentConsumption(), preemptedResources);
|
|
|
|
|
|
+ : Resources.subtract(getCurrentConsumption(), resourcesToBePreempted);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|