|
@@ -0,0 +1,394 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
+
|
|
|
+import static org.junit.Assert.assertArrayEquals;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.AfterClass;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Tests router rpc with multiple destination mount table resolver.
|
|
|
+ */
|
|
|
+public class TestRouterRPCMultipleDestinationMountTableResolver {
|
|
|
+ private static StateStoreDFSCluster cluster;
|
|
|
+ private static RouterContext routerContext;
|
|
|
+ private static MountTableResolver resolver;
|
|
|
+ private static DistributedFileSystem nnFs0;
|
|
|
+ private static DistributedFileSystem nnFs1;
|
|
|
+ private static DistributedFileSystem routerFs;
|
|
|
+ private static RouterRpcServer rpcServer;
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void setUp() throws Exception {
|
|
|
+
|
|
|
+ // Build and start a federated cluster
|
|
|
+ cluster = new StateStoreDFSCluster(false, 2,
|
|
|
+ MultipleDestinationMountTableResolver.class);
|
|
|
+ Configuration routerConf =
|
|
|
+ new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
|
|
|
+
|
|
|
+ Configuration hdfsConf = new Configuration(false);
|
|
|
+
|
|
|
+ cluster.addRouterOverrides(routerConf);
|
|
|
+ cluster.addNamenodeOverrides(hdfsConf);
|
|
|
+ cluster.startCluster();
|
|
|
+ cluster.startRouters();
|
|
|
+ cluster.waitClusterUp();
|
|
|
+
|
|
|
+ routerContext = cluster.getRandomRouter();
|
|
|
+ resolver =
|
|
|
+ (MountTableResolver) routerContext.getRouter().getSubclusterResolver();
|
|
|
+ nnFs0 = (DistributedFileSystem) cluster
|
|
|
+ .getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
|
|
|
+ nnFs1 = (DistributedFileSystem) cluster
|
|
|
+ .getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
|
|
|
+ routerFs = (DistributedFileSystem) routerContext.getFileSystem();
|
|
|
+ rpcServer =routerContext.getRouter().getRpcServer();
|
|
|
+ }
|
|
|
+
|
|
|
+ @AfterClass
|
|
|
+ public static void tearDown() {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.stopRouter(routerContext);
|
|
|
+ cluster.shutdown();
|
|
|
+ cluster = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * SetUp the mount entry , directories and file to verify invocation.
|
|
|
+ * @param order The order that the mount entry needs to follow.
|
|
|
+ * @throws Exception On account of any exception encountered during setting up
|
|
|
+ * the environment.
|
|
|
+ */
|
|
|
+ public void setupOrderMountPath(DestinationOrder order) throws Exception {
|
|
|
+ Map<String, String> destMap = new HashMap<>();
|
|
|
+ destMap.put("ns0", "/tmp");
|
|
|
+ destMap.put("ns1", "/tmp");
|
|
|
+ nnFs0.mkdirs(new Path("/tmp"));
|
|
|
+ nnFs1.mkdirs(new Path("/tmp"));
|
|
|
+ MountTable addEntry = MountTable.newInstance("/mount", destMap);
|
|
|
+ addEntry.setDestOrder(order);
|
|
|
+ assertTrue(addMountTable(addEntry));
|
|
|
+ routerFs.mkdirs(new Path("/mount/dir/dir"));
|
|
|
+ DFSTestUtil.createFile(routerFs, new Path("/mount/dir/file"), 100L, (short) 1,
|
|
|
+ 1024L);
|
|
|
+ DFSTestUtil.createFile(routerFs, new Path("/mount/file"), 100L, (short) 1,
|
|
|
+ 1024L);
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void resetTestEnvironment() throws IOException {
|
|
|
+ RouterClient client = routerContext.getAdminClient();
|
|
|
+ MountTableManager mountTableManager = client.getMountTableManager();
|
|
|
+ RemoveMountTableEntryRequest req2 =
|
|
|
+ RemoveMountTableEntryRequest.newInstance("/mount");
|
|
|
+ mountTableManager.removeMountTableEntry(req2);
|
|
|
+ nnFs0.delete(new Path("/tmp"), true);
|
|
|
+ nnFs1.delete(new Path("/tmp"), true);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvocationSpaceOrder() throws Exception {
|
|
|
+ setupOrderMountPath(DestinationOrder.SPACE);
|
|
|
+ boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
|
|
+ assertTrue(isDirAll);
|
|
|
+ testInvocation(isDirAll);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvocationHashAllOrder() throws Exception {
|
|
|
+ setupOrderMountPath(DestinationOrder.HASH_ALL);
|
|
|
+ boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
|
|
+ assertTrue(isDirAll);
|
|
|
+ testInvocation(isDirAll);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvocationRandomOrder() throws Exception {
|
|
|
+ setupOrderMountPath(DestinationOrder.RANDOM);
|
|
|
+ boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
|
|
+ assertTrue(isDirAll);
|
|
|
+ testInvocation(isDirAll);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvocationHashOrder() throws Exception {
|
|
|
+ setupOrderMountPath(DestinationOrder.HASH);
|
|
|
+ boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
|
|
+ assertFalse(isDirAll);
|
|
|
+ testInvocation(isDirAll);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvocationLocalOrder() throws Exception {
|
|
|
+ setupOrderMountPath(DestinationOrder.LOCAL);
|
|
|
+ boolean isDirAll = rpcServer.isPathAll("/mount/dir");
|
|
|
+ assertFalse(isDirAll);
|
|
|
+ testInvocation(isDirAll);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verifies the invocation of API's at directory level , file level and at
|
|
|
+ * mount level.
|
|
|
+ * @param dirAll if true assumes that the mount entry creates directory on all
|
|
|
+ * locations.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void testInvocation(boolean dirAll) throws IOException {
|
|
|
+ // Verify invocation on nested directory and file.
|
|
|
+ Path mountDir = new Path("/mount/dir/dir");
|
|
|
+ Path nameSpaceFile = new Path("/tmp/dir/file");
|
|
|
+ Path mountFile = new Path("/mount/dir/file");
|
|
|
+ Path mountEntry = new Path("/mount");
|
|
|
+ Path mountDest = new Path("/tmp");
|
|
|
+ Path nameSpaceDir = new Path("/tmp/dir/dir");
|
|
|
+ final String name = "user.a1";
|
|
|
+ final byte[] value = {0x31, 0x32, 0x33};
|
|
|
+ testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
|
|
|
+ mountFile, nameSpaceDir, name, value);
|
|
|
+
|
|
|
+ // Verify invocation on non nested directory and file.
|
|
|
+ mountDir = new Path("/mount/dir");
|
|
|
+ nameSpaceFile = new Path("/tmp/file");
|
|
|
+ mountFile = new Path("/mount/file");
|
|
|
+ nameSpaceDir = new Path("/tmp/dir");
|
|
|
+ testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
|
|
|
+ mountFile, nameSpaceDir, name, value);
|
|
|
+
|
|
|
+ // Check invocation directly for a mount point.
|
|
|
+ // Verify owner and permissions.
|
|
|
+ routerFs.setOwner(mountEntry, "testuser", "testgroup");
|
|
|
+ routerFs.setPermission(mountEntry,
|
|
|
+ FsPermission.createImmutable((short) 777));
|
|
|
+ assertEquals("testuser", routerFs.getFileStatus(mountEntry).getOwner());
|
|
|
+ assertEquals("testuser", nnFs0.getFileStatus(mountDest).getOwner());
|
|
|
+ assertEquals("testuser", nnFs1.getFileStatus(mountDest).getOwner());
|
|
|
+ assertEquals((short) 777,
|
|
|
+ routerFs.getFileStatus(mountEntry).getPermission().toShort());
|
|
|
+ assertEquals((short) 777,
|
|
|
+ nnFs0.getFileStatus(mountDest).getPermission().toShort());
|
|
|
+ assertEquals((short) 777,
|
|
|
+ nnFs1.getFileStatus(mountDest).getPermission().toShort());
|
|
|
+
|
|
|
+ //Verify storage policy.
|
|
|
+ routerFs.setStoragePolicy(mountEntry, "COLD");
|
|
|
+ assertEquals("COLD", routerFs.getStoragePolicy(mountEntry).getName());
|
|
|
+ assertEquals("COLD", nnFs0.getStoragePolicy(mountDest).getName());
|
|
|
+ assertEquals("COLD", nnFs1.getStoragePolicy(mountDest).getName());
|
|
|
+ routerFs.unsetStoragePolicy(mountEntry);
|
|
|
+ assertEquals("HOT", routerFs.getStoragePolicy(mountDest).getName());
|
|
|
+ assertEquals("HOT", nnFs0.getStoragePolicy(mountDest).getName());
|
|
|
+ assertEquals("HOT", nnFs1.getStoragePolicy(mountDest).getName());
|
|
|
+
|
|
|
+ //Verify erasure coding policy.
|
|
|
+ routerFs.setErasureCodingPolicy(mountEntry, "RS-6-3-1024k");
|
|
|
+ assertEquals("RS-6-3-1024k",
|
|
|
+ routerFs.getErasureCodingPolicy(mountEntry).getName());
|
|
|
+ assertEquals("RS-6-3-1024k",
|
|
|
+ nnFs0.getErasureCodingPolicy(mountDest).getName());
|
|
|
+ assertEquals("RS-6-3-1024k",
|
|
|
+ nnFs1.getErasureCodingPolicy(mountDest).getName());
|
|
|
+ routerFs.unsetErasureCodingPolicy(mountEntry);
|
|
|
+ assertNull(routerFs.getErasureCodingPolicy(mountDest));
|
|
|
+ assertNull(nnFs0.getErasureCodingPolicy(mountDest));
|
|
|
+ assertNull(nnFs1.getErasureCodingPolicy(mountDest));
|
|
|
+
|
|
|
+ //Verify xAttr.
|
|
|
+ routerFs.setXAttr(mountEntry, name, value);
|
|
|
+ assertArrayEquals(value, routerFs.getXAttr(mountEntry, name));
|
|
|
+ assertArrayEquals(value, nnFs0.getXAttr(mountDest, name));
|
|
|
+ assertArrayEquals(value, nnFs1.getXAttr(mountDest, name));
|
|
|
+ routerFs.removeXAttr(mountEntry, name);
|
|
|
+ assertEquals(0, routerFs.getXAttrs(mountEntry).size());
|
|
|
+ assertEquals(0, nnFs0.getXAttrs(mountDest).size());
|
|
|
+ assertEquals(0, nnFs1.getXAttrs(mountDest).size());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * SetUp to verify invocations on directories and file.
|
|
|
+ */
|
|
|
+ private void testDirectoryAndFileLevelInvocation(boolean dirAll,
|
|
|
+ Path mountDir, Path nameSpaceFile, Path mountFile, Path nameSpaceDir,
|
|
|
+ final String name, final byte[] value) throws IOException {
|
|
|
+ // Check invocation for a directory.
|
|
|
+ routerFs.setOwner(mountDir, "testuser", "testgroup");
|
|
|
+ routerFs.setPermission(mountDir, FsPermission.createImmutable((short) 777));
|
|
|
+ routerFs.setStoragePolicy(mountDir, "COLD");
|
|
|
+ routerFs.setErasureCodingPolicy(mountDir, "RS-6-3-1024k");
|
|
|
+ routerFs.setXAttr(mountDir, name, value);
|
|
|
+
|
|
|
+ // Verify the directory level invocations were checked in case of mounts not
|
|
|
+ // creating directories in all subclusters.
|
|
|
+ boolean checkedDir1 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
|
|
|
+ nnFs0, name, value);
|
|
|
+ boolean checkedDir2 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
|
|
|
+ nnFs1, name, value);
|
|
|
+ assertTrue("The file didn't existed in either of the subclusters.",
|
|
|
+ checkedDir1 || checkedDir2);
|
|
|
+ routerFs.unsetStoragePolicy(mountDir);
|
|
|
+ routerFs.removeXAttr(mountDir, name);
|
|
|
+ routerFs.unsetErasureCodingPolicy(mountDir);
|
|
|
+
|
|
|
+ checkedDir1 =
|
|
|
+ verifyDirectoryLevelUnsetInvocations(dirAll, nnFs0, nameSpaceDir);
|
|
|
+ checkedDir2 =
|
|
|
+ verifyDirectoryLevelUnsetInvocations(dirAll, nnFs1, nameSpaceDir);
|
|
|
+ assertTrue("The file didn't existed in either of the subclusters.",
|
|
|
+ checkedDir1 || checkedDir2);
|
|
|
+
|
|
|
+ // Check invocation for a file.
|
|
|
+ routerFs.setOwner(mountFile, "testuser", "testgroup");
|
|
|
+ routerFs.setPermission(mountFile,
|
|
|
+ FsPermission.createImmutable((short) 777));
|
|
|
+ routerFs.setStoragePolicy(mountFile, "COLD");
|
|
|
+ routerFs.setReplication(mountFile, (short) 2);
|
|
|
+ routerFs.setXAttr(mountFile, name, value);
|
|
|
+ verifyFileLevelInvocations(nameSpaceFile, nnFs0, mountFile, name, value);
|
|
|
+ verifyFileLevelInvocations(nameSpaceFile, nnFs1, mountFile, name, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify invocations of API's unseting values at the directory level.
|
|
|
+ * @param dirAll true if the mount entry order creates directory in all
|
|
|
+ * locations.
|
|
|
+ * @param nameSpaceDir path of the directory in the namespace.
|
|
|
+ * @param nnFs file system where the directory level invocation needs to be
|
|
|
+ * tested.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private boolean verifyDirectoryLevelUnsetInvocations(boolean dirAll,
|
|
|
+ DistributedFileSystem nnFs, Path nameSpaceDir) throws IOException {
|
|
|
+ boolean checked = false;
|
|
|
+ if (dirAll || nnFs.exists(nameSpaceDir)) {
|
|
|
+ checked = true;
|
|
|
+ assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceDir).getName());
|
|
|
+ assertNull(nnFs.getErasureCodingPolicy(nameSpaceDir));
|
|
|
+ assertEquals(0, nnFs.getXAttrs(nameSpaceDir).size());
|
|
|
+ }
|
|
|
+ return checked;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify file level invocations.
|
|
|
+ * @param nameSpaceFile path of the file in the namespace.
|
|
|
+ * @param nnFs the file system where the file invocation needs to checked.
|
|
|
+ * @param mountFile path of the file w.r.t. mount table.
|
|
|
+ * @param name name of Xattr.
|
|
|
+ * @param value value of Xattr.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void verifyFileLevelInvocations(Path nameSpaceFile,
|
|
|
+ DistributedFileSystem nnFs, Path mountFile, final String name,
|
|
|
+ final byte[] value) throws IOException {
|
|
|
+ if (nnFs.exists(nameSpaceFile)) {
|
|
|
+ assertEquals("testuser", nnFs.getFileStatus(nameSpaceFile).getOwner());
|
|
|
+ assertEquals((short) 777,
|
|
|
+ nnFs.getFileStatus(nameSpaceFile).getPermission().toShort());
|
|
|
+ assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceFile).getName());
|
|
|
+ assertEquals((short) 2,
|
|
|
+ nnFs.getFileStatus(nameSpaceFile).getReplication());
|
|
|
+ assertArrayEquals(value, nnFs.getXAttr(nameSpaceFile, name));
|
|
|
+
|
|
|
+ routerFs.unsetStoragePolicy(mountFile);
|
|
|
+ routerFs.removeXAttr(mountFile, name);
|
|
|
+ assertEquals(0, nnFs.getXAttrs(nameSpaceFile).size());
|
|
|
+
|
|
|
+ assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceFile).getName());
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify invocations at the directory level.
|
|
|
+ * @param dirAll true if the mount entry order creates directory in all
|
|
|
+ * locations.
|
|
|
+ * @param nameSpaceDir path of the directory in the namespace.
|
|
|
+ * @param nnFs file system where the directory level invocation needs to be
|
|
|
+ * tested.
|
|
|
+ * @param name name for the Xattr.
|
|
|
+ * @param value value for the Xattr.
|
|
|
+ * @return true, if directory existed and successful verification of
|
|
|
+ * invocations.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private boolean verifyDirectoryLevelInvocations(boolean dirAll,
|
|
|
+ Path nameSpaceDir, DistributedFileSystem nnFs, final String name,
|
|
|
+ final byte[] value) throws IOException {
|
|
|
+ boolean checked = false;
|
|
|
+ if (dirAll || nnFs.exists(nameSpaceDir)) {
|
|
|
+ checked = true;
|
|
|
+ assertEquals("testuser", nnFs.getFileStatus(nameSpaceDir).getOwner());
|
|
|
+ assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceDir).getName());
|
|
|
+ assertEquals("RS-6-3-1024k",
|
|
|
+ nnFs.getErasureCodingPolicy(nameSpaceDir).getName());
|
|
|
+ assertArrayEquals(value, nnFs.getXAttr(nameSpaceDir, name));
|
|
|
+ assertEquals((short) 777,
|
|
|
+ nnFs.getFileStatus(nameSpaceDir).getPermission().toShort());
|
|
|
+ }
|
|
|
+ return checked;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a mount table entry to the mount table through the admin API.
|
|
|
+ * @param entry Mount table entry to add.
|
|
|
+ * @return If it was successfully added.
|
|
|
+ * @throws IOException + * Problems adding entries.
|
|
|
+ */
|
|
|
+ private boolean addMountTable(final MountTable entry) throws IOException {
|
|
|
+ RouterClient client = routerContext.getAdminClient();
|
|
|
+ MountTableManager mountTableManager = client.getMountTableManager();
|
|
|
+ AddMountTableEntryRequest addRequest =
|
|
|
+ AddMountTableEntryRequest.newInstance(entry);
|
|
|
+ AddMountTableEntryResponse addResponse =
|
|
|
+ mountTableManager.addMountTableEntry(addRequest);
|
|
|
+
|
|
|
+ // Reload the Router cache
|
|
|
+ resolver.loadCache(true);
|
|
|
+
|
|
|
+ return addResponse.getStatus();
|
|
|
+ }
|
|
|
+}
|