|
@@ -92,6 +92,9 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
|
|
|
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ private float activeUsersTimesWeights = 0.0f;
|
|
|
+ private float allUsersTimesWeights = 0.0f;
|
|
|
+
|
|
|
/**
|
|
|
* UsageRatios will store the total used resources ratio across all users of
|
|
|
* the queue.
|
|
@@ -158,6 +161,7 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
|
|
|
private UsageRatios userUsageRatios = new UsageRatios();
|
|
|
private WriteLock writeLock;
|
|
|
+ private float weight;
|
|
|
|
|
|
public User(String name) {
|
|
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
@@ -262,6 +266,20 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
public void setResourceUsage(ResourceUsage resourceUsage) {
|
|
|
this.userResourceUsage = resourceUsage;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the weight
|
|
|
+ */
|
|
|
+ public float getWeight() {
|
|
|
+ return weight;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param weight the weight to set
|
|
|
+ */
|
|
|
+ public void setWeight(float weight) {
|
|
|
+ this.weight = weight;
|
|
|
+ }
|
|
|
} /* End of User class */
|
|
|
|
|
|
/**
|
|
@@ -382,6 +400,8 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
// Remove user from active/non-active list as well.
|
|
|
activeUsersSet.remove(userName);
|
|
|
nonActiveUsersSet.remove(userName);
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
+ allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
|
} finally {
|
|
|
writeLock.unlock();
|
|
|
}
|
|
@@ -418,6 +438,8 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
*/
|
|
|
private void addUser(String userName, User user) {
|
|
|
this.users.put(userName, user);
|
|
|
+ user.setWeight(getUserWeightFromQueue(userName));
|
|
|
+ allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -434,7 +456,8 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
user.getActiveApplications(), user.getPendingApplications(),
|
|
|
Resources.clone(user.getConsumedAMResources()),
|
|
|
Resources.clone(user.getUserResourceLimit()),
|
|
|
- user.getResourceUsage()));
|
|
|
+ user.getResourceUsage(), user.getWeight(),
|
|
|
+ activeUsersSet.contains(user.userName)));
|
|
|
}
|
|
|
return usersToReturn;
|
|
|
} finally {
|
|
@@ -442,6 +465,11 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private float getUserWeightFromQueue(String userName) {
|
|
|
+ Float weight = lQueue.getUserWeights().get(userName);
|
|
|
+ return (weight == null) ? 1.0f : weight.floatValue();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get computed user-limit for all ACTIVE users in this queue. If cached data
|
|
|
* is invalidated due to resource change, this method also enforce to
|
|
@@ -480,13 +508,24 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
writeLock.unlock();
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("userLimit is fetched. userLimit = "
|
|
|
- + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
|
|
|
- + schedulingMode + ", partition=" + nodePartition);
|
|
|
+ Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
|
|
|
+ User user = getUser(userName);
|
|
|
+ float weight = (user == null) ? 1.0f : user.getWeight();
|
|
|
+ Resource userSpecificUserLimit =
|
|
|
+ Resources.multiplyAndNormalizeDown(resourceCalculator,
|
|
|
+ userLimitResource, weight, lQueue.getMinimumAllocation());
|
|
|
+
|
|
|
+ if (user != null) {
|
|
|
+ user.setUserResourceLimit(userSpecificUserLimit);
|
|
|
}
|
|
|
|
|
|
- return userLimitPerSchedulingMode.get(schedulingMode);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
|
|
|
+ + ", userSpecificUserLimit=" + userSpecificUserLimit
|
|
|
+ + ", schedulingMode=" + schedulingMode
|
|
|
+ + ", partition=" + nodePartition);
|
|
|
+ }
|
|
|
+ return userSpecificUserLimit;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -527,13 +566,21 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
writeLock.unlock();
|
|
|
}
|
|
|
|
|
|
+ Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
|
|
|
+ User user = getUser(userName);
|
|
|
+ float weight = (user == null) ? 1.0f : user.getWeight();
|
|
|
+ Resource userSpecificUserLimit =
|
|
|
+ Resources.multiplyAndNormalizeDown(resourceCalculator,
|
|
|
+ userLimitResource, weight, lQueue.getMinimumAllocation());
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("userLimit is fetched. userLimit = "
|
|
|
- + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
|
|
|
- + schedulingMode + ", partition=" + nodePartition);
|
|
|
+ LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
|
|
|
+ + ", userSpecificUserLimit=" + userSpecificUserLimit
|
|
|
+ + ", schedulingMode=" + schedulingMode
|
|
|
+ + ", partition=" + nodePartition);
|
|
|
}
|
|
|
|
|
|
- return userLimitPerSchedulingMode.get(schedulingMode);
|
|
|
+ return userSpecificUserLimit;
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -656,16 +703,19 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
queueCapacity, required);
|
|
|
|
|
|
/*
|
|
|
- * We want to base the userLimit calculation on max(queueCapacity,
|
|
|
- * usedResources+required). However, we want usedResources to be based on
|
|
|
- * the combined ratios of all the users in the queue so we use consumedRatio
|
|
|
- * to calculate such. The calculation is dependent on how the
|
|
|
- * resourceCalculator calculates the ratio between two Resources. DRF
|
|
|
- * Example: If usedResources is greater than queueCapacity and users have
|
|
|
- * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
|
|
|
- * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
|
|
|
- * is then 20+30=50%. Yes, this value can be larger than 100% but for the
|
|
|
- * purposes of making sure all users are getting their fair share, it works.
|
|
|
+ * We want to base the userLimit calculation on
|
|
|
+ * max(queueCapacity, usedResources+required). However, we want
|
|
|
+ * usedResources to be based on the combined ratios of all the users in the
|
|
|
+ * queue so we use consumedRatio to calculate such.
|
|
|
+ * The calculation is dependent on how the resourceCalculator calculates the
|
|
|
+ * ratio between two Resources. DRF Example: If usedResources is greater
|
|
|
+ * than queueCapacity and users have the following [mem,cpu] usages:
|
|
|
+ *
|
|
|
+ * User1: [10%,20%] - Dominant resource is 20%
|
|
|
+ * User2: [30%,10%] - Dominant resource is 30%
|
|
|
+ * Then total consumedRatio is then 20+30=50%. Yes, this value can be
|
|
|
+ * larger than 100% but for the purposes of making sure all users are
|
|
|
+ * getting their fair share, it works.
|
|
|
*/
|
|
|
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
|
|
|
partitionResource, getUsageRatio(nodePartition),
|
|
@@ -680,23 +730,23 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
* capacity * user-limit-factor. Also, the queue's configured capacity
|
|
|
* should be higher than queue-hard-limit * ulMin
|
|
|
*/
|
|
|
- int usersCount = getNumActiveUsers();
|
|
|
+ float usersSummedByWeight = activeUsersTimesWeights;
|
|
|
Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
|
|
|
|
|
|
// For non-activeUser calculation, consider all users count.
|
|
|
if (!activeUser) {
|
|
|
resourceUsed = currentCapacity;
|
|
|
- usersCount = users.size();
|
|
|
+ usersSummedByWeight = allUsersTimesWeights;
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * User limit resource is determined by: max{currentCapacity / #activeUsers,
|
|
|
+ * User limit resource is determined by: max(currentCapacity / #activeUsers,
|
|
|
* currentCapacity * user-limit-percentage%)
|
|
|
*/
|
|
|
Resource userLimitResource = Resources.max(resourceCalculator,
|
|
|
partitionResource,
|
|
|
Resources.divideAndCeil(resourceCalculator, resourceUsed,
|
|
|
- usersCount),
|
|
|
+ usersSummedByWeight),
|
|
|
Resources.divideAndCeil(resourceCalculator,
|
|
|
Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
|
|
|
100));
|
|
@@ -727,18 +777,26 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
lQueue.getMinimumAllocation());
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("User limit computation for " + userName + " in queue "
|
|
|
- + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
|
|
|
- + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
|
|
|
- + required + " consumed: " + consumed + " user-limit-resource: "
|
|
|
- + userLimitResource + " queueCapacity: " + queueCapacity
|
|
|
- + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
|
|
|
- + " currentCapacity: " + currentCapacity + " activeUsers: "
|
|
|
- + usersCount + " clusterCapacity: " + clusterResource
|
|
|
- + " resourceByLabel: " + partitionResource + " usageratio: "
|
|
|
- + getUsageRatio(nodePartition) + " Partition: " + nodePartition);
|
|
|
- }
|
|
|
- getUser(userName).setUserResourceLimit(userLimitResource);
|
|
|
+ LOG.debug("User limit computation for " + userName
|
|
|
+ + ", in queue: " + lQueue.getQueueName()
|
|
|
+ + ", userLimitPercent=" + lQueue.getUserLimit()
|
|
|
+ + ", userLimitFactor=" + lQueue.getUserLimitFactor()
|
|
|
+ + ", required=" + required
|
|
|
+ + ", consumed=" + consumed
|
|
|
+ + ", user-limit-resource=" + userLimitResource
|
|
|
+ + ", queueCapacity=" + queueCapacity
|
|
|
+ + ", qconsumed=" + lQueue.getQueueResourceUsage().getUsed()
|
|
|
+ + ", currentCapacity=" + currentCapacity
|
|
|
+ + ", activeUsers=" + usersSummedByWeight
|
|
|
+ + ", clusterCapacity=" + clusterResource
|
|
|
+ + ", resourceByLabel=" + partitionResource
|
|
|
+ + ", usageratio=" + getUsageRatio(nodePartition)
|
|
|
+ + ", Partition=" + nodePartition
|
|
|
+ + ", resourceUsed=" + resourceUsed
|
|
|
+ + ", maxUserLimit=" + maxUserLimit
|
|
|
+ + ", userWeight=" + getUser(userName).getWeight()
|
|
|
+ );
|
|
|
+ }
|
|
|
return userLimitResource;
|
|
|
}
|
|
|
|
|
@@ -838,6 +896,32 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
return activeUsers.get();
|
|
|
}
|
|
|
|
|
|
+ float sumActiveUsersTimesWeights() {
|
|
|
+ float count = 0.0f;
|
|
|
+ try {
|
|
|
+ this.readLock.lock();
|
|
|
+ for (String u : activeUsersSet) {
|
|
|
+ count += getUser(u).getWeight();
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ float sumAllUsersTimesWeights() {
|
|
|
+ float count = 0.0f;
|
|
|
+ try {
|
|
|
+ this.readLock.lock();
|
|
|
+ for (String u : users.keySet()) {
|
|
|
+ count += getUser(u).getWeight();
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void updateActiveUsersResourceUsage(String userName) {
|
|
|
try {
|
|
|
this.writeLock.lock();
|
|
@@ -850,6 +934,7 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
if (nonActiveUsersSet.contains(userName)) {
|
|
|
nonActiveUsersSet.remove(userName);
|
|
|
activeUsersSet.add(userName);
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
|
|
|
// Update total resource usage of active and non-active after user
|
|
|
// is moved from non-active to active.
|
|
@@ -888,6 +973,7 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
if (activeUsersSet.contains(userName)) {
|
|
|
activeUsersSet.remove(userName);
|
|
|
nonActiveUsersSet.add(userName);
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
|
|
|
// Update total resource usage of active and non-active after user is
|
|
|
// moved from active to non-active.
|
|
@@ -988,4 +1074,18 @@ public class UsersManager implements AbstractUsersManager {
|
|
|
+ totalResUsageForNonActiveUsers.getAllUsed());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public void updateUserWeights() {
|
|
|
+ try {
|
|
|
+ this.writeLock.lock();
|
|
|
+ for (Map.Entry<String, User> ue : users.entrySet()) {
|
|
|
+ ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
|
|
|
+ }
|
|
|
+ activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
|
|
+ allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
|
+ userLimitNeedsRecompute();
|
|
|
+ } finally {
|
|
|
+ this.writeLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|