|
@@ -397,10 +397,16 @@ public class FifoIntraQueuePreemptionPlugin
|
|
|
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
|
|
|
.getResourceUsage();
|
|
|
|
|
|
+ // perUserAMUsed was populated with running apps, now we are looping
|
|
|
+ // through both running and pending apps.
|
|
|
+ Resource userSpecificAmUsed = perUserAMUsed.get(userName);
|
|
|
+ amUsed = (userSpecificAmUsed == null)
|
|
|
+ ? Resources.none() : userSpecificAmUsed;
|
|
|
+
|
|
|
TempUserPerPartition tmpUser = new TempUserPerPartition(
|
|
|
tq.leafQueue.getUser(userName), tq.queueName,
|
|
|
Resources.clone(userResourceUsage.getUsed(partition)),
|
|
|
- Resources.clone(perUserAMUsed.get(userName)),
|
|
|
+ Resources.clone(userSpecificAmUsed),
|
|
|
Resources.clone(userResourceUsage.getReserved(partition)),
|
|
|
Resources.none());
|
|
|
|
|
@@ -547,15 +553,17 @@ public class FifoIntraQueuePreemptionPlugin
|
|
|
Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
|
|
|
Resource amUsed = Resources.createResource(0, 0);
|
|
|
|
|
|
- for (FiCaSchedulerApp app : runningApps) {
|
|
|
- Resource userAMResource = perUserAMUsed.get(app.getUser());
|
|
|
- if (null == userAMResource) {
|
|
|
- userAMResource = Resources.createResource(0, 0);
|
|
|
- perUserAMUsed.put(app.getUser(), userAMResource);
|
|
|
- }
|
|
|
+ synchronized (leafQueue) {
|
|
|
+ for (FiCaSchedulerApp app : runningApps) {
|
|
|
+ Resource userAMResource = perUserAMUsed.get(app.getUser());
|
|
|
+ if (null == userAMResource) {
|
|
|
+ userAMResource = Resources.createResource(0, 0);
|
|
|
+ perUserAMUsed.put(app.getUser(), userAMResource);
|
|
|
+ }
|
|
|
|
|
|
- Resources.addTo(userAMResource, app.getAMResource(partition));
|
|
|
- Resources.addTo(amUsed, app.getAMResource(partition));
|
|
|
+ Resources.addTo(userAMResource, app.getAMResource(partition));
|
|
|
+ Resources.addTo(amUsed, app.getAMResource(partition));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return amUsed;
|