Ver Fonte

HDFS-16826. [RBF SBN] ConnectionManager should advance the client stateId for each request (#5086)

ZanderXu há 2 anos atrás
pai
commit
e0974298ce

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

@@ -226,13 +226,14 @@ public class ConnectionManager {
           this.pools.put(connectionId, pool);
           this.connectionPoolToNamespaceMap.put(connectionId, nsId);
         }
-        long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
-        pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
       } finally {
         writeLock.unlock();
       }
     }
 
+    long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
+    pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
+
     ConnectionContext conn = pool.getConnection();
 
     // Add a new connection to the pool if it wasn't usable

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.LongAccumulator;
+
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
 
@@ -99,4 +101,9 @@ public class PoolAlignmentContext implements AlignmentContext {
   public void advanceClientStateId(Long clientStateId) {
     poolLocalStateId.accumulate(clientStateId);
   }
+
+  @VisibleForTesting
+  public long getPoolLocalStateId() {
+    return this.poolLocalStateId.get();
+  }
 }

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java

@@ -18,8 +18,11 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -31,6 +34,7 @@ import org.junit.Rule;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -305,6 +309,51 @@ public class TestConnectionManager {
     }
   }
 
+  @Test
+  public void testAdvanceClientStateId() throws IOException {
+    // Start one ConnectionManager
+    Configuration tmpConf = new Configuration();
+    ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
+    tmpConnManager.start();
+    Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
+
+    // Mock one Server.Call with FederatedNamespaceState that ns0 = 1L.
+    Server.Call mockCall1 = new Server.Call(1, 1, null, null,
+        RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3});
+    Map<String, Long> nsStateId = new HashMap<>();
+    nsStateId.put("ns0", 1L);
+    RouterFederatedStateProto.Builder stateBuilder = RouterFederatedStateProto.newBuilder();
+    nsStateId.forEach(stateBuilder::putNamespaceStateIds);
+    mockCall1.setFederatedNamespaceState(stateBuilder.build().toByteString());
+
+    Server.getCurCall().set(mockCall1);
+
+    // Create one new connection pool
+    tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
+    assertEquals(1, poolMap.size());
+    ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
+        TEST_NN_ADDRESS, NamenodeProtocol.class);
+    ConnectionPool pool = poolMap.get(connectionPoolId);
+    assertEquals(1L, pool.getPoolAlignmentContext().getPoolLocalStateId());
+
+    // Mock one Server.Call with FederatedNamespaceState that ns0 = 2L.
+    Server.Call mockCall2 = new Server.Call(2, 1, null, null,
+        RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3});
+    nsStateId.clear();
+    nsStateId.put("ns0", 2L);
+    stateBuilder = RouterFederatedStateProto.newBuilder();
+    nsStateId.forEach(stateBuilder::putNamespaceStateIds);
+    mockCall2.setFederatedNamespaceState(stateBuilder.build().toByteString());
+
+    Server.getCurCall().set(mockCall2);
+
+    // Get one existed connection for ns0
+    tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
+    assertEquals(1, poolMap.size());
+    pool = poolMap.get(connectionPoolId);
+    assertEquals(2L, pool.getPoolAlignmentContext().getPoolLocalStateId());
+  }
+
   @Test
   public void testConfigureConnectionActiveRatio() throws IOException {
     // test 1 conn below the threshold and these conns are closed