Bläddra i källkod

HDFS-17597. [ARR] RouterSnapshot supports asynchronous rpc. (#6994). Contributed by Wenqi Li.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Wenqi Li 6 månader sedan
förälder
incheckning
1152442a98

+ 203 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java

@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
+import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+/**
+ * Module that implements all the asynchronous RPC calls related to snapshots in
+ * {@link ClientProtocol} in the {@link RouterRpcServer}.
+ */
+public class RouterAsyncSnapshot extends RouterSnapshot {
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Find generic locations. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  public RouterAsyncSnapshot(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient = this.rpcServer.getRPCClient();
+    this.namenodeResolver = rpcServer.getNamenodeResolver();
+  }
+
+  @Override
+  public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(snapshotRoot, true, false);
+    RemoteMethod method = new RemoteMethod("createSnapshot",
+        new Class<?>[] {String.class, String.class}, new RemoteParam(),
+        snapshotName);
+
+    if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
+      rpcClient.invokeConcurrent(locations, method, String.class);
+      asyncApply((ApplyFunction<Map<RemoteLocation, String>, String>)
+          results -> {
+          Map.Entry<RemoteLocation, String> firstelement =
+              results.entrySet().iterator().next();
+          RemoteLocation loc = firstelement.getKey();
+          String result = firstelement.getValue();
+          return result.replaceFirst(loc.getDest(), loc.getSrc());
+        });
+    } else {
+      rpcClient.invokeSequential(method, locations, String.class, null);
+      asyncApply((ApplyFunction<RemoteResult<RemoteLocation, String>, String>)
+          response -> {
+          RemoteLocation loc = response.getLocation();
+          String invokedResult = response.getResult();
+          return invokedResult.replaceFirst(loc.getDest(), loc.getSrc());
+        });
+    }
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getSnapshottableDirListing");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(
+            nss, method, true, false, SnapshottableDirectoryStatus[].class);
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, SnapshottableDirectoryStatus[]>,
+        SnapshottableDirectoryStatus[]>)
+        ret -> RouterRpcServer.merge(ret, SnapshottableDirectoryStatus.class));
+    return asyncReturn(SnapshottableDirectoryStatus[].class);
+  }
+
+  @Override
+  public SnapshotStatus[] getSnapshotListing(String snapshotRoot) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(snapshotRoot, true, false);
+    RemoteMethod remoteMethod = new RemoteMethod("getSnapshotListing",
+        new Class<?>[]{String.class},
+        new RemoteParam());
+    if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
+      rpcClient.invokeConcurrent(
+          locations, remoteMethod, true, false, SnapshotStatus[].class);
+      asyncApply((ApplyFunction<Map<RemoteLocation, SnapshotStatus[]>, SnapshotStatus[]>)
+          ret -> {
+          SnapshotStatus[] response = ret.values().iterator().next();
+          String src = ret.keySet().iterator().next().getSrc();
+          String dst = ret.keySet().iterator().next().getDest();
+          for (SnapshotStatus s : response) {
+            String mountPath = DFSUtil.bytes2String(s.getParentFullPath()).
+                replaceFirst(src, dst);
+            s.setParentFullPath(DFSUtil.string2Bytes(mountPath));
+          }
+          return response;
+        });
+    } else {
+      rpcClient
+          .invokeSequential(remoteMethod, locations, SnapshotStatus[].class,
+              null);
+      asyncApply((ApplyFunction<RemoteResult<RemoteLocation, SnapshotStatus[]>, SnapshotStatus[]>)
+          invokedResponse -> {
+          RemoteLocation loc = invokedResponse.getLocation();
+          SnapshotStatus[] response = invokedResponse.getResult();
+          for (SnapshotStatus s : response) {
+            String mountPath = DFSUtil.bytes2String(s.getParentFullPath()).
+                replaceFirst(loc.getDest(), loc.getSrc());
+            s.setParentFullPath(DFSUtil.string2Bytes(mountPath));
+          }
+          return response;
+        });
+    }
+    return asyncReturn(SnapshotStatus[].class);
+  }
+
+  @Override
+  public SnapshotDiffReport getSnapshotDiffReport(
+      String snapshotRoot, String earlierSnapshotName,
+      String laterSnapshotName) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(snapshotRoot, true, false);
+    RemoteMethod remoteMethod = new RemoteMethod("getSnapshotDiffReport",
+        new Class<?>[] {String.class, String.class, String.class},
+        new RemoteParam(), earlierSnapshotName, laterSnapshotName);
+
+    if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
+      rpcClient.invokeConcurrent(
+          locations, remoteMethod, true, false, SnapshotDiffReport.class);
+      asyncApply((ApplyFunction<Map<RemoteLocation, SnapshotDiffReport>, SnapshotDiffReport>)
+          ret -> ret.values().iterator().next());
+      return asyncReturn(SnapshotDiffReport.class);
+    } else {
+      return rpcClient.invokeSequential(
+          locations, remoteMethod, SnapshotDiffReport.class, null);
+    }
+  }
+
+  @Override
+  public SnapshotDiffReportListing getSnapshotDiffReportListing(
+      String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
+      byte[] startPath, int index) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(snapshotRoot, true, false);
+    Class<?>[] params = new Class<?>[] {
+        String.class, String.class, String.class,
+        byte[].class, int.class};
+    RemoteMethod remoteMethod = new RemoteMethod(
+        "getSnapshotDiffReportListing", params,
+        new RemoteParam(), earlierSnapshotName, laterSnapshotName,
+        startPath, index);
+
+    if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
+      rpcClient.invokeConcurrent(locations, remoteMethod, false, false,
+              SnapshotDiffReportListing.class);
+      asyncApply((ApplyFunction<Map<RemoteLocation, SnapshotDiffReportListing>,
+          SnapshotDiffReportListing>) ret -> {
+          Collection<SnapshotDiffReportListing> listings = ret.values();
+          SnapshotDiffReportListing listing0 = listings.iterator().next();
+          return listing0;
+        });
+      return asyncReturn(SnapshotDiffReportListing.class);
+    } else {
+      return rpcClient.invokeSequential(
+          locations, remoteMethod, SnapshotDiffReportListing.class, null);
+    }
+  }
+}

