Browse Source

YARN-6818. User limit per partition is not honored in branch-2.7. Contributed by Jonathan Hung.

Jonathan Hung 7 years ago
parent
commit
f41d7b18dd

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -127,6 +127,9 @@ Release 2.7.4 - UNRELEASED
     YARN-3260. AM attempt fail to register before RM processes launch event
     (Bibin A Chundatt via jlowe)
 
+    YARN-6818. User limit per partition is not honored in branch-2.7.
+    (Jonathan Hung via shv)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -1138,7 +1138,7 @@ public class LeafQueue extends AbstractCSQueue {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getUsed(), application.getCurrentReservation()),
+            Resources.subtract(user.getUsed(label), application.getCurrentReservation()),
             limit)) {
 
           if (LOG.isDebugEnabled()) {

+ 109 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -23,6 +23,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -32,6 +34,7 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -102,17 +106,22 @@ public class TestReservations {
   }
 
   private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
-    setup(csConf, false);
+    setup(csConf, false, CommonNodeLabelsManager.NO_LABEL);
+  }
+
+  private void setup(CapacitySchedulerConfiguration csConf, String label)
+      throws Exception {
+    setup(csConf, false, label);
   }
 
   private void setup(CapacitySchedulerConfiguration csConf,
-      boolean addUserLimits) throws Exception {
+      boolean addUserLimits, String label) throws Exception {
 
     csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
     final String newRoot = "root" + System.currentTimeMillis();
     // final String newRoot = "root";
 
-    setupQueueConfiguration(csConf, newRoot, addUserLimits);
+    setupQueueConfiguration(csConf, newRoot, addUserLimits, label);
     YarnConfiguration conf = new YarnConfiguration();
     cs.setConf(conf);
 
@@ -153,7 +162,7 @@ public class TestReservations {
   private static final String A = "a";
 
   private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
-      final String newRoot, boolean addUserLimits) {
+      final String newRoot, boolean addUserLimits, String label) {
 
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT,
@@ -161,6 +170,11 @@ public class TestReservations {
     conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
     conf.setAcl(CapacitySchedulerConfiguration.ROOT,
         QueueACL.SUBMIT_APPLICATIONS, " ");
+    if (!label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+      conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, label, 100);
+      conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT,
+          label, 100);
+    }
 
     final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "."
         + newRoot;
@@ -168,10 +182,18 @@ public class TestReservations {
     conf.setCapacity(Q_newRoot, 100);
     conf.setMaximumCapacity(Q_newRoot, 100);
     conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
+    if (!label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+      conf.setCapacityByLabel(Q_newRoot, label, 100);
+      conf.setMaximumCapacityByLabel(Q_newRoot, label, 100);
+    }
 
     final String Q_A = Q_newRoot + "." + A;
     conf.setCapacity(Q_A, 100f);
     conf.setMaximumCapacity(Q_A, 100);
+    if (!label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+      conf.setCapacityByLabel(Q_A, label, 100);
+      conf.setMaximumCapacityByLabel(Q_A, label, 100);
+    }
     conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
 
     if (addUserLimits) {
@@ -368,7 +390,7 @@ public class TestReservations {
   @Test
   public void testReservationLimitOtherUsers() throws Exception {
     CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-    setup(csConf, true);
+    setup(csConf, true, CommonNodeLabelsManager.NO_LABEL);
 
     // Manipulate queue 'a'
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
@@ -1233,6 +1255,88 @@ public class TestReservations {
     assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
   }
 
+  @Test
+  public void testAssignToUserWithPartition() throws Exception {
+
+    String label = "x";
+    Set<String> labelSingleton = Collections.singleton(label);
+    // Setup some nodes
+    String host_0 = "host_0";
+    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
+        8 * GB);
+    String host_1 = "host_1";
+    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
+        8 * GB);
+    node_1.updateLabels(labelSingleton);
+
+    RMNodeLabelsManager mgr = mock(RMNodeLabelsManager.class);
+    when(mgr.getResourceByLabel(eq(label), isA(Resource.class)))
+        .thenReturn(Resources.createResource(8 * GB));
+    when(mgr.getResourceByLabel(eq(CommonNodeLabelsManager.NO_LABEL),
+        isA(Resource.class))).thenReturn(Resources.createResource(8 * GB));
+    when(mgr.getClusterNodeLabels()).thenReturn(labelSingleton);
+    rmContext.setNodeLabelManager(mgr);
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+    setup(csConf, label);
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
+
+    // Users
+    final String user_0 = "user_0";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = TestUtils
+        .getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+        mock(ActiveUsersManager.class), spyRMContext);
+    rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 = TestUtils
+        .getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+        mock(ActiveUsersManager.class), spyRMContext);
+    a.submitApplicationAttempt(app_1, user_0);
+
+    when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
+    when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
+
+    Resource clusterResource = Resources.createResource(16 * GB);
+    when(csContext.getNumClusterNodes()).thenReturn(2);
+
+    // Setup resource-requests
+    Priority priorityAM = TestUtils.createMockPriority(1);
+    Priority priorityMap = TestUtils.createMockPriority(5);
+
+    ResourceRequest reqAM = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 3 * GB, 1, true, priorityAM, recordFactory);
+    reqAM.setNodeLabelExpression(label);
+    app_0.updateResourceRequests(Collections.singletonList(reqAM));
+    ResourceRequest reqMap = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 2 * GB, 1, true, priorityMap, recordFactory);
+    reqMap.setNodeLabelExpression(label);
+    app_0.updateResourceRequests(Collections.singletonList(reqMap));
+
+    // Allocate AM on node_1 (label x)
+    a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource));
+    assertEquals(0 * GB, a.getQueueResourceUsage().getUsed().getMemory());
+    assertEquals(3 * GB, a.getQueueResourceUsage().getUsed(label).getMemory());
+    assertEquals(0 * GB, a.getUser(user_0).getUsed().getMemory());
+    assertEquals(3 * GB, a.getUser(user_0).getUsed(label).getMemory());
+    assertEquals(0 * GB, node_0.getUsedResource().getMemory());
+    assertEquals(3 * GB, node_1.getUsedResource().getMemory());
+
+    Resource limit = Resources.createResource(2 * GB, 0);
+    ResourceLimits userResourceLimits = new ResourceLimits(clusterResource);
+    // User limit is 2 GB, user's used resources on partition x exceeds this
+    // so allocation should fail
+    boolean res = a.assignToUser(clusterResource, user_0, limit, app_0,
+        labelSingleton, userResourceLimits);
+    assertFalse(res);
+  }
+
   @Test
   public void testReservationsNoneAvailable() throws Exception {
     // Test that we now unreserve and use a node that has space