瀏覽代碼

HDFS-16837. [RBF SBN] ClientGSIContext should merge RouterFederatedStates to get the max state id for each namespaces (#5123)

ZanderXu 2 年之前
父節點
當前提交
8a9bdb1edc

+ 46 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java

@@ -20,13 +20,19 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.hadoop.thirdparty.protobuf.ByteString;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
 
 /**
  * Global State Id context for the client.
@@ -77,12 +83,46 @@ public class ClientGSIContext implements AlignmentContext {
   @Override
   public synchronized void receiveResponseState(RpcResponseHeaderProto header) {
     if (header.hasRouterFederatedState()) {
-      routerFederatedState = header.getRouterFederatedState();
+      routerFederatedState = mergeRouterFederatedState(
+          this.routerFederatedState, header.getRouterFederatedState());
     } else {
       lastSeenStateId.accumulate(header.getStateId());
     }
   }
 
+  /**
+   * Utility function to parse routerFederatedState field in RPC headers.
+   */
+  public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
+    if (byteString != null) {
+      try {
+        RouterFederatedStateProto federatedState = RouterFederatedStateProto.parseFrom(byteString);
+        return federatedState.getNamespaceStateIdsMap();
+      } catch (InvalidProtocolBufferException e) {
+        // Ignore this exception and will return an empty map
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Merge state1 and state2 to get the max value for each namespace.
+   * @param state1 input ByteString.
+   * @param state2 input ByteString.
+   * @return one ByteString object which contains the max value of each namespace.
+   */
+  public static ByteString mergeRouterFederatedState(ByteString state1, ByteString state2) {
+    Map<String, Long> mapping1 = new HashMap<>(getRouterFederatedStateMap(state1));
+    Map<String, Long> mapping2 = getRouterFederatedStateMap(state2);
+    mapping2.forEach((k, v) -> {
+      long localValue = mapping1.getOrDefault(k, 0L);
+      mapping1.put(k, Math.max(v, localValue));
+    });
+    RouterFederatedStateProto.Builder federatedBuilder = RouterFederatedStateProto.newBuilder();
+    mapping1.forEach(federatedBuilder::putNamespaceStateIds);
+    return federatedBuilder.build().toByteString();
+  }
+
   /**
    * Client side implementation for providing state alignment info in requests.
    */
@@ -106,4 +146,9 @@ public class ClientGSIContext implements AlignmentContext {
     // Do nothing.
     return 0;
   }
+
+  @VisibleForTesting
+  public ByteString getRouterFederatedState() {
+    return this.routerFederatedState;
+  }
 }

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -723,3 +723,15 @@ message BlockTokenSecretProto {
   repeated string storageIds = 8;
   optional bytes handshakeSecret = 9;
 }
+
+/////////////////////////////////////////////////
+// Alignment state for namespaces.
+/////////////////////////////////////////////////
+/**
+ * Clients should receive this message in RPC responses and forward it
+ * in RPC requests without interpreting it. It should be encoded
+ * as an obscure byte array when being sent to clients.
+ */
+message RouterFederatedStateProto {
+  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
+}

+ 6 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

@@ -28,8 +28,8 @@ import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.RetriableException;
@@ -83,10 +83,9 @@ class RouterStateIdContext implements AlignmentContext {
     if (namespaceIdMap.isEmpty()) {
       return;
     }
-    HdfsServerFederationProtos.RouterFederatedStateProto.Builder federatedStateBuilder =
-        HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder();
-    namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get()));
-    headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+    RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
+    namespaceIdMap.forEach((k, v) -> builder.putNamespaceStateIds(k, v.get()));
+    headerBuilder.setRouterFederatedState(builder.build().toByteString());
   }
 
   public LongAccumulator getNamespaceStateId(String nsId) {
@@ -102,9 +101,9 @@ class RouterStateIdContext implements AlignmentContext {
    */
   public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
     if (byteString != null) {
-      HdfsServerFederationProtos.RouterFederatedStateProto federatedState;
+      RouterFederatedStateProto federatedState;
       try {
-        federatedState = HdfsServerFederationProtos.RouterFederatedStateProto.parseFrom(byteString);
+        federatedState = RouterFederatedStateProto.parseFrom(byteString);
       } catch (InvalidProtocolBufferException e) {
         throw new RuntimeException(e);
       }

+ 0 - 13
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto

@@ -311,17 +311,4 @@ message GetDisabledNameservicesRequestProto {
 
 message GetDisabledNameservicesResponseProto {
   repeated string nameServiceIds = 1;
-}
-
-/////////////////////////////////////////////////
-// Alignment state for namespaces.
-/////////////////////////////////////////////////
-
-/**
- * Clients should receive this message in RPC responses and forward it
- * in RPC requests without interpreting it. It should be encoded
- * as an obscure byte array when being sent to clients.
- */
-message RouterFederatedStateProto {
-  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java

@@ -18,8 +18,8 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;

+ 41 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

@@ -27,14 +27,18 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientGSIContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@@ -43,6 +47,8 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeConte
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -505,4 +511,38 @@ public class TestObserverWithRouter {
     // getList call should be sent to observer
     assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
   }
-}
+
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testClientReceiveResponseState() {
+    ClientGSIContext clientGSIContext = new ClientGSIContext();
+
+    Map<String, Long> mockMapping = new HashMap<>();
+    mockMapping.put("ns0", 10L);
+    RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
+    mockMapping.forEach(builder::putNamespaceStateIds);
+    RpcHeaderProtos.RpcResponseHeaderProto header = RpcHeaderProtos.RpcResponseHeaderProto
+        .newBuilder()
+        .setCallId(1)
+        .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
+        .setRouterFederatedState(builder.build().toByteString())
+        .build();
+    clientGSIContext.receiveResponseState(header);
+
+    Map<String, Long> mockLowerMapping = new HashMap<>();
+    mockLowerMapping.put("ns0", 8L);
+    builder = RouterFederatedStateProto.newBuilder();
+    mockLowerMapping.forEach(builder::putNamespaceStateIds);
+    header = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder()
+        .setRouterFederatedState(builder.build().toByteString())
+        .setCallId(2)
+        .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
+        .build();
+    clientGSIContext.receiveResponseState(header);
+
+    Map<String, Long> latestFederateState = ClientGSIContext.getRouterFederatedStateMap(
+        clientGSIContext.getRouterFederatedState());
+    Assertions.assertEquals(1, latestFederateState.size());
+    Assertions.assertEquals(10L, latestFederateState.get("ns0"));
+  }
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java

@@ -19,12 +19,13 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.util.HashMap;
 import java.util.Map;
+
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.ClientId;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
-import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
 import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.util.ProtoUtil;
 import org.junit.Test;