浏览代码

YARN-11425. [Federation] Router Supports SubClusterCleaner. (#5326)

slfan1989 2 年之前
父节点
当前提交
a5f48eacca

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4299,6 +4299,28 @@ public class YarnConfiguration extends Configuration {
       ROUTER_PREFIX + "interceptor.allow-partial-result.enable";
   public static final boolean DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = false;
 
+  /** Router SubCluster Cleaner Thread Clean Interval Time. **/
+  public static final String ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME =
+      ROUTER_PREFIX + "subcluster.cleaner.interval.time";
+  public static final long DEFAULT_ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME =
+      TimeUnit.SECONDS.toMillis(60);
+
+  /** Router SubCluster Timeout Allowed by Router. **/
+  public static final String ROUTER_SUBCLUSTER_EXPIRATION_TIME =
+      ROUTER_PREFIX + "subcluster.heartbeat.expiration.time";
+  public static final long DEFAULT_ROUTER_SUBCLUSTER_EXPIRATION_TIME =
+      TimeUnit.MINUTES.toMillis(30);
+
+  /** Router Thread Pool Schedule Thread Number. **/
+  public static final String ROUTER_SCHEDULED_EXECUTOR_THREADS =
+      ROUTER_PREFIX + "scheduled.executor.threads";
+  public static final int DEFAULT_ROUTER_SCHEDULED_EXECUTOR_THREADS = 1;
+
+  /** Enable DeregisterSubCluster, enabled by default. **/
+  public static final String ROUTER_DEREGISTER_SUBCLUSTER_ENABLED =
+      ROUTER_PREFIX + "deregister.subcluster.enabled";
+  public static final boolean DEFAULT_ROUTER_DEREGISTER_SUBCLUSTER_ENABLED = true;
+
   ////////////////////////////////
   // CSI Volume configs
   ////////////////////////////////

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5117,4 +5117,44 @@
     </description>
   </property>
 
+  <property>
+    <description>
+      The number of threads to use for the Router scheduled executor service.
+    </description>
+    <name>yarn.router.subcluster.cleaner.interval.time</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+      The interval at which the subClusterCleaner runs. Default is 60s.
+    </description>
+    <name>yarn.router.subcluster.cleaner.interval.time</name>
+    <value>60s</value>
+  </property>
+
+  <property>
+    <description>
+      SubCluster heartbeat timeout. Default is 30mins.
+    </description>
+    <name>yarn.router.subcluster.heartbeat.expiration.time</name>
+    <value>30m</value>
+  </property>
+
+  <property>
+    <description>
+      Whether to enable deregisterSubCluster. Default is true.
+    </description>
+    <name>yarn.router.deregister.subcluster.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <description>
+      Number of Router Scheduler Threads.
+    </description>
+    <name>yarn.router.scheduled.executor.threads</name>
+    <value>1</value>
+  </property>
+
 </configuration>

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

@@ -34,6 +34,7 @@ import java.util.Comparator;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -606,4 +607,14 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public void setMembership(Map<SubClusterId, SubClusterInfo> membership) {
     this.membership = membership;
   }
+
+  @VisibleForTesting
+  public void setExpiredHeartbeat(SubClusterId subClusterId, long heartBearTime)
+      throws YarnRuntimeException {
+    if(!membership.containsKey(subClusterId)){
+      throw new YarnRuntimeException("subClusterId = " + subClusterId + "not exist");
+    }
+    SubClusterInfo subClusterInfo = membership.get(subClusterId);
+    subClusterInfo.setLastHeartBeat(heartBearTime);
+  }
 }

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -89,6 +89,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1187,4 +1190,25 @@ public final class FederationStateStoreFacade {
           reservationHomeSubCluster);
     }
   }
+
+  /**
+   * Deregister subCluster, Update the subCluster state to
+   * SC_LOST、SC_DECOMMISSIONED etc.
+   *
+   * @param subClusterId subClusterId.
+   * @param subClusterState The state of the subCluster to be updated.
+   * @throws YarnException yarn exception.
+   * @return If Deregister subCluster is successful, return true, otherwise, return false.
+   */
+  public boolean deregisterSubCluster(SubClusterId subClusterId,
+      SubClusterState subClusterState) throws YarnException {
+    SubClusterDeregisterRequest deregisterRequest =
+        SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState);
+    SubClusterDeregisterResponse response = stateStore.deregisterSubCluster(deregisterRequest);
+    // If the response is not empty, deregisterSubCluster is successful.
+    if (response != null) {
+      return true;
+    }
+    return false;
+  }
 }

+ 31 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java

@@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.router;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.lang.time.DurationFormatUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
+import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
 import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
 import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
 import org.apache.hadoop.yarn.server.router.webapp.RouterWebApp;
