|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
import java.io.Writer;
|
|
import java.io.Writer;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.TreeSet;
|
|
import java.util.TreeSet;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
@@ -50,22 +51,17 @@ class QueueManager {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(QueueManager.class);
|
|
private static final Log LOG = LogFactory.getLog(QueueManager.class);
|
|
|
|
|
|
- // Prefix in configuration for queue related keys
|
|
|
|
- private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
|
|
|
|
- = "mapred.queue.";
|
|
|
|
- // Configured queues
|
|
|
|
- private Set<String> queueNames;
|
|
|
|
- // Map of a queue and ACL property name with an ACL
|
|
|
|
- private HashMap<String, AccessControlList> aclsMap;
|
|
|
|
- // Map of a queue name to any generic object that represents
|
|
|
|
- // scheduler information
|
|
|
|
- private HashMap<String, Object> schedulerInfoObjects;
|
|
|
|
- // Whether ACLs are enabled in the system or not.
|
|
|
|
- private boolean aclsEnabled;
|
|
|
|
-
|
|
|
|
- //Resource in which queue acls are configured.
|
|
|
|
|
|
+ static final String QUEUE_STATE_SUFFIX = "state";
|
|
|
|
+ /** Prefix in configuration for queue related keys */
|
|
|
|
+ static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = "mapred.queue.";
|
|
|
|
+
|
|
|
|
+ // Continue to add this resource, to avoid incompatible change
|
|
static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml";
|
|
static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml";
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /** Whether ACLs are enabled in the system or not. */
|
|
|
|
+ private boolean aclsEnabled;
|
|
|
|
+ /** Map of a queue name and Queue object */
|
|
|
|
+ final HashMap<String,Queue> queues;
|
|
/**
|
|
/**
|
|
* Enum representing an AccessControlList that drives set of operations that
|
|
* Enum representing an AccessControlList that drives set of operations that
|
|
* can be performed on a queue.
|
|
* can be performed on a queue.
|
|
@@ -100,10 +96,22 @@ class QueueManager {
|
|
* @param conf Configuration object where queue configuration is specified.
|
|
* @param conf Configuration object where queue configuration is specified.
|
|
*/
|
|
*/
|
|
public QueueManager(Configuration conf) {
|
|
public QueueManager(Configuration conf) {
|
|
- queueNames = new TreeSet<String>();
|
|
|
|
- aclsMap = new HashMap<String, AccessControlList>();
|
|
|
|
- schedulerInfoObjects = new HashMap<String, Object>();
|
|
|
|
- initialize(conf);
|
|
|
|
|
|
+ checkDeprecation(conf);
|
|
|
|
+ conf.addResource(QUEUE_ACLS_FILE_NAME);
|
|
|
|
+ queues = new HashMap<String,Queue>();
|
|
|
|
+ // First get the queue names
|
|
|
|
+ String[] queueNameValues = conf.getStrings("mapred.queue.names",
|
|
|
|
+ new String[]{JobConf.DEFAULT_QUEUE_NAME});
|
|
|
|
+ // Get configured ACLs and state for each queue
|
|
|
|
+ aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
|
|
|
|
+ for (String name : queueNameValues) {
|
|
|
|
+ try {
|
|
|
|
+ queues.put(name, new Queue(name, getQueueAcls(name, conf),
|
|
|
|
+ getQueueState(name, conf)));
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.warn("Not able to initialize queue " + name, t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -116,7 +124,7 @@ class QueueManager {
|
|
* @return Set of queue names.
|
|
* @return Set of queue names.
|
|
*/
|
|
*/
|
|
public synchronized Set<String> getQueues() {
|
|
public synchronized Set<String> getQueues() {
|
|
- return queueNames;
|
|
|
|
|
|
+ return queues.keySet();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -133,27 +141,39 @@ class QueueManager {
|
|
*
|
|
*
|
|
* @return true if the operation is allowed, false otherwise.
|
|
* @return true if the operation is allowed, false otherwise.
|
|
*/
|
|
*/
|
|
- public synchronized boolean hasAccess(String queueName,
|
|
|
|
- QueueACL qACL, UserGroupInformation ugi) {
|
|
|
|
|
|
+ public synchronized boolean hasAccess(String queueName, QueueACL qACL,
|
|
|
|
+ UserGroupInformation ugi) {
|
|
if (!aclsEnabled) {
|
|
if (!aclsEnabled) {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("checking access for : " + toFullPropertyName(queueName,
|
|
|
|
- qACL.getAclName()));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- AccessControlList acl = aclsMap.get(toFullPropertyName(
|
|
|
|
- queueName, qACL.getAclName()));
|
|
|
|
- if (acl == null) {
|
|
|
|
|
|
+ final Queue q = queues.get(queueName);
|
|
|
|
+ if (null == q) {
|
|
|
|
+ LOG.info("Queue " + queueName + " is not present");
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("checking access for : " +
|
|
|
|
+ toFullPropertyName(queueName, qACL.getAclName()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ AccessControlList acl =
|
|
|
|
+ q.getAcls().get(toFullPropertyName(queueName, qACL.getAclName()));
|
|
|
|
+
|
|
// Check if user is part of the ACL
|
|
// Check if user is part of the ACL
|
|
- return acl.isUserAllowed(ugi);
|
|
|
|
|
|
+ return acl != null && acl.isUserAllowed(ugi);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Checks whether the given queue is running or not.
|
|
|
|
+ * @param queueName name of the queue
|
|
|
|
+ * @return true, if the queue is running.
|
|
|
|
+ */
|
|
|
|
+ synchronized boolean isRunning(String queueName) {
|
|
|
|
+ Queue q = queues.get(queueName);
|
|
|
|
+ return q != null && Queue.QueueState.RUNNING.equals(q.getState());
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Set a generic Object that represents scheduling information relevant
|
|
* Set a generic Object that represents scheduling information relevant
|
|
* to a queue.
|
|
* to a queue.
|
|
@@ -167,7 +187,10 @@ class QueueManager {
|
|
*/
|
|
*/
|
|
public synchronized void setSchedulerInfo(String queueName,
|
|
public synchronized void setSchedulerInfo(String queueName,
|
|
Object queueInfo) {
|
|
Object queueInfo) {
|
|
- schedulerInfoObjects.put(queueName, queueInfo);
|
|
|
|
|
|
+ Queue q = queues.get(queueName);
|
|
|
|
+ if (q != null) {
|
|
|
|
+ q.setSchedulingInfo(queueInfo);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -179,7 +202,10 @@ class QueueManager {
|
|
* @see #setSchedulerInfo(String, Object)
|
|
* @see #setSchedulerInfo(String, Object)
|
|
*/
|
|
*/
|
|
public synchronized Object getSchedulerInfo(String queueName) {
|
|
public synchronized Object getSchedulerInfo(String queueName) {
|
|
- return schedulerInfoObjects.get(queueName);
|
|
|
|
|
|
+ Queue q = queues.get(queueName);
|
|
|
|
+ return (q != null)
|
|
|
|
+ ? q.getSchedulingInfo()
|
|
|
|
+ : null;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -191,91 +217,98 @@ class QueueManager {
|
|
*
|
|
*
|
|
* @throws IOException when queue ACL configuration file is invalid.
|
|
* @throws IOException when queue ACL configuration file is invalid.
|
|
*/
|
|
*/
|
|
- synchronized void refreshAcls(Configuration conf) throws IOException {
|
|
|
|
|
|
+ synchronized void refreshQueues(Configuration conf) throws IOException {
|
|
|
|
+ // First check if things are configured in mapred-site.xml,
|
|
|
|
+ // so we can print out a deprecation warning.
|
|
|
|
+ // This check is needed only until we support the configuration
|
|
|
|
+ // in mapred-site.xml
|
|
|
|
+ checkDeprecation(conf);
|
|
|
|
+
|
|
|
|
+ // Add the queue configuration file. Values from mapred-site.xml
|
|
|
|
+ // will be overridden.
|
|
|
|
+ conf.addResource(QUEUE_ACLS_FILE_NAME);
|
|
|
|
+
|
|
|
|
+ // Now we refresh the properties of the queues. Note that we
|
|
|
|
+ // do *not* refresh the queue names or the acls flag. Instead
|
|
|
|
+ // we use the older values configured for them.
|
|
|
|
+ LOG.info("Refreshing acls and state for configured queues.");
|
|
try {
|
|
try {
|
|
- HashMap<String, AccessControlList> newAclsMap =
|
|
|
|
- getQueueAcls(conf);
|
|
|
|
- aclsMap = newAclsMap;
|
|
|
|
|
|
+ for (String qName : getQueues()) {
|
|
|
|
+ Queue q = queues.get(qName);
|
|
|
|
+ q.setAcls(getQueueAcls(qName, conf));
|
|
|
|
+ q.setState(getQueueState(qName, conf));
|
|
|
|
+ }
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
- String exceptionString = StringUtils.stringifyException(t);
|
|
|
|
- LOG.warn("Queue ACLs could not be refreshed because there was an " +
|
|
|
|
- "exception in parsing the configuration: "+ exceptionString +
|
|
|
|
- ". Existing ACLs are retained.");
|
|
|
|
- throw new IOException(exceptionString);
|
|
|
|
|
|
+ LOG.warn("Invalid queue configuration", t);
|
|
|
|
+ throw new IOException("Invalid queue configuration", t);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private void checkDeprecation(Configuration conf) {
|
|
private void checkDeprecation(Configuration conf) {
|
|
- for(String queue: queueNames) {
|
|
|
|
- for (QueueACL qACL : QueueACL.values()) {
|
|
|
|
- String key = toFullPropertyName(queue, qACL.getAclName());
|
|
|
|
- String aclString = conf.get(key);
|
|
|
|
- if(aclString != null) {
|
|
|
|
- LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
|
|
|
|
- "hadoop-site.xml is deprecated. Configure queue ACLs in " +
|
|
|
|
- QUEUE_ACLS_FILE_NAME);
|
|
|
|
- return;
|
|
|
|
|
|
+ // check if queues are defined.
|
|
|
|
+ String[] queues = conf.getStrings("mapred.queue.names");
|
|
|
|
+ // check if acls are defined
|
|
|
|
+ if (queues != null) {
|
|
|
|
+ for (String queue : queues) {
|
|
|
|
+ for (QueueACL oper : QueueACL.values()) {
|
|
|
|
+ String aclString =
|
|
|
|
+ conf.get(toFullPropertyName(queue, oper.getAclName()));
|
|
|
|
+ if (aclString != null) {
|
|
|
|
+ LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
|
|
|
|
+ "hadoop-site.xml is deprecated. Configure queue ACLs in " +
|
|
|
|
+ QUEUE_ACLS_FILE_NAME);
|
|
|
|
+ // even if one string is configured, it is enough for printing
|
|
|
|
+ // the warning. so we can return from here.
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- private HashMap<String, AccessControlList> getQueueAcls(Configuration conf) {
|
|
|
|
- checkDeprecation(conf);
|
|
|
|
- conf.addResource(QUEUE_ACLS_FILE_NAME);
|
|
|
|
- HashMap<String, AccessControlList> aclsMap =
|
|
|
|
- new HashMap<String, AccessControlList>();
|
|
|
|
- for (String queue : queueNames) {
|
|
|
|
- for (QueueACL qACL : QueueACL.values()) {
|
|
|
|
- String key = toFullPropertyName(queue, qACL.getAclName());
|
|
|
|
- String aclString = conf.get(key, " ");// default is empty list of users
|
|
|
|
- aclsMap.put(key, new AccessControlList(aclString));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return aclsMap;
|
|
|
|
|
|
+
|
|
|
|
+ /** Parse ACLs for the queue from the configuration. */
|
|
|
|
+ HashMap<String, AccessControlList> getQueueAcls(
|
|
|
|
+ String name, Configuration conf) {
|
|
|
|
+ HashMap<String,AccessControlList> map =
|
|
|
|
+ new HashMap<String,AccessControlList>();
|
|
|
|
+ for (QueueACL oper : QueueACL.values()) {
|
|
|
|
+ String aclKey = toFullPropertyName(name, oper.getAclName());
|
|
|
|
+ map.put(aclKey, new AccessControlList(conf.get(aclKey, "*")));
|
|
|
|
+ }
|
|
|
|
+ return map;
|
|
}
|
|
}
|
|
-
|
|
|
|
- private void initialize(Configuration conf) {
|
|
|
|
- aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
|
|
|
|
- String[] queues = conf.getStrings("mapred.queue.names",
|
|
|
|
- new String[] {JobConf.DEFAULT_QUEUE_NAME});
|
|
|
|
- addToSet(queueNames, queues);
|
|
|
|
- aclsMap = getQueueAcls(conf);
|
|
|
|
|
|
+
|
|
|
|
+ /** Parse state of the queue from the configuration. */
|
|
|
|
+ Queue.QueueState getQueueState(String name, Configuration conf) {
|
|
|
|
+ return conf.getEnum(
|
|
|
|
+ toFullPropertyName(name, QueueManager.QUEUE_STATE_SUFFIX),
|
|
|
|
+ Queue.QueueState.RUNNING);
|
|
}
|
|
}
|
|
-
|
|
|
|
- static final String toFullPropertyName(String queue,
|
|
|
|
- String property) {
|
|
|
|
|
|
+
|
|
|
|
+ static final String toFullPropertyName(String queue, String property) {
|
|
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
|
|
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
|
|
}
|
|
}
|
|
|
|
|
|
- private static final void addToSet(Set<String> set, String[] elems) {
|
|
|
|
- for (String elem : elems) {
|
|
|
|
- set.add(elem);
|
|
|
|
|
|
+ synchronized JobQueueInfo getJobQueueInfo(String queue) {
|
|
|
|
+ Queue q = queues.get(queue);
|
|
|
|
+ if (q != null) {
|
|
|
|
+ JobQueueInfo qInfo = new JobQueueInfo();
|
|
|
|
+ qInfo.setQueueName(q.getName());
|
|
|
|
+ qInfo.setQueueState(q.getState().getStateName());
|
|
|
|
+ Object schedInfo = q.getSchedulingInfo();
|
|
|
|
+ qInfo.setSchedulingInfo(schedInfo == null ? null : schedInfo.toString());
|
|
|
|
+ return qInfo;
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- synchronized JobQueueInfo[] getJobQueueInfos() {
|
|
|
|
- ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
|
|
|
|
- for(String queue : queueNames) {
|
|
|
|
- Object schedulerInfo = schedulerInfoObjects.get(queue);
|
|
|
|
- if(schedulerInfo != null) {
|
|
|
|
- queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString()));
|
|
|
|
- }else {
|
|
|
|
- queueInfoList.add(new JobQueueInfo(queue,null));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList
|
|
|
|
- .size()]);
|
|
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
- JobQueueInfo getJobQueueInfo(String queue) {
|
|
|
|
- Object schedulingInfo = schedulerInfoObjects.get(queue);
|
|
|
|
- if(schedulingInfo!=null){
|
|
|
|
- return new JobQueueInfo(queue,schedulingInfo.toString());
|
|
|
|
- }else {
|
|
|
|
- return new JobQueueInfo(queue,null);
|
|
|
|
|
|
+ synchronized JobQueueInfo[] getJobQueueInfos() {
|
|
|
|
+ ArrayList<JobQueueInfo> ret = new ArrayList<JobQueueInfo>();
|
|
|
|
+ for (String qName : getQueues()) {
|
|
|
|
+ ret.add(getJobQueueInfo(qName));
|
|
}
|
|
}
|
|
|
|
+ return (JobQueueInfo[]) ret.toArray(new JobQueueInfo[ret.size()]);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -285,13 +318,12 @@ class QueueManager {
|
|
* @return QueueAclsInfo[]
|
|
* @return QueueAclsInfo[]
|
|
* @throws java.io.IOException
|
|
* @throws java.io.IOException
|
|
*/
|
|
*/
|
|
- synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation
|
|
|
|
- ugi) throws IOException {
|
|
|
|
|
|
+ synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
|
|
|
|
+ throws IOException {
|
|
//List of all QueueAclsInfo objects , this list is returned
|
|
//List of all QueueAclsInfo objects , this list is returned
|
|
- ArrayList<QueueAclsInfo> queueAclsInfolist =
|
|
|
|
- new ArrayList<QueueAclsInfo>();
|
|
|
|
|
|
+ ArrayList<QueueAclsInfo> queueAclsInfolist = new ArrayList<QueueAclsInfo>();
|
|
QueueACL[] acls = QueueACL.values();
|
|
QueueACL[] acls = QueueACL.values();
|
|
- for (String queueName : queueNames) {
|
|
|
|
|
|
+ for (String queueName : getQueues()) {
|
|
QueueAclsInfo queueAclsInfo = null;
|
|
QueueAclsInfo queueAclsInfo = null;
|
|
ArrayList<String> operationsAllowed = null;
|
|
ArrayList<String> operationsAllowed = null;
|
|
for (QueueACL qACL : acls) {
|
|
for (QueueACL qACL : acls) {
|
|
@@ -305,13 +337,13 @@ class QueueManager {
|
|
if (operationsAllowed != null) {
|
|
if (operationsAllowed != null) {
|
|
//There is atleast 1 operation supported for queue <queueName>
|
|
//There is atleast 1 operation supported for queue <queueName>
|
|
//, hence initialize queueAclsInfo
|
|
//, hence initialize queueAclsInfo
|
|
- queueAclsInfo = new QueueAclsInfo(queueName, operationsAllowed.toArray
|
|
|
|
- (new String[operationsAllowed.size()]));
|
|
|
|
|
|
+ queueAclsInfo = new QueueAclsInfo(queueName, operationsAllowed.toArray(
|
|
|
|
+ new String[operationsAllowed.size()]));
|
|
queueAclsInfolist.add(queueAclsInfo);
|
|
queueAclsInfolist.add(queueAclsInfo);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return queueAclsInfolist.toArray(new QueueAclsInfo[
|
|
|
|
- queueAclsInfolist.size()]);
|
|
|
|
|
|
+ return
|
|
|
|
+ queueAclsInfolist.toArray(new QueueAclsInfo[queueAclsInfolist.size()]);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -321,11 +353,11 @@ class QueueManager {
|
|
* If acls are disabled(mapred.acls.enabled set to false), returns ACL with
|
|
* If acls are disabled(mapred.acls.enabled set to false), returns ACL with
|
|
* all users.
|
|
* all users.
|
|
*/
|
|
*/
|
|
- synchronized AccessControlList getQueueACL(String queueName,
|
|
|
|
- QueueACL qACL) {
|
|
|
|
|
|
+ synchronized AccessControlList getQueueACL(String queueName, QueueACL qACL) {
|
|
if (aclsEnabled) {
|
|
if (aclsEnabled) {
|
|
- return aclsMap.get(toFullPropertyName(
|
|
|
|
- queueName, qACL.getAclName()));
|
|
|
|
|
|
+ Queue q = queues.get(queueName);
|
|
|
|
+ assert q != null;
|
|
|
|
+ return q.getAcls().get(toFullPropertyName(queueName, qACL.getAclName()));
|
|
}
|
|
}
|
|
return new AccessControlList("*");
|
|
return new AccessControlList("*");
|
|
}
|
|
}
|