Просмотр исходного кода

YARN-11306. [Federation] Refactor NM#FederationInterceptor#recover Code. (#4897)

slfan1989 2 лет назад
Родитель
Сommit
aeba204fa2

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java

@@ -522,6 +522,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
     };
   }
 
+  public void unAttachUAM(String uamId) {
+    if (this.unmanagedAppMasterMap.containsKey(uamId)) {
+      UnmanagedApplicationManager appManager = this.unmanagedAppMasterMap.get(uamId);
+      appManager.shutDownConnections();
+    }
+    this.unmanagedAppMasterMap.remove(uamId);
+    this.appIdMap.remove(uamId);
+  }
+
   @VisibleForTesting
   protected Map<String, UnmanagedApplicationManager> getUnmanagedAppMasterMap() {
     return unmanagedAppMasterMap;

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -954,4 +955,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       throws YarnException, IOException {
     return null;
   }
+
+  @VisibleForTesting
+  public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
+    return applicationContainerIdMap;
+  }
 }

+ 132 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -200,8 +201,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    * that all the {@link AMRMClientRelayer} will be re-populated with all
    * pending requests.
    *
-   * TODO: When split-merge is not idempotent, this can lead to some
-   * over-allocation without a full cancel to RM.
    */
   private volatile boolean justRecovered;
 
@@ -357,104 +356,103 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   @Override
   public void recover(Map<String, byte[]> recoveredDataMap) {
     super.recover(recoveredDataMap);
-    LOG.info("Recovering data for FederationInterceptor for {}",
-        this.attemptId);
+    LOG.info("Recovering data for FederationInterceptor for {}.", this.attemptId);
     this.justRecovered = true;
 
-    if (recoveredDataMap == null) {
+    if (recoveredDataMap == null || recoveredDataMap.isEmpty()) {
+      LOG.warn("recoveredDataMap isNull Or isEmpty, FederationInterceptor can't recover.");
       return;
     }
+
+    if (!recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
+      return;
+    }
+
     try {
+
       if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
+        byte[] appMasterRequestBytes = recoveredDataMap.get(NMSS_REG_REQUEST_KEY);
         RegisterApplicationMasterRequestProto pb =
-            RegisterApplicationMasterRequestProto
-                .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
-        this.amRegistrationRequest =
-            new RegisterApplicationMasterRequestPBImpl(pb);
-        LOG.info("amRegistrationRequest recovered for {}", this.attemptId);
-
+            RegisterApplicationMasterRequestProto.parseFrom(appMasterRequestBytes);
+        this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb);
+        LOG.info("amRegistrationRequest recovered for {}.", this.attemptId);
         // Give the register request to homeRMRelayer for future re-registration
         this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
       }
+
       if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
+        byte[] appMasterResponseBytes = recoveredDataMap.get(NMSS_REG_RESPONSE_KEY);
         RegisterApplicationMasterResponseProto pb =
-            RegisterApplicationMasterResponseProto
-                .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
-        this.amRegistrationResponse =
-            new RegisterApplicationMasterResponsePBImpl(pb);
-        LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
+            RegisterApplicationMasterResponseProto.parseFrom(appMasterResponseBytes);
+        this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb);
+        LOG.info("amRegistrationResponse recovered for {}.", this.attemptId);
       }
 
       // Recover UAM amrmTokens from registry or NMSS
-      Map<String, Token<AMRMTokenIdentifier>> uamMap;
-      if (this.registryClient != null) {
-        uamMap = this.registryClient
-            .loadStateFromRegistry(this.attemptId.getApplicationId());
-        LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
-            uamMap.size(), this.attemptId.getApplicationId());
-      } else {
-        uamMap = new HashMap<>();
-        for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
-          if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) {
-            // entry for subClusterId -> UAM amrmToken
-            String scId =
-                entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length());
-            Token<AMRMTokenIdentifier> amrmToken = new Token<>();
-            amrmToken.decodeFromUrlString(
-                new String(entry.getValue(), STRING_TO_BYTE_FORMAT));
-            uamMap.put(scId, amrmToken);
-            LOG.debug("Recovered UAM in {} from NMSS", scId);
-          }
-        }
-        LOG.info("Found {} existing UAMs for application {} in NMStateStore",
-            uamMap.size(), this.attemptId.getApplicationId());
-      }
+      Map<String, Token<AMRMTokenIdentifier>> uamMap =
+          recoverSubClusterAMRMTokenIdentifierMap(recoveredDataMap);
 
       // Re-attach the UAMs
       int containers = 0;
