瀏覽代碼

YARN-7599. [BackPort][GPG] ApplicationCleaner in Global Policy Generator. (#5934) Contributed by Botong Huang, Shilun Fan.

Co-authored-by: Botong Huang <botong@apache.org>
Co-authored-by: slfan1989 <slfan1989@apache.org>
Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
slfan1989 1 年之前
父節點
當前提交
8538af4638
共有 11 個文件被更改,包括 510 次插入5 次删除
  1. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  3. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
  4. 23 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
  5. 21 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
  6. 153 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
  7. 77 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
  8. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
  9. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
  10. 131 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4432,6 +4432,31 @@ public class YarnConfiguration extends Configuration {
   public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
       "kerberos.principal.hostname";
 
+  // The application cleaner class to use
+  public static final String GPG_APPCLEANER_CLASS =
+      FEDERATION_GPG_PREFIX + "application.cleaner.class";
+  public static final String DEFAULT_GPG_APPCLEANER_CLASS =
+      "org.apache.hadoop.yarn.server.globalpolicygenerator"
+          + ".applicationcleaner.DefaultApplicationCleaner";
+
+  // The interval at which the application cleaner runs, -1 means disabled
+  public static final String GPG_APPCLEANER_INTERVAL_MS =
+      FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms";
+  public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(-1);
+
+  /**
+   * Specifications on how (many times) to contact Router for apps. We need to
+   * do this because Router might return partial application list because some
+   * sub-cluster RM is not responsive (e.g. failing over).
+   *
+   * Should have three values separated by comma: minimal success retries,
+   * maximum total retry, retry interval (ms).
+   */
+  public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+      FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec";
+  public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+      "3,10,600000";
+
   public static final String FEDERATION_GPG_POLICY_PREFIX =
       FEDERATION_GPG_PREFIX + "policy.generator.";
 

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5538,6 +5538,14 @@
     <value>LINEAR</value>
   </property>
 
+  <property>
+    <description>
+      The Application Cleaner implementation class for GPG to use.
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.class</name>
+    <value>org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner</value>
+  </property>
+
   <property>
     <description>Flag to enable cross-origin (CORS) support in the GPG. This flag
       requires the CORS filter initializer to be added to the filter initializers
@@ -5546,6 +5554,14 @@
     <value>false</value>
   </property>
 
+  <property>
+    <description>
+      The interval at which the application cleaner runs, -1 means disabled.
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.interval-ms</name>
+    <value>-1s</value>
+  </property>
+
   <property>
     <description>
       The http address of the GPG web application.
@@ -5556,6 +5572,18 @@
     <value>0.0.0.0:8069</value>
   </property>
 
+  <property>
+    <description>
+      Specifications on how (many times) to contact Router for apps. We need to
+      do this because Router might return partial application list because some
+      sub-cluster RM is not responsive (e.g. failing over).
+      Should have three values separated by comma: minimal success retries,
+      maximum total retry, retry interval (ms).
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.contact.router.spec</name>
+    <value>3,10,600000</value>
+  </property>
+
   <property>
     <description>
       The https address of the GPG web application.

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -83,6 +83,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRespo
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
@@ -884,6 +887,33 @@ public final class FederationStateStoreFacade {
     }
   }
 
+  /**
+   * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+   * all submitted applications to it's home sub-cluster.
+   *
+   * @return the mapping of all submitted application to it's home sub-cluster
+   * @throws YarnException if the request is invalid/fails
+   */
+  public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster() throws YarnException {
+    GetApplicationsHomeSubClusterResponse response = stateStore.getApplicationsHomeSubCluster(
+        GetApplicationsHomeSubClusterRequest.newInstance());
+    return response.getAppsHomeSubClusters();
+  }
+
+  /**
+   * Delete the mapping of home {@code SubClusterId} of a previously submitted
+   * {@code ApplicationId}. Currently response is empty if the operation is
+   * successful, if not an exception reporting reason for a failure.
+   *
+   * @param applicationId the application to delete the home sub-cluster of
+   * @throws YarnException if the request is invalid/fails
+   */
+  public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+      throws YarnException {
+    stateStore.deleteApplicationHomeSubCluster(
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+  }
+
   /**
    * Update ApplicationHomeSubCluster to FederationStateStore.
    *

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 
 /**
  * GPGUtils contains utility functions for the GPG.
@@ -58,11 +59,12 @@ public final class GPGUtils {
    * @param webAddr WebAddress.
    * @param path url path.
    * @param returnType return type.
+   * @param selectParam query parameters.
    * @param conf configuration.
    * @return response entity.
    */
   public static <T> T invokeRMWebService(String webAddr, String path, final Class<T> returnType,
-      Configuration conf) {
+      Configuration conf, String selectParam) {
     Client client = Client.create();
     T obj;
 
@@ -72,6 +74,11 @@ public final class GPGUtils {
     String scheme = YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : HTTP_PREFIX;
     String webAddress = scheme + socketAddress.getHostName() + ":" + socketAddress.getPort();
     WebResource webResource = client.resource(webAddress);
+
+    if (selectParam != null) {
+      webResource = webResource.queryParam(RMWSConsts.DESELECTS, selectParam);
+    }
+
     ClientResponse response = null;
     try {
       response = webResource.path(RM_WEB_SERVICE_PATH).path(path)
@@ -92,6 +99,21 @@ public final class GPGUtils {
     }
   }
 
+  /**
+   * Performs an invocation of the remote RMWebService.
+   *
+   * @param <T> Generic T.
+   * @param webAddr WebAddress.
+   * @param path url path.
+   * @param returnType return type.
+   * @param config configuration.
+   * @return response entity.
+   */
+  public static <T> T invokeRMWebService(String webAddr,
+      String path, final Class<T> returnType, Configuration config) {
+    return invokeRMWebService(webAddr, path, returnType, config, null);
+  }
+
   /**
    * Creates a uniform weighting of 1.0 for each sub cluster.
    *

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
@@ -84,6 +85,7 @@ public class GlobalPolicyGenerator extends CompositeService {
   // Scheduler service that runs tasks periodically
   private ScheduledThreadPoolExecutor scheduledExecutorService;
   private SubClusterCleaner subClusterCleaner;
+  private ApplicationCleaner applicationCleaner;
   private PolicyGenerator policyGenerator;
   private String webAppAddress;
   private JvmPauseMonitor pauseMonitor;
@@ -125,6 +127,12 @@ public class GlobalPolicyGenerator extends CompositeService {
         conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
             YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
     this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
+    this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
+        YarnConfiguration.GPG_APPCLEANER_CLASS,
+        YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, ApplicationCleaner.class);
+    this.applicationCleaner.init(conf, this.gpgContext);
+
     this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
 
     this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
@@ -149,7 +157,7 @@ public class GlobalPolicyGenerator extends CompositeService {
 
     super.serviceStart();
 
-    // Scheduler SubClusterCleaner service
+    // Schedule SubClusterCleaner service
     Configuration config = getConfig();
     long scCleanerIntervalMs = config.getTimeDuration(
         YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
@@ -161,6 +169,18 @@ public class GlobalPolicyGenerator extends CompositeService {
           DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
     }
 
+    // Schedule ApplicationCleaner service
+    long appCleanerIntervalMs = config.getTimeDuration(
+        YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+    if (appCleanerIntervalMs > 0) {
+      this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
+          0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled application cleaner with interval: {}",
+          DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
+    }
+
     // Schedule PolicyGenerator
     // We recommend using yarn.federation.gpg.policy.generator.interval
     // instead of yarn.federation.gpg.policy.generator.interval-ms

+ 153 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java

@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ApplicationCleaner is a runnable that cleans up old applications from
+ * table applicationsHomeSubCluster in FederationStateStore.
+ */
+public abstract class ApplicationCleaner implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ApplicationCleaner.class);
+
+  private Configuration conf;
+  private GPGContext gpgContext;
+
+  private int minRouterSuccessCount;
+  private int maxRouterRetry;
+  private long routerQueryIntevalMillis;
+
+  public void init(Configuration config, GPGContext context)
+      throws YarnException {
+
+    this.gpgContext = context;
+    this.conf = config;
+
+    String routerSpecString =
+        this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
+            YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
+    String[] specs = routerSpecString.split(",");
+    if (specs.length != 3) {
+      throw new YarnException("Expect three comma separated values in "
+          + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+          + routerSpecString);
+    }
+    this.minRouterSuccessCount = Integer.parseInt(specs[0]);
+    this.maxRouterRetry = Integer.parseInt(specs[1]);
+    this.routerQueryIntevalMillis = Long.parseLong(specs[2]);
+
+    if (this.minRouterSuccessCount > this.maxRouterRetry) {
+      throw new YarnException("minRouterSuccessCount "
+          + this.minRouterSuccessCount
+          + " should not be larger than maxRouterRetry" + this.maxRouterRetry);
+    }
+    if (this.minRouterSuccessCount <= 0) {
+      throw new YarnException("minRouterSuccessCount "
+          + this.minRouterSuccessCount + " should be positive");
+    }
+
+    LOG.info(
+        "Initialized AppCleaner with Router query with min success {}, "
+            + "max retry {}, retry interval {}",
+        this.minRouterSuccessCount, this.maxRouterRetry,
+        DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
+  }
+
+  public GPGContext getGPGContext() {
+    return this.gpgContext;
+  }
+
+  /**
+   * Query router for applications.
+   *
+   * @return the set of applications
+   * @throws YarnRuntimeException when router call fails
+   */
+  public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+    String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
+
+    LOG.info(String.format("Contacting router at: %s", webAppAddress));
+    AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf,
+        DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
+
+    Set<ApplicationId> appSet = new HashSet<>();
+    for (AppInfo appInfo : appsInfo.getApps()) {
+      appSet.add(ApplicationId.fromString(appInfo.getAppId()));
+    }
+    return appSet;
+  }
+
+  /**
+   * Get the list of known applications in the cluster from Router.
+   *
+   * @return the list of known applications
+   * @throws YarnException if get app fails
+   */
+  public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+    int successCount = 0, totalAttemptCount = 0;
+    Set<ApplicationId> resultSet = new HashSet<>();
+    while (totalAttemptCount < this.maxRouterRetry) {
+      try {
+        Set<ApplicationId> routerApps = getAppsFromRouter();
+        resultSet.addAll(routerApps);
+        LOG.info("Attempt {}: {} known apps from Router, {} in total",
+            totalAttemptCount, routerApps.size(), resultSet.size());
+
+        successCount++;
+        if (successCount >= this.minRouterSuccessCount) {
+          return resultSet;
+        }
+
+        // Wait for the next attempt
+        try {
+          Thread.sleep(this.routerQueryIntevalMillis);
+        } catch (InterruptedException e) {
+          LOG.warn("Sleep interrupted after attempt {}.", totalAttemptCount);
+        }
+      } catch (Exception e) {
+        LOG.warn("Router query attempt {} failed.", totalAttemptCount, e);
+      } finally {
+        totalAttemptCount++;
+      }
+    }
+    throw new YarnException("Only " + successCount
+        + " success Router queries after " + totalAttemptCount + " retries");
+  }
+
+  @Override
+  public abstract void run();
+}

+ 77 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The default ApplicationCleaner that cleans up old applications from table
+ * applicationsHomeSubCluster in FederationStateStore.
+ */
+public class DefaultApplicationCleaner extends ApplicationCleaner {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultApplicationCleaner.class);
+
+  @Override
+  public void run() {
+    Date now = new Date();
+    LOG.info("Application cleaner run at time {}", now);
+
+    FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
+    Set<ApplicationId> candidates = new HashSet<>();
+    try {
+      List<ApplicationHomeSubCluster> response =
+          facade.getApplicationsHomeSubCluster();
+      for (ApplicationHomeSubCluster app : response) {
+        candidates.add(app.getApplicationId());
+      }
+      LOG.info("{} app entries in FederationStateStore", candidates.size());
+
+      Set<ApplicationId> routerApps = getRouterKnownApplications();
+      LOG.info("{} known applications from Router", routerApps.size());
+
+      candidates.removeAll(routerApps);
+      LOG.info("Deleting {} applications from statestore", candidates.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString)
+            .collect(Collectors.joining(",")));
+      }
+      for (ApplicationId appId : candidates) {
+        try {
+          facade.deleteApplicationHomeSubCluster(appId);
+        } catch (Exception e) {
+          LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Application cleaner started at time {} fails. ", now, e);
+    }
+  }
+}

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java

