|
@@ -22,6 +22,8 @@ import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
|
@@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
+ private static final Log LOG = LogFactory.getLog(FSQueue.class.getName());
|
|
|
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
|
|
|
private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
|
|
|
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
|
@@ -242,26 +245,24 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
return !nonPreemptableQueues.contains(queueName);
|
|
|
}
|
|
|
|
|
|
- public ResourceWeights getQueueWeight(String queue) {
|
|
|
+ private ResourceWeights getQueueWeight(String queue) {
|
|
|
ResourceWeights weight = queueWeights.get(queue);
|
|
|
return (weight == null) ? ResourceWeights.NEUTRAL : weight;
|
|
|
}
|
|
|
|
|
|
- public void setQueueWeight(String queue, ResourceWeights weight) {
|
|
|
- queueWeights.put(queue, weight);
|
|
|
- }
|
|
|
-
|
|
|
public int getUserMaxApps(String user) {
|
|
|
Integer maxApps = userMaxApps.get(user);
|
|
|
return (maxApps == null) ? userMaxAppsDefault : maxApps;
|
|
|
}
|
|
|
|
|
|
- public int getQueueMaxApps(String queue) {
|
|
|
+ @VisibleForTesting
|
|
|
+ int getQueueMaxApps(String queue) {
|
|
|
Integer maxApps = queueMaxApps.get(queue);
|
|
|
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
|
|
|
}
|
|
|
-
|
|
|
- public float getQueueMaxAMShare(String queue) {
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ float getQueueMaxAMShare(String queue) {
|
|
|
Float maxAMShare = queueMaxAMShares.get(queue);
|
|
|
return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
|
|
|
}
|
|
@@ -273,21 +274,12 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
* @return the min allocation on this queue or {@link Resources#none}
|
|
|
* if not set
|
|
|
*/
|
|
|
- public Resource getMinResources(String queue) {
|
|
|
+ @VisibleForTesting
|
|
|
+ Resource getMinResources(String queue) {
|
|
|
Resource minQueueResource = minQueueResources.get(queue);
|
|
|
return (minQueueResource == null) ? Resources.none() : minQueueResource;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Set the maximum resource allocation for the given queue.
|
|
|
- *
|
|
|
- * @param queue the target queue
|
|
|
- * @param maxResource the maximum resource allocation
|
|
|
- */
|
|
|
- void setMaxResources(String queue, Resource maxResource) {
|
|
|
- maxQueueResources.put(queue, maxResource);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get the maximum resource allocation for the given queue. If the max in not
|
|
|
* set, return the larger of the min and the default max.
|
|
@@ -295,7 +287,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
* @param queue the target queue's name
|
|
|
* @return the max allocation on this queue
|
|
|
*/
|
|
|
- public Resource getMaxResources(String queue) {
|
|
|
+ @VisibleForTesting
|
|
|
+ Resource getMaxResources(String queue) {
|
|
|
Resource maxQueueResource = maxQueueResources.get(queue);
|
|
|
if (maxQueueResource == null) {
|
|
|
Resource minQueueResource = minQueueResources.get(queue);
|
|
@@ -317,21 +310,11 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
* @param queue the target queue's name
|
|
|
* @return the max allocation on this queue or null if not set
|
|
|
*/
|
|
|
- public Resource getMaxChildResources(String queue) {
|
|
|
+ @VisibleForTesting
|
|
|
+ Resource getMaxChildResources(String queue) {
|
|
|
return maxChildQueueResources.get(queue);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Set the maximum resource allocation for the children of the given queue.
|
|
|
- * Use of this method is primarily intended for testing purposes.
|
|
|
- *
|
|
|
- * @param queue the target queue
|
|
|
- * @param maxResource the maximum resource allocation
|
|
|
- */
|
|
|
- void setMaxChildResources(String queue, Resource maxResource) {
|
|
|
- maxChildQueueResources.put(queue, maxResource);
|
|
|
- }
|
|
|
-
|
|
|
public boolean hasAccess(String queueName, QueueACL acl,
|
|
|
UserGroupInformation user) {
|
|
|
int lastPeriodIndex = queueName.length();
|
|
@@ -346,8 +329,9 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
- public SchedulingPolicy getSchedulingPolicy(String queueName) {
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ SchedulingPolicy getSchedulingPolicy(String queueName) {
|
|
|
SchedulingPolicy policy = schedulingPolicies.get(queueName);
|
|
|
return (policy == null) ? defaultSchedulingPolicy : policy;
|
|
|
}
|
|
@@ -423,4 +407,35 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|
|
public void setAverageCapacity(int avgCapacity) {
|
|
|
globalReservationQueueConfig.setAverageCapacity(avgCapacity);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize a {@link FSQueue} with queue-specific properties and its
|
|
|
+ * metrics.
|
|
|
+ * @param queue the FSQueue needed to be initialized
|
|
|
+ * @param scheduler the scheduler which the queue belonged to
|
|
|
+ */
|
|
|
+ public void initFSQueue(FSQueue queue, FairScheduler scheduler){
|
|
|
+ // Set queue-specific properties.
|
|
|
+ String name = queue.getName();
|
|
|
+ queue.setWeights(getQueueWeight(name));
|
|
|
+ queue.setMinShare(getMinResources(name));
|
|
|
+ queue.setMaxShare(getMaxResources(name));
|
|
|
+ queue.setMaxRunningApps(getQueueMaxApps(name));
|
|
|
+ queue.setMaxAMShare(getQueueMaxAMShare(name));
|
|
|
+ queue.setMaxChildQueueResource(getMaxChildResources(name));
|
|
|
+ try {
|
|
|
+ SchedulingPolicy policy = getSchedulingPolicy(name);
|
|
|
+ policy.initialize(scheduler.getClusterResource());
|
|
|
+ queue.setPolicy(policy);
|
|
|
+ } catch (AllocationConfigurationException ex) {
|
|
|
+ LOG.warn("Failed to set the scheduling policy "
|
|
|
+ + getDefaultSchedulingPolicy(), ex);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set queue metrics.
|
|
|
+ queue.getMetrics().setMinShare(getMinResources(name));
|
|
|
+ queue.getMetrics().setMaxShare(getMaxResources(name));
|
|
|
+ queue.getMetrics().setMaxApps(getQueueMaxApps(name));
|
|
|
+ queue.getMetrics().setSchedulingPolicy(getSchedulingPolicy(name).getName());
|
|
|
+ }
|
|
|
}
|