|
@@ -19,32 +19,38 @@ package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
|
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
|
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
|
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
|
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
|
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
|
|
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
|
|
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
|
|
|
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
|
|
|
|
+import static org.junit.Assert.*;
|
|
import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.verify;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.util.List;
|
|
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
-import java.util.Set;
|
|
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Random;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Callable;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
|
|
|
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
|
|
|
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
|
|
|
|
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
-import org.apache.hadoop.security.GroupMappingServiceProvider;
|
|
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
|
|
|
|
|
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.BeforeClass;
|
|
import org.junit.BeforeClass;
|
|
@@ -54,53 +60,137 @@ import org.mockito.Mockito;
|
|
/**
|
|
/**
|
|
* Basic tests of router federation rename. Rename across namespaces.
|
|
* Basic tests of router federation rename. Rename across namespaces.
|
|
*/
|
|
*/
|
|
-public class TestRouterFederationRename extends TestRouterFederationRenameBase {
|
|
|
|
|
|
+public class TestRouterFederationRename {
|
|
|
|
|
|
- public static class MockGroupsMapping implements
|
|
|
|
- GroupMappingServiceProvider {
|
|
|
|
|
|
+ private static final int NUM_SUBCLUSTERS = 2;
|
|
|
|
+ private static final int NUM_DNS = 6;
|
|
|
|
|
|
- @Override
|
|
|
|
- public List<String> getGroups(String user) {
|
|
|
|
- return Arrays.asList(user+"_group");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void cacheGroupsRefresh() {
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void cacheGroupsAdd(List<String> groups) {
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public Set<String> getGroupsSet(String user) {
|
|
|
|
- return ImmutableSet.of(user+"_group");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ /** Federated HDFS cluster. */
|
|
|
|
+ private static MiniRouterDFSCluster cluster;
|
|
|
|
|
|
|
|
+ /** Random Router for this federated cluster. */
|
|
private RouterContext router;
|
|
private RouterContext router;
|
|
|
|
+
|
|
|
|
+ /** Random nameservice in the federated cluster. */
|
|
|
|
+ private String ns;
|
|
|
|
+ /** Filesystem interface to the Router. */
|
|
private FileSystem routerFS;
|
|
private FileSystem routerFS;
|
|
- private MiniRouterDFSCluster cluster;
|
|
|
|
|
|
+ /** Filesystem interface to the Namenode. */
|
|
|
|
+ private FileSystem nnFS;
|
|
|
|
+ /** File in the Namenode. */
|
|
|
|
+ private String nnFile;
|
|
|
|
|
|
@BeforeClass
|
|
@BeforeClass
|
|
- public static void before() throws Exception {
|
|
|
|
- globalSetUp();
|
|
|
|
|
|
+ public static void globalSetUp() throws Exception {
|
|
|
|
+ Configuration namenodeConf = new Configuration();
|
|
|
|
+ namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
|
|
|
|
+ true);
|
|
|
|
+ cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
|
|
|
|
+ cluster.setNumDatanodesPerNameservice(NUM_DNS);
|
|
|
|
+ cluster.addNamenodeOverrides(namenodeConf);
|
|
|
|
+ cluster.setIndependentDNs();
|
|
|
|
+
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
|
|
|
|
+ cluster.addNamenodeOverrides(conf);
|
|
|
|
+ // Start NNs and DNs and wait until ready.
|
|
|
|
+ cluster.startCluster();
|
|
|
|
+
|
|
|
|
+ // Start routers, enable router federation rename.
|
|
|
|
+ String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
|
|
|
|
+ .getClientNamenodeAddress() + "/journal";
|
|
|
|
+ Configuration routerConf = new RouterConfigBuilder()
|
|
|
|
+ .metrics()
|
|
|
|
+ .rpc()
|
|
|
|
+ .routerRenameOption()
|
|
|
|
+ .set(SCHEDULER_JOURNAL_URI, journal)
|
|
|
|
+ .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
|
|
|
|
+ .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
|
|
|
|
+ .build();
|
|
|
|
+ // We decrease the DN cache times to make the test faster.
|
|
|
|
+ routerConf.setTimeDuration(
|
|
|
|
+ RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
|
|
|
|
+ cluster.addRouterOverrides(routerConf);
|
|
|
|
+ cluster.startRouters();
|
|
|
|
+
|
|
|
|
+ // Register and verify all NNs with all routers
|
|
|
|
+ cluster.registerNamenodes();
|
|
|
|
+ cluster.waitNamenodeRegistration();
|
|
|
|
+
|
|
|
|
+ // We decrease the DN heartbeat expire interval to make them dead faster
|
|
|
|
+ cluster.getCluster().getNamesystem(0).getBlockManager()
|
|
|
|
+ .getDatanodeManager().setHeartbeatInterval(1);
|
|
|
|
+ cluster.getCluster().getNamesystem(1).getBlockManager()
|
|
|
|
+ .getDatanodeManager().setHeartbeatInterval(1);
|
|
|
|
+ cluster.getCluster().getNamesystem(0).getBlockManager()
|
|
|
|
+ .getDatanodeManager().setHeartbeatExpireInterval(3000);
|
|
|
|
+ cluster.getCluster().getNamesystem(1).getBlockManager()
|
|
|
|
+ .getDatanodeManager().setHeartbeatExpireInterval(3000);
|
|
|
|
+ DistCpProcedure.enableForTest();
|
|
}
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
@AfterClass
|
|
- public static void after() {
|
|
|
|
- tearDown();
|
|
|
|
|
|
+ public static void tearDown() {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ DistCpProcedure.disableForTest();
|
|
}
|
|
}
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void testSetup() throws Exception {
|
|
public void testSetup() throws Exception {
|
|
- setup();
|
|
|
|
- router = getRouterContext();
|
|
|
|
- routerFS = getRouterFileSystem();
|
|
|
|
- cluster = getCluster();
|
|
|
|
|
|
+
|
|
|
|
+ // Create mock locations
|
|
|
|
+ cluster.installMockLocations();
|
|
|
|
+
|
|
|
|
+ // Delete all files via the NNs and verify
|
|
|
|
+ cluster.deleteAllFiles();
|
|
|
|
+
|
|
|
|
+ // Create test fixtures on NN
|
|
|
|
+ cluster.createTestDirectoriesNamenode();
|
|
|
|
+
|
|
|
|
+ // Wait to ensure NN has fully created its test directories
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+
|
|
|
|
+ // Random router for this test
|
|
|
|
+ RouterContext rndRouter = cluster.getRandomRouter();
|
|
|
|
+ this.setRouter(rndRouter);
|
|
|
|
+
|
|
|
|
+ // Create a mount that points to 2 dirs in the same ns:
|
|
|
|
+ // /same
|
|
|
|
+ // ns0 -> /
|
|
|
|
+ // ns0 -> /target-ns0
|
|
|
|
+ for (RouterContext rc : cluster.getRouters()) {
|
|
|
|
+ Router r = rc.getRouter();
|
|
|
|
+ MockResolver resolver = (MockResolver) r.getSubclusterResolver();
|
|
|
|
+ List<String> nss = cluster.getNameservices();
|
|
|
|
+ String ns0 = nss.get(0);
|
|
|
|
+ resolver.addLocation("/same", ns0, "/");
|
|
|
|
+ resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Pick a namenode for this test
|
|
|
|
+ String ns0 = cluster.getNameservices().get(0);
|
|
|
|
+ this.setNs(ns0);
|
|
|
|
+ this.setNamenode(cluster.getNamenode(ns0, null));
|
|
|
|
+
|
|
|
|
+ // Create a test file on the NN
|
|
|
|
+ Random rnd = new Random();
|
|
|
|
+ String randomFile = "testfile-" + rnd.nextInt();
|
|
|
|
+ this.nnFile =
|
|
|
|
+ cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
|
|
|
|
+
|
|
|
|
+ createFile(nnFS, nnFile, 32);
|
|
|
|
+ verifyFileExists(nnFS, nnFile);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void createDir(FileSystem fs, String dir) throws IOException {
|
|
|
|
+ fs.mkdirs(new Path(dir));
|
|
|
|
+ String file = dir + "/file";
|
|
|
|
+ createFile(fs, file, 32);
|
|
|
|
+ verifyFileExists(fs, dir);
|
|
|
|
+ verifyFileExists(fs, file);
|
|
}
|
|
}
|
|
|
|
|
|
- private void testRenameDir(RouterContext testRouter, String path,
|
|
|
|
|
|
+ protected void testRenameDir(RouterContext testRouter, String path,
|
|
String renamedPath, boolean exceptionExpected, Callable<Object> call)
|
|
String renamedPath, boolean exceptionExpected, Callable<Object> call)
|
|
throws IOException {
|
|
throws IOException {
|
|
createDir(testRouter.getFileSystem(), path);
|
|
createDir(testRouter.getFileSystem(), path);
|
|
@@ -129,6 +219,23 @@ public class TestRouterFederationRename extends TestRouterFederationRenameBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected void setRouter(RouterContext r) throws IOException {
|
|
|
|
+ this.router = r;
|
|
|
|
+ this.routerFS = r.getFileSystem();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void setNs(String nameservice) {
|
|
|
|
+ this.ns = nameservice;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void setNamenode(NamenodeContext nn) throws IOException {
|
|
|
|
+ this.nnFS = nn.getFileSystem();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected FileSystem getRouterFileSystem() {
|
|
|
|
+ return this.routerFS;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testSuccessfulRbfRename() throws Exception {
|
|
public void testSuccessfulRbfRename() throws Exception {
|
|
List<String> nss = cluster.getNameservices();
|
|
List<String> nss = cluster.getNameservices();
|