-      for (Map.Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap
-          .entrySet()) {
-        SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey());
+      AMRMProxyApplicationContext applicationContext = getApplicationContext();
+      ApplicationId applicationId = this.attemptId.getApplicationId();
+      String queue = this.amRegistrationResponse.getQueue();
+      String homeSCId = this.homeSubClusterId.getId();
+      String user = applicationContext.getUser();
 
-        // Create a config loaded with federation on and subclusterId
+      for (Map.Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
+        String keyScId = entry.getKey();
+        Token<AMRMTokenIdentifier> tokens = entry.getValue();
+        SubClusterId subClusterId = SubClusterId.newInstance(keyScId);
+
+        // Create a config loaded with federation on and subClusterId
         // for each UAM
         YarnConfiguration config = new YarnConfiguration(getConf());
-        FederationProxyProviderUtil.updateConfForFederation(config,
-            subClusterId.getId());
+        FederationProxyProviderUtil.updateConfForFederation(config, keyScId);
 
         try {
-          this.uamPool.reAttachUAM(subClusterId.getId(), config,
-              this.attemptId.getApplicationId(),
-              this.amRegistrationResponse.getQueue(),
-              getApplicationContext().getUser(), this.homeSubClusterId.getId(),
-              entry.getValue(), subClusterId.toString());
+          // ReAttachUAM
+          this.uamPool.reAttachUAM(keyScId, config, applicationId, queue, user, homeSCId,
+              tokens, keyScId);
 
-          this.secondaryRelayers.put(subClusterId.getId(),
-              this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
+          // GetAMRMClientRelayer
+          this.secondaryRelayers.put(keyScId, this.uamPool.getAMRMClientRelayer(keyScId));
 
+          // RegisterApplicationMaster
           RegisterApplicationMasterResponse response =
-              this.uamPool.registerApplicationMaster(subClusterId.getId(),
-                  this.amRegistrationRequest);
+              this.uamPool.registerApplicationMaster(keyScId, this.amRegistrationRequest);
 
           // Set sub-cluster to be timed out initially
-          lastSCResponseTime.put(subClusterId,
-              clock.getTime() - subClusterTimeOut);
+          lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
 
           // Running containers from secondary RMs
-          for (Container container : response
-              .getContainersFromPreviousAttempts()) {
-            containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+          List<Container> previousAttempts = response.getContainersFromPreviousAttempts();
+          for (Container container : previousAttempts) {
+            ContainerId containerId = container.getId();
+            containerIdToSubClusterIdMap.put(containerId, subClusterId);
             containers++;
-            LOG.debug("  From subcluster {} running container {}",
-                subClusterId, container.getId());
+            LOG.info("From subCluster {} running container {}", subClusterId, containerId);
           }
-          LOG.info("Recovered {} running containers from UAM in {}",
-              response.getContainersFromPreviousAttempts().size(),
-              subClusterId);
+
+          LOG.info("Recovered {} running containers from UAM in {}.",
+              previousAttempts.size(), subClusterId);
 
         } catch (Exception e) {
-          LOG.error("Error reattaching UAM to " + subClusterId + " for "
-              + this.attemptId, e);
+          LOG.error("Error reattaching UAM to {} for {}.", subClusterId, this.attemptId, e);
+          // During recovery, we need to clean up the data of the bad SubCluster.
+          // This ensures that when the bad SubCluster is recovered,
+          // new Containers can still be allocated and new UAMs can be registered.
+          this.uamPool.unAttachUAM(keyScId);
+          this.secondaryRelayers.remove(keyScId);
+          this.lastSCResponseTime.remove(subClusterId);
+          List<ContainerId> containerIds =
+              containerIdToSubClusterIdMap.entrySet().stream()
+              .filter(item-> item.getValue().equals(subClusterId))
+              .map(Entry::getKey)
+              .collect(Collectors.toList());
+          for (ContainerId containerId : containerIds) {
+            containerIdToSubClusterIdMap.remove(containerId);
+          }
         }
       }
 
@@ -463,42 +461,91 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       // map as well.
       UserGroupInformation appSubmitter;
       if (UserGroupInformation.isSecurityEnabled()) {
-        appSubmitter = UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+        appSubmitter = UserGroupInformation.createProxyUser(user,
             UserGroupInformation.getLoginUser());
       } else {
-        appSubmitter = UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+        appSubmitter = UserGroupInformation.createRemoteUser(user);
       }
-      ApplicationClientProtocol rmClient =
-          createHomeRMProxy(getApplicationContext(),
-              ApplicationClientProtocol.class, appSubmitter);
 
-      GetContainersResponse response = rmClient
-          .getContainers(GetContainersRequest.newInstance(this.attemptId));
+      ApplicationClientProtocol rmClient = createHomeRMProxy(applicationContext,
+          ApplicationClientProtocol.class, appSubmitter);
+
+      GetContainersRequest request = GetContainersRequest.newInstance(this.attemptId);
+      GetContainersResponse response = rmClient.getContainers(request);
+
       for (ContainerReport container : response.getContainerList()) {
-        containerIdToSubClusterIdMap.put(container.getContainerId(),
-            this.homeSubClusterId);
+        ContainerId containerId = container.getContainerId();
+        containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
         containers++;
-        LOG.debug("  From home RM {} running container {}",
-            this.homeSubClusterId, container.getContainerId());
+        LOG.debug("From home RM {} running container {}.", this.homeSubClusterId, containerId);
       }
-      LOG.info("{} running containers including AM recovered from home RM {}",
+      LOG.info("{} running containers including AM recovered from home RM {}.",
           response.getContainerList().size(), this.homeSubClusterId);
 
-      LOG.info(
-          "In all {} UAMs {} running containers including AM recovered for {}",
+      LOG.info("In all {} UAMs {} running containers including AM recovered for {}.",
           uamMap.size(), containers, this.attemptId);
 
-      if (this.amRegistrationResponse != null) {
+      if (queue != null) {
         // Initialize the AMRMProxyPolicy
-        String queue = this.amRegistrationResponse.getQueue();
-        this.policyInterpreter =
-            FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
-                getConf(), this.federationFacade, this.homeSubClusterId);
+        queue = this.amRegistrationResponse.getQueue();
+        this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
+            getConf(), this.federationFacade, this.homeSubClusterId);
       }
     } catch (IOException | YarnException e) {
       throw new YarnRuntimeException(e);
     }
+  }
 
+  /**
+   * recover SubClusterAMRMTokenIdentifierMap.
+   *
+   * If registryClient is not empty, restore directly from registryClient,
+   * otherwise restore from NMSS.
+   *
+   * @param recoveredDataMap recoveredDataMap.
+   * @return subClusterAMRMTokenIdentifierMap.
+   * @throws IOException IO Exception occurs.
+   */
+  private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdentifierMap(
+      Map<String, byte[]> recoveredDataMap) throws IOException {
+    Map<String, Token<AMRMTokenIdentifier>> uamMap;
+    ApplicationId applicationId = this.attemptId.getApplicationId();
+    if (this.registryClient != null) {
+      uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+      LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
+          uamMap.size(), applicationId);
+    } else {
+      uamMap = recoverSubClusterAMRMTokenIdentifierMapFromNMSS(recoveredDataMap);
+      LOG.info("Found {} existing UAMs for application {} in NMStateStore.",
+          uamMap.size(), applicationId);
+    }
+    return uamMap;
+  }
+
+  /**
+   * recover SubClusterAMRMTokenIdentifierMap From NMSS.
+   *
+   * @param recoveredDataMap recoveredDataMap
+   * @return subClusterAMRMTokenIdentifierMap.
+   * @throws IOException IO Exception occurs.
+   */
+  private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdentifierMapFromNMSS(
+      Map<String, byte[]> recoveredDataMap) throws IOException {
+    Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+    for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
+      String key = entry.getKey();
+      byte[] value = entry.getValue();
+
+      if (key.startsWith(NMSS_SECONDARY_SC_PREFIX)) {
+        // entry for subClusterId -> UAM AMRMTokenIdentifier
+        String scId = key.substring(NMSS_SECONDARY_SC_PREFIX.length());
+        Token<AMRMTokenIdentifier> aMRMTokenIdentifier = new Token<>();
+        aMRMTokenIdentifier.decodeFromUrlString(new String(value, STRING_TO_BYTE_FORMAT));
+        uamMap.put(scId, aMRMTokenIdentifier);
+        LOG.debug("Recovered UAM in {} from NMSS.", scId);
+      }
+    }
+    return uamMap;
   }
 
   /**

+ 190 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -26,12 +26,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.HashMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
@@ -78,6 +80,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -517,6 +520,16 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     testRecover(null);
   }
 
+  @Test
+  public void testRecoverBadSCWithAMRMProxyHA() throws Exception {
+    testRecoverWithBadSubCluster(registry);
+  }
+
+  @Test
+  public void testRecoverBadSCWithoutAMRMProxyHA() throws Exception {
+    testRecoverWithBadSubCluster(null);
+  }
+
   protected void testRecover(final RegistryOperations registryObj)
       throws Exception {
     UserGroupInformation ugi =
@@ -1242,4 +1255,181 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
       return null;
     });
   }
+
+  public void testRecoverWithBadSubCluster(final RegistryOperations registryObj)
+      throws IOException, InterruptedException {
+
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    // Prepare a list of subclusters
+    List<SubClusterId> subClusterIds = new ArrayList<>();
+    SubClusterId sc1 = SubClusterId.newInstance("SC-1");
+    SubClusterId sc2 = SubClusterId.newInstance("SC-2");
+    SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID);
+    subClusterIds.add(sc1);
+    subClusterIds.add(sc2);
+    subClusterIds.add(homeSC);
+
+    // Prepare AMRMProxy Context
+    AMRMProxyApplicationContext appContext = new AMRMProxyApplicationContextImpl(nmContext,
+        getConf(), attemptId, "test-user", null, null, null, registryObj);
+
+    // Prepare RegisterApplicationMasterRequest
+    RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor
+      initSubClusterAndInterceptor(subClusterIds, registryObj);
+
+      // Step2. Register Application And Assign Containers
+      List<Container> containers = registerApplicationAndAssignContainers(registerReq);
+
+      // Step3. Offline SC-1 cluster
+      offlineSubClusterSC1(sc1);
+
+      // Step4. Recover ApplicationMaster
+      recoverApplicationMaster(appContext);
+
+      // Step5. We recovered ApplicationMaster.
+      // SC-1 was offline, SC-2 was recovered at this time, UnmanagedAMPool.size=1 and only SC-2
+      UnmanagedAMPoolManager unmanagedAMPoolManager = interceptor.getUnmanagedAMPool();
+      Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+      Assert.assertNotNull(allUAMIds);
+      Assert.assertEquals(1, allUAMIds.size());
+      Assert.assertTrue(allUAMIds.contains(sc2.getId()));
+
+      // Step6. The first allocate call expects a fail-over exception and re-register.
+      AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+      allocateRequest.setResponseId(0);
+      LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class,
+          "AMRMProxy just restarted and recovered for " + this.attemptId +
+          ". AM should re-register and full re-send pending requests.",
+          () -> interceptor.allocate(allocateRequest));
+      interceptor.registerApplicationMaster(registerReq);
+
+      // Step7. release Containers
+      releaseContainers(containers, sc1);
+
+      // Step8. finish application
+      finishApplication();
+
+      return null;
+    });
+  }
+
+  private void initSubClusterAndInterceptor(List<SubClusterId> subClusterIds,
+      RegistryOperations registryObj) throws YarnException {
+    // Prepare subClusters SC-1, SC-2, HomeSC
+    for (SubClusterId subClusterId : subClusterIds) {
+      registerSubCluster(subClusterId);
+    }
+
+    // Prepare Interceptor
+    interceptor = new TestableFederationInterceptor();
+    AMRMProxyApplicationContext appContext = new AMRMProxyApplicationContextImpl(nmContext,
+        getConf(), attemptId, "test-user", null, null, null, registryObj);
+    interceptor.init(appContext);
+    interceptor.cleanupRegistry();
+  }
+
+  private List<Container> registerApplicationAndAssignContainers(
+      RegisterApplicationMasterRequest registerReq) throws Exception {
+
+    // Register HomeSC
+    RegisterApplicationMasterResponse registerResponse =
+        interceptor.registerApplicationMaster(registerReq);
+    Assert.assertNotNull(registerResponse);
+
+    // We only registered HomeSC, so UnmanagedAMPoolSize should be empty
+    Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+    // We assign 3 Containers to each cluster
+    int numberOfContainers = 3;
+    List<Container> containers =
+        getContainersAndAssert(numberOfContainers, numberOfContainers * 3);
+
+    // At this point, UnmanagedAMPoolSize should be equal to 2 and should contain SC-1, SC-2
+    Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+    UnmanagedAMPoolManager unmanagedAMPoolManager = interceptor.getUnmanagedAMPool();
+    Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+    Assert.assertNotNull(allUAMIds);
+    Assert.assertEquals(2, allUAMIds.size());
+    Assert.assertTrue(allUAMIds.contains("SC-1"));
+    Assert.assertTrue(allUAMIds.contains("SC-2"));
+
+    // Make sure all async hb threads are done
+    interceptor.drainAllAsyncQueue(true);
+
+    return containers;
+  }
+
+  private void offlineSubClusterSC1(SubClusterId subClusterId) throws YarnException {
+
+    ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+        interceptor.getSecondaryRMs();
+
+    // SC-1 out of service
+    deRegisterSubCluster(subClusterId);
+    secondaries.get(subClusterId.getId()).setRunningMode(false);
+  }
+
+  private void recoverApplicationMaster(AMRMProxyApplicationContext appContext)
+      throws IOException {
+    // Prepare for Federation Interceptor restart and recover
+    Map<String, byte[]> recoveredDataMap =
+        recoverDataMapForAppAttempt(nmStateStore, attemptId);
+
+    // Preserve the mock RM instances
+    MockResourceManagerFacade homeRM = interceptor.getHomeRM();
+
+    // Create a new interceptor instance and recover
+    interceptor = new TestableFederationInterceptor(homeRM,
+        interceptor.getSecondaryRMs());
+    interceptor.init(appContext);
+    interceptor.recover(recoveredDataMap);
+  }
+
+  private void releaseContainers(List<Container> containers, SubClusterId subClusterId)
+      throws Exception {
+
+    ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+        interceptor.getSecondaryRMs();
+    lastResponseId = 0;
+
+    // Get the Container list of SC-1
+    MockResourceManagerFacade sc1Facade = secondaries.get("SC-1");
+    HashMap<ApplicationId, List<ContainerId>> appContainerMap =
+        sc1Facade.getApplicationContainerIdMap();
+    Assert.assertNotNull(appContainerMap);
+    ApplicationId applicationId = attemptId.getApplicationId();
+    Assert.assertNotNull(applicationId);
+    List<ContainerId> sc1ContainerList = appContainerMap.get(applicationId);
+
+    // Release all containers,
+    // Because SC-1 is offline, it is necessary to clean up the Containers allocated by SC-1
+    containers = containers.stream()
+        .filter(container -> !sc1ContainerList.contains(container.getId()))
+        .collect(Collectors.toList());
+    releaseContainersAndAssert(containers);
+  }
+
+  private void finishApplication() throws IOException, YarnException {
+    // Finish the application
+    FinishApplicationMasterRequest finishReq =
+        Records.newRecord(FinishApplicationMasterRequest.class);
+    finishReq.setDiagnostics("");
+    finishReq.setTrackingUrl("");
+    finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+    FinishApplicationMasterResponse finishResponse =
+        interceptor.finishApplicationMaster(finishReq);
+    Assert.assertNotNull(finishResponse);
+    Assert.assertTrue(finishResponse.getIsUnregistered());
+  }
 }