|
@@ -0,0 +1,260 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileContext;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.ha.ClientBaseWithFixes;
|
|
|
+import org.apache.hadoop.hdfs.DFSClient;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
|
+import org.apache.hadoop.minikdc.MiniKdc;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.AfterClass;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.UUID;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
|
|
|
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
|
|
+import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE;
|
|
|
+import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING;
|
|
|
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
|
|
|
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
|
|
|
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PRINCIPAL;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Basic tests of router federation rename. Rename across namespaces.
|
|
|
+ */
|
|
|
+public class TestRouterFederationRenameInKerberosEnv
|
|
|
+ extends ClientBaseWithFixes {
|
|
|
+
|
|
|
+ private static final int NUM_SUBCLUSTERS = 2;
|
|
|
+ private static final int NUM_DNS = 6;
|
|
|
+
|
|
|
+ private static String clientPrincipal = "client@EXAMPLE.COM";
|
|
|
+ private static String serverPrincipal = System.getenv().get("USERNAME")
|
|
|
+ + "/localhost@EXAMPLE.COM";
|
|
|
+ private static String keytab = new File(
|
|
|
+ System.getProperty("test.dir", "target"),
|
|
|
+ UUID.randomUUID().toString())
|
|
|
+ .getAbsolutePath();
|
|
|
+
|
|
|
+ private static Configuration baseConf = new Configuration(false);
|
|
|
+
|
|
|
+ private static MiniKdc kdc;
|
|
|
+
|
|
|
+ /** Federated HDFS cluster. */
|
|
|
+ private MiniRouterDFSCluster cluster;
|
|
|
+
|
|
|
+ /** Random Router for this federated cluster. */
|
|
|
+ private RouterContext router;
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void globalSetUp() throws Exception {
|
|
|
+ // init KDC
|
|
|
+ File workDir = new File(System.getProperty("test.dir", "target"));
|
|
|
+ kdc = new MiniKdc(MiniKdc.createConf(), workDir);
|
|
|
+ kdc.start();
|
|
|
+ kdc.createPrincipal(new File(keytab), clientPrincipal, serverPrincipal);
|
|
|
+
|
|
|
+
|
|
|
+ baseConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
|
|
|
+ true);
|
|
|
+
|
|
|
+ SecurityUtil.setAuthenticationMethod(KERBEROS, baseConf);
|
|
|
+ baseConf.set(RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY,
|
|
|
+ serverPrincipal);
|
|
|
+ baseConf.set(RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY, keytab);
|
|
|
+ baseConf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
|
|
|
+ serverPrincipal);
|
|
|
+ baseConf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
|
|
|
+ baseConf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
|
|
|
+ serverPrincipal);
|
|
|
+ baseConf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
|
|
|
+ baseConf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
|
|
+ true);
|
|
|
+
|
|
|
+ baseConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY,
|
|
|
+ DFS_DATA_TRANSFER_PROTECTION_DEFAULT);
|
|
|
+ baseConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
|
|
|
+
|
|
|
+ DistCpProcedure.enableForTest();
|
|
|
+ }
|
|
|
+
|
|
|
+ @AfterClass
|
|
|
+ public static void globalTearDown() {
|
|
|
+ kdc.stop();
|
|
|
+ DistCpProcedure.disableForTest();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ @Override
|
|
|
+ public void tearDown() throws Exception {
|
|
|
+ super.tearDown();
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ @Override
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ super.setUp();
|
|
|
+
|
|
|
+ cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
|
|
|
+ cluster.setNumDatanodesPerNameservice(NUM_DNS);
|
|
|
+ cluster.addNamenodeOverrides(baseConf);
|
|
|
+ cluster.setIndependentDNs();
|
|
|
+ // Start NNs and DNs and wait until ready.
|
|
|
+ cluster.startCluster();
|
|
|
+
|
|
|
+ // Start routers, enable router federation rename.
|
|
|
+ String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
|
|
|
+ .getClientNamenodeAddress() + "/journal";
|
|
|
+ Configuration routerConf = new RouterConfigBuilder()
|
|
|
+ .metrics()
|
|
|
+ .rpc()
|
|
|
+ .routerRenameOption()
|
|
|
+ .set(SCHEDULER_JOURNAL_URI, journal)
|
|
|
+ .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
|
|
|
+ .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
|
|
|
+ .set(ZK_DTSM_ZK_CONNECTION_STRING, hostPort)
|
|
|
+ .set(ZK_DTSM_ZK_AUTH_TYPE, "none")
|
|
|
+ .set(RM_PRINCIPAL, serverPrincipal)
|
|
|
+ .build();
|
|
|
+ // We decrease the DN cache times to make the test faster.
|
|
|
+ routerConf.setTimeDuration(
|
|
|
+ RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
|
|
|
+ cluster.addRouterOverrides(baseConf);
|
|
|
+ cluster.addRouterOverrides(routerConf);
|
|
|
+ cluster.startRouters();
|
|
|
+
|
|
|
+ // Register and verify all NNs with all routers
|
|
|
+ cluster.registerNamenodes();
|
|
|
+ cluster.waitNamenodeRegistration();
|
|
|
+
|
|
|
+ // We decrease the DN heartbeat expire interval to make them dead faster
|
|
|
+ cluster.getCluster().getNamesystem(0).getBlockManager()
|
|
|
+ .getDatanodeManager().setHeartbeatInterval(1);
|
|
|
+ cluster.getCluster().getNamesystem(1).getBlockManager()
|
|
|
+ .getDatanodeManager().setHeartbeatInterval(1);
|
|
|
+ cluster.getCluster().getNamesystem(0).getBlockManager()
|
|
|
+ .getDatanodeManager().setHeartbeatExpireInterval(3000);
|
|
|
+ cluster.getCluster().getNamesystem(1).getBlockManager()
|
|
|
+ .getDatanodeManager().setHeartbeatExpireInterval(3000);
|
|
|
+
|
|
|
+ // Create mock locations
|
|
|
+ cluster.installMockLocations();
|
|
|
+
|
|
|
+ // Create test fixtures on NN
|
|
|
+ cluster.createTestDirectoriesNamenode();
|
|
|
+
|
|
|
+ // Random router for this test
|
|
|
+ RouterContext rndRouter = cluster.getRandomRouter();
|
|
|
+ setRouter(rndRouter);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void createDir(FileSystem fs, String dir) throws IOException {
|
|
|
+ fs.mkdirs(new Path(dir));
|
|
|
+ String file = dir + "/file";
|
|
|
+ createFile(fs, file, 32);
|
|
|
+ verifyFileExists(fs, dir);
|
|
|
+ verifyFileExists(fs, file);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void testRenameDir(RouterContext testRouter, String path,
|
|
|
+ String renamedPath, boolean exceptionExpected, Callable<Object> call)
|
|
|
+ throws IOException {
|
|
|
+ createDir(testRouter.getFileSystem(), path);
|
|
|
+ // rename
|
|
|
+ boolean exceptionThrown = false;
|
|
|
+ try {
|
|
|
+ call.call();
|
|
|
+ assertFalse(verifyFileExists(testRouter.getFileSystem(), path));
|
|
|
+ assertTrue(verifyFileExists(testRouter.getFileSystem(),
|
|
|
+ renamedPath + "/file"));
|
|
|
+ } catch (Exception ex) {
|
|
|
+ exceptionThrown = true;
|
|
|
+ assertTrue(verifyFileExists(testRouter.getFileSystem(),
|
|
|
+ path + "/file"));
|
|
|
+ assertFalse(verifyFileExists(testRouter.getFileSystem(), renamedPath));
|
|
|
+ } finally {
|
|
|
+ FileContext fileContext = testRouter.getFileContext();
|
|
|
+ fileContext.delete(new Path(path), true);
|
|
|
+ fileContext.delete(new Path(renamedPath), true);
|
|
|
+ }
|
|
|
+ if (exceptionExpected) {
|
|
|
+ // Error was expected.
|
|
|
+ assertTrue(exceptionThrown);
|
|
|
+ } else {
|
|
|
+ // No error was expected.
|
|
|
+ assertFalse(exceptionThrown);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void setRouter(RouterContext r) throws IOException {
|
|
|
+ this.router = r;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testClientRename() throws IOException {
|
|
|
+ String ns0 = cluster.getNameservices().get(0);
|
|
|
+ String ns1 = cluster.getNameservices().get(1);
|
|
|
+ // Test successfully rename a dir to a destination that is in a different
|
|
|
+ // namespace.
|
|
|
+ String dir =
|
|
|
+ cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
|
|
|
+ String renamedDir =
|
|
|
+ cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
|
|
|
+ testRenameDir(router, dir, renamedDir, false, () -> {
|
|
|
+ UserGroupInformation ugi = UserGroupInformation
|
|
|
+ .loginUserFromKeytabAndReturnUGI(clientPrincipal, keytab);
|
|
|
+ ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {
|
|
|
+ DFSClient client = router.getClient();
|
|
|
+ ClientProtocol clientProtocol = client.getNamenode();
|
|
|
+ clientProtocol.rename(dir, renamedDir);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|