@@ -0,0 +1,19 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java

@@ -159,7 +159,7 @@ public class PolicyGenerator implements Runnable, Configurable {
           clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
         }
         Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
-            e.getValue(), e.getKey(), getConf());
+            e.getValue(), e.getKey(), conf);
         clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
       }
     }
@@ -181,7 +181,7 @@ public class PolicyGenerator implements Runnable, Configurable {
     for (SubClusterInfo sci : activeSubClusters.values()) {
       SchedulerTypeInfo sti = GPGUtils
           .invokeRMWebService(sci.getRMWebServiceAddress(),
-              RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, getConf());
+              RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, conf);
       if(sti != null){
         schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
       } else {

+ 131 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for DefaultApplicationCleaner in GPG.
+ */
+public class TestDefaultApplicationCleaner {
+  private Configuration conf;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreFacade facade;
+  private ApplicationCleaner appCleaner;
+  private GPGContext gpgContext;
+
+  private List<ApplicationId> appIds;
+  // The list of applications returned by mocked router
+  private Set<ApplicationId> routerAppIds;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new YarnConfiguration();
+
+    // No Router query retry
+    conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0");
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+
+    facade = FederationStateStoreFacade.getInstance();
+    facade.reinitialize(stateStore, conf);
+
+    gpgContext = new GPGContextImpl();
+    gpgContext.setStateStoreFacade(facade);
+
+    appCleaner = new TestableDefaultApplicationCleaner();
+    appCleaner.init(conf, gpgContext);
+
+    routerAppIds = new HashSet<>();
+
+    appIds = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      ApplicationId appId = ApplicationId.newInstance(0, i);
+      appIds.add(appId);
+
+      SubClusterId subClusterId =
+          SubClusterId.newInstance("SUBCLUSTER-" + i);
+
+      stateStore.addApplicationHomeSubCluster(
+          AddApplicationHomeSubClusterRequest.newInstance(
+              ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+    }
+  }
+
+  @After
+  public void breakDown() {
+    if (stateStore != null) {
+      stateStore.close();
+      stateStore = null;
+    }
+  }
+
+  @Test
+  public void testFederationStateStoreAppsCleanUp() throws YarnException {
+    // Set first app to be still known by Router
+    ApplicationId appId = appIds.get(0);
+    routerAppIds.add(appId);
+
+    // Another random app not in stateStore known by Router
+    appId = ApplicationId.newInstance(100, 200);
+    routerAppIds.add(appId);
+
+    appCleaner.run();
+
+    // Only one app should be left
+    Assert.assertEquals(1,
+        stateStore
+            .getApplicationsHomeSubCluster(
+                GetApplicationsHomeSubClusterRequest.newInstance())
+            .getAppsHomeSubClusters().size());
+  }
+
+  /**
+   * Testable version of DefaultApplicationCleaner.
+   */
+  public class TestableDefaultApplicationCleaner
+      extends DefaultApplicationCleaner {
+    @Override
+    public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+      return routerAppIds;
+    }
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java

@@ -299,7 +299,7 @@ public class TestPolicyGenerator {
     String webAppAddress = getServiceAddress(NetUtils.createSocketAddr(rmAddress));
 
     SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.SCHEDULER,
-        SchedulerTypeInfo.class, this.conf);
+        SchedulerTypeInfo.class, conf);
 
     Assert.assertNotNull(sti);
     SchedulerInfo schedulerInfo = sti.getSchedulerInfo();