Browse Source

YARN-9539.Improve cleanup process of app activities and make some conditions configurable. Contributed by Tao Yang.

Weiwei Yang 6 years ago
parent
commit
1a47c2b7ae

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4005,6 +4005,41 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD =
       "/usr/bin/numactl";
 
+  /**
+   * Settings for activities manager.
+   */
+  public static final String RM_ACTIVITIES_MANAGER_PREFIX =
+      RM_PREFIX + "activities-manager.";
+  public static final String RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_PREFIX =
+      RM_ACTIVITIES_MANAGER_PREFIX + "scheduler-activities.";
+  public static final String RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX =
+      RM_ACTIVITIES_MANAGER_PREFIX + "app-activities.";
+
+  /** The cleanup interval for activities in milliseconds. **/
+  public static final String RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS =
+      RM_ACTIVITIES_MANAGER_PREFIX + "cleanup-interval-ms";
+  public static final long DEFAULT_RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS =
+      5000L;
+
+  /** Time to live for scheduler activities in milliseconds. **/
+  public static final String RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS =
+      RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_PREFIX + "ttl-ms";
+  public static final long
+      DEFAULT_RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS = 600000L;
+
+  /** Time to live for app activities in milliseconds. **/
+  public static final String RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS =
+      RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX + "ttl-ms";
+  public static final long DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS =
+      600000L;
+
+  /** Max queue length for app activities. **/
+  public static final String
+      RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH =
+      RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX + "max-queue-length";
+  public static final int
+      DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH = 1000;
+
   public YarnConfiguration() {
     super();
   }

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -4187,4 +4187,28 @@
     <name>yarn.nodemanager.csi-driver.names</name>
     <value></value>
   </property>
+
+  <property>
+    <description>The cleanup interval for activities in milliseconds.</description>
+    <name>yarn.resourcemanager.activities-manager.cleanup-interval-ms</name>
+    <value>5000</value>
+  </property>
+
+  <property>
+    <description>Time to live for scheduler activities in milliseconds.</description>
+    <name>yarn.resourcemanager.activities-manager.scheduler-activities.ttl-ms</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <description>Time to live for app activities in milliseconds.</description>
+    <name>yarn.resourcemanager.activities-manager.app-activities.ttl-ms</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <description>Max queue length for app activities.</description>
+    <name>yarn.resourcemanager.activities-manager.app-activities.max-queue-length</name>
+    <value>1000</value>
+  </property>
 </configuration>

+ 52 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java

@@ -19,9 +19,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.service.AbstractService;
@@ -72,7 +74,10 @@ public class ActivitiesManager extends AbstractService {
   private boolean recordNextAvailableNode = false;
   private List<NodeAllocation> lastAvailableNodeActivities = null;
   private Thread cleanUpThread;
-  private int timeThreshold = 600 * 1000;
+  private long activitiesCleanupIntervalMs;
+  private long schedulerActivitiesTTL;
+  private long appActivitiesTTL;
+  private int appActivitiesMaxQueueLength;
   private final RMContext rmContext;
   private volatile boolean stopped;
   private ThreadLocal<DiagnosticsCollectorManager> diagnosticCollectorManager;
@@ -89,6 +94,28 @@ public class ActivitiesManager extends AbstractService {
         () -> new DiagnosticsCollectorManager(
             new GenericDiagnosticsCollector()));
     this.rmContext = rmContext;
+    if (rmContext.getYarnConfiguration() != null) {
+      setupConfForCleanup(rmContext.getYarnConfiguration());
+    }
+  }
+
+  private void setupConfForCleanup(Configuration conf) {
+    activitiesCleanupIntervalMs = conf.getLong(
+        YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
+        YarnConfiguration.
+            DEFAULT_RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS);
+    schedulerActivitiesTTL = conf.getLong(
+        YarnConfiguration.RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS,
+        YarnConfiguration.
+            DEFAULT_RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS);
+    appActivitiesTTL = conf.getLong(
+        YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
+        YarnConfiguration.
+            DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS);
+    appActivitiesMaxQueueLength = conf.getInt(YarnConfiguration.
+            RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
+        YarnConfiguration.
+            DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH);
   }
 
   public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
