瀏覽代碼

YARN-8862. [GPG] Add Yarn Registry cleanup in ApplicationCleaner. Contributed by Botong Huang.

Botong Huang 6 年之前
父節點
當前提交
727c3f8079
共有 9 個文件被更改,包括 136 次插入10 次删除
  1. 12 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
  2. 30 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
  3. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
  4. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
  5. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
  6. 18 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
  7. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
  8. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
  9. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

+ 12 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java

@@ -202,21 +202,27 @@ public class FederationRegistryClient {
    * Remove an application from registry.
    *
    * @param appId application id
+   * @param ignoreMemoryState whether to ignore the memory data in terms of
+   *          known application
    */
-  public synchronized void removeAppFromRegistry(ApplicationId appId) {
+  public synchronized void removeAppFromRegistry(ApplicationId appId,
+      boolean ignoreMemoryState) {
     Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
         this.appSubClusterTokenMap.get(appId);
-    LOG.info("Removing all registry entries for {}", appId);
-
-    if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
-      return;
+    if (!ignoreMemoryState) {
+      if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
+        return;
+      }
     }
+    LOG.info("Removing all registry entries for {}", appId);
 
     // Lastly remove the application directory
     String key = getRegistryKey(appId, null);
     try {
       removeKeyRegistry(this.registry, this.user, key, true, true);
-      subClusterTokenMap.clear();
+      if (subClusterTokenMap != null) {
+        subClusterTokenMap.clear();
+      }
     } catch (YarnException e) {
       LOG.error("Failed removing registry directory key " + key, e);
     }

+ 30 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java

@@ -80,11 +80,40 @@ public class TestFederationRegistryClient {
     Assert.assertEquals(2,
         this.registryClient.loadStateFromRegistry(appId).size());
 
-    this.registryClient.removeAppFromRegistry(appId);
+    this.registryClient.removeAppFromRegistry(appId, false);
 
     Assert.assertEquals(0, this.registryClient.getAllApplications().size());
     Assert.assertEquals(0,
         this.registryClient.loadStateFromRegistry(appId).size());
   }
 
+  @Test
+  public void testRemoveWithMemoryState() {
+    ApplicationId appId1 = ApplicationId.newInstance(0, 0);
+    ApplicationId appId2 = ApplicationId.newInstance(0, 1);
+    String scId0 = "subcluster0";
+
+    this.registryClient.writeAMRMTokenForUAM(appId1, scId0,
+        new Token<AMRMTokenIdentifier>());
+    this.registryClient.writeAMRMTokenForUAM(appId2, scId0,
+        new Token<AMRMTokenIdentifier>());
+    Assert.assertEquals(2, this.registryClient.getAllApplications().size());
+
+    // Create a new client instance
+    this.registryClient =
+        new FederationRegistryClient(this.conf, this.registry, this.user);
+
+    this.registryClient.loadStateFromRegistry(appId2);
+    // Should remove app2
+    this.registryClient.removeAppFromRegistry(appId2, false);
+    Assert.assertEquals(1, this.registryClient.getAllApplications().size());
+
+    // Should not remove app1 since memory state don't have it
+    this.registryClient.removeAppFromRegistry(appId1, false);
+    Assert.assertEquals(1, this.registryClient.getAllApplications().size());
+
+    // Should remove app1
+    this.registryClient.removeAppFromRegistry(appId1, true);
+    Assert.assertEquals(0, this.registryClient.getAllApplications().size());
+  }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.globalpolicygenerator;
 
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 
 /**
@@ -32,4 +33,8 @@ public interface GPGContext {
   GPGPolicyFacade getPolicyFacade();
 
   void setPolicyFacade(GPGPolicyFacade facade);
+
+  FederationRegistryClient getRegistryClient();
+
+  void setRegistryClient(FederationRegistryClient client);
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.globalpolicygenerator;
 
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 
 /**
@@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext {
 
   private FederationStateStoreFacade facade;
   private GPGPolicyFacade policyFacade;
+  private FederationRegistryClient registryClient;
 
   @Override
   public FederationStateStoreFacade getStateStoreFacade() {
@@ -48,4 +50,14 @@ public class GPGContextImpl implements GPGContext {
   public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
     policyFacade = gpgPolicyfacade;
   }
+
+  @Override
+  public FederationRegistryClient getRegistryClient() {
+    return registryClient;
+  }
+
+  @Override
+  public void setRegistryClient(FederationRegistryClient client) {
+    registryClient = client;
+  }
 }

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java

@@ -25,11 +25,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
@@ -60,6 +63,7 @@ public class GlobalPolicyGenerator extends CompositeService {
 
   // Federation Variables
   private GPGContext gpgContext;
+  private RegistryOperations registry;
 
   // Scheduler service that runs tasks periodically
   private ScheduledThreadPoolExecutor scheduledExecutorService;
@@ -81,6 +85,17 @@ public class GlobalPolicyGenerator extends CompositeService {
         .setPolicyFacade(new GPGPolicyFacade(
             this.gpgContext.getStateStoreFacade(), conf));
 
+    this.registry = FederationStateStoreFacade.createInstance(conf,
+        YarnConfiguration.YARN_REGISTRY_CLASS,
+        YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
+        RegistryOperations.class);
+    this.registry.init(conf);
+
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+    FederationRegistryClient registryClient =
+        new FederationRegistryClient(conf, this.registry, user);
+    this.gpgContext.setRegistryClient(registryClient);
+
     this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
         conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
             YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
@@ -105,6 +120,8 @@ public class GlobalPolicyGenerator extends CompositeService {
   protected void serviceStart() throws Exception {
     super.serviceStart();
 
+    this.registry.start();
+
     // Schedule SubClusterCleaner service
     long scCleanerIntervalMs = getConfig().getLong(
         YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
@@ -141,6 +158,10 @@ public class GlobalPolicyGenerator extends CompositeService {
 
   @Override
   protected void serviceStop() throws Exception {
+    if (this.registry != null) {
+      this.registry.stop();
+    }
+
     try {
       if (this.scheduledExecutorService != null
           && !this.scheduledExecutorService.isShutdown()) {

+ 18 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -27,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable {
 
   private Configuration conf;
   private GPGContext gpgContext;
+  private FederationRegistryClient registryClient;
 
   private int minRouterSuccessCount;
   private int maxRouterRetry;
@@ -56,6 +60,7 @@ public abstract class ApplicationCleaner implements Runnable {
 
     this.gpgContext = context;
     this.conf = config;
+    this.registryClient = context.getRegistryClient();
 
     String routerSpecString =
         this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
@@ -102,7 +107,7 @@ public abstract class ApplicationCleaner implements Runnable {
 
     LOG.info(String.format("Contacting router at: %s", webAppAddress));
     AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf,
-        webAppAddress, "apps", AppsInfo.class,
+        webAppAddress, RMWSConsts.APPS, AppsInfo.class,
         DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
 
     Set<ApplicationId> appSet = new HashSet<ApplicationId>();
@@ -149,6 +154,18 @@ public abstract class ApplicationCleaner implements Runnable {
         + " success Router queries after " + totalAttemptCount + " retries");
   }
 
+  protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
+    List<String> allApps = this.registryClient.getAllApplications();
+    LOG.info("Got " + allApps.size() + " existing apps in registry");
+    for (String app : allApps) {
+      ApplicationId appId = ApplicationId.fromString(app);
+      if (!knownApps.contains(appId)) {
+        LOG.info("removing finished application entry for {}", app);
+        this.registryClient.removeAppFromRegistry(appId, true);
+      }
+    }
+  }
+
   @Override
   public abstract void run();
 }

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java

@@ -74,6 +74,8 @@ public class DefaultApplicationCleaner extends ApplicationCleaner {
         }
       }
 
+      // Clean up registry entries
+      cleanupAppRecordInRegistry(routerApps);
     } catch (Throwable e) {
       LOG.error("Application cleaner started at time " + now + " fails: ", e);
     }

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java

@@ -24,15 +24,21 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
 import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
@@ -50,6 +56,8 @@ public class TestDefaultApplicationCleaner {
   private FederationStateStoreFacade facade;
   private ApplicationCleaner appCleaner;
   private GPGContext gpgContext;
+  private RegistryOperations registry;
+  private FederationRegistryClient registryClient;
 
   private List<ApplicationId> appIds;
   // The list of applications returned by mocked router
@@ -68,8 +76,18 @@ public class TestDefaultApplicationCleaner {
     facade = FederationStateStoreFacade.getInstance();
     facade.reinitialize(stateStore, conf);
 
+    registry = new FSRegistryOperationsService();
+    registry.init(conf);
+    registry.start();
+
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+    registryClient = new FederationRegistryClient(conf, registry, user);
+    registryClient.cleanAllApplications();
+    Assert.assertEquals(0, registryClient.getAllApplications().size());
+
     gpgContext = new GPGContextImpl();
     gpgContext.setStateStoreFacade(facade);
+    gpgContext.setRegistryClient(registryClient);
 
     appCleaner = new TestableDefaultApplicationCleaner();
     appCleaner.init(conf, gpgContext);
@@ -87,7 +105,12 @@ public class TestDefaultApplicationCleaner {
       stateStore.addApplicationHomeSubCluster(
           AddApplicationHomeSubClusterRequest.newInstance(
               ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+
+      // Write some registry entries for the app
+      registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(),
+          new Token<AMRMTokenIdentifier>());
     }
+    Assert.assertEquals(3, registryClient.getAllApplications().size());
   }
 
   @After
@@ -95,6 +118,12 @@ public class TestDefaultApplicationCleaner {
     if (stateStore != null) {
       stateStore.close();
     }
+    if (registryClient != null) {
+      registryClient.cleanAllApplications();
+    }
+    if (registry != null) {
+      registry.stop();
+    }
   }
 
   @Test
@@ -115,6 +144,9 @@ public class TestDefaultApplicationCleaner {
             .getApplicationsHomeSubCluster(
                 GetApplicationsHomeSubClusterRequest.newInstance())
             .getAppsHomeSubClusters().size());
+
+    // The known app should not be cleaned in registry
+    Assert.assertEquals(1, registryClient.getAllApplications().size());
   }
 
   /**

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -815,13 +816,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     if (failedToUnRegister) {
       homeResponse.setIsUnregistered(false);
-    } else {
+    } else if (request.getFinalApplicationStatus()
+        .equals(FinalApplicationStatus.SUCCEEDED)) {
       // Clean up UAMs only when the app finishes successfully, so that no more
       // attempt will be launched.
       this.uamPool.stop();
       if (this.registryClient != null) {
         this.registryClient
-            .removeAppFromRegistry(this.attemptId.getApplicationId());
+            .removeAppFromRegistry(this.attemptId.getApplicationId(), false);
       }
     }
     return homeResponse;