|
@@ -77,8 +77,10 @@ import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
@@ -137,6 +139,11 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
|
|
new HashMap<>();
|
|
|
|
|
|
+ private Set<String> activeUsersSet =
|
|
|
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
|
|
+ private float activeUsersTimesWeights = 0.0f;
|
|
|
+ private float allUsersTimesWeights = 0.0f;
|
|
|
+
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
public LeafQueue(CapacitySchedulerContext cs,
|
|
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
|
@@ -231,6 +238,20 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
defaultAppPriorityPerQueue = Priority.newInstance(conf
|
|
|
.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
|
|
|
|
|
|
+ // Validate leaf queue's user's weights.
|
|
|
+ int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
|
|
|
+ for (Entry<String, Float> e : getUserWeights().entrySet()) {
|
|
|
+ float val = e.getValue().floatValue();
|
|
|
+ if (val < 0.0f || val > (100.0f / queueUL)) {
|
|
|
+ throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
|
|
|
+ + "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
|
|
|
+ 100.0f/queueUL + ", the number of concurrent active users in "
|
|
|
+ + getQueuePath() + ")");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ updateUserWeights();
|
|
|
+
|
|
|
LOG.info("Initializing " + queueName + "\n" +
|
|
|
"capacity = " + queueCapacities.getCapacity() +
|
|
|
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
|
@@ -279,6 +300,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
"defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue);
|
|
|
}
|
|
|
|
|
|
+ // This must be called from a synchronized method.
|
|
|
+ private void updateUserWeights() {
|
|
|
+ activeUsersSet = activeUsersManager.getActiveUsersSet();
|
|
|
+ for (Map.Entry<String, User> ue : users.entrySet()) {
|
|
|
+ ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
|
|
|
+ }
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
+ allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Used only by tests.
|
|
|
*/
|
|
@@ -418,10 +449,17 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (user == null) {
|
|
|
user = new User(userName);
|
|
|
users.put(userName, user);
|
|
|
+ user.setWeight(getUserWeightFromQueue(userName));
|
|
|
+ allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
|
}
|
|
|
return user;
|
|
|
}
|
|
|
|
|
|
+ private float getUserWeightFromQueue(String userName) {
|
|
|
+ Float weight = getUserWeights().get(userName);
|
|
|
+ return (weight == null) ? 1.0f : weight.floatValue();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @return an ArrayList of UserInfo objects who are active in this queue
|
|
|
*/
|
|
@@ -433,7 +471,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
.getAllUsed()), user.getActiveApplications(), user
|
|
|
.getPendingApplications(), Resources.clone(user
|
|
|
.getConsumedAMResources()), Resources.clone(user
|
|
|
- .getUserResourceLimit()), user.getResourceUsage()));
|
|
|
+ .getUserResourceLimit()), user.getResourceUsage(),
|
|
|
+ user.getWeight(), activeUsersSet.contains(user.userName)));
|
|
|
}
|
|
|
return usersToReturn;
|
|
|
}
|
|
@@ -560,19 +599,36 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
public synchronized Resource getUserAMResourceLimit() {
|
|
|
- return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
|
|
|
+ return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
public synchronized Resource getUserAMResourceLimitPerPartition(
|
|
|
- String nodePartition) {
|
|
|
+ String nodePartition, String userName) {
|
|
|
+ float userWeight = 1.0f;
|
|
|
+ if (userName != null && getUser(userName) != null) {
|
|
|
+ userWeight = getUser(userName).getWeight();
|
|
|
+ }
|
|
|
+ if (activeUsersManager.getActiveUsersChanged()) {
|
|
|
+ activeUsersSet = activeUsersManager.getActiveUsersSet();
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
+ activeUsersManager.clearActiveUsersChanged();
|
|
|
+ }
|
|
|
/*
|
|
|
* The user am resource limit is based on the same approach as the user
|
|
|
* limit (as it should represent a subset of that). This means that it uses
|
|
|
* the absolute queue capacity (per partition) instead of the max and is
|
|
|
* modified by the userlimit and the userlimit factor as is the userlimit
|
|
|
*/
|
|
|
- float effectiveUserLimit = Math.max(userLimit / 100.0f,
|
|
|
- 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
|
|
|
+ float effectiveUserLimit;
|
|
|
+ if (activeUsersTimesWeights > 0.0f) {
|
|
|
+ effectiveUserLimit = Math.max(userLimit / 100.0f,
|
|
|
+ 1.0f / activeUsersTimesWeights);
|
|
|
+ } else {
|
|
|
+ effectiveUserLimit = Math.max(userLimit / 100.0f,
|
|
|
+ 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
|
|
|
+ }
|
|
|
+ effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
|
|
|
|
|
|
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
|
|
|
resourceCalculator,
|
|
@@ -699,7 +755,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// Verify whether we already calculated user-am-limit for this label.
|
|
|
if (userAMLimit == null) {
|
|
|
- userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
|
|
|
+ userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
|
|
|
+ application.getUser());
|
|
|
userAmPartitionLimit.put(partitionName, userAMLimit);
|
|
|
}
|
|
|
|
|
@@ -814,6 +871,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
user.finishApplication(wasActive);
|
|
|
if (user.getTotalApplications() == 0) {
|
|
|
users.remove(application.getUser());
|
|
|
+ allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
|
}
|
|
|
|
|
|
// Check if we can activate more applications
|
|
@@ -1240,20 +1298,25 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
// Also, the queue's configured capacity should be higher than
|
|
|
// queue-hard-limit * ulMin
|
|
|
|
|
|
- final int usersCount;
|
|
|
+ float usersSummedByWeight;
|
|
|
if (forActive) {
|
|
|
- usersCount = activeUsersManager.getNumActiveUsers();
|
|
|
+ if (activeUsersManager.getActiveUsersChanged()) {
|
|
|
+ activeUsersSet = activeUsersManager.getActiveUsersSet();
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
+ activeUsersManager.clearActiveUsersChanged();
|
|
|
+ }
|
|
|
+ usersSummedByWeight = activeUsersTimesWeights;
|
|
|
} else {
|
|
|
- usersCount = users.size();
|
|
|
+ usersSummedByWeight = allUsersTimesWeights;
|
|
|
}
|
|
|
|
|
|
// User limit resource is determined by:
|
|
|
- // max{currentCapacity / #activeUsers, currentCapacity *
|
|
|
+ // max(currentCapacity / #activeUsers, currentCapacity *
|
|
|
// user-limit-percentage%)
|
|
|
Resource userLimitResource = Resources.max(
|
|
|
resourceCalculator, partitionResource,
|
|
|
Resources.divideAndCeil(
|
|
|
- resourceCalculator, currentCapacity, usersCount),
|
|
|
+ resourceCalculator, currentCapacity, usersSummedByWeight),
|
|
|
Resources.divideAndCeil(
|
|
|
resourceCalculator,
|
|
|
Resources.multiplyAndRoundDown(
|
|
@@ -1301,18 +1364,45 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
" qconsumed: " + queueUsage.getUsed() +
|
|
|
" consumedRatio: " + totalUserConsumedRatio +
|
|
|
" currentCapacity: " + currentCapacity +
|
|
|
- " activeUsers: " + usersCount +
|
|
|
+ " activeUsers: " + usersSummedByWeight +
|
|
|
" clusterCapacity: " + clusterResource +
|
|
|
" resourceByLabel: " + partitionResource +
|
|
|
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
|
|
|
- " Partition: " + nodePartition
|
|
|
+ " Partition: " + nodePartition +
|
|
|
+ " maxUserLimit=" + maxUserLimit +
|
|
|
+ " userWeight=" + ((user != null) ? user.getWeight() : 1.0f)
|
|
|
);
|
|
|
}
|
|
|
+ // Apply user's weight.
|
|
|
+ float weight = (user == null) ? 1.0f : user.getWeight();
|
|
|
+ userLimitResource =
|
|
|
+ Resources.multiplyAndNormalizeDown(resourceCalculator,
|
|
|
+ userLimitResource, weight, minimumAllocation);
|
|
|
+
|
|
|
if (forActive) {
|
|
|
user.setUserResourceLimit(userLimitResource);
|
|
|
}
|
|
|
return userLimitResource;
|
|
|
}
|
|
|
+
|
|
|
+ float sumActiveUsersTimesWeights() {
|
|
|
+ float count = 0.0f;
|
|
|
+ for (String userName : activeUsersSet) {
|
|
|
+ // Do the following instead of calling getUser to avoid synchronization.
|
|
|
+ User user = users.get(userName);
|
|
|
+ count += (user != null) ? user.getWeight() : 0.0f;
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized float sumAllUsersTimesWeights() {
|
|
|
+ float count = 0.0f;
|
|
|
+ for (String userName : users.keySet()) {
|
|
|
+ User user = getUser(userName);
|
|
|
+ count += user.getWeight();
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
|
|
|
@Private
|
|
|
protected synchronized boolean canAssignToUser(Resource clusterResource,
|
|
@@ -1753,6 +1843,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
int activeApplications = 0;
|
|
|
private UsageRatios userUsageRatios = new UsageRatios();
|
|
|
String userName;
|
|
|
+ float weight = 1.0f;
|
|
|
|
|
|
public User(String name) {
|
|
|
this.userName = name;
|
|
@@ -1854,6 +1945,20 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
public void setResourceUsage(ResourceUsage resourceUsage) {
|
|
|
this.userResourceUsage = resourceUsage;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the weight
|
|
|
+ */
|
|
|
+ public float getWeight() {
|
|
|
+ return weight;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param weight the weight to set
|
|
|
+ */
|
|
|
+ public void setWeight(float weight) {
|
|
|
+ this.weight = weight;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|