Browse Source

HDFS-17721. RBF: Allow routers to declare IP for admin addr (#7342) Contributed by Felix Nguyen.

Reviewed-by: Haiyang Hu <huhaiyang@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
Felix Nguyen 3 months ago
parent
commit
741bdd636b

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java

@@ -75,7 +75,7 @@ public class MountTableRefresherService extends AbstractService {
 
   /**
    * All router admin clients cached. So no need to create the client again and
-   * again. Router admin address(host:port) is used as key to cache RouterClient
+   * again. Router admin address(host:port or ip:port) is used as key to cache RouterClient
    * objects.
    */
   private LoadingCache<String, RouterClient> routerClientsCache;
@@ -102,8 +102,13 @@ public class MountTableRefresherService extends AbstractService {
     this.mountTableStore = getMountTableStore();
     // Attach this service to mount table store.
     this.mountTableStore.setRefreshService(this);
-    this.localAdminAddress =
-        StateStoreUtils.getHostPortString(router.getAdminServerAddress());
+    if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
+      this.localAdminAddress = StateStoreUtils.getIpPortString(router.getAdminServerAddress());
+    } else {
+      this.localAdminAddress = StateStoreUtils.getHostPortString(router.getAdminServerAddress());
+    }
+    LOG.info("Initialized MountTableRefresherService with addr: {}", this.localAdminAddress);
     this.cacheUpdateTimeout = conf.getTimeDuration(
         RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
         RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
@@ -220,7 +225,7 @@ public class MountTableRefresherService extends AbstractService {
     List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
     for (RouterState routerState : cachedRecords) {
       String adminAddress = routerState.getAdminAddress();
-      if (adminAddress == null || adminAddress.length() == 0) {
+      if (adminAddress == null || adminAddress.isEmpty()) {
         // this router has not enabled router admin.
         continue;
       }
@@ -237,11 +242,13 @@ public class MountTableRefresherService extends AbstractService {
          * RouterClient
          */
         refreshThreads.add(getLocalRefresher(adminAddress));
+        LOG.debug("Added local refresher for {}", adminAddress);
       } else {
         try {
           RouterClient client = routerClientsCache.get(adminAddress);
           refreshThreads.add(new MountTableRefresherThread(
               client.getMountTableManager(), adminAddress));
+          LOG.debug("Added remote refresher for {}", adminAddress);
         } catch (ExecutionException execExcep) {
           // Can not connect, seems router is stopped now.
           LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
@@ -296,6 +303,7 @@ public class MountTableRefresherService extends AbstractService {
       if (mountTableRefreshThread.isSuccess()) {
         successCount++;
       } else {
+        LOG.debug("Failed to refresh {}", mountTableRefreshThread.getAdminAddress());
         failureCount++;
         // remove RouterClient from cache so that new client is created
         removeFromCache(mountTableRefreshThread.getAdminAddress());

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -288,6 +288,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       FEDERATION_ROUTER_PREFIX + "safemode.checkperiod";
   public static final long DFS_ROUTER_SAFEMODE_CHECKPERIOD_MS_DEFAULT =
       TimeUnit.SECONDS.toMillis(5);
+  public static final String DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "heartbeat.with.ip.enable";
+  public static final boolean DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT = false;
 
   // HDFS Router-based federation mount table entries
   /** Maximum number of cache entries to have. */

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java

@@ -88,9 +88,12 @@ public class RouterHeartbeatService extends PeriodicService {
             getStateStoreVersion(MountTableStore.class));
         record.setStateStoreVersion(stateStoreVersion);
         // if admin server not started then hostPort will be empty
-        String hostPort =
-            StateStoreUtils.getHostPortString(router.getAdminServerAddress());
-        record.setAdminAddress(hostPort);
+        if (router.getConfig().getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
+            RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
+          record.setAdminAddress(StateStoreUtils.getIpPortString(router.getAdminServerAddress()));
+        } else {
+          record.setAdminAddress(StateStoreUtils.getHostPortString(router.getAdminServerAddress()));
+        }
         RouterHeartbeatRequest request =
             RouterHeartbeatRequest.newInstance(record);
         RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java

@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.net.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,4 +137,19 @@ public final class StateStoreUtils {
     return hostName + ":" + address.getPort();
   }
 
+  /**
+   * Returns address in form of ip:port, empty string if address is null.
+   *
+   * @param address address
+   * @return host:port
+   */
+  public static String getIpPortString(InetSocketAddress address) {
+    if (null == address) {
+      return "";
+    }
+    address = NetUtils.getConnectAddress(address);
+    InetAddress inet = address.getAddress();
+    return inet.getHostAddress() + ":" + address.getPort();
+  }
+
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -974,4 +974,12 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.heartbeat.with.ip.enable</name>
+    <description>
+      Make router use IP instead of host when communicating with router state state store.
+    </description>
+    <value>false</value>
+  </property>
+
 </configuration>

+ 24 - 8
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java

@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -53,23 +55,32 @@ import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * This test class verifies that mount table cache is updated on all the routers
  * when MountTableRefreshService is enabled and there is a change in mount table
  * entries.
  */
+@RunWith(Parameterized.class)
 public class TestRouterMountTableCacheRefresh {
   private static TestingServer curatorTestingServer;
   private static MiniRouterDFSCluster cluster;
   private static RouterContext routerContext;
   private static MountTableManager mountTableManager;
 
-  @BeforeClass
-  public static void setUp() throws Exception {
+  @Parameterized.Parameters
+  public static Collection<Object> data() {
+    return Arrays.asList(new Object[] {true, false});
+  }
+
+  public TestRouterMountTableCacheRefresh(boolean useIpForHeartbeats) throws Exception {
+    // Initialize only once per parameter
+    if (curatorTestingServer != null) {
+      return;
+    }
     curatorTestingServer = new TestingServer();
     curatorTestingServer.start();
     final String connectString = curatorTestingServer.getConnectString();
@@ -82,6 +93,7 @@ public class TestRouterMountTableCacheRefresh {
         FileSubclusterResolver.class);
     conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
     conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, useIpForHeartbeats);
     cluster.addRouterOverrides(conf);
     cluster.startCluster();
     cluster.startRouters();
@@ -95,11 +107,15 @@ public class TestRouterMountTableCacheRefresh {
         numNameservices, 60000);
   }
 
-  @AfterClass
-  public static void destory() {
+  @Parameterized.AfterParam
+  public static void destroy() {
     try {
-      curatorTestingServer.close();
-      cluster.shutdown();
+      if (curatorTestingServer != null) {
+        curatorTestingServer.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     } catch (IOException e) {
       // do nothing
     }