Browse Source

HDFS-17761. [ARR] RouterNetworkTopologyServlet adapts to async router rpc. (#7533). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Reviewed-by: Shilun Fan <slfan1989@apache.org>
hfutatzhanghb 1 month ago
parent
commit
d980aef7be

+ 15 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNetworkTopologyServlet.java

@@ -31,6 +31,8 @@ import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 
 
+import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+
 /**
 /**
  * A servlet to print out the network topology from router.
  * A servlet to print out the network topology from router.
  */
  */
@@ -49,9 +51,19 @@ public class RouterNetworkTopologyServlet extends NetworkTopologyServlet {
     }
     }
 
 
     Router router = RouterHttpServer.getRouterFromContext(context);
     Router router = RouterHttpServer.getRouterFromContext(context);
-    DatanodeInfo[] datanodeReport =
-        router.getRpcServer().getDatanodeReport(
-            HdfsConstants.DatanodeReportType.ALL);
+    DatanodeInfo[] datanodeReport = null;
+    if (router.getRpcServer().isAsync()) {
+      router.getRpcServer().getDatanodeReportAsync(
+          HdfsConstants.DatanodeReportType.ALL, true, 0);
+      try {
+        datanodeReport = syncReturn(DatanodeInfo[].class);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    } else {
+      datanodeReport = router.getRpcServer().getDatanodeReport(
+          HdfsConstants.DatanodeReportType.ALL);
+    }
     List<Node> datanodeInfos = Arrays.asList(datanodeReport);
     List<Node> datanodeInfos = Arrays.asList(datanodeReport);
 
 
     try (PrintStream out = new PrintStream(
     try (PrintStream out = new PrintStream(

+ 139 - 30
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNetworkTopologyServlet.java

@@ -24,10 +24,17 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Method;
 import java.net.HttpURLConnection;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URL;
 import java.util.Iterator;
 import java.util.Iterator;
@@ -36,21 +43,31 @@ import java.util.Map;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterNoDatanodes;
+import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterWithDatanodes;
+import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.setUp;
 
 
+@SuppressWarnings("checkstyle:VisibilityModifier")
 public class TestRouterNetworkTopologyServlet {
 public class TestRouterNetworkTopologyServlet {
 
 
-  private static StateStoreDFSCluster clusterWithDatanodes;
-  private static StateStoreDFSCluster clusterNoDatanodes;
+  public static StateStoreDFSCluster clusterWithDatanodes;
+  public static StateStoreDFSCluster clusterNoDatanodes;
+
+  public static final String ASYNC_MODE = "ASYNC";
+  public static final String SYNC_MODE = "SYNC";
 
 
-  @BeforeAll
-  public static void setUp() throws Exception {
-    // Builder configuration
+  public static void setUp(String rpcMode) throws Exception {
+    // Builder configuration.
     Configuration routerConf =
     Configuration routerConf =
         new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
         new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
     routerConf.set(DFS_ROUTER_HTTP_ENABLE, "true");
     routerConf.set(DFS_ROUTER_HTTP_ENABLE, "true");
+    // Use async router rpc.
+    if (rpcMode.equals("ASYNC")) {
+      routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
+    }
     Configuration hdfsConf = new Configuration(false);
     Configuration hdfsConf = new Configuration(false);
 
 
-    // Build and start a federated cluster
+    // Build and start a federated cluster.
     clusterWithDatanodes = new StateStoreDFSCluster(false, 2,
     clusterWithDatanodes = new StateStoreDFSCluster(false, 2,
         MultipleDestinationMountTableResolver.class);
         MultipleDestinationMountTableResolver.class);
     clusterWithDatanodes.addNamenodeOverrides(hdfsConf);
     clusterWithDatanodes.addNamenodeOverrides(hdfsConf);
@@ -67,7 +84,7 @@ public class TestRouterNetworkTopologyServlet {
     clusterWithDatanodes.waitClusterUp();
     clusterWithDatanodes.waitClusterUp();
     clusterWithDatanodes.waitActiveNamespaces();
     clusterWithDatanodes.waitActiveNamespaces();
 
 
-    // Build and start a federated cluster
+    // Build and start a federated cluster.
     clusterNoDatanodes = new StateStoreDFSCluster(false, 2,
     clusterNoDatanodes = new StateStoreDFSCluster(false, 2,
         MultipleDestinationMountTableResolver.class);
         MultipleDestinationMountTableResolver.class);
     clusterNoDatanodes.addNamenodeOverrides(hdfsConf);
     clusterNoDatanodes.addNamenodeOverrides(hdfsConf);
@@ -80,13 +97,74 @@ public class TestRouterNetworkTopologyServlet {
     clusterNoDatanodes.waitActiveNamespaces();
     clusterNoDatanodes.waitActiveNamespaces();
   }
   }
 
 
-  @Test
+  @Nested
+  @ExtendWith(RouterServerHelper.class)
+  class TestWithAsyncRouterRpc {
+
+    @ParameterizedTest
+    @ValueSource(strings = {ASYNC_MODE})
+    public void testPrintTopologyTextFormatAsync(String rpcMode) throws Exception {
+      testPrintTopologyTextFormat();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {ASYNC_MODE})
+    public void testPrintTopologyJsonFormatAsync(String rpcMode) throws Exception {
+      testPrintTopologyJsonFormat();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {ASYNC_MODE})
+    public void testPrintTopologyNoDatanodesTextFormatAsync(String rpcMode)
+        throws Exception {
+      testPrintTopologyNoDatanodesTextFormat();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {ASYNC_MODE})
+    public void testPrintTopologyNoDatanodesJsonFormatAsync(String rpcMode)
+        throws Exception {
+      testPrintTopologyNoDatanodesJsonFormat();
+    }
+  }
+
+  @Nested
+  @ExtendWith(RouterServerHelper.class)
+  class TestWithSyncRouterRpc {
+
+    @ParameterizedTest
+    @ValueSource(strings = {SYNC_MODE})
+    public void testPrintTopologyTextFormatSync(String rpcMode) throws Exception {
+      testPrintTopologyTextFormat();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {SYNC_MODE})
+    public void testPrintTopologyJsonFormatSync(String rpcMode) throws Exception {
+      testPrintTopologyJsonFormat();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {SYNC_MODE})
+    public void testPrintTopologyNoDatanodesTextFormatSync(String rpcMode)
+        throws Exception {
+      testPrintTopologyNoDatanodesTextFormat();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {SYNC_MODE})
+    public void testPrintTopologyNoDatanodesJsonFormatSync(String rpcMode)
+        throws Exception {
+      testPrintTopologyNoDatanodesJsonFormat();
+    }
+  }
+
   public void testPrintTopologyTextFormat() throws Exception {
   public void testPrintTopologyTextFormat() throws Exception {
-    // get http Address
+    // Get http Address.
     String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter()
     String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter()
         .getHttpServerAddress().toString();
         .getHttpServerAddress().toString();
 
 
-    // send http request
+    // Send http request.
     URL url = new URL("http:/" + httpAddress + "/topology");
     URL url = new URL("http:/" + httpAddress + "/topology");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setReadTimeout(20000);
     conn.setReadTimeout(20000);
@@ -101,7 +179,7 @@ public class TestRouterNetworkTopologyServlet {
     sb.append("\n-- Network Topology -- ");
     sb.append("\n-- Network Topology -- ");
     String topology = sb.toString();
     String topology = sb.toString();
 
 
-    // assert rack info
+    // Assert rack info.
     assertTrue(topology.contains("/ns0/rack1"));
     assertTrue(topology.contains("/ns0/rack1"));
     assertTrue(topology.contains("/ns0/rack2"));
     assertTrue(topology.contains("/ns0/rack2"));
     assertTrue(topology.contains("/ns0/rack3"));
     assertTrue(topology.contains("/ns0/rack3"));
@@ -109,18 +187,17 @@ public class TestRouterNetworkTopologyServlet {
     assertTrue(topology.contains("/ns1/rack5"));
     assertTrue(topology.contains("/ns1/rack5"));
     assertTrue(topology.contains("/ns1/rack6"));
     assertTrue(topology.contains("/ns1/rack6"));
 
 
-    // assert node number
+    // Assert node number.
     assertEquals(18,
     assertEquals(18,
         topology.split("127.0.0.1").length - 1);
         topology.split("127.0.0.1").length - 1);
   }
   }
 
 
-  @Test
   public void testPrintTopologyJsonFormat() throws Exception {
   public void testPrintTopologyJsonFormat() throws Exception {
-    // get http Address
+    // Get http Address.
     String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter()
     String httpAddress = clusterWithDatanodes.getRandomRouter().getRouter()
-            .getHttpServerAddress().toString();
+        .getHttpServerAddress().toString();
 
 
-    // send http request
+    // Send http request.
     URL url = new URL("http:/" + httpAddress + "/topology");
     URL url = new URL("http:/" + httpAddress + "/topology");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setReadTimeout(20000);
     conn.setReadTimeout(20000);
@@ -132,13 +209,13 @@ public class TestRouterNetworkTopologyServlet {
     IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
     IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
     String topology = out.toString();
     String topology = out.toString();
 
 
-    // parse json
+    // Parse json.
     JsonNode racks = new ObjectMapper().readTree(topology);
     JsonNode racks = new ObjectMapper().readTree(topology);
 
 
-    // assert rack number
+    // Assert rack number.
     assertEquals(6, racks.size());
     assertEquals(6, racks.size());
 
 
-    // assert rack info
+    // Assert rack info.
     assertTrue(topology.contains("/ns0/rack1"));
     assertTrue(topology.contains("/ns0/rack1"));
     assertTrue(topology.contains("/ns0/rack2"));
     assertTrue(topology.contains("/ns0/rack2"));
     assertTrue(topology.contains("/ns0/rack3"));
     assertTrue(topology.contains("/ns0/rack3"));
@@ -146,7 +223,7 @@ public class TestRouterNetworkTopologyServlet {
     assertTrue(topology.contains("/ns1/rack5"));
     assertTrue(topology.contains("/ns1/rack5"));
     assertTrue(topology.contains("/ns1/rack6"));
     assertTrue(topology.contains("/ns1/rack6"));
 
 
-    // assert node number
+    // Assert node number.
     Iterator<JsonNode> elements = racks.elements();
     Iterator<JsonNode> elements = racks.elements();
     int dataNodesCount = 0;
     int dataNodesCount = 0;
     while(elements.hasNext()){
     while(elements.hasNext()){
@@ -159,13 +236,12 @@ public class TestRouterNetworkTopologyServlet {
     assertEquals(18, dataNodesCount);
     assertEquals(18, dataNodesCount);
   }
   }
 
 
-  @Test
   public void testPrintTopologyNoDatanodesTextFormat() throws Exception {
   public void testPrintTopologyNoDatanodesTextFormat() throws Exception {
-    // get http Address
+    // Get http Address.
     String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter()
     String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter()
         .getHttpServerAddress().toString();
         .getHttpServerAddress().toString();
 
 
-    // send http request
+    // Send http request.
     URL url = new URL("http:/" + httpAddress + "/topology");
     URL url = new URL("http:/" + httpAddress + "/topology");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setReadTimeout(20000);
     conn.setReadTimeout(20000);
@@ -179,17 +255,16 @@ public class TestRouterNetworkTopologyServlet {
     sb.append("\n-- Network Topology -- ");
     sb.append("\n-- Network Topology -- ");
     String topology = sb.toString();
     String topology = sb.toString();
 
 
-    // assert node number
+    // Assert node number.
     assertTrue(topology.contains("No DataNodes"));
     assertTrue(topology.contains("No DataNodes"));
   }
   }
 
 
-  @Test
   public void testPrintTopologyNoDatanodesJsonFormat() throws Exception {
   public void testPrintTopologyNoDatanodesJsonFormat() throws Exception {
-    // get http Address
+    // Get http Address.
     String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter()
     String httpAddress = clusterNoDatanodes.getRandomRouter().getRouter()
         .getHttpServerAddress().toString();
         .getHttpServerAddress().toString();
 
 
-    // send http request
+    // Send http request.
     URL url = new URL("http:/" + httpAddress + "/topology");
     URL url = new URL("http:/" + httpAddress + "/topology");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setReadTimeout(20000);
     conn.setReadTimeout(20000);
@@ -204,7 +279,41 @@ public class TestRouterNetworkTopologyServlet {
     sb.append("\n-- Network Topology -- ");
     sb.append("\n-- Network Topology -- ");
     String topology = sb.toString();
     String topology = sb.toString();
 
 
-    // assert node number
+    // Assert node number.
     assertTrue(topology.contains("No DataNodes"));
     assertTrue(topology.contains("No DataNodes"));
   }
   }
 }
 }
+
+class RouterServerHelper implements BeforeEachCallback, AfterAllCallback {
+
+  private static final ThreadLocal<RouterServerHelper> TEST_ROUTER_SERVER_TL =
+      new InheritableThreadLocal<RouterServerHelper>();
+
+  @Override
+  public void beforeEach(ExtensionContext context) throws Exception {
+    Method testMethod = context.getRequiredTestMethod();
+    ValueSource enumAnnotation = testMethod.getAnnotation(ValueSource.class);
+    if (enumAnnotation != null) {
+      String[] strings = enumAnnotation.strings();
+      for (String rpcMode : strings) {
+        if (TEST_ROUTER_SERVER_TL.get() == null) {
+          setUp(rpcMode);
+        }
+      }
+    }
+    TEST_ROUTER_SERVER_TL.set(RouterServerHelper.this);
+  }
+
+  @Override
+  public void afterAll(ExtensionContext context) {
+    if (clusterWithDatanodes != null) {
+      clusterWithDatanodes.shutdown();
+      clusterWithDatanodes = null;
+    }
+    if (clusterNoDatanodes != null) {
+      clusterNoDatanodes.shutdown();
+      clusterNoDatanodes = null;
+    }
+    TEST_ROUTER_SERVER_TL.remove();
+  }
+}