浏览代码

HDFS-17744. [ARR] getEnclosingRoot RPC adapts to async rpc. (#7445). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 1 月之前
父节点
当前提交
7ab88fe262

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
 import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
 import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
 import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
@@ -104,6 +106,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
   private final boolean allowPartialList;
   /** Time out when getting the mount statistics. */
   private long mountStatusTimeOut;
+  /** Default nameservice enabled. */
+  private final boolean defaultNameServiceEnabled;
   /** Identifier for the super user. */
   private String superUser;
   /** Identifier for the super group. */
@@ -126,6 +130,9 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
     this.mountStatusTimeOut = getMountStatusTimeOut();
     this.superUser = getSuperUser();
     this.superGroup = getSuperGroup();
+    this.defaultNameServiceEnabled = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
   }
 
   @Override
@@ -1086,4 +1093,35 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
 
     return asyncReturn(boolean.class);
   }
+
+  @Override
+  public Path getEnclosingRoot(String src) throws IOException {
+    final Path[] mountPath = new Path[1];
+    if (defaultNameServiceEnabled) {
+      mountPath[0] = new Path("/");
+    }
+
+    if (subclusterResolver instanceof MountTableResolver) {
+      MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
+      if (mountTable.getMountPoint(src) != null) {
+        mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath());
+      }
+    }
+
+    if (mountPath[0] == null) {
+      throw new IOException(String.format("No mount point for %s", src));
+    }
+
+    getEZForPath(src);
+    asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> {
+      if (zone == null) {
+        return mountPath[0];
+      } else {
+        Path zonePath = new Path(zone.getPath());
+        return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0];
+      }
+    });
+    return asyncReturn(Path.class);
+  }
+
 }

+ 158 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test a router end-to-end including the MountTable using async rpc.
+ */
+public class TestRouterAsyncMountTable {
+  public static final Logger LOG = LoggerFactory.getLogger(TestRouterAsyncMountTable.class);
+
+  private static StateStoreDFSCluster cluster;
+  private static MiniRouterDFSCluster.NamenodeContext nnContext0;
+  private static MiniRouterDFSCluster.NamenodeContext nnContext1;
+  private static MiniRouterDFSCluster.RouterContext routerContext;
+  private static MountTableResolver mountTable;
+  private static FileSystem routerFs;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    // Build and start a federated cluster.
+    cluster = new StateStoreDFSCluster(false, 2);
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
+    cluster.addRouterOverrides(conf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    // Get the end points.
+    nnContext0 = cluster.getNamenode("ns0", null);
+    nnContext1 = cluster.getNamenode("ns1", null);
+    routerContext = cluster.getRandomRouter();
+    routerFs = routerContext.getFileSystem();
+    Router router = routerContext.getRouter();
+    mountTable = (MountTableResolver) router.getSubclusterResolver();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @After
+  public void clearMountTable() throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    GetMountTableEntriesRequest req1 =
+        GetMountTableEntriesRequest.newInstance("/");
+    GetMountTableEntriesResponse response =
+        mountTableManager.getMountTableEntries(req1);
+    for (MountTable entry : response.getEntries()) {
+      RemoveMountTableEntryRequest req2 =
+          RemoveMountTableEntryRequest.newInstance(entry.getSourcePath());
+      mountTableManager.removeMountTableEntry(req2);
+    }
+    mountTable.setDefaultNSEnable(true);
+  }
+
+  /**
+   * Add a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to add.
+   * @return If it was succesfully added.
+   * @throws IOException Problems adding entries.
+   */
+  private boolean addMountTable(final MountTable entry) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(entry);
+    AddMountTableEntryResponse addResponse =
+        mountTableManager.addMountTableEntry(addRequest);
+
+    // Reload the Router cache.
+    mountTable.loadCache(true);
+
+    return addResponse.getStatus();
+  }
+
+  @Test
+  public void testGetEnclosingRoot() throws Exception {
+
+    // Add a read only entry.
+    MountTable readOnlyEntry = MountTable.newInstance(
+        "/readonly", Collections.singletonMap("ns0", "/testdir"));
+    readOnlyEntry.setReadOnly(true);
+    assertTrue(addMountTable(readOnlyEntry));
+    assertEquals(routerFs.getEnclosingRoot(new Path("/readonly")), new Path("/readonly"));
+
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/"));
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular")),
+        routerFs.getEnclosingRoot(routerFs.getEnclosingRoot(new Path("/regular"))));
+
+    // Add a regular entry.
+    MountTable regularEntry = MountTable.newInstance(
+        "/regular", Collections.singletonMap("ns0", "/testdir"));
+    assertTrue(addMountTable(regularEntry));
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/regular"));
+
+    // Path does not need to exist.
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular/pathDNE")), new Path("/regular"));
+  }
+}