|
@@ -18,14 +18,27 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
|
|
|
|
|
+import com.google.common.base.Joiner;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.security.AccessControlException;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
/**
|
|
@@ -38,6 +51,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|
|
|
|
|
private Configuration schedConf;
|
|
|
private YarnConfigurationStore confStore;
|
|
|
+ private ConfigurationMutationACLPolicy aclMutationPolicy;
|
|
|
private RMContext rmContext;
|
|
|
private Configuration conf;
|
|
|
|
|
@@ -68,6 +82,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|
|
schedConf.set(kv.getKey(), kv.getValue());
|
|
|
}
|
|
|
confStore.initialize(config, schedConf);
|
|
|
+ this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
|
|
|
+ .getPolicy(config);
|
|
|
+ aclMutationPolicy.init(config, rmContext);
|
|
|
this.conf = config;
|
|
|
}
|
|
|
|
|
@@ -80,12 +97,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void mutateConfiguration(String user,
|
|
|
- Map<String, String> confUpdate) throws IOException {
|
|
|
+ public void mutateConfiguration(UserGroupInformation user,
|
|
|
+ QueueConfigsUpdateInfo confUpdate) throws IOException {
|
|
|
+ if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
|
|
|
+ throw new AccessControlException("User is not admin of all modified" +
|
|
|
+ " queues.");
|
|
|
+ }
|
|
|
Configuration oldConf = new Configuration(schedConf);
|
|
|
- LogMutation log = new LogMutation(confUpdate, user);
|
|
|
+ Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
|
|
|
+ LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
|
|
|
long id = confStore.logMutation(log);
|
|
|
- for (Map.Entry<String, String> kv : confUpdate.entrySet()) {
|
|
|
+ for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
|
|
|
if (kv.getValue() == null) {
|
|
|
schedConf.unset(kv.getKey());
|
|
|
} else {
|
|
@@ -101,4 +123,125 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|
|
}
|
|
|
confStore.confirmMutation(id, true);
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ private Map<String, String> constructKeyValueConfUpdate(
|
|
|
+ QueueConfigsUpdateInfo mutationInfo) throws IOException {
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
|
|
|
+ CapacitySchedulerConfiguration proposedConf =
|
|
|
+ new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
|
|
|
+ Map<String, String> confUpdate = new HashMap<>();
|
|
|
+ for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
|
|
|
+ removeQueue(queueToRemove, proposedConf, confUpdate);
|
|
|
+ }
|
|
|
+ for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
|
|
|
+ addQueue(addQueueInfo, proposedConf, confUpdate);
|
|
|
+ }
|
|
|
+ for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
|
|
|
+ updateQueue(updateQueueInfo, proposedConf, confUpdate);
|
|
|
+ }
|
|
|
+ return confUpdate;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removeQueue(
|
|
|
+ String queueToRemove, CapacitySchedulerConfiguration proposedConf,
|
|
|
+ Map<String, String> confUpdate) throws IOException {
|
|
|
+ if (queueToRemove == null) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
|
|
|
+ String queueName = queueToRemove.substring(
|
|
|
+ queueToRemove.lastIndexOf('.') + 1);
|
|
|
+ CSQueue queue = cs.getQueue(queueName);
|
|
|
+ if (queue == null ||
|
|
|
+ !queue.getQueuePath().equals(queueToRemove)) {
|
|
|
+ throw new IOException("Queue " + queueToRemove + " not found");
|
|
|
+ } else if (queueToRemove.lastIndexOf('.') == -1) {
|
|
|
+ throw new IOException("Can't remove queue " + queueToRemove);
|
|
|
+ }
|
|
|
+ String parentQueuePath = queueToRemove.substring(0, queueToRemove
|
|
|
+ .lastIndexOf('.'));
|
|
|
+ String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
|
|
|
+ List<String> newSiblingQueues = new ArrayList<>();
|
|
|
+ for (String siblingQueue : siblingQueues) {
|
|
|
+ if (!siblingQueue.equals(queueName)) {
|
|
|
+ newSiblingQueues.add(siblingQueue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ proposedConf.setQueues(parentQueuePath, newSiblingQueues
|
|
|
+ .toArray(new String[0]));
|
|
|
+ String queuesConfig = CapacitySchedulerConfiguration.PREFIX
|
|
|
+ + parentQueuePath + CapacitySchedulerConfiguration.DOT
|
|
|
+ + CapacitySchedulerConfiguration.QUEUES;
|
|
|
+ if (newSiblingQueues.size() == 0) {
|
|
|
+ confUpdate.put(queuesConfig, null);
|
|
|
+ } else {
|
|
|
+ confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
|
|
|
+ }
|
|
|
+ for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
|
|
|
+ ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
|
|
|
+ .entrySet()) {
|
|
|
+ proposedConf.unset(confRemove.getKey());
|
|
|
+ confUpdate.put(confRemove.getKey(), null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addQueue(
|
|
|
+ QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
|
|
|
+ Map<String, String> confUpdate) throws IOException {
|
|
|
+ if (addInfo == null) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
|
|
|
+ String queuePath = addInfo.getQueue();
|
|
|
+ String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
|
|
|
+ if (cs.getQueue(queueName) != null) {
|
|
|
+ throw new IOException("Can't add existing queue " + queuePath);
|
|
|
+ } else if (queuePath.lastIndexOf('.') == -1) {
|
|
|
+ throw new IOException("Can't add invalid queue " + queuePath);
|
|
|
+ }
|
|
|
+ String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
|
|
|
+ String[] siblings = proposedConf.getQueues(parentQueue);
|
|
|
+ List<String> siblingQueues = siblings == null ? new ArrayList<>() :
|
|
|
+ new ArrayList<>(Arrays.<String>asList(siblings));
|
|
|
+ siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
|
|
|
+ proposedConf.setQueues(parentQueue,
|
|
|
+ siblingQueues.toArray(new String[0]));
|
|
|
+ confUpdate.put(CapacitySchedulerConfiguration.PREFIX
|
|
|
+ + parentQueue + CapacitySchedulerConfiguration.DOT
|
|
|
+ + CapacitySchedulerConfiguration.QUEUES,
|
|
|
+ Joiner.on(',').join(siblingQueues));
|
|
|
+ String keyPrefix = CapacitySchedulerConfiguration.PREFIX
|
|
|
+ + queuePath + CapacitySchedulerConfiguration.DOT;
|
|
|
+ for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
|
|
|
+ if (kv.getValue() == null) {
|
|
|
+ proposedConf.unset(keyPrefix + kv.getKey());
|
|
|
+ } else {
|
|
|
+ proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
|
|
|
+ }
|
|
|
+ confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateQueue(QueueConfigInfo updateInfo,
|
|
|
+ CapacitySchedulerConfiguration proposedConf,
|
|
|
+ Map<String, String> confUpdate) {
|
|
|
+ if (updateInfo == null) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ String queuePath = updateInfo.getQueue();
|
|
|
+ String keyPrefix = CapacitySchedulerConfiguration.PREFIX
|
|
|
+ + queuePath + CapacitySchedulerConfiguration.DOT;
|
|
|
+ for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
|
|
|
+ if (kv.getValue() == null) {
|
|
|
+ proposedConf.unset(keyPrefix + kv.getKey());
|
|
|
+ } else {
|
|
|
+ proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
|
|
|
+ }
|
|
|
+ confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|