Browse Source

YARN-9623. Auto adjust max queue length of app activities to make sure activities on all nodes can be covered. Contributed by Tao Yang.

Weiwei Yang 5 years ago
parent
commit
cbae241320

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4038,7 +4038,7 @@ public class YarnConfiguration extends Configuration {
       RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH =
       RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX + "max-queue-length";
   public static final int
-      DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH = 1000;
+      DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH = 100;
 
   public YarnConfiguration() {
     super();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -4209,6 +4209,6 @@
   <property>
     <description>Max queue length for app activities.</description>
     <name>yarn.resourcemanager.activities-manager.app-activities.max-queue-length</name>
-    <value>1000</value>
+    <value>100</value>
   </property>
 </configuration>

+ 50 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java

@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
@@ -79,7 +80,8 @@ public class ActivitiesManager extends AbstractService {
   private long activitiesCleanupIntervalMs;
   private long schedulerActivitiesTTL;
   private long appActivitiesTTL;
-  private int appActivitiesMaxQueueLength;
+  private volatile int appActivitiesMaxQueueLength;
+  private int configuredAppActivitiesMaxQueueLength;
   private final RMContext rmContext;
   private volatile boolean stopped;
   private ThreadLocal<DiagnosticsCollectorManager> diagnosticCollectorManager;
@@ -114,10 +116,11 @@ public class ActivitiesManager extends AbstractService {
         YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
         YarnConfiguration.
             DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS);
-    appActivitiesMaxQueueLength = conf.getInt(YarnConfiguration.
+    configuredAppActivitiesMaxQueueLength = conf.getInt(YarnConfiguration.
             RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
         YarnConfiguration.
             DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH);
+    appActivitiesMaxQueueLength = configuredAppActivitiesMaxQueueLength;
   }
 
   public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
@@ -228,6 +231,44 @@ public class ActivitiesManager extends AbstractService {
     recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
   }
 
+  private void dynamicallyUpdateAppActivitiesMaxQueueLengthIfNeeded() {
+    if (rmContext.getRMNodes() == null) {
+      return;
+    }
+    if (rmContext.getScheduler() instanceof CapacityScheduler) {
+      CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
+      if (!cs.isMultiNodePlacementEnabled()) {
+        int numNodes = rmContext.getRMNodes().size();
+        int newAppActivitiesMaxQueueLength;
+        int numAsyncSchedulerThreads = cs.getNumAsyncSchedulerThreads();
+        if (numAsyncSchedulerThreads > 0) {
+          newAppActivitiesMaxQueueLength =
+              Math.max(configuredAppActivitiesMaxQueueLength,
+                  numNodes * numAsyncSchedulerThreads);
+        } else {
+          newAppActivitiesMaxQueueLength =
+              Math.max(configuredAppActivitiesMaxQueueLength,
+                  (int) (numNodes * 1.2));
+        }
+        if (appActivitiesMaxQueueLength != newAppActivitiesMaxQueueLength) {
+          LOG.info("Update max queue length of app activities from {} to {},"
+                  + " configured={}, numNodes={}, numAsyncSchedulerThreads={}"
+                  + " when multi-node placement disabled.",
+              appActivitiesMaxQueueLength, newAppActivitiesMaxQueueLength,
+              configuredAppActivitiesMaxQueueLength, numNodes,
+              numAsyncSchedulerThreads);
+          appActivitiesMaxQueueLength = newAppActivitiesMaxQueueLength;
+        }
+      } else if (appActivitiesMaxQueueLength
+          != configuredAppActivitiesMaxQueueLength) {
+        LOG.info("Update max queue length of app activities from {} to {}"
+                + " when multi-node placement enabled.",
+            appActivitiesMaxQueueLength, configuredAppActivitiesMaxQueueLength);
+        appActivitiesMaxQueueLength = configuredAppActivitiesMaxQueueLength;
+      }
+    }
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     cleanUpThread = new Thread(new Runnable() {
@@ -277,6 +318,8 @@ public class ActivitiesManager extends AbstractService {
 
           LOG.debug("Remaining apps in app activities cache: {}",
               completedAppAllocations.keySet());
+          // dynamically update max queue length of app activities if needed
+          dynamicallyUpdateAppActivitiesMaxQueueLengthIfNeeded();
           try {
             Thread.sleep(activitiesCleanupIntervalMs);
           } catch (InterruptedException e) {
@@ -567,4 +610,9 @@ public class ActivitiesManager extends AbstractService {
     }
     return sb.toString();
   }
+
+  @VisibleForTesting
+  public int getAppActivitiesMaxQueueLength() {
+    return appActivitiesMaxQueueLength;
+  }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -3185,4 +3185,12 @@ public class CapacityScheduler extends
   public void resetSchedulerMetrics() {
     CapacitySchedulerMetrics.destroy();
   }
+
+  public boolean isMultiNodePlacementEnabled() {
+    return multiNodePlacementEnabled;
+  }
+
+  public int getNumAsyncSchedulerThreads() {
+    return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
+  }
 }

+ 63 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java

@@ -25,13 +25,16 @@ import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -40,10 +43,12 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -393,6 +398,64 @@ public class TestActivitiesManager {
         testingTimes);
   }
 
+  @Test (timeout = 10000)
+  public void testAppActivitiesMaxQueueLengthUpdate()
+      throws TimeoutException, InterruptedException {
+    Configuration conf = new Configuration();
+    int configuredAppActivitiesMaxQueueLength = 1;
+    conf.setInt(YarnConfiguration.
+            RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
+        configuredAppActivitiesMaxQueueLength);
+    conf.setInt(YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
+        500);
+    ConcurrentMap<NodeId, RMNode> mockNodes = new ConcurrentHashMap<>();
+    int numNodes = 5;
+    for (int i = 0; i < numNodes; i++) {
+      mockNodes.put(NodeId.newInstance("node" + i, 0), mock(RMNode.class));
+    }
+    CapacityScheduler cs = Mockito.mock(CapacityScheduler.class);
+    RMContext mockRMContext = Mockito.mock(RMContext.class);
+    Mockito.when(mockRMContext.getRMNodes()).thenReturn(mockNodes);
+    Mockito.when(mockRMContext.getYarnConfiguration()).thenReturn(conf);
+    Mockito.when(mockRMContext.getScheduler()).thenReturn(cs);
+    /*
+     * Test for async-scheduling with multi-node placement disabled
+     */
+    Mockito.when(cs.isMultiNodePlacementEnabled()).thenReturn(false);
+    int numAsyncSchedulerThreads = 3;
+    Mockito.when(cs.getNumAsyncSchedulerThreads())
+        .thenReturn(numAsyncSchedulerThreads);
+    ActivitiesManager newActivitiesManager =
+        new ActivitiesManager(mockRMContext);
+    Assert.assertEquals(1,
+        newActivitiesManager.getAppActivitiesMaxQueueLength());
+    newActivitiesManager.init(conf);
+    newActivitiesManager.start();
+    GenericTestUtils.waitFor(
+        () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
+            == numNodes * numAsyncSchedulerThreads, 100, 3000);
+    Assert.assertEquals(15,
+        newActivitiesManager.getAppActivitiesMaxQueueLength());
+    /*
+     * Test for HB-driven scheduling with multi-node placement disabled
+     */
+    Mockito.when(cs.getNumAsyncSchedulerThreads()).thenReturn(0);
+    GenericTestUtils.waitFor(
+        () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
+            == numNodes * 1.2, 100, 3000);
+    Assert.assertEquals(6,
+        newActivitiesManager.getAppActivitiesMaxQueueLength());
+    /*
+     * Test for scheduling with multi-node placement enabled
+     */
+    Mockito.when(cs.isMultiNodePlacementEnabled()).thenReturn(true);
+    GenericTestUtils.waitFor(
+        () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
+            == configuredAppActivitiesMaxQueueLength, 100, 3000);
+    Assert.assertEquals(1,
+        newActivitiesManager.getAppActivitiesMaxQueueLength());
+  }
+
   private void testManyTimes(String testingName,
       Supplier<Void> supplier, int testingTimes) {
     long totalTime = 0;