|
@@ -17,6 +17,9 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
|
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNotEquals;
|
|
@@ -41,6 +44,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.ClientGSIContext;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
|
|
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
|
@@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeConte
|
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider;
|
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
@@ -72,6 +77,10 @@ public class TestObserverWithRouter {
|
|
|
private RouterContext routerContext;
|
|
|
private FileSystem fileSystem;
|
|
|
|
|
|
+ private static final String ROUTER_NS_ID = "router-service";
|
|
|
+ private static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
|
|
|
+ "dfs.client.failover.observer.auto-msync-period";
|
|
|
+
|
|
|
@BeforeEach
|
|
|
void init(TestInfo info) throws Exception {
|
|
|
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
|
|
@@ -146,7 +155,8 @@ public class TestObserverWithRouter {
|
|
|
|
|
|
public enum ConfigSetting {
|
|
|
USE_NAMENODE_PROXY_FLAG,
|
|
|
- USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER
|
|
|
+ USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER,
|
|
|
+ USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER
|
|
|
}
|
|
|
|
|
|
private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) {
|
|
@@ -162,6 +172,16 @@ public class TestObserverWithRouter {
|
|
|
.getRpcServerAddress()
|
|
|
.getHostName(), RouterObserverReadProxyProvider.class.getName());
|
|
|
break;
|
|
|
+ case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
|
|
|
+ // HA configs
|
|
|
+ conf.set(DFS_NAMESERVICES, ROUTER_NS_ID);
|
|
|
+ conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + ROUTER_NS_ID, "router1");
|
|
|
+ conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ "." + ROUTER_NS_ID + ".router1",
|
|
|
+ routerContext.getFileSystemURI().toString());
|
|
|
+ DistributedFileSystem.setDefaultUri(conf, "hdfs://" + ROUTER_NS_ID);
|
|
|
+ conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ROUTER_NS_ID,
|
|
|
+ RouterObserverReadConfiguredFailoverProxyProvider.class.getName());
|
|
|
+ break;
|
|
|
default:
|
|
|
Assertions.fail("Unknown config setting: " + configSetting);
|
|
|
}
|
|
@@ -758,8 +778,10 @@ public class TestObserverWithRouter {
|
|
|
@ParameterizedTest
|
|
|
public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception {
|
|
|
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
|
|
|
- clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
|
|
|
- routerContext.getRouter().getRpcServerAddress().getHostName(), 0);
|
|
|
+ String configKeySuffix =
|
|
|
+ configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ?
|
|
|
+ ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName();
|
|
|
+ clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 0);
|
|
|
fileSystem = routerContext.getFileSystem(clientConfiguration);
|
|
|
|
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
@@ -793,6 +815,7 @@ public class TestObserverWithRouter {
|
|
|
assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver);
|
|
|
break;
|
|
|
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
+ case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
|
|
|
// An msync is sent to each active namenode for each read.
|
|
|
// Total msyncs will be (numListings * num_of_nameservices).
|
|
|
assertEquals("Msyncs sent to the active namenodes",
|
|
@@ -809,8 +832,10 @@ public class TestObserverWithRouter {
|
|
|
@ParameterizedTest
|
|
|
public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception {
|
|
|
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
|
|
|
- clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
|
|
|
- routerContext.getRouter().getRpcServerAddress().getHostName(), 3000);
|
|
|
+ String configKeySuffix =
|
|
|
+ configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ?
|
|
|
+ ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName();
|
|
|
+ clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000);
|
|
|
fileSystem = routerContext.getFileSystem(clientConfiguration);
|
|
|
|
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
@@ -843,6 +868,7 @@ public class TestObserverWithRouter {
|
|
|
assertEquals("Reads sent to observer", 2, rpcCountForObserver);
|
|
|
break;
|
|
|
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
+ case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
|
|
|
// 4 msyncs expected. 2 for the first read, and 2 for the third read
|
|
|
// after the auto-msync period has elapsed during the sleep.
|
|
|
assertEquals("Msyncs sent to the active namenodes",
|
|
@@ -859,8 +885,10 @@ public class TestObserverWithRouter {
|
|
|
@ParameterizedTest
|
|
|
public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception {
|
|
|
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
|
|
|
- clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
|
|
|
- routerContext.getRouter().getRpcServerAddress().getHostName(), 3000);
|
|
|
+ String configKeySuffix =
|
|
|
+ configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ?
|
|
|
+ ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName();
|
|
|
+ clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000);
|
|
|
fileSystem = routerContext.getFileSystem(clientConfiguration);
|
|
|
|
|
|
List<? extends FederationNamenodeContext> namenodes = routerContext
|
|
@@ -893,6 +921,7 @@ public class TestObserverWithRouter {
|
|
|
assertEquals("Read sent to observer", 1, rpcCountForObserver);
|
|
|
break;
|
|
|
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
|
|
|
+ case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
|
|
|
// 5 calls to the active namenodes expected. 4 msync and a mkdir.
|
|
|
// Each of the 2 reads results in an msync to 2 nameservices.
|
|
|
// The mkdir also goes to the active.
|