|
@@ -245,6 +245,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
|
protected ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
|
|
protected ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
|
|
protected ReentrantReadWriteLock clusterNodesLock =
|
|
protected ReentrantReadWriteLock clusterNodesLock =
|
|
new ReentrantReadWriteLock();
|
|
new ReentrantReadWriteLock();
|
|
|
|
+ private long nodeComputationInterval;
|
|
|
|
|
|
Runnable computeTask = new Runnable() {
|
|
Runnable computeTask = new Runnable() {
|
|
@Override
|
|
@Override
|
|
@@ -278,12 +279,15 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
|
this.sortedNodes = new ArrayList<>();
|
|
this.sortedNodes = new ArrayList<>();
|
|
this.scheduledExecutor = Executors.newScheduledThreadPool(1);
|
|
this.scheduledExecutor = Executors.newScheduledThreadPool(1);
|
|
this.comparator = comparator;
|
|
this.comparator = comparator;
|
|
- this.scheduledExecutor.scheduleAtFixedRate(computeTask,
|
|
|
|
- nodeComputationInterval, nodeComputationInterval,
|
|
|
|
- TimeUnit.MILLISECONDS);
|
|
|
|
|
|
+ this.nodeComputationInterval = nodeComputationInterval;
|
|
numNodesForAnyAllocation = numNodes;
|
|
numNodesForAnyAllocation = numNodes;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void start() {
|
|
|
|
+ this.scheduledExecutor.scheduleAtFixedRate(computeTask, nodeComputationInterval,
|
|
|
|
+ nodeComputationInterval, TimeUnit.MILLISECONDS);
|
|
|
|
+ }
|
|
|
|
+
|
|
protected void updateSortedNodes() {
|
|
protected void updateSortedNodes() {
|
|
List<NodeId> nodeIds = sortNodes(true).stream()
|
|
List<NodeId> nodeIds = sortNodes(true).stream()
|
|
.map(n -> n.nodeId)
|
|
.map(n -> n.nodeId)
|