Procházet zdrojové kódy

HDFS-17659. [ARR]Router Quota supports asynchronous rpc. (#7157). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
hfutatzhanghb před 5 měsíci
rodič
revize
d8768cff46

+ 87 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class AsyncQuota extends Quota {
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  private final Router router;
+
+  public AsyncQuota(Router router, RouterRpcServer server) {
+    super(router, server);
+    this.router = router;
+    this.rpcServer = server;
+    this.rpcClient =  this.rpcServer.getRPCClient();
+  }
+
+  /**
+   * Async get aggregated quota usage for the federation path.
+   * @param path Federation path.
+   * @return Aggregated quota.
+   * @throws IOException If the quota system is disabled.
+   */
+  public QuotaUsage getQuotaUsage(String path) throws IOException {
+    getEachQuotaUsage(path);
+
+    asyncApply(o -> {
+      Map<RemoteLocation, QuotaUsage> results = (Map<RemoteLocation, QuotaUsage>) o;
+      try {
+        return aggregateQuota(path, results);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+    });
+    return asyncReturn(QuotaUsage.class);
+  }
+
+  /**
+   * Get quota usage for the federation path.
+   * @param path Federation path.
+   * @return quota usage for each remote location.
+   * @throws IOException If the quota system is disabled.
+   */
+  Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+    if (!router.isQuotaEnabled()) {
+      throw new IOException("The quota system is disabled in Router.");
+    }
+
+    final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
+    RemoteMethod method = new RemoteMethod("getQuotaUsage",
+        new Class<?>[] {String.class}, new RemoteParam());
+    rpcClient.invokeConcurrent(
+        quotaLocs, method, true, false, QuotaUsage.class);
+    return asyncReturn(Map.class);
+  }
+}

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java

@@ -213,9 +213,9 @@ public class Quota {
    * method will do some additional filtering.
    * @param path Federation path.
    * @return List of valid quota remote locations.
-   * @throws IOException
+   * @throws IOException If the location for this path cannot be determined.
    */
-  private List<RemoteLocation> getValidQuotaLocations(String path)
+  protected List<RemoteLocation> getValidQuotaLocations(String path)
       throws IOException {
     final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
 
@@ -359,7 +359,7 @@ public class Quota {
    * federation path.
    * @param path Federation path.
    * @return List of quota remote locations.
-   * @throws IOException
+   * @throws IOException If the location for this path cannot be determined.
    */
   private List<RemoteLocation> getQuotaRemoteLocations(String path)
       throws IOException {

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java

@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+
 /**
  * Service to periodically update the {@link RouterQuotaUsage}
  * cached information in the {@link Router}.
@@ -99,6 +101,9 @@ public class RouterQuotaUpdateService extends PeriodicService {
         // This is because mount table does not have mtime.
         // For other mount entry get current quota usage
         HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
+        if (rpcServer.isAsync()) {
+          ret = syncReturn(HdfsFileStatus.class);
+        }
         if (ret == null || ret.getModificationTime() == 0) {
           long[] zeroConsume = new long[StorageType.values().length];
           currentQuotaUsage =
@@ -113,6 +118,9 @@ public class RouterQuotaUpdateService extends PeriodicService {
             Quota quotaModule = this.rpcServer.getQuotaModule();
             Map<RemoteLocation, QuotaUsage> usageMap =
                 quotaModule.getEachQuotaUsage(src);
+            if (this.rpcServer.isAsync()) {
+              usageMap = (Map<RemoteLocation, QuotaUsage>)syncReturn(Map.class);
+            }
             currentQuotaUsage = quotaModule.aggregateQuota(src, usageMap);
             remoteQuotaUsage.putAll(usageMap);
           } catch (IOException ioe) {
@@ -136,6 +144,8 @@ public class RouterQuotaUpdateService extends PeriodicService {
       }
     } catch (IOException e) {
       LOG.error("Quota cache updated error.", e);
+    } catch (Exception e) {
+      LOG.error(e.toString());
     }
   }
 

+ 166 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java

@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.ipc.CallerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterAsyncQuota {
+  private static Configuration routerConf;
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+  private static String ns0;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFs;
+  private RouterRpcServer routerRpcServer;
+  private AsyncQuota asyncQuota;
+
+  private final String testfilePath = "/testdir/testAsyncQuota.file";
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    cluster = new MiniRouterDFSCluster(true, 1, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+    cluster.setNumDatanodesPerNameservice(3);
+    cluster.setRacks(
+        new String[] {"/rack1", "/rack2", "/rack3"});
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+    // Start routers with only an RPC service
+    routerConf = new RouterConfigBuilder()
+        .rpc()
+        .quota(true)
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    routerConf.setBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY, true);
+    cluster.addRouterOverrides(routerConf);
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    cluster.waitActiveNamespaces();
+    ns0 = cluster.getNameservices().get(0);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    router = cluster.getRandomRouter();
+    routerFs = router.getFileSystem();
+    routerRpcServer = router.getRouterRpcServer();
+    routerRpcServer.initAsyncThreadPool();
+    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+        routerRpcServer.getRPCMonitor(),
+        routerRpcServer.getRouterStateIdContext());
+    RouterRpcServer spy = Mockito.spy(routerRpcServer);
+    Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
+    asyncQuota = new AsyncQuota(router.getRouter(), spy);
+
+    // Create mock locations
+    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
+    resolver.addLocation("/", ns0, "/");
+    FsPermission permission = new FsPermission("705");
+    routerFs.mkdirs(new Path("/testdir"), permission);
+    FSDataOutputStream fsDataOutputStream = routerFs.create(
+        new Path(testfilePath), true);
+    fsDataOutputStream.write(new byte[1024]);
+    fsDataOutputStream.close();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clear client context
+    CallerContext.setCurrent(null);
+    boolean delete = routerFs.delete(new Path("/testdir"));
+    assertTrue(delete);
+    if (routerFs != null) {
+      routerFs.close();
+    }
+  }
+
+  @Test
+  public void testRouterAsyncGetQuotaUsage() throws Exception {
+    asyncQuota.getQuotaUsage("/testdir");
+    QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
+    // 3-replication.
+    Assert.assertEquals(3 * 1024, quotaUsage.getSpaceConsumed());
+    // We have one directory and one file.
+    Assert.assertEquals(2, quotaUsage.getFileAndDirectoryCount());
+  }
+
+  @Test
+  public void testRouterAsyncSetQuotaUsage() throws Exception {
+    asyncQuota.setQuota("/testdir", Long.MAX_VALUE, 8096, StorageType.DISK, false);
+    syncReturn(void.class);
+    asyncQuota.getQuotaUsage("/testdir");
+    QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
+    Assert.assertEquals(8096, quotaUsage.getTypeQuota(StorageType.DISK));
+  }
+}