|
@@ -26,6 +26,7 @@ import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
@@ -109,6 +110,8 @@ public class PoolManager {
|
|
// mapred.fairscheduler.allocation.file is used).
|
|
// mapred.fairscheduler.allocation.file is used).
|
|
private String poolNameProperty; // Jobconf property to use for determining a
|
|
private String poolNameProperty; // Jobconf property to use for determining a
|
|
// job's pool name (default: user.name)
|
|
// job's pool name (default: user.name)
|
|
|
|
+
|
|
|
|
+ PoolPlacementPolicy placementPolicy;
|
|
|
|
|
|
private Map<String, Pool> pools = new HashMap<String, Pool>();
|
|
private Map<String, Pool> pools = new HashMap<String, Pool>();
|
|
|
|
|
|
@@ -126,6 +129,10 @@ public class PoolManager {
|
|
Configuration conf = scheduler.getConf();
|
|
Configuration conf = scheduler.getConf();
|
|
this.poolNameProperty = conf.get(
|
|
this.poolNameProperty = conf.get(
|
|
"mapred.fairscheduler.poolnameproperty", "user.name");
|
|
"mapred.fairscheduler.poolnameproperty", "user.name");
|
|
|
|
+
|
|
|
|
+ this.placementPolicy = new PoolPlacementPolicy(getSimplePlacementRules(),
|
|
|
|
+ new HashSet<String>(), conf);
|
|
|
|
+
|
|
this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
|
|
this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
|
|
if (allocFile == null) {
|
|
if (allocFile == null) {
|
|
// No allocation file specified in jobconf. Use the default allocation
|
|
// No allocation file specified in jobconf. Use the default allocation
|
|
@@ -242,6 +249,8 @@ public class PoolManager {
|
|
long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
|
|
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
|
|
|
|
+
|
|
|
|
+ PoolPlacementPolicy newPlacementPolicy = null;
|
|
|
|
|
|
// Remember all pool names so we can display them on web UI, etc.
|
|
// Remember all pool names so we can display them on web UI, etc.
|
|
List<String> poolNamesInAllocFile = new ArrayList<String>();
|
|
List<String> poolNamesInAllocFile = new ArrayList<String>();
|
|
@@ -258,6 +267,8 @@ public class PoolManager {
|
|
doc = builder.parse(allocFile.toString());
|
|
doc = builder.parse(allocFile.toString());
|
|
}
|
|
}
|
|
Element root = doc.getDocumentElement();
|
|
Element root = doc.getDocumentElement();
|
|
|
|
+ Element placementPolicyElement = null;
|
|
|
|
+
|
|
if (!"allocations".equals(root.getTagName()))
|
|
if (!"allocations".equals(root.getTagName()))
|
|
throw new AllocationConfigurationException("Bad fair scheduler config " +
|
|
throw new AllocationConfigurationException("Bad fair scheduler config " +
|
|
"file: top-level element not <allocations>");
|
|
"file: top-level element not <allocations>");
|
|
@@ -352,10 +363,21 @@ public class PoolManager {
|
|
} else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
|
|
} else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
|
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
defaultSchedulingMode = parseSchedulingMode(text);
|
|
defaultSchedulingMode = parseSchedulingMode(text);
|
|
|
|
+ } else if ("poolPlacementPolicy".equals(element.getTagName())) {
|
|
|
|
+ placementPolicyElement = element;
|
|
} else {
|
|
} else {
|
|
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
|
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Load placement policy and pass it configured pools
|
|
|
|
+ if (placementPolicyElement != null) {
|
|
|
|
+ newPlacementPolicy = PoolPlacementPolicy.fromXml(placementPolicyElement,
|
|
|
|
+ new HashSet<String>(poolNamesInAllocFile), scheduler.getConf());
|
|
|
|
+ } else {
|
|
|
|
+ newPlacementPolicy = new PoolPlacementPolicy(getSimplePlacementRules(),
|
|
|
|
+ new HashSet<String>(poolNamesInAllocFile), scheduler.getConf());
|
|
|
|
+ }
|
|
|
|
|
|
// Commit the reload; also create any pool defined in the alloc file
|
|
// Commit the reload; also create any pool defined in the alloc file
|
|
// if it does not already exist, so it can be displayed on the web UI.
|
|
// if it does not already exist, so it can be displayed on the web UI.
|
|
@@ -375,6 +397,7 @@ public class PoolManager {
|
|
this.defaultSchedulingMode = defaultSchedulingMode;
|
|
this.defaultSchedulingMode = defaultSchedulingMode;
|
|
this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>(
|
|
this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>(
|
|
poolNamesInAllocFile));
|
|
poolNamesInAllocFile));
|
|
|
|
+ this.placementPolicy = newPlacementPolicy;
|
|
for (String name: poolNamesInAllocFile) {
|
|
for (String name: poolNamesInAllocFile) {
|
|
Pool pool = getPool(name);
|
|
Pool pool = getPool(name);
|
|
if (poolModes.containsKey(name)) {
|
|
if (poolModes.containsKey(name)) {
|
|
@@ -480,8 +503,16 @@ public class PoolManager {
|
|
*/
|
|
*/
|
|
public String getPoolName(JobInProgress job) {
|
|
public String getPoolName(JobInProgress job) {
|
|
Configuration conf = job.getJobConf();
|
|
Configuration conf = job.getJobConf();
|
|
- return conf.get(EXPLICIT_POOL_PROPERTY,
|
|
|
|
|
|
+ String poolName = conf.get(EXPLICIT_POOL_PROPERTY,
|
|
conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
|
|
conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
|
|
|
|
+ String user = conf.get("user.name");
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ return placementPolicy.assignJobToPool(poolName, user);
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ LOG.error("Error assigning job to pool, using default pool", ex);
|
|
|
|
+ return Pool.DEFAULT_POOL_NAME;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -553,4 +584,21 @@ public class PoolManager {
|
|
return declaredPools;
|
|
return declaredPools;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Construct simple pool placement policy from allow-undeclared-pools
|
|
|
|
+ */
|
|
|
|
+ private List<PoolPlacementRule> getSimplePlacementRules() {
|
|
|
|
+ List<PoolPlacementRule> rules = new ArrayList<PoolPlacementRule>();
|
|
|
|
+
|
|
|
|
+ boolean specifiedCreate = scheduler.getConf().getBoolean(
|
|
|
|
+ FairScheduler.ALLOW_UNDECLARED_POOLS_KEY,
|
|
|
|
+ FairScheduler.DEFAULT_ALLOW_UNDECLARED_POOLS);
|
|
|
|
+
|
|
|
|
+ rules.add(
|
|
|
|
+ new PoolPlacementRule.Specified().initialize(specifiedCreate, null));
|
|
|
|
+ rules.add(new PoolPlacementRule.Default().initialize(true, null));
|
|
|
|
+
|
|
|
|
+ return rules;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|