소스 검색

YARN-4565. Fix a bug that leads to AM resource limit not hornored when sizeBasedWeight enabled for FairOrderingPolicy. Contributed by Wangda Tan

Jian He 9 년 전
부모
커밋
bb2a9a78c0

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -1225,6 +1225,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4502. Fix two AM containers get allocated when AM restart.
     (Vinod Kumar Vavilapalli via wangda)
 
+    YARN-4565. Fix a bug that leads to AM resource limit not hornored when
+    sizeBasedWeight enabled for FairOrderingPolicy. (wtan via jianhe)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -863,7 +863,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   }
   
   @Override
-  public synchronized ResourceUsage getSchedulingResourceUsage() {
+  public ResourceUsage getSchedulingResourceUsage() {
     return attemptResourceUsage;
   }
   

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -978,7 +978,8 @@ public class CapacityScheduler extends
                    clusterResource, getMinimumResourceCapability());
     }
 
-    if (updateDemandForQueue != null) {
+    if (updateDemandForQueue != null && !application
+        .isWaitingForAMContainer()) {
       updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
     }
 

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -30,6 +30,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -959,4 +960,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
     return defaultPriority;
   }
+
+  @VisibleForTesting
+  public void setOrderingPolicy(String queue, String policy) {
+    set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
+  }
+
+  @VisibleForTesting
+  public void setOrderingPolicyParameter(String queue,
+      String parameterKey, String parameterValue) {
+    set(getQueuePrefix(queue) + ORDERING_POLICY + "."
+        + parameterKey, parameterValue);
+  }
 }

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java

@@ -20,6 +20,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
 import java.util.*;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -138,6 +150,48 @@ public class TestFairOrderingPolicy {
     checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
   }
 
+  @Test
+  public void testSizeBasedWeightNotAffectAppActivation() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+
+    // Define top-level queues
+    String queuePath = CapacitySchedulerConfiguration.ROOT + ".default";
+    csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
+    csConf.setOrderingPolicyParameter(queuePath,
+        FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true");
+    csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);
+
+    // inject node label manager
+    MockRM rm = new MockRM(csConf);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Get LeafQueue
+    LeafQueue lq = (LeafQueue) cs.getQueue("default");
+    OrderingPolicy<FiCaSchedulerApp> policy = lq.getOrderingPolicy();
+    Assert.assertTrue(policy instanceof FairOrderingPolicy);
+    Assert.assertTrue(((FairOrderingPolicy<FiCaSchedulerApp>)policy).getSizeBasedWeight());
+
+    rm.registerNode("h1:1234", 10 * GB);
+
+    // Submit 4 apps
+    rm.submitApp(1 * GB, "app", "user", null, "default");
+    rm.submitApp(1 * GB, "app", "user", null, "default");
+    rm.submitApp(1 * GB, "app", "user", null, "default");
+    rm.submitApp(1 * GB, "app", "user", null, "default");
+
+    Assert.assertEquals(1, lq.getNumActiveApplications());
+    Assert.assertEquals(3, lq.getNumPendingApplications());
+
+    // Try allocate once, #active-apps and #pending-apps should be still correct
+    cs.handle(new NodeUpdateSchedulerEvent(
+        rm.getRMContext().getRMNodes().get(NodeId.newInstance("h1", 1234))));
+    Assert.assertEquals(1, lq.getNumActiveApplications());
+    Assert.assertEquals(3, lq.getNumPendingApplications());
+  }
+
   public void checkIds(Iterator<MockSchedulableEntity> si,
       String[] ids) {
     for (int i = 0;i < ids.length;i++) {