Forráskód Böngészése

HDFS-17596. [ARR] RouterStoragePolicy supports asynchronous rpc. (#6988). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 5 hónapja
szülő
commit
ea3c4c84ba

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class RouterAsyncStoragePolicy extends RouterStoragePolicy {
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+
+  public RouterAsyncStoragePolicy(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient = this.rpcServer.getRPCClient();
+  }
+
+  @Override
+  public BlockStoragePolicy getStoragePolicy(String path)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+
+    List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(path, false, false);
+    RemoteMethod method = new RemoteMethod("getStoragePolicy",
+        new Class<?>[] {String.class},
+        new RemoteParam());
+    rpcClient.invokeSequential(locations, method);
+    return asyncReturn(BlockStoragePolicy.class);
+  }
+
+  @Override
+  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getStoragePolicies");
+    rpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class);
+    return asyncReturn(BlockStoragePolicy[].class);
+  }
+}

+ 163 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java

@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.ipc.CallerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterAsyncStoragePolicy {
+  private static Configuration routerConf;
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+  private static String ns0;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFs;
+  private RouterRpcServer routerRpcServer;
+  private RouterAsyncStoragePolicy asyncStoragePolicy;
+
+  private final String testfilePath = "/testdir/testAsyncStoragePolicy.file";
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    cluster = new MiniRouterDFSCluster(true, 1, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+    cluster.setNumDatanodesPerNameservice(3);
+
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+    // Start routers with only an RPC service
+    routerConf = new RouterConfigBuilder()
+        .rpc()
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    cluster.waitActiveNamespaces();
+    ns0 = cluster.getNameservices().get(0);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    router = cluster.getRandomRouter();
+    routerFs = router.getFileSystem();
+    routerRpcServer = router.getRouterRpcServer();
+    routerRpcServer.initAsyncThreadPool();
+    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+        routerRpcServer.getRPCMonitor(),
+        routerRpcServer.getRouterStateIdContext());
+    RouterRpcServer spy = Mockito.spy(routerRpcServer);
+    Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
+    asyncStoragePolicy = new RouterAsyncStoragePolicy(spy);
+
+    // Create mock locations
+    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
+    resolver.addLocation("/", ns0, "/");
+    FsPermission permission = new FsPermission("705");
+    routerFs.mkdirs(new Path("/testdir"), permission);
+    FSDataOutputStream fsDataOutputStream = routerFs.create(
+        new Path(testfilePath), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clear client context
+    CallerContext.setCurrent(null);
+    boolean delete = routerFs.delete(new Path("/testdir"));
+    assertTrue(delete);
+    if (routerFs != null) {
+      routerFs.close();
+    }
+  }
+
+  @Test
+  public void testRouterAsyncStoragePolicy() throws Exception {
+    BlockStoragePolicy[] storagePolicies = cluster.getNamenodes().get(0)
+        .getClient().getStoragePolicies();
+    asyncStoragePolicy.getStoragePolicies();
+    BlockStoragePolicy[] storagePoliciesAsync = syncReturn(BlockStoragePolicy[].class);
+    assertArrayEquals(storagePolicies, storagePoliciesAsync);
+
+    asyncStoragePolicy.getStoragePolicy(testfilePath);
+    BlockStoragePolicy blockStoragePolicy1 = syncReturn(BlockStoragePolicy.class);
+
+    asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD");
+    syncReturn(null);
+    asyncStoragePolicy.getStoragePolicy(testfilePath);
+    BlockStoragePolicy blockStoragePolicy2 = syncReturn(BlockStoragePolicy.class);
+    assertNotEquals(blockStoragePolicy1, blockStoragePolicy2);
+    assertEquals("COLD", blockStoragePolicy2.getName());
+  }
+}