|
@@ -116,7 +116,37 @@ public class QueueManager {
|
|
|
private long lastReloadAttempt; // Last time we tried to reload the queues file
|
|
|
private long lastSuccessfulReload; // Last time we successfully reloaded queues
|
|
|
private boolean lastReloadAttemptFailed = false;
|
|
|
-
|
|
|
+
|
|
|
+ // Monitor object for minQueueResources
|
|
|
+ private Object minQueueResourcesMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for maxQueueResources
|
|
|
+ private Object maxQueueResourcesMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for queueMaxApps
|
|
|
+ private Object queueMaxAppsMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for userMaxApps
|
|
|
+ private Object userMaxAppsMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for queueWeights
|
|
|
+ private Object queueWeightsMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for minSharePreemptionTimeouts
|
|
|
+ private Object minSharePreemptionTimeoutsMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for queueAcls
|
|
|
+ private Object queueAclsMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for userMaxAppsDefault
|
|
|
+ private Object userMaxAppsDefaultMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for queueMaxAppsDefault
|
|
|
+ private Object queueMaxAppsDefaultMO = new Object();
|
|
|
+
|
|
|
+ //Monitor object for defaultSchedulingMode
|
|
|
+ private Object defaultSchedulingModeMO = new Object();
|
|
|
+
|
|
|
public QueueManager(FairScheduler scheduler) {
|
|
|
this.scheduler = scheduler;
|
|
|
}
|
|
@@ -145,21 +175,27 @@ public class QueueManager {
|
|
|
/**
|
|
|
* Get a queue by name, creating it if necessary
|
|
|
*/
|
|
|
- public synchronized FSQueue getQueue(String name) {
|
|
|
- FSQueue queue = queues.get(name);
|
|
|
- if (queue == null) {
|
|
|
- queue = new FSQueue(scheduler, name);
|
|
|
- queue.setSchedulingMode(defaultSchedulingMode);
|
|
|
- queues.put(name, queue);
|
|
|
+ public FSQueue getQueue(String name) {
|
|
|
+ synchronized (queues) {
|
|
|
+ FSQueue queue = queues.get(name);
|
|
|
+ if (queue == null) {
|
|
|
+ queue = new FSQueue(scheduler, name);
|
|
|
+ synchronized (defaultSchedulingModeMO){
|
|
|
+ queue.setSchedulingMode(defaultSchedulingMode);
|
|
|
+ }
|
|
|
+ queues.put(name, queue);
|
|
|
+ }
|
|
|
+ return queue;
|
|
|
}
|
|
|
- return queue;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Return whether a queue exists already.
|
|
|
*/
|
|
|
- public synchronized boolean exists(String name) {
|
|
|
- return queues.containsKey(name);
|
|
|
+ public boolean exists(String name) {
|
|
|
+ synchronized (queues) {
|
|
|
+ return queues.containsKey(name);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -353,16 +389,16 @@ public class QueueManager {
|
|
|
// Commit the reload; also create any queue defined in the alloc file
|
|
|
// if it does not already exist, so it can be displayed on the web UI.
|
|
|
synchronized(this) {
|
|
|
- this.minQueueResources = minQueueResources;
|
|
|
- this.maxQueueResources = maxQueueResources;
|
|
|
- this.queueMaxApps = queueMaxApps;
|
|
|
- this.userMaxApps = userMaxApps;
|
|
|
- this.queueWeights = queueWeights;
|
|
|
- this.userMaxAppsDefault = userMaxAppsDefault;
|
|
|
- this.queueMaxAppsDefault = queueMaxAppsDefault;
|
|
|
- this.defaultSchedulingMode = defaultSchedulingMode;
|
|
|
- this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
|
|
|
- this.queueAcls = queueAcls;
|
|
|
+ setMinResources(minQueueResources);
|
|
|
+ setMaxResources(maxQueueResources);
|
|
|
+ setQueueMaxApps(queueMaxApps);
|
|
|
+ setUserMaxApps(userMaxApps);
|
|
|
+ setQueueWeights(queueWeights);
|
|
|
+ setUserMaxAppsDefault(userMaxAppsDefault);
|
|
|
+ setQueueMaxAppsDefault(queueMaxAppsDefault);
|
|
|
+ setDefaultSchedulingMode(defaultSchedulingMode);
|
|
|
+ setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
|
|
|
+ setQueueAcls(queueAcls);
|
|
|
for (String name: queueNamesInAllocFile) {
|
|
|
FSQueue queue = getQueue(name);
|
|
|
if (queueModes.containsKey(name)) {
|
|
@@ -392,25 +428,40 @@ public class QueueManager {
|
|
|
* @return the cap set on this queue, or 0 if not set.
|
|
|
*/
|
|
|
public Resource getMinResources(String queue) {
|
|
|
- if (minQueueResources.containsKey(queue)) {
|
|
|
- return minQueueResources.get(queue);
|
|
|
- } else{
|
|
|
- return Resources.createResource(0);
|
|
|
+ synchronized(minQueueResourcesMO) {
|
|
|
+ if (minQueueResources.containsKey(queue)) {
|
|
|
+ return minQueueResources.get(queue);
|
|
|
+ } else{
|
|
|
+ return Resources.createResource(0);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void setMinResources(Map<String, Resource> resources) {
|
|
|
+ synchronized(minQueueResourcesMO) {
|
|
|
+ minQueueResources = resources;
|
|
|
+ }
|
|
|
+ }
|
|
|
/**
|
|
|
* Get the maximum resource allocation for the given queue.
|
|
|
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
|
|
|
*/
|
|
|
Resource getMaxResources(String queueName) {
|
|
|
- if (maxQueueResources.containsKey(queueName)) {
|
|
|
- return maxQueueResources.get(queueName);
|
|
|
- } else {
|
|
|
- return Resources.createResource(Integer.MAX_VALUE);
|
|
|
+ synchronized (maxQueueResourcesMO) {
|
|
|
+ if (maxQueueResources.containsKey(queueName)) {
|
|
|
+ return maxQueueResources.get(queueName);
|
|
|
+ } else {
|
|
|
+ return Resources.createResource(Integer.MAX_VALUE);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void setMaxResources(Map<String, Resource> resources) {
|
|
|
+ synchronized(maxQueueResourcesMO) {
|
|
|
+ maxQueueResources = resources;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Add an app in the appropriate queue
|
|
|
*/
|
|
@@ -428,11 +479,12 @@ public class QueueManager {
|
|
|
/**
|
|
|
* Get a collection of all queues
|
|
|
*/
|
|
|
- public synchronized Collection<FSQueue> getQueues() {
|
|
|
- return queues.values();
|
|
|
+ public Collection<FSQueue> getQueues() {
|
|
|
+ synchronized (queues) {
|
|
|
+ return queues.values();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* Get all queue names that have been seen either in the allocation file or in
|
|
|
* a submitted app.
|
|
@@ -447,40 +499,102 @@ public class QueueManager {
|
|
|
}
|
|
|
|
|
|
public int getUserMaxApps(String user) {
|
|
|
- if (userMaxApps.containsKey(user)) {
|
|
|
- return userMaxApps.get(user);
|
|
|
- } else {
|
|
|
- return userMaxAppsDefault;
|
|
|
+ synchronized (userMaxAppsMO) {
|
|
|
+ if (userMaxApps.containsKey(user)) {
|
|
|
+ return userMaxApps.get(user);
|
|
|
+ } else {
|
|
|
+ return getUserMaxAppsDefault();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void setUserMaxApps(Map<String, Integer> userApps) {
|
|
|
+ synchronized (userMaxAppsMO) {
|
|
|
+ userMaxApps = userApps;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int getUserMaxAppsDefault() {
|
|
|
+ synchronized (userMaxAppsDefaultMO){
|
|
|
+ return userMaxAppsDefault;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setUserMaxAppsDefault(int userMaxApps) {
|
|
|
+ synchronized (userMaxAppsDefaultMO){
|
|
|
+ userMaxAppsDefault = userMaxApps;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public int getQueueMaxApps(String queue) {
|
|
|
- if (queueMaxApps.containsKey(queue)) {
|
|
|
- return queueMaxApps.get(queue);
|
|
|
- } else {
|
|
|
+ synchronized (queueMaxAppsMO) {
|
|
|
+ if (queueMaxApps.containsKey(queue)) {
|
|
|
+ return queueMaxApps.get(queue);
|
|
|
+ } else {
|
|
|
+ return getQueueMaxAppsDefault();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setQueueMaxApps(Map<String, Integer> queueApps) {
|
|
|
+ synchronized (queueMaxAppsMO) {
|
|
|
+ queueMaxApps = queueApps;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int getQueueMaxAppsDefault(){
|
|
|
+ synchronized(queueMaxAppsDefaultMO) {
|
|
|
return queueMaxAppsDefault;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void setQueueMaxAppsDefault(int queueMaxApps){
|
|
|
+ synchronized(queueMaxAppsDefaultMO) {
|
|
|
+ queueMaxAppsDefault = queueMaxApps;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
|
|
|
+ synchronized(defaultSchedulingModeMO) {
|
|
|
+ defaultSchedulingMode = schedulingMode;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public double getQueueWeight(String queue) {
|
|
|
- if (queueWeights.containsKey(queue)) {
|
|
|
- return queueWeights.get(queue);
|
|
|
- } else {
|
|
|
- return 1.0;
|
|
|
+ synchronized (queueWeightsMO) {
|
|
|
+ if (queueWeights.containsKey(queue)) {
|
|
|
+ return queueWeights.get(queue);
|
|
|
+ } else {
|
|
|
+ return 1.0;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void setQueueWeights(Map<String, Double> weights) {
|
|
|
+ synchronized (queueWeightsMO) {
|
|
|
+ queueWeights = weights;
|
|
|
+ }
|
|
|
+ }
|
|
|
/**
|
|
|
* Get a queue's min share preemption timeout, in milliseconds. This is the
|
|
|
* time after which jobs in the queue may kill other queues' tasks if they
|
|
|
* are below their min share.
|
|
|
*/
|
|
|
public long getMinSharePreemptionTimeout(String queueName) {
|
|
|
- if (minSharePreemptionTimeouts.containsKey(queueName)) {
|
|
|
- return minSharePreemptionTimeouts.get(queueName);
|
|
|
+ synchronized (minSharePreemptionTimeoutsMO) {
|
|
|
+ if (minSharePreemptionTimeouts.containsKey(queueName)) {
|
|
|
+ return minSharePreemptionTimeouts.get(queueName);
|
|
|
+ }
|
|
|
}
|
|
|
return defaultMinSharePreemptionTimeout;
|
|
|
}
|
|
|
+
|
|
|
+ private void setMinSharePreemptionTimeouts(
|
|
|
+ Map<String, Long> sharePreemptionTimeouts){
|
|
|
+ synchronized (minSharePreemptionTimeoutsMO) {
|
|
|
+ minSharePreemptionTimeouts = sharePreemptionTimeouts;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get the fair share preemption, in milliseconds. This is the time
|
|
@@ -497,9 +611,10 @@ public class QueueManager {
|
|
|
*/
|
|
|
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
|
|
|
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
|
|
|
-
|
|
|
- if (queueAcls.containsKey(queue)) {
|
|
|
- out.putAll(queueAcls.get(queue));
|
|
|
+ synchronized (queueAclsMO) {
|
|
|
+ if (queueAcls.containsKey(queue)) {
|
|
|
+ out.putAll(queueAcls.get(queue));
|
|
|
+ }
|
|
|
}
|
|
|
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
|
|
|
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
|
|
@@ -509,4 +624,10 @@ public class QueueManager {
|
|
|
}
|
|
|
return out;
|
|
|
}
|
|
|
+
|
|
|
+ private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
|
|
|
+ synchronized (queueAclsMO) {
|
|
|
+ queueAcls = queue;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|