|
@@ -24,7 +24,6 @@ import java.net.URL;
|
|
|
import java.net.URLConnection;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -71,40 +70,6 @@ public class QueueManager {
|
|
|
|
|
|
private final FairScheduler scheduler;
|
|
|
|
|
|
- // Minimum resource allocation for each queue
|
|
|
- private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
|
|
|
- // Maximum amount of resources per queue
|
|
|
- private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
|
|
|
- // Sharing weights for each queue
|
|
|
- private Map<String, Double> queueWeights = new HashMap<String, Double>();
|
|
|
-
|
|
|
- // Max concurrent running applications for each queue and for each user; in addition,
|
|
|
- // for users that have no max specified, we use the userMaxJobsDefault.
|
|
|
- private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
|
|
|
- private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
|
|
|
- private int userMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
- private int queueMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
-
|
|
|
- // ACL's for each queue. Only specifies non-default ACL's from configuration.
|
|
|
- private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
|
|
- new HashMap<String, Map<QueueACL, AccessControlList>>();
|
|
|
-
|
|
|
- // Min share preemption timeout for each queue in seconds. If a job in the queue
|
|
|
- // waits this long without receiving its guaranteed share, it is allowed to
|
|
|
- // preempt other jobs' tasks.
|
|
|
- private Map<String, Long> minSharePreemptionTimeouts =
|
|
|
- new HashMap<String, Long>();
|
|
|
-
|
|
|
- // Default min share preemption timeout for queues where it is not set
|
|
|
- // explicitly.
|
|
|
- private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
-
|
|
|
- // Preemption timeout for jobs below fair share in seconds. If a job remains
|
|
|
- // below half its fair share for this long, it is allowed to preempt tasks.
|
|
|
- private long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
-
|
|
|
- SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
|
|
|
-
|
|
|
private Object allocFile; // Path to XML file containing allocations. This
|
|
|
// is either a URL to specify a classpath resource
|
|
|
// (if the fair-scheduler.xml on the classpath is
|
|
@@ -113,40 +78,12 @@ public class QueueManager {
|
|
|
|
|
|
private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
|
|
|
|
|
|
+ private volatile QueueManagerInfo info = new QueueManagerInfo();
|
|
|
+
|
|
|
private long lastReloadAttempt; // Last time we tried to reload the queues file
|
|
|
private long lastSuccessfulReload; // Last time we successfully reloaded queues
|
|
|
private boolean lastReloadAttemptFailed = false;
|
|
|
|
|
|
- // Monitor object for minQueueResources
|
|
|
- private Object minQueueResourcesMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for maxQueueResources
|
|
|
- private Object maxQueueResourcesMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for queueMaxApps
|
|
|
- private Object queueMaxAppsMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for userMaxApps
|
|
|
- private Object userMaxAppsMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for queueWeights
|
|
|
- private Object queueWeightsMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for minSharePreemptionTimeouts
|
|
|
- private Object minSharePreemptionTimeoutsMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for queueAcls
|
|
|
- private Object queueAclsMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for userMaxAppsDefault
|
|
|
- private Object userMaxAppsDefaultMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for queueMaxAppsDefault
|
|
|
- private Object queueMaxAppsDefaultMO = new Object();
|
|
|
-
|
|
|
- //Monitor object for defaultSchedulingMode
|
|
|
- private Object defaultSchedulingModeMO = new Object();
|
|
|
-
|
|
|
public QueueManager(FairScheduler scheduler) {
|
|
|
this.scheduler = scheduler;
|
|
|
}
|
|
@@ -180,9 +117,7 @@ public class QueueManager {
|
|
|
FSQueue queue = queues.get(name);
|
|
|
if (queue == null) {
|
|
|
queue = new FSQueue(scheduler, name);
|
|
|
- synchronized (defaultSchedulingModeMO){
|
|
|
- queue.setSchedulingMode(defaultSchedulingMode);
|
|
|
- }
|
|
|
+ queue.setSchedulingMode(info.defaultSchedulingMode);
|
|
|
queues.put(name, queue);
|
|
|
}
|
|
|
return queue;
|
|
@@ -272,6 +207,8 @@ public class QueueManager {
|
|
|
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
|
|
int userMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
+ long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
+ long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
|
|
|
|
|
|
// Remember all queue names so we can display them on web UI, etc.
|
|
@@ -389,16 +326,10 @@ public class QueueManager {
|
|
|
// Commit the reload; also create any queue defined in the alloc file
|
|
|
// if it does not already exist, so it can be displayed on the web UI.
|
|
|
synchronized (this) {
|
|
|
- setMinResources(minQueueResources);
|
|
|
- setMaxResources(maxQueueResources);
|
|
|
- setQueueMaxApps(queueMaxApps);
|
|
|
- setUserMaxApps(userMaxApps);
|
|
|
- setQueueWeights(queueWeights);
|
|
|
- setUserMaxAppsDefault(userMaxAppsDefault);
|
|
|
- setQueueMaxAppsDefault(queueMaxAppsDefault);
|
|
|
- setDefaultSchedulingMode(defaultSchedulingMode);
|
|
|
- setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
|
|
|
- setQueueAcls(queueAcls);
|
|
|
+ info = new QueueManagerInfo(minQueueResources, maxQueueResources,
|
|
|
+ queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
|
|
|
+ queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
|
|
|
+ queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
|
|
for (String name: queueNamesInAllocFile) {
|
|
|
FSQueue queue = getQueue(name);
|
|
|
if (queueModes.containsKey(name)) {
|
|
@@ -428,182 +359,87 @@ public class QueueManager {
|
|
|
* @return the cap set on this queue, or 0 if not set.
|
|
|
*/
|
|
|
public Resource getMinResources(String queue) {
|
|
|
- synchronized(minQueueResourcesMO) {
|
|
|
- if (minQueueResources.containsKey(queue)) {
|
|
|
- return minQueueResources.get(queue);
|
|
|
- } else {
|
|
|
- return Resources.createResource(0);
|
|
|
- }
|
|
|
+ Resource minQueueResource = info.minQueueResources.get(queue);
|
|
|
+ if (minQueueResource != null) {
|
|
|
+ return minQueueResource;
|
|
|
+ } else {
|
|
|
+ return Resources.createResource(0);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void setMinResources(Map<String, Resource> resources) {
|
|
|
- synchronized (minQueueResourcesMO) {
|
|
|
- minQueueResources = resources;
|
|
|
- }
|
|
|
- }
|
|
|
/**
|
|
|
* Get the maximum resource allocation for the given queue.
|
|
|
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
|
|
|
*/
|
|
|
+
|
|
|
public Resource getMaxResources(String queueName) {
|
|
|
- synchronized (maxQueueResourcesMO) {
|
|
|
- if (maxQueueResources.containsKey(queueName)) {
|
|
|
- return maxQueueResources.get(queueName);
|
|
|
- } else {
|
|
|
- return Resources.createResource(Integer.MAX_VALUE);
|
|
|
- }
|
|
|
+ Resource maxQueueResource = info.maxQueueResources.get(queueName);
|
|
|
+ if (maxQueueResource != null) {
|
|
|
+ return maxQueueResource;
|
|
|
+ } else {
|
|
|
+ return Resources.createResource(Integer.MAX_VALUE);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void setMaxResources(Map<String, Resource> resources) {
|
|
|
- synchronized (maxQueueResourcesMO) {
|
|
|
- maxQueueResources = resources;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Add an app in the appropriate queue
|
|
|
- */
|
|
|
- public synchronized void addApp(FSSchedulerApp app) {
|
|
|
- getQueue(app.getQueueName()).addApp(app);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Remove an app
|
|
|
- */
|
|
|
- public synchronized void removeApp(FSSchedulerApp app) {
|
|
|
- getQueue(app.getQueueName()).removeApp(app);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get a collection of all queues
|
|
|
*/
|
|
|
public Collection<FSQueue> getQueues() {
|
|
|
synchronized (queues) {
|
|
|
- return queues.values();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get all queue names that have been seen either in the allocation file or in
|
|
|
- * a submitted app.
|
|
|
- */
|
|
|
- public synchronized Collection<String> getQueueNames() {
|
|
|
- List<String> list = new ArrayList<String>();
|
|
|
- for (FSQueue queue: getQueues()) {
|
|
|
- list.add(queue.getName());
|
|
|
+ return new ArrayList<FSQueue>(queues.values());
|
|
|
}
|
|
|
- Collections.sort(list);
|
|
|
- return list;
|
|
|
}
|
|
|
|
|
|
public int getUserMaxApps(String user) {
|
|
|
- synchronized (userMaxAppsMO) {
|
|
|
- if (userMaxApps.containsKey(user)) {
|
|
|
- return userMaxApps.get(user);
|
|
|
- } else {
|
|
|
- return getUserMaxAppsDefault();
|
|
|
- }
|
|
|
+ // save current info in case it gets changed under us
|
|
|
+ QueueManagerInfo info = this.info;
|
|
|
+ if (info.userMaxApps.containsKey(user)) {
|
|
|
+ return info.userMaxApps.get(user);
|
|
|
+ } else {
|
|
|
+ return info.userMaxAppsDefault;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void setUserMaxApps(Map<String, Integer> userApps) {
|
|
|
- synchronized (userMaxAppsMO) {
|
|
|
- userMaxApps = userApps;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private int getUserMaxAppsDefault() {
|
|
|
- synchronized (userMaxAppsDefaultMO){
|
|
|
- return userMaxAppsDefault;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void setUserMaxAppsDefault(int userMaxApps) {
|
|
|
- synchronized (userMaxAppsDefaultMO){
|
|
|
- userMaxAppsDefault = userMaxApps;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public int getQueueMaxApps(String queue) {
|
|
|
- synchronized (queueMaxAppsMO) {
|
|
|
- if (queueMaxApps.containsKey(queue)) {
|
|
|
- return queueMaxApps.get(queue);
|
|
|
- } else {
|
|
|
- return getQueueMaxAppsDefault();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void setQueueMaxApps(Map<String, Integer> queueApps) {
|
|
|
- synchronized (queueMaxAppsMO) {
|
|
|
- queueMaxApps = queueApps;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private int getQueueMaxAppsDefault(){
|
|
|
- synchronized (queueMaxAppsDefaultMO) {
|
|
|
- return queueMaxAppsDefault;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void setQueueMaxAppsDefault(int queueMaxApps){
|
|
|
- synchronized(queueMaxAppsDefaultMO) {
|
|
|
- queueMaxAppsDefault = queueMaxApps;
|
|
|
+ // save current info in case it gets changed under us
|
|
|
+ QueueManagerInfo info = this.info;
|
|
|
+ if (info.queueMaxApps.containsKey(queue)) {
|
|
|
+ return info.queueMaxApps.get(queue);
|
|
|
+ } else {
|
|
|
+ return info.queueMaxAppsDefault;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
|
|
|
- synchronized(defaultSchedulingModeMO) {
|
|
|
- defaultSchedulingMode = schedulingMode;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public double getQueueWeight(String queue) {
|
|
|
- synchronized (queueWeightsMO) {
|
|
|
- if (queueWeights.containsKey(queue)) {
|
|
|
- return queueWeights.get(queue);
|
|
|
- } else {
|
|
|
- return 1.0;
|
|
|
- }
|
|
|
+ Double weight = info.queueWeights.get(queue);
|
|
|
+ if (weight != null) {
|
|
|
+ return weight;
|
|
|
+ } else {
|
|
|
+ return 1.0;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void setQueueWeights(Map<String, Double> weights) {
|
|
|
- synchronized (queueWeightsMO) {
|
|
|
- queueWeights = weights;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get a queue's min share preemption timeout, in milliseconds. This is the
|
|
|
* time after which jobs in the queue may kill other queues' tasks if they
|
|
|
* are below their min share.
|
|
|
*/
|
|
|
public long getMinSharePreemptionTimeout(String queueName) {
|
|
|
- synchronized (minSharePreemptionTimeoutsMO) {
|
|
|
- if (minSharePreemptionTimeouts.containsKey(queueName)) {
|
|
|
- return minSharePreemptionTimeouts.get(queueName);
|
|
|
- }
|
|
|
+ // save current info in case it gets changed under us
|
|
|
+ QueueManagerInfo info = this.info;
|
|
|
+ if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
|
|
|
+ return info.minSharePreemptionTimeouts.get(queueName);
|
|
|
}
|
|
|
- return defaultMinSharePreemptionTimeout;
|
|
|
+ return info.defaultMinSharePreemptionTimeout;
|
|
|
}
|
|
|
|
|
|
- private void setMinSharePreemptionTimeouts(
|
|
|
- Map<String, Long> sharePreemptionTimeouts){
|
|
|
- synchronized (minSharePreemptionTimeoutsMO) {
|
|
|
- minSharePreemptionTimeouts = sharePreemptionTimeouts;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get the fair share preemption, in milliseconds. This is the time
|
|
|
* after which any job may kill other jobs' tasks if it is below half
|
|
|
* its fair share.
|
|
|
*/
|
|
|
public long getFairSharePreemptionTimeout() {
|
|
|
- return fairSharePreemptionTimeout;
|
|
|
+ return info.fairSharePreemptionTimeout;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -612,10 +448,9 @@ public class QueueManager {
|
|
|
*/
|
|
|
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
|
|
|
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
|
|
|
- synchronized (queueAclsMO) {
|
|
|
- if (queueAcls.containsKey(queue)) {
|
|
|
- out.putAll(queueAcls.get(queue));
|
|
|
- }
|
|
|
+ Map<QueueACL, AccessControlList> queueAcl = info.queueAcls.get(queue);
|
|
|
+ if (queueAcl != null) {
|
|
|
+ out.putAll(queueAcl);
|
|
|
}
|
|
|
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
|
|
|
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
|
|
@@ -626,9 +461,74 @@ public class QueueManager {
|
|
|
return out;
|
|
|
}
|
|
|
|
|
|
- private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
|
|
|
- synchronized (queueAclsMO) {
|
|
|
- queueAcls = queue;
|
|
|
+ static class QueueManagerInfo {
|
|
|
+ // Minimum resource allocation for each queue
|
|
|
+ public final Map<String, Resource> minQueueResources;
|
|
|
+ // Maximum amount of resources per queue
|
|
|
+ public final Map<String, Resource> maxQueueResources;
|
|
|
+ // Sharing weights for each queue
|
|
|
+ public final Map<String, Double> queueWeights;
|
|
|
+
|
|
|
+ // Max concurrent running applications for each queue and for each user; in addition,
|
|
|
+ // for users that have no max specified, we use the userMaxJobsDefault.
|
|
|
+ public final Map<String, Integer> queueMaxApps;
|
|
|
+ public final Map<String, Integer> userMaxApps;
|
|
|
+ public final int userMaxAppsDefault;
|
|
|
+ public final int queueMaxAppsDefault;
|
|
|
+
|
|
|
+ // ACL's for each queue. Only specifies non-default ACL's from configuration.
|
|
|
+ public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
|
|
|
+
|
|
|
+ // Min share preemption timeout for each queue in seconds. If a job in the queue
|
|
|
+ // waits this long without receiving its guaranteed share, it is allowed to
|
|
|
+ // preempt other jobs' tasks.
|
|
|
+ public final Map<String, Long> minSharePreemptionTimeouts;
|
|
|
+
|
|
|
+ // Default min share preemption timeout for queues where it is not set
|
|
|
+ // explicitly.
|
|
|
+ public final long defaultMinSharePreemptionTimeout;
|
|
|
+
|
|
|
+ // Preemption timeout for jobs below fair share in seconds. If a job remains
|
|
|
+ // below half its fair share for this long, it is allowed to preempt tasks.
|
|
|
+ public final long fairSharePreemptionTimeout;
|
|
|
+
|
|
|
+ public final SchedulingMode defaultSchedulingMode;
|
|
|
+
|
|
|
+ public QueueManagerInfo(Map<String, Resource> minQueueResources,
|
|
|
+ Map<String, Resource> maxQueueResources,
|
|
|
+ Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
|
|
+ Map<String, Double> queueWeights, int userMaxAppsDefault,
|
|
|
+ int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
|
|
|
+ Map<String, Long> minSharePreemptionTimeouts,
|
|
|
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
|
|
+ long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
|
|
|
+ this.minQueueResources = minQueueResources;
|
|
|
+ this.maxQueueResources = maxQueueResources;
|
|
|
+ this.queueMaxApps = queueMaxApps;
|
|
|
+ this.userMaxApps = userMaxApps;
|
|
|
+ this.queueWeights = queueWeights;
|
|
|
+ this.userMaxAppsDefault = userMaxAppsDefault;
|
|
|
+ this.queueMaxAppsDefault = queueMaxAppsDefault;
|
|
|
+ this.defaultSchedulingMode = defaultSchedulingMode;
|
|
|
+ this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
|
|
|
+ this.queueAcls = queueAcls;
|
|
|
+ this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
|
|
|
+ this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ public QueueManagerInfo() {
|
|
|
+ minQueueResources = new HashMap<String, Resource>();
|
|
|
+ maxQueueResources = new HashMap<String, Resource>();
|
|
|
+ queueWeights = new HashMap<String, Double>();
|
|
|
+ queueMaxApps = new HashMap<String, Integer>();
|
|
|
+ userMaxApps = new HashMap<String, Integer>();
|
|
|
+ userMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
+ queueMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
+ queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
|
|
|
+ minSharePreemptionTimeouts = new HashMap<String, Long>();
|
|
|
+ defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
+ fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
+ defaultSchedulingMode = SchedulingMode.FAIR;
|
|
|
}
|
|
|
}
|
|
|
}
|