@@ -152,12 +179,13 @@ public class ActivitiesManager extends AbstractService {
         while (!stopped && !Thread.currentThread().isInterrupted()) {
           Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
               completedNodeAllocations.entrySet().iterator();
+          long curTS = SystemClock.getInstance().getTime();
           while (ite.hasNext()) {
             Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = ite.next();
             List<NodeAllocation> allocations = nodeAllocation.getValue();
-            long currTS = SystemClock.getInstance().getTime();
-            if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
-                - currTS > timeThreshold) {
+            if (allocations.size() > 0
+                && curTS - allocations.get(0).getTimeStamp()
+                > schedulerActivitiesTTL) {
               ite.remove();
             }
           }
@@ -171,11 +199,29 @@ public class ActivitiesManager extends AbstractService {
             if (rmApp == null || rmApp.getFinalApplicationStatus()
                 != FinalApplicationStatus.UNDEFINED) {
               iteApp.remove();
+            } else {
+              Iterator<AppAllocation> appActivitiesIt =
+                  appAllocation.getValue().iterator();
+              while (appActivitiesIt.hasNext()) {
+                if (curTS - appActivitiesIt.next().getTime()
+                    > appActivitiesTTL) {
+                  appActivitiesIt.remove();
+                } else {
+                  break;
+                }
+              }
+              if (appAllocation.getValue().isEmpty()) {
+                iteApp.remove();
+                LOG.debug("Removed all expired activities from cache for {}.",
+                    rmApp.getApplicationId());
+              }
             }
           }
 
+          LOG.debug("Remaining apps in app activities cache: {}",
+              completedAppAllocations.keySet());
           try {
-            Thread.sleep(5000);
+            Thread.sleep(activitiesCleanupIntervalMs);
           } catch (InterruptedException e) {
             LOG.info(getName() + " thread interrupted");
             break;
@@ -290,7 +336,7 @@ public class ActivitiesManager extends AbstractService {
           appAllocations = curAppAllocations;
         }
       }
-      if (appAllocations.size() == 1000) {
+      if (appAllocations.size() == appActivitiesMaxQueueLength) {
         appAllocations.poll();
       }
       appAllocations.add(appAllocation);

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -77,4 +78,9 @@ public class AppActivitiesInfo {
       }
     }
   }
+
+  @VisibleForTesting
+  public List<AppAllocationInfo> getAllocations() {
+    return allocations;
+  }
 }

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java

@@ -30,10 +30,13 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Assert;
@@ -81,6 +85,8 @@ public class TestActivitiesManager {
   @Before
   public void setup() {
     rmContext = Mockito.mock(RMContext.class);
+    Configuration conf = new Configuration();
+    Mockito.when(rmContext.getYarnConfiguration()).thenReturn(conf);
     ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
     Mockito.when(scheduler.getMinimumResourceCapability())
         .thenReturn(Resources.none());
@@ -95,6 +101,8 @@ public class TestActivitiesManager {
       RMApp mockApp = Mockito.mock(RMApp.class);
       Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp)
           .getApplicationId();
+      Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp)
+          .getFinalApplicationStatus();
       rmApps.put(appAttemptId.getApplicationId(), mockApp);
       FiCaSchedulerApp app =
           new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
@@ -245,6 +253,51 @@ public class TestActivitiesManager {
     }
   }
 
+  @Test (timeout = 30000)
+  public void testAppActivitiesTTL() throws Exception {
+    long cleanupIntervalMs = 100;
+    long appActivitiesTTL = 1000;
+    rmContext.getYarnConfiguration()
+        .setLong(YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
+            cleanupIntervalMs);
+    rmContext.getYarnConfiguration()
+        .setLong(YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
+            appActivitiesTTL);
+    ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
+    newActivitiesManager.serviceStart();
+    // start recording activities for first app and first node
+    SchedulerApplicationAttempt app = apps.get(0);
+    FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0);
+    newActivitiesManager
+        .turnOnAppActivitiesRecording(app.getApplicationId(), 3);
+    int numActivities = 10;
+    for (int i = 0; i < numActivities; i++) {
+      ActivitiesLogger.APP
+          .startAppAllocationRecording(newActivitiesManager, node,
+              SystemClock.getInstance().getTime(), app);
+      ActivitiesLogger.APP
+          .recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
+              new SchedulerRequestKey(Priority.newInstance(0), 0, null),
+              ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+              ActivityState.REJECTED);
+      ActivitiesLogger.APP
+          .finishAllocatedAppAllocationRecording(newActivitiesManager,
+              app.getApplicationId(), null, ActivityState.SKIPPED,
+              ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
+    }
+    AppActivitiesInfo appActivitiesInfo = newActivitiesManager
+        .getAppActivitiesInfo(app.getApplicationId(), null, null);
+    Assert.assertEquals(numActivities,
+        appActivitiesInfo.getAllocations().size());
+    // sleep until all app activities expired
+    Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
+    // there should be no remaining app activities
+    appActivitiesInfo = newActivitiesManager
+        .getAppActivitiesInfo(app.getApplicationId(), null, null);
+    Assert.assertEquals(0,
+        appActivitiesInfo.getAllocations().size());
+  }
+
   /**
    * Testing activities manager which can record all history information about
    * node allocations.