@@ -50,6 +54,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_ROUTER_DEREGISTER_SUBCLUSTER_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.ROUTER_DEREGISTER_SUBCLUSTER_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.ROUTER_SCHEDULED_EXECUTOR_THREADS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_ROUTER_SCHEDULED_EXECUTOR_THREADS;
+
 /**
  * The router is a stateless YARN component which is the entry point to the
  * cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
@@ -88,6 +99,9 @@ public class Router extends CompositeService {
 
   private static final String METRICS_NAME = "Router";
 
+  private ScheduledThreadPoolExecutor scheduledExecutorService;
+  private SubClusterCleaner subClusterCleaner;
+
   public Router() {
     super(Router.class.getName());
   }
@@ -117,6 +131,12 @@ public class Router extends CompositeService {
     addService(pauseMonitor);
     jm.setPauseMonitor(pauseMonitor);
 
+    // Initialize subClusterCleaner
+    this.subClusterCleaner = new SubClusterCleaner(this.conf);
+    int scheduledExecutorThreads = conf.getInt(ROUTER_SCHEDULED_EXECUTOR_THREADS,
+        DEFAULT_ROUTER_SCHEDULED_EXECUTOR_THREADS);
+    this.scheduledExecutorService = new ScheduledThreadPoolExecutor(scheduledExecutorThreads);
+
     WebServiceClient.initialize(config);
     super.serviceInit(conf);
   }
@@ -128,6 +148,16 @@ public class Router extends CompositeService {
     } catch (IOException e) {
       throw new YarnRuntimeException("Failed Router login", e);
     }
+    boolean isDeregisterSubClusterEnabled = this.conf.getBoolean(
+        ROUTER_DEREGISTER_SUBCLUSTER_ENABLED, DEFAULT_ROUTER_DEREGISTER_SUBCLUSTER_ENABLED);
+    if (isDeregisterSubClusterEnabled) {
+      long scCleanerIntervalMs = this.conf.getTimeDuration(ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME,
+          DEFAULT_ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME, TimeUnit.MINUTES);
+      this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
+          0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled SubClusterCleaner With Interval: {}.",
+          DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
+    }
     startWepApp();
     super.serviceStart();
   }
@@ -146,12 +176,7 @@ public class Router extends CompositeService {
   }
 
   protected void shutDown() {
-    new Thread() {
-      @Override
-      public void run() {
-        Router.this.stop();
-      }
-    }.start();
+    new Thread(() -> Router.this.stop()).start();
   }
 
   protected RouterClientRMService createClientRMProxyService() {

+ 92 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/SubClusterCleaner.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The SubClusterCleaner thread is used to check whether the SubCluster
+ * has exceeded the heartbeat time.
+ * If the SubCluster heartbeat time exceeds 30 mins, set the SubCluster to LOST.
+ * Check the thread every 1 mins, check once.
+ */
+public class SubClusterCleaner implements Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SubClusterCleaner.class);
+  private FederationStateStoreFacade federationFacade;
+  private long heartbeatExpirationMillis;
+
+  public SubClusterCleaner(Configuration conf) {
+    federationFacade = FederationStateStoreFacade.getInstance();
+    this.heartbeatExpirationMillis =
+        conf.getTimeDuration(YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME,
+        YarnConfiguration.DEFAULT_ROUTER_SUBCLUSTER_EXPIRATION_TIME, TimeUnit.MINUTES);
+  }
+
+  @Override
+  public void run() {
+    try {
+      // Step1. Get Current Time.
+      Date now = new Date();
+      LOG.info("SubClusterCleaner at {}.", now);
+
+      Map<SubClusterId, SubClusterInfo> subClusters = federationFacade.getSubClusters(true);
+
+      for (Map.Entry<SubClusterId, SubClusterInfo> subCluster : subClusters.entrySet()) {
+        // Step2. Get information about subClusters.
+        SubClusterId subClusterId = subCluster.getKey();
+        SubClusterInfo subClusterInfo = subCluster.getValue();
+        SubClusterState subClusterState = subClusterInfo.getState();
+        long lastHeartBeatTime = subClusterInfo.getLastHeartBeat();
+
+        // We Only Check SubClusters in NEW and RUNNING states
+        if (!subClusterState.isUnusable()) {
+          long heartBeatInterval = now.getTime() - lastHeartBeatTime;
+          try {
+            // HeartBeat Interval Exceeds Expiration Time
+            if (heartBeatInterval > heartbeatExpirationMillis) {
+              LOG.info("Deregister SubCluster {} in state {} last heartbeat at {}.",
+                  subClusterId, subClusterState, new Date(lastHeartBeatTime));
+              federationFacade.deregisterSubCluster(subClusterId, SubClusterState.SC_LOST);
+            }
+          } catch (YarnException e) {
+            LOG.error("deregisterSubCluster failed on SubCluster {}.", subClusterId, e);
+          }
+        } else {
+          LOG.debug("SubCluster {} in state {} last heartbeat at {}, " +
+              "heartbeat interval < 30mins, no need for Deregister.",
+              subClusterId, subClusterState, new Date(lastHeartBeatTime));
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("SubClusterCleaner Fails.", e);
+    }
+  }
+}

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/package-info.java

@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router Cleaner package. **/
+package org.apache.hadoop.yarn.server.router.cleaner;

+ 158 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/cleaner/TestSubClusterCleaner.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class TestSubClusterCleaner {
+
+  ////////////////////////////////
+  // Router Constants
+  ////////////////////////////////
+  private Configuration conf;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreFacade facade;
+  private SubClusterCleaner cleaner;
+  private final static int NUM_SUBCLUSTERS = 4;
+  private final static long EXPIRATION_TIME = Time.now() - 5000;
+
+  @Before
+  public void setup() throws YarnException {
+    conf = new YarnConfiguration();
+    conf.setLong(YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME, 1000);
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+
+    facade = FederationStateStoreFacade.getInstance();
+    facade.reinitialize(stateStore, conf);
+
+    cleaner = new SubClusterCleaner(conf);
+    for (int i = 0; i < NUM_SUBCLUSTERS; i++){
+      // Create sub cluster id and info
+      SubClusterId subClusterId = SubClusterId.newInstance("SC-" + i);
+      SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+          "127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3", "127.0.0.1:4",
+           SubClusterState.SC_RUNNING, Time.now(), "");
+      // Register the subCluster
+      stateStore.registerSubCluster(
+          SubClusterRegisterRequest.newInstance(subClusterInfo));
+    }
+  }
+
+  @Test
+  public void testSubClustersWithOutHeartBeat()
+      throws InterruptedException, TimeoutException, YarnException {
+
+    // We set up such a unit test, We set the status of all subClusters to RUNNING,
+    // and Manually set subCluster heartbeat expiration.
+    // At this time, the size of the Active SubCluster is 0.
+    Map<SubClusterId, SubClusterInfo> subClustersMap = facade.getSubClusters(false);
+
+    // Step1. Manually set subCluster heartbeat expiration.
+    // subCluster has no heartbeat, and all subClusters will expire.
+    subClustersMap.keySet().forEach(subClusterId ->
+        stateStore.setExpiredHeartbeat(subClusterId, EXPIRATION_TIME));
+
+    // Step2. Run the Cleaner to change the status of the expired SubCluster to SC_LOST.
+    cleaner.run();
+
+    // Step3. All clusters have expired,
+    // so the current Federation has no active subClusters.
+    int count = facade.getActiveSubClustersCount();
+    Assert.assertEquals(0, count);
+
+    // Step4. Check Active SubCluster Status.
+    // We want all subClusters to be SC_LOST.
+    subClustersMap.values().forEach(subClusterInfo -> {
+      SubClusterState subClusterState = subClusterInfo.getState();
+      Assert.assertEquals(SubClusterState.SC_LOST, subClusterState);
+    });
+  }
+
+  @Test
+  public void testSubClustersPartWithHeartBeat() throws YarnException, InterruptedException {
+
+    // Step1. Manually set subCluster heartbeat expiration.
+    for (int i = 0; i < NUM_SUBCLUSTERS; i++) {
+      // Create subCluster id and info.
+      expiredSubcluster("SC-" + i);
+    }
+
+    // Step2. Run the Cleaner to change the status of the expired SubCluster to SC_LOST.
+    cleaner.run();
+
+    // Step3. Let SC-0, SC-1 resume heartbeat.
+    resumeSubClusterHeartbeat("SC-0");
+    resumeSubClusterHeartbeat("SC-1");
+
+    // Step4. At this point we should have 2 subClusters that are surviving clusters.
+    int count = facade.getActiveSubClustersCount();
+    Assert.assertEquals(2, count);
+
+    // Step5. The result we expect is that SC-0 and SC-1 are in the RUNNING state,
+    // and SC-2 and SC-3 are in the SC_LOST state.
+    checkSubClusterState("SC-0", SubClusterState.SC_RUNNING);
+    checkSubClusterState("SC-1", SubClusterState.SC_RUNNING);
+    checkSubClusterState("SC-2", SubClusterState.SC_LOST);
+    checkSubClusterState("SC-3", SubClusterState.SC_LOST);
+  }
+
+  private void resumeSubClusterHeartbeat(String pSubClusterId)
+      throws YarnException {
+    SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
+    SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest.newInstance(
+        subClusterId, Time.now(), SubClusterState.SC_RUNNING, "test");
+    SubClusterHeartbeatResponse response = stateStore.subClusterHeartbeat(request);
+    Assert.assertNotNull(response);
+  }
+
+  private void expiredSubcluster(String pSubClusterId) {
+    SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
+    stateStore.setExpiredHeartbeat(subClusterId, EXPIRATION_TIME);
+  }
+
+  private void checkSubClusterState(String pSubClusterId, SubClusterState expectState)
+      throws YarnException {
+    Map<SubClusterId, SubClusterInfo> subClustersMap = facade.getSubClusters(false);
+    SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
+    SubClusterInfo subClusterInfo = subClustersMap.get(subClusterId);
+    if (subClusterInfo == null) {
+      throw new YarnException("subClusterId=" + pSubClusterId + " does not exist.");
+    }
+    Assert.assertEquals(expectState, subClusterInfo.getState());
+  }
+}