|
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProvider
|
|
|
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
|
@@ -49,7 +50,11 @@ import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import static org.mockito.Mockito.any;
|
|
|
+import static org.mockito.Mockito.atLeast;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
/**
|
|
@@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider {
|
|
|
private FederationStateStore stateStore;
|
|
|
private final String dummyCapability = "cap";
|
|
|
|
|
|
+ private GetClusterMetricsResponse threadResponse;
|
|
|
+
|
|
|
@Before
|
|
|
public void setUp() throws IOException, YarnException {
|
|
|
conf = new YarnConfiguration();
|
|
|
- stateStore = new MemoryFederationStateStore();
|
|
|
+
|
|
|
+ // Configure Facade cache to use a very long ttl
|
|
|
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60);
|
|
|
+
|
|
|
+ stateStore = spy(new MemoryFederationStateStore());
|
|
|
stateStore.init(conf);
|
|
|
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
|
|
|
+ verify(stateStore, times(0))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -75,12 +88,25 @@ public class TestFederationRMFailoverProxyProvider {
|
|
|
stateStore = null;
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout = 60000)
|
|
|
public void testFederationRMFailoverProxyProvider() throws Exception {
|
|
|
+ testProxyProvider(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout=60000)
|
|
|
+ public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache()
|
|
|
+ throws Exception {
|
|
|
+ testProxyProvider(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testProxyProvider(boolean facadeFlushCache) throws Exception {
|
|
|
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
|
|
|
final MiniYARNCluster cluster = new MiniYARNCluster(
|
|
|
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
|
|
|
|
|
|
+ conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
|
|
|
+ facadeFlushCache);
|
|
|
+
|
|
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
|
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
|
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
|
@@ -100,14 +126,20 @@ public class TestFederationRMFailoverProxyProvider {
|
|
|
// Transition rm3 to active;
|
|
|
makeRMActive(subClusterId, cluster, 2);
|
|
|
|
|
|
- ApplicationClientProtocol client = FederationProxyProviderUtil
|
|
|
+ final ApplicationClientProtocol client = FederationProxyProviderUtil
|
|
|
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
|
|
|
UserGroupInformation.getCurrentUser());
|
|
|
|
|
|
+ verify(stateStore, times(1))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
+
|
|
|
// client will retry until the rm becomes active.
|
|
|
GetClusterMetricsResponse response =
|
|
|
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
|
|
|
|
|
+ verify(stateStore, times(1))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
+
|
|
|
// validate response
|
|
|
checkResponse(response);
|
|
|
|
|
@@ -118,7 +150,50 @@ public class TestFederationRMFailoverProxyProvider {
|
|
|
|
|
|
// Transition rm2 to active;
|
|
|
makeRMActive(subClusterId, cluster, 1);
|
|
|
- response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
|
|
+
|
|
|
+ verify(stateStore, times(1))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
+
|
|
|
+ threadResponse = null;
|
|
|
+ Thread thread = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ // In non flush cache case, we will be hitting the cache with old RM
|
|
|
+ // address and keep failing before the cache is flushed
|
|
|
+ threadResponse =
|
|
|
+ client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
|
|
|
+ } catch (YarnException | IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ thread.start();
|
|
|
+
|
|
|
+ if (!facadeFlushCache) {
|
|
|
+ // Add a wait so that hopefully the thread has started hitting old cached
|
|
|
+ Thread.sleep(500);
|
|
|
+
|
|
|
+ // Should still be hitting cache
|
|
|
+ verify(stateStore, times(1))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
+
|
|
|
+ // Force flush cache, so that it will pick up the new RM address
|
|
|
+ FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
|
|
|
+ true);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for the thread to finish and grab result
|
|
|
+ thread.join();
|
|
|
+ response = threadResponse;
|
|
|
+
|
|
|
+ if (facadeFlushCache) {
|
|
|
+ verify(stateStore, atLeast(2))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
+ } else {
|
|
|
+ verify(stateStore, times(2))
|
|
|
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
|
|
|
+ }
|
|
|
|
|
|
// validate response
|
|
|
checkResponse(response);
|