فهرست منبع

MAPREDUCE-5712. Backport Fair Scheduler pool placement by secondary group (Ted Malaska via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1557845 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 سال پیش
والد
کامیت
aa795fd268

+ 3 - 0
CHANGES.txt

@@ -55,6 +55,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners 
     (tucu)
 
+    MAPREDUCE-5712. Backport Fair Scheduler pool placement by secondary group
+    (Ted Malaska via Sandy Ryza)
+
   BUG FIXES
 
     HADOOP-9863. Backport HADOOP-8686 to support BigEndian on ppc64. 

+ 1 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java

@@ -39,6 +39,7 @@ public class PoolPlacementPolicy {
         new HashMap<String, Class<? extends PoolPlacementRule>>();
     map.put("user", PoolPlacementRule.User.class);
     map.put("primaryGroup", PoolPlacementRule.PrimaryGroup.class);
+    map.put("secondaryGroupExistingPool", PoolPlacementRule.SecondaryGroupExistingPool.class);
     map.put("specified", PoolPlacementRule.Specified.class);
     map.put("default", PoolPlacementRule.Default.class);
     map.put("reject", PoolPlacementRule.Reject.class);

+ 38 - 7
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java

@@ -58,7 +58,7 @@ public abstract class PoolPlacementRule {
    */
   public String assignJobToPool(String requestedPool, String user,
       Groups groups, Collection<String> configuredPools) throws IOException {
-    String pool = getPoolForJob(requestedPool, user, groups);
+    String pool = getPoolForJob(requestedPool, user, groups, configuredPools);
     if (create || configuredPools.contains(pool)) {
       return pool;
     } else {
@@ -98,12 +98,14 @@ public abstract class PoolPlacementRule {
    *    The user submitting the job.
    * @param groups
    *    The groups of the user submitting the job.
+   * @param configuredPools
+   *    The pools specified in the scheduler configuration.
    * @return
    *    The name of the Pool to assign the job to, or null to empty string
    *    continue to the next rule.
    */
   protected abstract String getPoolForJob(String requestedPool, String user,
-      Groups groups) throws IOException;
+      Groups groups, Collection<String> configuredQueues) throws IOException;
 
   /**
    * Places jobs in pools by username of the submitter
@@ -111,7 +113,7 @@ public abstract class PoolPlacementRule {
   public static class User extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool,
-        String user, Groups groups) {
+        String user, Groups groups, Collection<String> configuredPools) {
       if (user != null) {
         return user; 
       } else {
@@ -131,7 +133,8 @@ public abstract class PoolPlacementRule {
   public static class PrimaryGroup extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool,
-        String user, Groups groups) throws IOException {
+        String user, Groups groups, 
+        Collection<String> configuredPools) throws IOException {
       if (user == null) {
         return Pool.DEFAULT_POOL_NAME;
       }
@@ -150,13 +153,41 @@ public abstract class PoolPlacementRule {
     }
   }
 
+  
+  /**
+   * Places jobs in queues by secondary group of the submitter
+   * 
+   * Match will be made on first secondary group that exist in
+   * pool
+   */
+  public static class SecondaryGroupExistingPool extends PoolPlacementRule {
+    @Override
+    protected String getPoolForJob(String requestedPool,
+        String user, Groups groups, 
+        Collection<String> configuredPools) throws IOException {
+      List<String> groupNames = groups.getGroups(user);
+
+      for (int i = 1; i < groupNames.size(); i++) {
+        if (configuredPools.contains(groupNames.get(i))) {
+          return groupNames.get(i);
+        }
+      }
+      return "";
+    }
+        
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+
   /**
    * Places jobs in pools by requested pool of the submitter
    */
   public static class Specified extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool,
-        String user, Groups groups) {
+        String user, Groups groups, Collection<String> configuredPools) {
       if (requestedPool.equals(Pool.DEFAULT_POOL_NAME)) {
         return "";
       } else {
@@ -176,7 +207,7 @@ public abstract class PoolPlacementRule {
   public static class Default extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool, String user,
-        Groups groups) {
+        Groups groups, Collection<String> configuredPools) {
       return Pool.DEFAULT_POOL_NAME;
     }
     
@@ -198,7 +229,7 @@ public abstract class PoolPlacementRule {
     
     @Override
     protected String getPoolForJob(String requestedPool, String user,
-        Groups groups) {
+        Groups groups, Collection<String> configuredPools) {
       throw new UnsupportedOperationException();
     }
     

+ 2 - 1
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SimpleGroupsMapping.java → src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java

@@ -28,7 +28,8 @@ public class SimpleGroupsMapping implements GroupMappingServiceProvider {
   
   @Override
   public List<String> getGroups(String user) {
-    return Arrays.asList(user + "group");
+    return Arrays.asList(user + "group", user + "subgroup1", 
+      user + "subgroup2");
   }
 
   @Override

+ 26 - 7
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -601,24 +601,33 @@ public class TestFairScheduler extends TestCase {
   
   private JobInProgress submitJobNotInitialized(int state, int maps, int reduces)
 	    throws IOException {
-    return submitJob(state, maps, reduces, null, null, false);
+    return submitJob(state, maps, reduces, null, null, false, null);
   }
 
   private JobInProgress submitJob(int state, int maps, int reduces)
       throws IOException {
-    return submitJob(state, maps, reduces, null, null, true);
+    return submitJob(state, maps, reduces, null, null, true, null);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool)
       throws IOException {
-    return submitJob(state, maps, reduces, pool, null, true);
+    return submitJob(state, maps, reduces, pool, null, true, null);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool,
       String[][] mapInputLocations, boolean initializeJob) throws IOException {
+    return submitJob(state, maps, reduces, pool, mapInputLocations, initializeJob, null);
+  }
+
+  private JobInProgress submitJob(int state, int maps, int reduces, String pool,
+      String[][] mapInputLocations, boolean initializeJob, 
+      String userName) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
+    if (userName != null) {
+      jobConf.set("user.name", userName);
+    }
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
@@ -3098,10 +3107,14 @@ public class TestFairScheduler extends TestCase {
     rules.add(new PoolPlacementRule.Specified().initialize(true, null));
     rules.add(new PoolPlacementRule.User().initialize(false, null));
     rules.add(new PoolPlacementRule.PrimaryGroup().initialize(false, null));
+    rules.add(new PoolPlacementRule.SecondaryGroupExistingPool().initialize(false, null));
     rules.add(new PoolPlacementRule.Default().initialize(true, null));
     Set<String> pools = new HashSet();
     pools.add("user1");
     pools.add("user3group");
+    pools.add("user4subgroup1");
+    pools.add("user4subgroup2");
+    pools.add("user5subgroup2");
 
     placementPolicyConfig.set("user.name", "user1");
     PoolManager poolManager = scheduler.getPoolManager();
@@ -3109,19 +3122,25 @@ public class TestFairScheduler extends TestCase {
     poolManager.placementPolicy = new PoolPlacementPolicy(
         rules, pools, placementPolicyConfig);
     
-    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
-
-    job1.getJobConf().set("user.name", "user1");
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user1");
     poolManager.setPool(job1, "somepool");
     assertEquals("somepool", poolManager.getPoolName(job1));
 
     poolManager.setPool(job1, "default");
     assertEquals("user1", poolManager.getPoolName(job1));
 
-    job1.getJobConf().set("user.name", "user3");
+    job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user3");
     poolManager.setPool(job1, "default");
     assertEquals("user3group", poolManager.getPoolName(job1));
 
+    job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user4");
+    poolManager.setPool(job1, "default");
+    assertEquals("user4subgroup1", poolManager.getPoolName(job1));
+
+    job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user5");
+    poolManager.setPool(job1, "default");
+    assertEquals("user5subgroup2", poolManager.getPoolName(job1));
+
     job1.getJobConf().set("user.name", "otheruser");
     poolManager.setPool(job1, "default");
     assertEquals("default", poolManager.getPoolName(job1));