+ 207 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java

@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterAsyncSnapshot {
+  private static Configuration routerConf;
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+  private static String ns0;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFs;
+  private RouterRpcServer routerRpcServer;
+  private RouterAsyncSnapshot asyncSnapshot;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    cluster = new MiniRouterDFSCluster(true, 1, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+    cluster.setNumDatanodesPerNameservice(3);
+
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+    // Start routers with only an RPC service
+    routerConf = new RouterConfigBuilder()
+        .rpc()
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    cluster.waitActiveNamespaces();
+    ns0 = cluster.getNameservices().get(0);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    router = cluster.getRandomRouter();
+    routerFs = router.getFileSystem();
+    routerRpcServer = router.getRouterRpcServer();
+    routerRpcServer.initAsyncThreadPool();
+    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+        routerRpcServer.getRPCMonitor(),
+        routerRpcServer.getRouterStateIdContext());
+    RouterRpcServer spy = Mockito.spy(routerRpcServer);
+    Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
+    asyncSnapshot = new RouterAsyncSnapshot(spy);
+
+    // Create mock locations
+    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
+    resolver.addLocation("/", ns0, "/");
+    FsPermission permission = new FsPermission("705");
+    routerFs.mkdirs(new Path("/testdir"), permission);
+    FSDataOutputStream fsDataOutputStream = routerFs.create(
+        new Path("/testdir/testSnapshot.file"), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clear client context
+    CallerContext.setCurrent(null);
+    boolean delete = routerFs.delete(new Path("/testdir"));
+    assertTrue(delete);
+    if (routerFs != null) {
+      routerFs.close();
+    }
+  }
+
+  @Test
+  public void testRouterAsyncSnapshot() throws Exception {
+    asyncSnapshot.allowSnapshot("/testdir");
+    syncReturn(null);
+    asyncSnapshot.createSnapshot("/testdir", "testdirSnapshot");
+    String snapshotName = syncReturn(String.class);
+    assertEquals("/testdir/.snapshot/testdirSnapshot", snapshotName);
+    asyncSnapshot.getSnapshottableDirListing();
+    SnapshottableDirectoryStatus[] snapshottableDirectoryStatuses =
+        syncReturn(SnapshottableDirectoryStatus[].class);
+    assertEquals(1, snapshottableDirectoryStatuses.length);
+    asyncSnapshot.getSnapshotListing("/testdir");
+    SnapshotStatus[] snapshotStatuses = syncReturn(SnapshotStatus[].class);
+    assertEquals(1, snapshotStatuses.length);
+
+    FSDataOutputStream fsDataOutputStream = routerFs.append(
+        new Path("/testdir/testSnapshot.file"), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+
+    asyncSnapshot.createSnapshot("/testdir", "testdirSnapshot1");
+    snapshotName = syncReturn(String.class);
+    assertEquals("/testdir/.snapshot/testdirSnapshot1", snapshotName);
+
+    asyncSnapshot.getSnapshotDiffReport("/testdir",
+        "testdirSnapshot", "testdirSnapshot1");
+    SnapshotDiffReport snapshotDiffReport = syncReturn(SnapshotDiffReport.class);
+    assertEquals(MODIFY, snapshotDiffReport.getDiffList().get(0).getType());
+
+    asyncSnapshot.getSnapshotDiffReportListing("/testdir",
+        "testdirSnapshot", "testdirSnapshot1", new byte[]{}, -1);
+    SnapshotDiffReportListing snapshotDiffReportListing =
+        syncReturn(SnapshotDiffReportListing.class);
+    assertEquals(1, snapshotDiffReportListing.getModifyList().size());
+
+    LambdaTestUtils.intercept(SnapshotException.class, () -> {
+      asyncSnapshot.disallowSnapshot("/testdir");
+      syncReturn(null);
+    });
+
+    asyncSnapshot.renameSnapshot("/testdir",
+        "testdirSnapshot1", "testdirSnapshot2");
+    syncReturn(null);
+
+    LambdaTestUtils.intercept(SnapshotException.class,
+        "Cannot delete snapshot testdirSnapshot1 from path /testdir",
+        () -> {
+        asyncSnapshot.deleteSnapshot("/testdir", "testdirSnapshot1");
+        syncReturn(null);
+      });
+
+    asyncSnapshot.deleteSnapshot("/testdir", "testdirSnapshot2");
+    syncReturn(null);
+
+    asyncSnapshot.deleteSnapshot("/testdir", "testdirSnapshot");
+    syncReturn(null);
+
+    asyncSnapshot.disallowSnapshot("/testdir");
+    syncReturn(null);
+  }
+}