|
@@ -73,6 +73,7 @@ import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
@@ -1189,84 +1190,72 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ private static String ensurePartition(String partition) {
|
|
|
+ return Optional.ofNullable(partition).orElse(RMNodeLabelsManager.NO_LABEL);
|
|
|
+ }
|
|
|
+
|
|
|
+ @FunctionalInterface
|
|
|
+ interface Counter {
|
|
|
+ void count(String partition, Resource resource);
|
|
|
+ }
|
|
|
+
|
|
|
+ @FunctionalInterface
|
|
|
+ interface CounterWithApp {
|
|
|
+ void count(String partition, Resource reservedRes, SchedulerApplicationAttempt application);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void count(String partition, Resource resource, Counter counter, Counter parentCounter) {
|
|
|
+ final String checkedPartition = ensurePartition(partition);
|
|
|
+ counter.count(checkedPartition, resource);
|
|
|
+ Optional.ofNullable(parentCounter).ifPresent(c -> c.count(checkedPartition, resource));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void countAndUpdate(String partition, Resource resource,
|
|
|
+ Counter counter, CounterWithApp parentCounter) {
|
|
|
+ final String checkedPartition = ensurePartition(partition);
|
|
|
+ counter.count(checkedPartition, resource);
|
|
|
+ CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
|
|
+ labelManager.getResourceByLabel(checkedPartition, Resources.none()),
|
|
|
+ checkedPartition, this);
|
|
|
+ Optional.ofNullable(parentCounter).ifPresent(c -> c.count(checkedPartition, resource, null));
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void incReservedResource(String partition, Resource reservedRes) {
|
|
|
- if (partition == null) {
|
|
|
- partition = RMNodeLabelsManager.NO_LABEL;
|
|
|
- }
|
|
|
-
|
|
|
- queueUsage.incReserved(partition, reservedRes);
|
|
|
- if(null != parent){
|
|
|
- parent.incReservedResource(partition, reservedRes);
|
|
|
- }
|
|
|
+ count(partition, reservedRes, queueUsage::incReserved,
|
|
|
+ parent == null ? null : parent::incReservedResource);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void decReservedResource(String partition, Resource reservedRes) {
|
|
|
- if (partition == null) {
|
|
|
- partition = RMNodeLabelsManager.NO_LABEL;
|
|
|
- }
|
|
|
-
|
|
|
- queueUsage.decReserved(partition, reservedRes);
|
|
|
- if(null != parent){
|
|
|
- parent.decReservedResource(partition, reservedRes);
|
|
|
- }
|
|
|
+ count(partition, reservedRes, queueUsage::decReserved,
|
|
|
+ parent == null ? null : parent::decReservedResource);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
|
|
|
- if (nodeLabel == null) {
|
|
|
- nodeLabel = RMNodeLabelsManager.NO_LABEL;
|
|
|
- }
|
|
|
- // ResourceUsage has its own lock, no addition lock needs here.
|
|
|
- queueUsage.incPending(nodeLabel, resourceToInc);
|
|
|
- if (null != parent) {
|
|
|
- parent.incPendingResource(nodeLabel, resourceToInc);
|
|
|
- }
|
|
|
+ count(nodeLabel, resourceToInc, queueUsage::incPending,
|
|
|
+ parent == null ? null : parent::incPendingResource);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
|
|
|
- if (nodeLabel == null) {
|
|
|
- nodeLabel = RMNodeLabelsManager.NO_LABEL;
|
|
|
- }
|
|
|
- // ResourceUsage has its own lock, no addition lock needs here.
|
|
|
- queueUsage.decPending(nodeLabel, resourceToDec);
|
|
|
- if (null != parent) {
|
|
|
- parent.decPendingResource(nodeLabel, resourceToDec);
|
|
|
- }
|
|
|
+ count(nodeLabel, resourceToDec, queueUsage::decPending,
|
|
|
+ parent == null ? null : parent::decPendingResource);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void incUsedResource(String nodeLabel, Resource resourceToInc,
|
|
|
SchedulerApplicationAttempt application) {
|
|
|
- if (nodeLabel == null) {
|
|
|
- nodeLabel = RMNodeLabelsManager.NO_LABEL;
|
|
|
- }
|
|
|
- // ResourceUsage has its own lock, no addition lock needs here.
|
|
|
- queueUsage.incUsed(nodeLabel, resourceToInc);
|
|
|
- CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
|
|
- labelManager.getResourceByLabel(nodeLabel, Resources.none()),
|
|
|
- nodeLabel, this);
|
|
|
- if (null != parent) {
|
|
|
- parent.incUsedResource(nodeLabel, resourceToInc, null);
|
|
|
- }
|
|
|
+ countAndUpdate(nodeLabel, resourceToInc, queueUsage::incUsed,
|
|
|
+ parent == null ? null : parent::incUsedResource);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void decUsedResource(String nodeLabel, Resource resourceToDec,
|
|
|
SchedulerApplicationAttempt application) {
|
|
|
- if (nodeLabel == null) {
|
|
|
- nodeLabel = RMNodeLabelsManager.NO_LABEL;
|
|
|
- }
|
|
|
- // ResourceUsage has its own lock, no addition lock needs here.
|
|
|
- queueUsage.decUsed(nodeLabel, resourceToDec);
|
|
|
- CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
|
|
- labelManager.getResourceByLabel(nodeLabel, Resources.none()),
|
|
|
- nodeLabel, this);
|
|
|
- if (null != parent) {
|
|
|
- parent.decUsedResource(nodeLabel, resourceToDec, null);
|
|
|
- }
|
|
|
+ countAndUpdate(nodeLabel, resourceToDec, queueUsage::decUsed,
|
|
|
+ parent == null ? null : parent::decUsedResource);
|
|
|
}
|
|
|
|
|
|
/**
|