|
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeConte
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.junit.jupiter.api.Assertions;
|
|
import org.junit.jupiter.api.Assertions;
|
|
@@ -58,10 +59,13 @@ import org.junit.jupiter.api.Test;
|
|
import org.junit.jupiter.api.AfterEach;
|
|
import org.junit.jupiter.api.AfterEach;
|
|
import org.junit.jupiter.api.BeforeEach;
|
|
import org.junit.jupiter.api.BeforeEach;
|
|
import org.junit.jupiter.api.Tag;
|
|
import org.junit.jupiter.api.Tag;
|
|
|
|
+import org.junit.jupiter.params.ParameterizedTest;
|
|
|
|
+import org.junit.jupiter.params.provider.EnumSource;
|
|
import org.junit.jupiter.api.TestInfo;
|
|
import org.junit.jupiter.api.TestInfo;
|
|
|
|
|
|
|
|
|
|
public class TestObserverWithRouter {
|
|
public class TestObserverWithRouter {
|
|
|
|
+ private static final int NUM_NAMESERVICES = 2;
|
|
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
|
|
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
|
|
private MiniRouterDFSCluster cluster;
|
|
private MiniRouterDFSCluster cluster;
|
|
private RouterContext routerContext;
|
|
private RouterContext routerContext;
|
|
@@ -102,7 +106,7 @@ public class TestObserverWithRouter {
|
|
.iterator()
|
|
.iterator()
|
|
.forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
|
|
.forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
|
|
}
|
|
}
|
|
- cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode);
|
|
|
|
|
|
+ cluster = new MiniRouterDFSCluster(true, NUM_NAMESERVICES, numberOfNamenode);
|
|
cluster.addNamenodeOverrides(conf);
|
|
cluster.addNamenodeOverrides(conf);
|
|
// Start NNs and DNs and wait until ready
|
|
// Start NNs and DNs and wait until ready
|
|
cluster.startCluster();
|
|
cluster.startCluster();
|
|
@@ -139,15 +143,34 @@ public class TestObserverWithRouter {
|
|
routerContext = cluster.getRandomRouter();
|
|
routerContext = cluster.getRandomRouter();
|
|
}
|
|
}
|
|
|
|
|
|
- private static Configuration getConfToEnableObserverReads() {
|
|
|
|
|
|
+ public enum ConfigSetting {
|
|
|
|
+ USE_NAMENODE_PROXY_FLAG,
|
|
|
|
+ USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) {
|
|
Configuration conf = new Configuration();
|
|
Configuration conf = new Configuration();
|
|
- conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
|
|
|
|
|
|
+ switch (configSetting) {
|
|
|
|
+ case USE_NAMENODE_PROXY_FLAG:
|
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
|
|
|
|
+ break;
|
|
|
|
+ case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
|
+ conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
|
|
|
|
+ "." +
|
|
|
|
+ routerContext.getRouter()
|
|
|
|
+ .getRpcServerAddress()
|
|
|
|
+ .getHostName(), RouterObserverReadProxyProvider.class.getName());
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ Assertions.fail("Unknown config setting: " + configSetting);
|
|
|
|
+ }
|
|
return conf;
|
|
return conf;
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testObserverRead() throws Exception {
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testObserverRead(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
internalTestObserverRead();
|
|
internalTestObserverRead();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -187,13 +210,15 @@ public class TestObserverWithRouter {
|
|
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
|
|
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
- public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
|
|
|
|
|
|
+ public void testObserverReadWithoutFederatedStatePropagation(ConfigSetting configSetting)
|
|
|
|
+ throws Exception {
|
|
Configuration confOverrides = new Configuration(false);
|
|
Configuration confOverrides = new Configuration(false);
|
|
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
|
|
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
|
|
startUpCluster(2, confOverrides);
|
|
startUpCluster(2, confOverrides);
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
.getRouter().getNamenodeResolver()
|
|
.getRouter().getNamenodeResolver()
|
|
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
@@ -216,14 +241,16 @@ public class TestObserverWithRouter {
|
|
assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
|
|
assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
- public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
|
|
|
|
|
|
+ public void testDisablingObserverReadUsingNameserviceOverride(ConfigSetting configSetting)
|
|
|
|
+ throws Exception {
|
|
// Disable observer reads using per-nameservice override
|
|
// Disable observer reads using per-nameservice override
|
|
Configuration confOverrides = new Configuration(false);
|
|
Configuration confOverrides = new Configuration(false);
|
|
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
|
|
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
|
|
startUpCluster(2, confOverrides);
|
|
startUpCluster(2, confOverrides);
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
|
|
|
|
Path path = new Path("/testFile");
|
|
Path path = new Path("/testFile");
|
|
fileSystem.create(path).close();
|
|
fileSystem.create(path).close();
|
|
@@ -239,9 +266,10 @@ public class TestObserverWithRouter {
|
|
assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver);
|
|
assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testReadWhenObserverIsDown() throws Exception {
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testReadWhenObserverIsDown(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
Path path = new Path("/testFile1");
|
|
Path path = new Path("/testFile1");
|
|
// Send Create call to active
|
|
// Send Create call to active
|
|
fileSystem.create(path).close();
|
|
fileSystem.create(path).close();
|
|
@@ -267,9 +295,10 @@ public class TestObserverWithRouter {
|
|
rpcCountForObserver);
|
|
rpcCountForObserver);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testMultipleObserver() throws Exception {
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testMultipleObserver(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
Path path = new Path("/testFile1");
|
|
Path path = new Path("/testFile1");
|
|
// Send Create call to active
|
|
// Send Create call to active
|
|
fileSystem.create(path).close();
|
|
fileSystem.create(path).close();
|
|
@@ -406,9 +435,10 @@ public class TestObserverWithRouter {
|
|
innerCluster.shutdown();
|
|
innerCluster.shutdown();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testUnavailableObserverNN() throws Exception {
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testUnavailableObserverNN(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
stopObserver(2);
|
|
stopObserver(2);
|
|
|
|
|
|
Path path = new Path("/testFile");
|
|
Path path = new Path("/testFile");
|
|
@@ -442,9 +472,10 @@ public class TestObserverWithRouter {
|
|
assertTrue("There must be unavailable namenodes", hasUnavailable);
|
|
assertTrue("There must be unavailable namenodes", hasUnavailable);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testRouterMsync() throws Exception {
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testRouterMsync(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
Path path = new Path("/testFile");
|
|
Path path = new Path("/testFile");
|
|
|
|
|
|
// Send Create call to active
|
|
// Send Create call to active
|
|
@@ -464,9 +495,10 @@ public class TestObserverWithRouter {
|
|
rpcCountForActive);
|
|
rpcCountForActive);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testSingleRead() throws Exception {
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testSingleRead(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
.getRouter().getNamenodeResolver()
|
|
.getRouter().getNamenodeResolver()
|
|
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
@@ -554,10 +586,11 @@ public class TestObserverWithRouter {
|
|
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
|
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testStateIdProgressionInRouter() throws Exception {
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {
|
|
Path rootPath = new Path("/");
|
|
Path rootPath = new Path("/");
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
RouterStateIdContext routerStateIdContext = routerContext
|
|
RouterStateIdContext routerStateIdContext = routerContext
|
|
.getRouterRpcServer()
|
|
.getRouterRpcServer()
|
|
.getRouterStateIdContext();
|
|
.getRouterStateIdContext();
|
|
@@ -570,9 +603,10 @@ public class TestObserverWithRouter {
|
|
assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get());
|
|
assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get());
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
- public void testSharedStateInRouterStateIdContext() throws Exception {
|
|
|
|
|
|
+ public void testSharedStateInRouterStateIdContext(ConfigSetting configSetting) throws Exception {
|
|
Path rootPath = new Path("/");
|
|
Path rootPath = new Path("/");
|
|
long cleanupPeriodMs = 1000;
|
|
long cleanupPeriodMs = 1000;
|
|
|
|
|
|
@@ -580,7 +614,7 @@ public class TestObserverWithRouter {
|
|
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs);
|
|
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs);
|
|
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10);
|
|
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10);
|
|
startUpCluster(1, conf);
|
|
startUpCluster(1, conf);
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
|
|
RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
|
|
.getRouterStateIdContext();
|
|
.getRouterStateIdContext();
|
|
|
|
|
|
@@ -616,9 +650,10 @@ public class TestObserverWithRouter {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
- public void testRouterStateIdContextCleanup() throws Exception {
|
|
|
|
|
|
+ public void testRouterStateIdContextCleanup(ConfigSetting configSetting) throws Exception {
|
|
Path rootPath = new Path("/");
|
|
Path rootPath = new Path("/");
|
|
long recordExpiry = TimeUnit.SECONDS.toMillis(1);
|
|
long recordExpiry = TimeUnit.SECONDS.toMillis(1);
|
|
|
|
|
|
@@ -626,7 +661,7 @@ public class TestObserverWithRouter {
|
|
confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry);
|
|
confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry);
|
|
|
|
|
|
startUpCluster(1, confOverride);
|
|
startUpCluster(1, confOverride);
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
|
|
RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
|
|
.getRouterStateIdContext();
|
|
.getRouterStateIdContext();
|
|
|
|
|
|
@@ -645,9 +680,11 @@ public class TestObserverWithRouter {
|
|
assertTrue(namespace2.isEmpty());
|
|
assertTrue(namespace2.isEmpty());
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
|
- public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception {
|
|
|
|
|
|
+ public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSetting)
|
|
|
|
+ throws Exception {
|
|
Path rootPath = new Path("/");
|
|
Path rootPath = new Path("/");
|
|
|
|
|
|
Configuration confOverride = new Configuration(false);
|
|
Configuration confOverride = new Configuration(false);
|
|
@@ -655,7 +692,7 @@ public class TestObserverWithRouter {
|
|
confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
|
|
confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
|
|
startUpCluster(1, confOverride);
|
|
startUpCluster(1, confOverride);
|
|
|
|
|
|
- fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
|
|
|
|
|
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
|
fileSystem.listStatus(rootPath);
|
|
fileSystem.listStatus(rootPath);
|
|
int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
|
|
int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
|
|
|
|
|
|
@@ -682,4 +719,156 @@ public class TestObserverWithRouter {
|
|
assertEquals("List-status should show newly created directories.",
|
|
assertEquals("List-status should show newly created directories.",
|
|
initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
|
|
initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
|
|
|
|
+ clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
|
|
|
|
+ routerContext.getRouter().getRpcServerAddress().getHostName(), 0);
|
|
|
|
+ fileSystem = routerContext.getFileSystem(clientConfiguration);
|
|
|
|
+
|
|
|
|
+ List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
|
|
+ .getRouter().getNamenodeResolver()
|
|
|
|
+ .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
|
|
+ assertEquals("First namenode should be observer", namenodes.get(0).getState(),
|
|
|
|
+ FederationNamenodeServiceState.OBSERVER);
|
|
|
|
+ Path path = new Path("/");
|
|
|
|
+
|
|
|
|
+ long rpcCountForActive;
|
|
|
|
+ long rpcCountForObserver;
|
|
|
|
+
|
|
|
|
+ // Send read requests
|
|
|
|
+ int numListings = 15;
|
|
|
|
+ for (int i = 0; i < numListings; i++) {
|
|
|
|
+ fileSystem.listFiles(path, false);
|
|
|
|
+ }
|
|
|
|
+ fileSystem.close();
|
|
|
|
+
|
|
|
|
+ rpcCountForActive = routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCMetrics().getActiveProxyOps();
|
|
|
|
+
|
|
|
|
+ rpcCountForObserver = routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCMetrics().getObserverProxyOps();
|
|
|
|
+
|
|
|
|
+ switch (configSetting) {
|
|
|
|
+ case USE_NAMENODE_PROXY_FLAG:
|
|
|
|
+ // First read goes to active.
|
|
|
|
+ assertEquals("Calls sent to the active", 1, rpcCountForActive);
|
|
|
|
+ // The rest of the reads are sent to the observer.
|
|
|
|
+ assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver);
|
|
|
|
+ break;
|
|
|
|
+ case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
|
+ // An msync is sent to each active namenode for each read.
|
|
|
|
+ // Total msyncs will be (numListings * num_of_nameservices).
|
|
|
|
+ assertEquals("Msyncs sent to the active namenodes",
|
|
|
|
+ NUM_NAMESERVICES * numListings, rpcCountForActive);
|
|
|
|
+ // All reads should be sent of the observer.
|
|
|
|
+ assertEquals("Reads sent to observer", numListings, rpcCountForObserver);
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ Assertions.fail("Unknown config setting: " + configSetting);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
|
|
|
|
+ clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
|
|
|
|
+ routerContext.getRouter().getRpcServerAddress().getHostName(), 3000);
|
|
|
|
+ fileSystem = routerContext.getFileSystem(clientConfiguration);
|
|
|
|
+
|
|
|
|
+ List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
|
|
+ .getRouter().getNamenodeResolver()
|
|
|
|
+ .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
|
|
+ assertEquals("First namenode should be observer", namenodes.get(0).getState(),
|
|
|
|
+ FederationNamenodeServiceState.OBSERVER);
|
|
|
|
+ Path path = new Path("/");
|
|
|
|
+
|
|
|
|
+ long rpcCountForActive;
|
|
|
|
+ long rpcCountForObserver;
|
|
|
|
+
|
|
|
|
+ fileSystem.listFiles(path, false);
|
|
|
|
+ fileSystem.listFiles(path, false);
|
|
|
|
+ Thread.sleep(5000);
|
|
|
|
+ fileSystem.listFiles(path, false);
|
|
|
|
+ fileSystem.close();
|
|
|
|
+
|
|
|
|
+ rpcCountForActive = routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCMetrics().getActiveProxyOps();
|
|
|
|
+
|
|
|
|
+ rpcCountForObserver = routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCMetrics().getObserverProxyOps();
|
|
|
|
+
|
|
|
|
+ switch (configSetting) {
|
|
|
|
+ case USE_NAMENODE_PROXY_FLAG:
|
|
|
|
+ // First read goes to active.
|
|
|
|
+ assertEquals("Calls sent to the active", 1, rpcCountForActive);
|
|
|
|
+ // The rest of the reads are sent to the observer.
|
|
|
|
+ assertEquals("Reads sent to observer", 2, rpcCountForObserver);
|
|
|
|
+ break;
|
|
|
|
+ case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
|
+ // 4 msyncs expected. 2 for the first read, and 2 for the third read
|
|
|
|
+ // after the auto-msync period has elapsed during the sleep.
|
|
|
|
+ assertEquals("Msyncs sent to the active namenodes",
|
|
|
|
+ 4, rpcCountForActive);
|
|
|
|
+ // All three reads should be sent of the observer.
|
|
|
|
+ assertEquals("Reads sent to observer", 3, rpcCountForObserver);
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ Assertions.fail("Unknown config setting: " + configSetting);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @EnumSource(ConfigSetting.class)
|
|
|
|
+ @ParameterizedTest
|
|
|
|
+ public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception {
|
|
|
|
+ Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
|
|
|
|
+ clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
|
|
|
|
+ routerContext.getRouter().getRpcServerAddress().getHostName(), 3000);
|
|
|
|
+ fileSystem = routerContext.getFileSystem(clientConfiguration);
|
|
|
|
+
|
|
|
|
+ List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
|
|
+ .getRouter().getNamenodeResolver()
|
|
|
|
+ .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
|
|
|
|
+ assertEquals("First namenode should be observer", namenodes.get(0).getState(),
|
|
|
|
+ FederationNamenodeServiceState.OBSERVER);
|
|
|
|
+ Path path = new Path("/");
|
|
|
|
+
|
|
|
|
+ long rpcCountForActive;
|
|
|
|
+ long rpcCountForObserver;
|
|
|
|
+
|
|
|
|
+ fileSystem.listFiles(path, false);
|
|
|
|
+ Thread.sleep(5000);
|
|
|
|
+ fileSystem.mkdirs(new Path(path, "mkdirLocation"));
|
|
|
|
+ fileSystem.listFiles(path, false);
|
|
|
|
+ fileSystem.close();
|
|
|
|
+
|
|
|
|
+ rpcCountForActive = routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCMetrics().getActiveProxyOps();
|
|
|
|
+
|
|
|
|
+ rpcCountForObserver = routerContext.getRouter().getRpcServer()
|
|
|
|
+ .getRPCMetrics().getObserverProxyOps();
|
|
|
|
+
|
|
|
|
+ switch (configSetting) {
|
|
|
|
+ case USE_NAMENODE_PROXY_FLAG:
|
|
|
|
+ // First listing and mkdir go to the active.
|
|
|
|
+ assertEquals("Calls sent to the active namenodes", 2, rpcCountForActive);
|
|
|
|
+ // Second listing goes to the observer.
|
|
|
|
+ assertEquals("Read sent to observer", 1, rpcCountForObserver);
|
|
|
|
+ break;
|
|
|
|
+ case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
|
+ // 5 calls to the active namenodes expected. 4 msync and a mkdir.
|
|
|
|
+ // Each of the 2 reads results in an msync to 2 nameservices.
|
|
|
|
+ // The mkdir also goes to the active.
|
|
|
|
+ assertEquals("Calls sent to the active namenodes",
|
|
|
|
+ 5, rpcCountForActive);
|
|
|
|
+ // Both reads should be sent of the observer.
|
|
|
|
+ assertEquals("Reads sent to observer", 2, rpcCountForObserver);
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ Assertions.fail("Unknown config setting: " + configSetting);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|