Преглед на файлове

YARN-5545. Fix issues related to Max App in capacity scheduler. Contributed by Bibin A Chundatt

Naganarasimha преди 8 години
родител
ревизия
503e73e849

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -868,6 +868,13 @@ public class CapacityScheduler extends
       String queueName, String user, Priority priority) {
     try {
       writeLock.lock();
+      if (isSystemAppsLimitReached()) {
+        String message = "Maximum system application limit reached,"
+            + "cannot accept submission of application: " + applicationId;
+        this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
+            applicationId, RMAppEventType.APP_REJECTED, message));
+        return;
+      }
       // Sanity checks.
       CSQueue queue = getQueue(queueName);
       if (queue == null) {
@@ -2023,6 +2030,13 @@ public class CapacityScheduler extends
     return apps;
   }
 
+  public boolean isSystemAppsLimitReached() {
+    if (root.getNumApplications() < conf.getMaximumSystemApplications()) {
+      return false;
+    }
+    return true;
+  }
+
   private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration(
       Configuration configuration) throws IOException {
     try {

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -1133,4 +1133,19 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
       INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
   public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
       0.2f;
+
+  /**
+   * Maximum application for a queue to be used when application per queue is
+   * not defined.To be consistent with previous version the default value is set
+   * as UNDEFINED.
+   */
+  @Private
+  public static final String QUEUE_GLOBAL_MAX_APPLICATION =
+      PREFIX + "global-queue-max-application";
+
+  public int getGlobalMaximumApplicationsPerQueue() {
+    int maxApplicationsPerQueue =
+        getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED);
+    return maxApplicationsPerQueue;
+  }
 }

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -188,9 +188,14 @@ public class LeafQueue extends AbstractCSQueue {
 
       maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
       if (maxApplications < 0) {
-        int maxSystemApps = conf.getMaximumSystemApplications();
-        maxApplications =
-            (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
+        int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
+        if (maxGlobalPerQueueApps > 0) {
+          maxApplications = maxGlobalPerQueueApps;
+        } else {
+          int maxSystemApps = conf.getMaximumSystemApplications();
+          maxApplications =
+              (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
+        }
       }
       maxApplicationsPerUser = Math.min(maxApplications,
           (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor));

+ 166 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -31,14 +31,17 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -47,11 +50,17 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -61,12 +70,15 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
 public class TestApplicationLimits {
   
   private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class);
@@ -692,10 +704,159 @@ public class TestApplicationLimits {
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());
     assertEquals(expectedHeadroom, app_1_0.getHeadroom());
   }
-  
 
-  @After
-  public void tearDown() {
-  
+  private Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    // Define top-level
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{"a", "b", "c", "d"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+    conf.setInt(CapacitySchedulerConfiguration.QUEUE_GLOBAL_MAX_APPLICATION,
+        20);
+    conf.setInt("yarn.scheduler.capacity.root.a.a1.maximum-applications", 1);
+    conf.setFloat("yarn.scheduler.capacity.root.d.user-limit-factor", 0.1f);
+    conf.setInt("yarn.scheduler.capacity.maximum-applications", 4);
+    final String a = CapacitySchedulerConfiguration.ROOT + ".a";
+    final String b = CapacitySchedulerConfiguration.ROOT + ".b";
+    final String c = CapacitySchedulerConfiguration.ROOT + ".c";
+    final String d = CapacitySchedulerConfiguration.ROOT + ".d";
+    final String aa1 = a + ".a1";
+    final String aa2 = a + ".a2";
+    final String aa3 = a + ".a3";
+
+    conf.setQueues(a, new String[]{"a1", "a2", "a3"});
+    conf.setCapacity(a, 50);
+    conf.setCapacity(b, 50);
+    conf.setCapacity(c, 0);
+    conf.setCapacity(d, 0);
+    conf.setCapacity(aa1, 50);
+    conf.setCapacity(aa2, 50);
+    conf.setCapacity(aa3, 0);
+
+    conf.setCapacityByLabel(a, "y", 25);
+    conf.setCapacityByLabel(b, "y", 50);
+    conf.setCapacityByLabel(c, "y", 25);
+    conf.setCapacityByLabel(d, "y", 0);
+
+    conf.setCapacityByLabel(a, "x", 50);
+    conf.setCapacityByLabel(b, "x", 50);
+
+    conf.setCapacityByLabel(a, "z", 50);
+    conf.setCapacityByLabel(b, "z", 50);
+
+    conf.setCapacityByLabel(aa1, "x", 100);
+    conf.setCapacityByLabel(aa2, "x", 0);
+
+    conf.setCapacityByLabel(aa1, "y", 25);
+    conf.setCapacityByLabel(aa2, "y", 75);
+
+    conf.setCapacityByLabel(aa2, "z", 75);
+    conf.setCapacityByLabel(aa3, "z", 25);
+    return conf;
+  }
+
+  private Set<String> toSet(String... elements) {
+    Set<String> set = Sets.newHashSet(elements);
+    return set;
+  }
+
+  @Test(timeout = 120000)
+  public void testApplicationLimitSubmit() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
+        ImmutableSet.of("x", "y", "z"));
+
+    // set mapping:
+    // h1 -> x
+    // h2 -> y
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 4096);
+    MockNM nm2 = rm.registerNode("h2:1234", 4096);
+    MockNM nm3 = rm.registerNode("h3:1234", 4096);
+
+    // Submit application to queue c where the default partition capacity is
+    // zero
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "c", false);
+    rm.drainEvents();
+    rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    assertEquals(RMAppState.ACCEPTED, app1.getState());
+    rm.killApp(app1.getApplicationId());
+
+    RMApp app2 = rm.submitApp(GB, "app", "user", null, "a1", false);
+    rm.drainEvents();
+    rm.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+    assertEquals(RMAppState.ACCEPTED, app2.getState());
+
+    // Check second application is rejected and based on queue level max
+    // application app is rejected
+    RMApp app3 = rm.submitApp(GB, "app", "user", null, "a1", false);
+    rm.drainEvents();
+    rm.waitForState(app3.getApplicationId(), RMAppState.FAILED);
+    assertEquals(RMAppState.FAILED, app3.getState());
+    assertEquals(
+        "org.apache.hadoop.security.AccessControlException: "
+            + "Queue root.a.a1 already has 1 applications, cannot accept "
+            + "submission of application: " + app3.getApplicationId(),
+        app3.getDiagnostics().toString());
+
+    // based on Global limit of queue usert application is rejected
+    RMApp app11 = rm.submitApp(GB, "app", "user", null, "d", false);
+    rm.drainEvents();
+    rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
+    assertEquals(RMAppState.ACCEPTED, app11.getState());
+    RMApp app12 = rm.submitApp(GB, "app", "user", null, "d", false);
+    rm.drainEvents();
+    rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
+    assertEquals(RMAppState.ACCEPTED, app12.getState());
+    RMApp app13 = rm.submitApp(GB, "app", "user", null, "d", false);
+    rm.drainEvents();
+    rm.waitForState(app13.getApplicationId(), RMAppState.FAILED);
+    assertEquals(RMAppState.FAILED, app13.getState());
+    assertEquals(
+        "org.apache.hadoop.security.AccessControlException: Queue"
+            + " root.d already has 2 applications from user user cannot"
+            + " accept submission of application: " + app13.getApplicationId(),
+        app13.getDiagnostics().toString());
+
+    // based on system max limit application is rejected
+    RMApp app14 = rm.submitApp(GB, "app", "user2", null, "a2", false);
+    rm.drainEvents();
+    rm.waitForState(app14.getApplicationId(), RMAppState.ACCEPTED);
+    RMApp app15 = rm.submitApp(GB, "app", "user2", null, "a2", false);
+    rm.drainEvents();
+    rm.waitForState(app15.getApplicationId(), RMAppState.FAILED);
+    assertEquals(RMAppState.FAILED, app15.getState());
+    assertEquals(
+        "Maximum system application limit reached,cannot"
+            + " accept submission of application: " + app15.getApplicationId(),
+        app15.getDiagnostics().toString());
+
+    rm.killApp(app2.getApplicationId());
+    rm.killApp(app11.getApplicationId());
+    rm.killApp(app13.getApplicationId());
+    rm.killApp(app14.getApplicationId());
+    rm.stop();
   }
 }