|
@@ -55,8 +55,9 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|
|
private Map<String, N> nodeNameToNodeMap = new HashMap<>();
|
|
|
private Map<String, List<N>> nodesPerRack = new HashMap<>();
|
|
|
|
|
|
- private Resource clusterCapacity = Resources.clone(Resources.none());
|
|
|
- private Resource staleClusterCapacity = null;
|
|
|
+ private final Resource clusterCapacity = Resources.clone(Resources.none());
|
|
|
+ private volatile Resource staleClusterCapacity =
|
|
|
+ Resources.clone(Resources.none());
|
|
|
|
|
|
// Max allocation
|
|
|
private long maxNodeMemory = -1;
|
|
@@ -82,6 +83,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|
|
|
|
|
// Update cluster capacity
|
|
|
Resources.addTo(clusterCapacity, node.getTotalResource());
|
|
|
+ staleClusterCapacity = Resources.clone(clusterCapacity);
|
|
|
|
|
|
// Update maximumAllocation
|
|
|
updateMaxResources(node, true);
|
|
@@ -139,16 +141,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|
|
}
|
|
|
|
|
|
public Resource getClusterCapacity() {
|
|
|
- readLock.lock();
|
|
|
- try {
|
|
|
- if (staleClusterCapacity == null ||
|
|
|
- !Resources.equals(staleClusterCapacity, clusterCapacity)) {
|
|
|
- staleClusterCapacity = Resources.clone(clusterCapacity);
|
|
|
- }
|
|
|
- return staleClusterCapacity;
|
|
|
- } finally {
|
|
|
- readLock.unlock();
|
|
|
- }
|
|
|
+ return staleClusterCapacity;
|
|
|
}
|
|
|
|
|
|
public N removeNode(NodeId nodeId) {
|
|
@@ -175,6 +168,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
|
|
|
|
|
// Update cluster capacity
|
|
|
Resources.subtractFrom(clusterCapacity, node.getTotalResource());
|
|
|
+ staleClusterCapacity = Resources.clone(clusterCapacity);
|
|
|
|
|
|
// Update maximumAllocation
|
|
|
updateMaxResources(node, false);
|