|
@@ -0,0 +1,152 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.federation.router.async;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FsServerDefaults;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.router.TestRouterRPCMultipleDestinationMountTableResolver;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Tests router async rpc with multiple destination mount table resolver.
|
|
|
+ */
|
|
|
+public class TestRouterAsyncRPCMultipleDestinationMountTableResolver extends
|
|
|
+ TestRouterRPCMultipleDestinationMountTableResolver {
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void setUp() throws Exception {
|
|
|
+
|
|
|
+ // Build and start a federated cluster.
|
|
|
+ cluster = new StateStoreDFSCluster(false, 3,
|
|
|
+ MultipleDestinationMountTableResolver.class);
|
|
|
+ Configuration routerConf =
|
|
|
+ new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
|
|
|
+ routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
|
|
|
+
|
|
|
+ Configuration hdfsConf = new Configuration(false);
|
|
|
+ hdfsConf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
|
|
+
|
|
|
+ cluster.addRouterOverrides(routerConf);
|
|
|
+ cluster.addNamenodeOverrides(hdfsConf);
|
|
|
+ cluster.startCluster();
|
|
|
+ cluster.startRouters();
|
|
|
+ cluster.waitClusterUp();
|
|
|
+
|
|
|
+ routerContext = cluster.getRandomRouter();
|
|
|
+ resolver =
|
|
|
+ (MountTableResolver) routerContext.getRouter().getSubclusterResolver();
|
|
|
+ nnFs0 = (DistributedFileSystem) cluster
|
|
|
+ .getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
|
|
|
+ nnFs1 = (DistributedFileSystem) cluster
|
|
|
+ .getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
|
|
|
+ nnFs2 = (DistributedFileSystem) cluster
|
|
|
+ .getNamenode(cluster.getNameservices().get(2), null).getFileSystem();
|
|
|
+ routerFs = (DistributedFileSystem) routerContext.getFileSystem();
|
|
|
+ rpcServer =routerContext.getRouter().getRpcServer();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Test
|
|
|
+ public void testInvokeAtAvailableNs() throws IOException {
|
|
|
+ // Create a mount point with multiple destinations.
|
|
|
+ Path path = new Path("/testInvokeAtAvailableNs");
|
|
|
+ Map<String, String> destMap = new HashMap<>();
|
|
|
+ destMap.put("ns0", "/testInvokeAtAvailableNs");
|
|
|
+ destMap.put("ns1", "/testInvokeAtAvailableNs");
|
|
|
+ nnFs0.mkdirs(path);
|
|
|
+ nnFs1.mkdirs(path);
|
|
|
+ MountTable addEntry =
|
|
|
+ MountTable.newInstance("/testInvokeAtAvailableNs", destMap);
|
|
|
+ addEntry.setQuota(new RouterQuotaUsage.Builder().build());
|
|
|
+ addEntry.setDestOrder(DestinationOrder.RANDOM);
|
|
|
+ addEntry.setFaultTolerant(true);
|
|
|
+ assertTrue(addMountTable(addEntry));
|
|
|
+
|
|
|
+ // Make one subcluster unavailable.
|
|
|
+ MiniDFSCluster dfsCluster = cluster.getCluster();
|
|
|
+ dfsCluster.shutdownNameNode(0);
|
|
|
+ dfsCluster.shutdownNameNode(1);
|
|
|
+ try {
|
|
|
+ // Verify that #invokeAtAvailableNs works by calling #getServerDefaults.
|
|
|
+ RemoteMethod method = new RemoteMethod("getServerDefaults");
|
|
|
+ FsServerDefaults serverDefaults = null;
|
|
|
+ rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class);
|
|
|
+ try {
|
|
|
+ serverDefaults = syncReturn(FsServerDefaults.class);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ assertNotNull(serverDefaults);
|
|
|
+ } finally {
|
|
|
+ dfsCluster.restartNameNode(0);
|
|
|
+ dfsCluster.restartNameNode(1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Test
|
|
|
+ public void testIsMultiDestDir() throws Exception {
|
|
|
+ RouterClientProtocol client =
|
|
|
+ routerContext.getRouter().getRpcServer().getClientProtocolModule();
|
|
|
+ setupOrderMountPath(DestinationOrder.HASH_ALL);
|
|
|
+ // Should be true only for directory and false for all other cases.
|
|
|
+ client.isMultiDestDirectory("/mount/dir");
|
|
|
+ assertTrue(syncReturn(boolean.class));
|
|
|
+ client.isMultiDestDirectory("/mount/nodir");
|
|
|
+ assertFalse(syncReturn(boolean.class));
|
|
|
+ client.isMultiDestDirectory("/mount/dir/file");
|
|
|
+ assertFalse(syncReturn(boolean.class));
|
|
|
+ routerFs.createSymlink(new Path("/mount/dir/file"),
|
|
|
+ new Path("/mount/dir/link"), true);
|
|
|
+ client.isMultiDestDirectory("/mount/dir/link");
|
|
|
+ assertFalse(syncReturn(boolean.class));
|
|
|
+ routerFs.createSymlink(new Path("/mount/dir/dir"),
|
|
|
+ new Path("/mount/dir/linkDir"), true);
|
|
|
+ client.isMultiDestDirectory("/mount/dir/linkDir");
|
|
|
+ assertFalse(syncReturn(boolean.class));
|
|
|
+ resetTestEnvironment();
|
|
|
+ // Test single directory destination. Should be false for the directory.
|
|
|
+ setupOrderMountPath(DestinationOrder.HASH);
|
|
|
+ client.isMultiDestDirectory("/mount/dir");
|
|
|
+ assertFalse(syncReturn(boolean.class));
|
|
|
+ }
|
|
|
+}
|