瀏覽代碼

HDFS-16876: Changes cleanup of shared RouterStateIdContext to be driven by namenodeResolver data. (#5282)

Simbarashe Dzinamarira 2 年之前
父節點
當前提交
b56d483258

+ 0 - 15
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

@@ -77,10 +77,6 @@ public class ConnectionManager {
    * Global federated namespace context for router.
    */
   private final RouterStateIdContext routerStateIdContext;
-  /**
-   * Map from connection pool ID to namespace.
-   */
-  private final Map<ConnectionPoolId, String> connectionPoolToNamespaceMap;
   /** Max size of queue for creating new connections. */
   private final int creatorQueueMaxSize;
 
@@ -105,7 +101,6 @@ public class ConnectionManager {
   public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
     this.conf = config;
     this.routerStateIdContext = routerStateIdContext;
-    this.connectionPoolToNamespaceMap = new HashMap<>();
     // Configure minimum, maximum and active connection pools
     this.maxSize = this.conf.getInt(
         RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
@@ -172,10 +167,6 @@ public class ConnectionManager {
         pool.close();
       }
       this.pools.clear();
-      for (String nsID: connectionPoolToNamespaceMap.values()) {
-        routerStateIdContext.removeNamespaceStateId(nsID);
-      }
-      connectionPoolToNamespaceMap.clear();
     } finally {
       writeLock.unlock();
     }
@@ -224,7 +215,6 @@ public class ConnectionManager {
               this.minActiveRatio, protocol,
               new PoolAlignmentContext(this.routerStateIdContext, nsId));
           this.pools.put(connectionId, pool);
-          this.connectionPoolToNamespaceMap.put(connectionId, nsId);
         }
       } finally {
         writeLock.unlock();
@@ -451,11 +441,6 @@ public class ConnectionManager {
         try {
           for (ConnectionPoolId poolId : toRemove) {
             pools.remove(poolId);
-            String nsID = connectionPoolToNamespaceMap.get(poolId);
-            connectionPoolToNamespaceMap.remove(poolId);
-            if (!connectionPoolToNamespaceMap.values().contains(nsID)) {
-              routerStateIdContext.removeNamespaceStateId(nsID);
-            }
           }
         } finally {
           writeLock.unlock();

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -53,6 +53,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -413,9 +414,38 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
                 .forEach(this.dnCache::refresh),
             0,
             dnCacheExpire, TimeUnit.MILLISECONDS);
+
+    Executors
+        .newSingleThreadScheduledExecutor()
+        .scheduleWithFixedDelay(this::clearStaleNamespacesInRouterStateIdContext,
+            0,
+            conf.getLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+                RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT),
+            TimeUnit.MILLISECONDS);
+
     initRouterFedRename();
   }
 
+  /**
+   * Clear expired namespace in the shared RouterStateIdContext.
+   */
+  private void clearStaleNamespacesInRouterStateIdContext() {
+    try {
+      final Set<String> resolvedNamespaces = namenodeResolver.getNamespaces()
+          .stream()
+          .map(FederationNamespaceInfo::getNameserviceId)
+          .collect(Collectors.toSet());
+
+      routerStateIdContext.getNamespaces().forEach(namespace -> {
+        if (!resolvedNamespaces.contains(namespace)) {
+          routerStateIdContext.removeNamespaceStateId(namespace);
+        }
+      });
+    } catch (IOException e) {
+      LOG.warn("Could not fetch current list of namespaces.", e);
+    }
+  }
+
   /**
    * Init the router federation rename environment. Each router has its own
    * journal path.

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashSet;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.LongAccumulator;
@@ -92,6 +93,10 @@ class RouterStateIdContext implements AlignmentContext {
     return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
   }
 
+  public List<String> getNamespaces() {
+    return Collections.list(namespaceIdMap.keys());
+  }
+
   public void removeNamespaceStateId(String nsId) {
     namespaceIdMap.remove(nsId);
   }

+ 77 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

@@ -25,7 +25,6 @@ import static org.junit.Assert.assertThrows;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -43,6 +42,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
@@ -359,7 +359,7 @@ public class TestObserverWithRouter {
       }
       sb.append(suffix);
     }
-    routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
     routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
     routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
@@ -564,4 +564,79 @@ public class TestObserverWithRouter {
     LongAccumulator namespaceStateId  = routerStateIdContext.getNamespaceStateId("ns0");
     assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get());
   }
+
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testSharedStateInRouterStateIdContext() throws Exception {
+    Path rootPath = new Path("/");
+    long cleanupPeriodMs = 1000;
+
+    Configuration conf = new Configuration(false);
+    conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs);
+    conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10);
+    startUpCluster(1, conf);
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
+        .getRouterStateIdContext();
+
+    // First read goes to active and creates connection pool for this user to active
+    fileSystem.listStatus(rootPath);
+    // Second read goes to observer and creates connection pool for this user to observer
+    fileSystem.listStatus(rootPath);
+    // Get object storing state of the namespace in the shared RouterStateIdContext
+    LongAccumulator namespaceStateId1  = routerStateIdContext.getNamespaceStateId("ns0");
+
+    // Wait for connection pools to expire and be cleaned up.
+    Thread.sleep(cleanupPeriodMs * 2);
+
+    // Third read goes to observer.
+    // New connection pool to observer is created since existing one expired.
+    fileSystem.listStatus(rootPath);
+    fileSystem.close();
+    // Get object storing state of the namespace in the shared RouterStateIdContext
+    LongAccumulator namespaceStateId2  = routerStateIdContext.getNamespaceStateId("ns0");
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    long rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+
+    // First list status goes to active
+    assertEquals("One call should be sent to active", 1, rpcCountForActive);
+    // Last two listStatuses  go to observer.
+    assertEquals("Two calls should be sent to observer", 2, rpcCountForObserver);
+
+    Assertions.assertSame(namespaceStateId1, namespaceStateId2,
+        "The same object should be used in the shared RouterStateIdContext");
+  }
+
+
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testRouterStateIdContextCleanup() throws Exception {
+    Path rootPath = new Path("/");
+    long recordExpiry = TimeUnit.SECONDS.toMillis(1);
+
+    Configuration confOverride = new Configuration(false);
+    confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry);
+
+    startUpCluster(1, confOverride);
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
+        .getRouterStateIdContext();
+
+    fileSystem.listStatus(rootPath);
+    List<String> namespace1 = routerStateIdContext.getNamespaces();
+    fileSystem.close();
+
+    MockResolver mockResolver = (MockResolver) routerContext.getRouter().getNamenodeResolver();
+    mockResolver.cleanRegistrations();
+    mockResolver.setDisableRegistration(true);
+    Thread.sleep(recordExpiry * 2);
+
+    List<String> namespace2 = routerStateIdContext.getNamespaces();
+    assertEquals(1, namespace1.size());
+    assertEquals("ns0", namespace1.get(0));
+    assertTrue(namespace2.isEmpty());
+  }
 }