Prechádzať zdrojové kódy

HDFS-13522: Add federated nameservices states to client protocol and propagate it between routers and clients.

Fixes #4311

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
Simbarashe Dzinamarira 2 rokov pred
rodič
commit
e77d54d1ee
14 zmenil súbory, kde vykonal 426 pridanie a 53 odobranie
  1. 14 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  2. 25 6
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
  3. 3 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
  4. 30 5
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
  5. 22 5
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
  6. 103 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java
  7. 4 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
  8. 5 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
  9. 6 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  10. 168 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
  11. 11 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
  12. 2 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
  13. 18 18
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
  14. 15 10
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -937,6 +937,9 @@ public abstract class Server {
     // the priority level assigned by scheduler, 0 by default
     private long clientStateId;
     private boolean isCallCoordinated;
+    // Serialized RouterFederatedStateProto message to
+    // store last seen states for multiple namespaces.
+    private ByteString federatedNamespaceState;
 
     Call() {
       this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
@@ -994,6 +997,14 @@ public abstract class Server {
       return processingDetails;
     }
 
+    public void setFederatedNamespaceState(ByteString federatedNamespaceState) {
+      this.federatedNamespaceState = federatedNamespaceState;
+    }
+
+    public ByteString getFederatedNamespaceState() {
+      return this.federatedNamespaceState;
+    }
+
     @Override
     public String toString() {
       return "Call#" + callId + " Retry#" + retryCount;
@@ -2868,6 +2879,9 @@ public abstract class Server {
             stateId = alignmentContext.receiveRequestState(
                 header, getMaxIdleTime());
             call.setClientStateId(stateId);
+            if (header.hasRouterFederatedState()) {
+              call.setFederatedNamespaceState(header.getRouterFederatedState());
+            }
           }
         } catch (IOException ioe) {
           throw new RpcServerException("Processing RPC request caught ", ioe);

+ 25 - 6
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
 
 /**
  * Global State Id context for the client.
@@ -37,8 +38,17 @@ import java.util.concurrent.atomic.LongAccumulator;
 @InterfaceStability.Evolving
 public class ClientGSIContext implements AlignmentContext {
 
-  private final LongAccumulator lastSeenStateId =
-      new LongAccumulator(Math::max, Long.MIN_VALUE);
+  private final LongAccumulator lastSeenStateId;
+  private ByteString routerFederatedState;
+
+  public ClientGSIContext() {
+    this(new LongAccumulator(Math::max, Long.MIN_VALUE));
+  }
+
+  public ClientGSIContext(LongAccumulator lastSeenStateId) {
+    this.lastSeenStateId = lastSeenStateId;
+    routerFederatedState = null;
+  }
 
   @Override
   public long getLastSeenStateId() {
@@ -65,16 +75,25 @@ public class ClientGSIContext implements AlignmentContext {
    * in responses.
    */
   @Override
-  public void receiveResponseState(RpcResponseHeaderProto header) {
-    lastSeenStateId.accumulate(header.getStateId());
+  public synchronized void receiveResponseState(RpcResponseHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      routerFederatedState = header.getRouterFederatedState();
+    } else {
+      lastSeenStateId.accumulate(header.getStateId());
+    }
   }
 
   /**
    * Client side implementation for providing state alignment info in requests.
    */
   @Override
-  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
-    header.setStateId(lastSeenStateId.longValue());
+  public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    if (lastSeenStateId.get() != Long.MIN_VALUE) {
+      header.setStateId(lastSeenStateId.get());
+    }
+    if (routerFederatedState != null) {
+      header.setRouterFederatedState(routerFederatedState);
+    }
   }
 
   /**

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java

@@ -349,6 +349,9 @@ public class NameNodeProxiesClient {
       boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
       AlignmentContext alignmentContext)
       throws IOException {
+    if (alignmentContext == null) {
+      alignmentContext = new ClientGSIContext();
+    }
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
         ProtobufRpcEngine2.class);
 

+ 30 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

@@ -73,6 +73,14 @@ public class ConnectionManager {
 
   /** Queue for creating new connections. */
   private final BlockingQueue<ConnectionPool> creatorQueue;
+  /**
+   * Global federated namespace context for router.
+   */
+  private final RouterStateIdContext routerStateIdContext;
+  /**
+   * Map from connection pool ID to namespace.
+   */
+  private final Map<ConnectionPoolId, String> connectionPoolToNamespaceMap;
   /** Max size of queue for creating new connections. */
   private final int creatorQueueMaxSize;
 
@@ -85,15 +93,19 @@ public class ConnectionManager {
   /** If the connection manager is running. */
   private boolean running = false;
 
+  public ConnectionManager(Configuration config) {
+    this(config, new RouterStateIdContext(config));
+  }
 
   /**
    * Creates a proxy client connection pool manager.
    *
    * @param config Configuration for the connections.
    */
-  public ConnectionManager(Configuration config) {
+  public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
     this.conf = config;
-
+    this.routerStateIdContext = routerStateIdContext;
+    this.connectionPoolToNamespaceMap = new HashMap<>();
     // Configure minimum, maximum and active connection pools
     this.maxSize = this.conf.getInt(
         RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
@@ -160,6 +172,10 @@ public class ConnectionManager {
         pool.close();
       }
       this.pools.clear();
+      for (String nsID: connectionPoolToNamespaceMap.values()) {
+        routerStateIdContext.removeNamespaceStateId(nsID);
+      }
+      connectionPoolToNamespaceMap.clear();
     } finally {
       writeLock.unlock();
     }
@@ -172,12 +188,12 @@ public class ConnectionManager {
    * @param ugi User group information.
    * @param nnAddress Namenode address for the connection.
    * @param protocol Protocol for the connection.
+   * @param nsId Nameservice identity.
    * @return Proxy client to connect to nnId as UGI.
    * @throws IOException If the connection cannot be obtained.
    */
   public ConnectionContext getConnection(UserGroupInformation ugi,
-      String nnAddress, Class<?> protocol) throws IOException {
-
+      String nnAddress, Class<?> protocol, String nsId) throws IOException {
     // Check if the manager is shutdown
     if (!this.running) {
       LOG.error(
@@ -205,9 +221,13 @@ public class ConnectionManager {
         if (pool == null) {
           pool = new ConnectionPool(
               this.conf, nnAddress, ugi, this.minSize, this.maxSize,
-              this.minActiveRatio, protocol);
+              this.minActiveRatio, protocol,
+              new PoolAlignmentContext(this.routerStateIdContext, nsId));
           this.pools.put(connectionId, pool);
+          this.connectionPoolToNamespaceMap.put(connectionId, nsId);
         }
+        long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
+        pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
       } finally {
         writeLock.unlock();
       }
@@ -430,6 +450,11 @@ public class ConnectionManager {
         try {
           for (ConnectionPoolId poolId : toRemove) {
             pools.remove(poolId);
+            String nsID = connectionPoolToNamespaceMap.get(poolId);
+            connectionPoolToNamespaceMap.remove(poolId);
+            if (!connectionPoolToNamespaceMap.values().contains(nsID)) {
+              routerStateIdContext.removeNamespaceStateId(nsID);
+            }
           }
         } finally {
           writeLock.unlock();

+ 22 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java

@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.SocketFactory;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -108,6 +109,8 @@ public class ConnectionPool {
 
   /** Enable using multiple physical socket or not. **/
   private final boolean enableMultiSocket;
+  /** StateID alignment context. */
+  private final PoolAlignmentContext alignmentContext;
 
   /** Map for the protocols and their protobuf implementations. */
   private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
@@ -138,7 +141,8 @@ public class ConnectionPool {
 
   protected ConnectionPool(Configuration config, String address,
       UserGroupInformation user, int minPoolSize, int maxPoolSize,
-      float minActiveRatio, Class<?> proto) throws IOException {
+      float minActiveRatio, Class<?> proto, PoolAlignmentContext alignmentContext)
+      throws IOException {
 
     this.conf = config;
 
@@ -157,6 +161,8 @@ public class ConnectionPool {
         RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
         RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);
 
+    this.alignmentContext = alignmentContext;
+
     // Add minimum connections to the pool
     for (int i = 0; i < this.minSize; i++) {
       ConnectionContext newConnection = newConnection();
@@ -211,6 +217,14 @@ public class ConnectionPool {
     return this.clientIndex;
   }
 
+  /**
+   * Get the alignment context for this pool
+   * @return Alignment context
+   */
+  public PoolAlignmentContext getPoolAlignmentContext() {
+    return this.alignmentContext;
+  }
+
   /**
    * Return the next connection round-robin.
    *
@@ -398,7 +412,7 @@ public class ConnectionPool {
   public ConnectionContext newConnection() throws IOException {
     return newConnection(this.conf, this.namenodeAddress,
         this.ugi, this.protocol, this.enableMultiSocket,
-        this.socketIndex.incrementAndGet());
+        this.socketIndex.incrementAndGet(), alignmentContext);
   }
 
   /**
@@ -413,13 +427,15 @@ public class ConnectionPool {
    * @param ugi User context.
    * @param proto Interface of the protocol.
    * @param enableMultiSocket Enable multiple socket or not.
+   * @param alignmentContext Client alignment context.
    * @return proto for the target ClientProtocol that contains the user's
    *         security context.
    * @throws IOException If it cannot be created.
    */
   protected static <T> ConnectionContext newConnection(Configuration conf,
       String nnAddress, UserGroupInformation ugi, Class<T> proto,
-      boolean enableMultiSocket, int socketIndex) throws IOException {
+      boolean enableMultiSocket, int socketIndex,
+      AlignmentContext alignmentContext) throws IOException {
     if (!PROTO_MAP.containsKey(proto)) {
       String msg = "Unsupported protocol for connection to NameNode: "
           + ((proto != null) ? proto.getName() : "null");
@@ -448,10 +464,11 @@ public class ConnectionPool {
           socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
           defaultPolicy, conf, socketIndex);
       proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
-          conf, factory).getProxy();
+          conf, factory, alignmentContext).getProxy();
     } else {
       proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
-          conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+          conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null,
+          alignmentContext).getProxy();
     }
 
     T client = newProtoClient(proto, classes, proxy);

+ 103 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+
+
+/**
+ * An alignment context shared by all connections in a {@link ConnectionPool}.
+ * There is a distinct connection pool for each [namespace,UGI] pairing.
+ * <p>
+ * {@link #sharedGlobalStateId} is a reference to a
+ * shared {@link LongAccumulator} object in the {@link RouterStateIdContext}.
+ * {@link #poolLocalStateId} is specific to each PoolAlignmentContext.
+ * <p>
+ * The shared {@link #sharedGlobalStateId} is updated only using
+ * responses from NameNodes, so clients cannot poison it.
+ * {@link #poolLocalStateId} is used to propagate client observed
+ * state into NameNode requests. A misbehaving client can poison this but the effect is only
+ * visible to other clients with the same UGI and accessing the same namespace.
+ */
+public class PoolAlignmentContext implements AlignmentContext {
+  private LongAccumulator sharedGlobalStateId;
+  private LongAccumulator poolLocalStateId;
+
+  PoolAlignmentContext(RouterStateIdContext routerStateIdContext, String namespaceId) {
+    sharedGlobalStateId = routerStateIdContext.getNamespaceStateId(namespaceId);
+    poolLocalStateId = new LongAccumulator(Math::max, Long.MIN_VALUE);
+  }
+
+  /**
+   * Client side implementation only receives state alignment info.
+   * It does not provide state alignment info therefore this does nothing.
+   */
+  @Override
+  public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {
+    // Do nothing.
+  }
+
+  /**
+   * Router updates a globally shared value using response from
+   * namenodes.
+   */
+  @Override
+  public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
+    sharedGlobalStateId.accumulate(header.getStateId());
+  }
+
+  /**
+   * Client side implementation for routers to provide state info in requests to
+   * namenodes.
+   */
+  @Override
+  public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
+    long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get());
+    header.setStateId(maxStateId);
+  }
+
+  /**
+   * Client side implementation only provides state alignment info in requests.
+   * Client does not receive RPC requests therefore this does nothing.
+   */
+  @Override
+  public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold)
+      throws IOException {
+    // Do nothing.
+    return 0;
+  }
+
+  @Override
+  public long getLastSeenStateId() {
+    return sharedGlobalStateId.get();
+  }
+
+  @Override
+  public boolean isCoordinatedCall(String protocolName, String method) {
+    throw new UnsupportedOperationException(
+        "Client should not be checking uncoordinated call");
+  }
+
+  public void advanceClientStateId(Long clientStateId) {
+    poolLocalStateId.accumulate(clientStateId);
+  }
+}

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -191,6 +191,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       FEDERATION_STORE_PREFIX + "enable";
   public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
 
+  public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE =
+      FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
+  public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;
+
   public static final String FEDERATION_STORE_SERIALIZER_CLASS =
       FEDERATION_STORE_PREFIX + "serializer";
   public static final Class<StateStoreSerializerPBImpl>

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -149,7 +149,8 @@ public class RouterRpcClient {
    * @param monitor Optional performance monitor.
    */
   public RouterRpcClient(Configuration conf, Router router,
-      ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
+      ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
+      RouterStateIdContext routerStateIdContext) {
     this.router = router;
 
     this.namenodeResolver = resolver;
@@ -158,7 +159,7 @@ public class RouterRpcClient {
     this.contextFieldSeparator =
         clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
             HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
-    this.connectionManager = new ConnectionManager(clientConf);
+    this.connectionManager = new ConnectionManager(clientConf, routerStateIdContext);
     this.connectionManager.start();
     this.routerRpcFairnessPolicyController =
         FederationUtil.newFairnessPolicyController(conf);
@@ -374,7 +375,7 @@ public class RouterRpcClient {
             ugi.getUserName(), routerUser);
       }
       connection = this.connectionManager.getConnection(
-          connUGI, rpcAddress, proto);
+          connUGI, rpcAddress, proto, nsId);
       LOG.debug("User {} NN {} is using connection {}",
           ugi.getUserName(), rpcAddress, connection);
     } catch (Exception ex) {
@@ -1641,7 +1642,7 @@ public class RouterRpcClient {
 
   /**
    * Refreshes/changes the fairness policy controller implementation if possible
-   * and returns the controller class name
+   * and returns the controller class name.
    * @param conf Configuration
    * @return New controller class name if successfully refreshed, else old controller class name
    */

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -252,18 +252,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   /**
    * Construct a router RPC server.
    *
-   * @param configuration HDFS Configuration.
+   * @param conf HDFS Configuration.
    * @param router A router using this RPC server.
    * @param nnResolver The NN resolver instance to determine active NNs in HA.
    * @param fileResolver File resolver to resolve file paths to subclusters.
    * @throws IOException If the RPC server could not be created.
    */
-  public RouterRpcServer(Configuration configuration, Router router,
+  public RouterRpcServer(Configuration conf, Router router,
       ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
           throws IOException {
     super(RouterRpcServer.class.getName());
 
-    this.conf = configuration;
+    this.conf = conf;
     this.router = router;
     this.namenodeResolver = nnResolver;
     this.subclusterResolver = fileResolver;
@@ -321,6 +321,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
     // Create security manager
     this.securityManager = new RouterSecurityManager(this.conf);
+    RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
 
     this.rpcServer = new RPC.Builder(this.conf)
         .setProtocol(ClientNamenodeProtocolPB.class)
@@ -331,6 +332,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
         .setnumReaders(readerCount)
         .setQueueSizePerHandler(handlerQueueSize)
         .setVerbose(false)
+        .setAlignmentContext(routerStateIdContext)
         .setSecretManager(this.securityManager.getSecretManager())
         .build();
 
@@ -384,7 +386,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
     // Create the client
     this.rpcClient = new RouterRpcClient(this.conf, this.router,
-        this.namenodeResolver, this.rpcMonitor);
+        this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
 
     // Initialize modules
     this.quotaCall = new Quota(this.router, this);

+ 168 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashSet;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+
+
+/**
+ * This is the router implementation to hold the state Ids for all
+ * namespaces. This object is only updated by responses from NameNodes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class RouterStateIdContext implements AlignmentContext {
+
+  private final HashSet<String> coordinatedMethods;
+  /**
+   * Collection of last-seen namespace state Ids for a set of namespaces.
+   * Each value is globally shared by all outgoing connections to a particular namespace,
+   * so updates should only be performed using reliable responses from NameNodes.
+   */
+  private final ConcurrentHashMap<String, LongAccumulator> namespaceIdMap;
+  // Size limit for the map of state Ids to send to clients.
+  private final int maxSizeOfFederatedStateToPropagate;
+
+  RouterStateIdContext(Configuration conf) {
+    this.coordinatedMethods = new HashSet<>();
+    // For now, only ClientProtocol methods can be coordinated, so only checking
+    // against ClientProtocol.
+    for (Method method : ClientProtocol.class.getDeclaredMethods()) {
+      if (method.isAnnotationPresent(ReadOnly.class)
+          && method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) {
+        coordinatedMethods.add(method.getName());
+      }
+    }
+
+    namespaceIdMap = new ConcurrentHashMap<>();
+
+    maxSizeOfFederatedStateToPropagate =
+        conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE,
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT);
+  }
+
+  /**
+   * Adds the {@link #namespaceIdMap} to the response header that will be sent to a client.
+   */
+  public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) {
+    if (namespaceIdMap.isEmpty()) {
+      return;
+    }
+    HdfsServerFederationProtos.RouterFederatedStateProto.Builder federatedStateBuilder =
+        HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder();
+    namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get()));
+    headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+  }
+
+  public LongAccumulator getNamespaceStateId(String nsId) {
+    return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
+  }
+
+  public void removeNamespaceStateId(String nsId) {
+    namespaceIdMap.remove(nsId);
+  }
+
+  /**
+   * Utility function to parse routerFederatedState field in RPC headers.
+   */
+  public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
+    if (byteString != null) {
+      HdfsServerFederationProtos.RouterFederatedStateProto federatedState;
+      try {
+        federatedState = HdfsServerFederationProtos.RouterFederatedStateProto.parseFrom(byteString);
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      return federatedState.getNamespaceStateIdsMap();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public static long getClientStateIdFromCurrentCall(String nsId) {
+    Long clientStateID = Long.MIN_VALUE;
+    Server.Call call = Server.getCurCall().get();
+    if (call != null) {
+      ByteString callFederatedNamespaceState = call.getFederatedNamespaceState();
+      if (callFederatedNamespaceState != null) {
+        Map<String, Long> clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState);
+        clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE);
+      }
+    }
+    return clientStateID;
+  }
+
+  @Override
+  public void updateResponseState(RpcResponseHeaderProto.Builder header) {
+    if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) {
+      setResponseHeaderState(header);
+    }
+  }
+
+  @Override
+  public void receiveResponseState(RpcResponseHeaderProto header) {
+    // Do nothing.
+  }
+
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    // Do nothing.
+  }
+
+  /**
+   * Routers do not update their state using information from clients
+   * to avoid clients interfering with one another.
+   */
+  @Override
+  public long receiveRequestState(RpcRequestHeaderProto header,
+      long clientWaitTime) throws RetriableException {
+    // Do nothing.
+    return 0;
+  }
+
+  @Override
+  public long getLastSeenStateId() {
+    return 0;
+  }
+
+  @Override
+  public boolean isCoordinatedCall(String protocolName, String methodName) {
+    return protocolName.equals(ClientProtocol.class.getCanonicalName())
+        && coordinatedMethods.contains(methodName);
+  }
+}

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -834,4 +834,15 @@
       (delete the source path directly) and skip (skip both trash and deletion).
     </description>
   </property>
+
+  <property>
+    <name>dfs.federation.router.observer.federated.state.propagation.maxsize</name>
+    <value>5</value>
+    <description>
+      The maximum size of the federated state to send in the RPC header. Sending the federated
+      state removes the need to msync on every read call, but at the expense of having a larger
+      header. The cost tradeoff between the larger header and always msync'ing depends on the number
+      of namespaces in use and the latency of the msync requests.
+    </description>
+  </property>
 </configuration>

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java

@@ -384,7 +384,8 @@ public final class FederationTestUtils {
           invocation.getMock());
       throw new IOException("Simulate connectionManager throw IOException");
     }).when(spyConnectionManager).getConnection(
-        any(UserGroupInformation.class), any(String.class), any(Class.class));
+        any(UserGroupInformation.class), any(String.class), any(Class.class),
+        any(String.class));
 
     Whitebox.setInternalState(rpcClient, "connectionManager",
         spyConnectionManager);

+ 18 - 18
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java

@@ -81,15 +81,15 @@ public class TestConnectionManager {
   public void testCleanup() throws Exception {
     Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
 
-    ConnectionPool pool1 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
+    ConnectionPool pool1 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1,
+        0, 10, 0.5f, ClientProtocol.class, null);
     addConnectionsToPool(pool1, 9, 4);
     poolMap.put(
         new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
         pool1);
 
-    ConnectionPool pool2 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class);
+    ConnectionPool pool2 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER2,
+        0, 10, 0.5f, ClientProtocol.class, null);
     addConnectionsToPool(pool2, 10, 10);
     poolMap.put(
         new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
@@ -111,8 +111,8 @@ public class TestConnectionManager {
     checkPoolConnections(TEST_USER2, 10, 10);
 
     // Make sure the number of connections doesn't go below minSize
-    ConnectionPool pool3 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class);
+    ConnectionPool pool3 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER3,
+        2, 10, 0.5f, ClientProtocol.class, null);
     addConnectionsToPool(pool3, 8, 0);
     poolMap.put(
         new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
@@ -140,7 +140,7 @@ public class TestConnectionManager {
 
     ConnectionPool pool = new ConnectionPool(
         copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
-        ClientProtocol.class);
+        ClientProtocol.class, null);
     poolMap.put(
         new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
         pool);
@@ -174,8 +174,8 @@ public class TestConnectionManager {
   public void testConnectionCreatorWithException() throws Exception {
     // Create a bad connection pool pointing to unresolvable namenode address.
     ConnectionPool badPool = new ConnectionPool(
-            conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f,
-            ClientProtocol.class);
+        conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f,
+        ClientProtocol.class, null);
     BlockingQueue<ConnectionPool> queue = new ArrayBlockingQueue<>(1);
     queue.add(badPool);
     ConnectionManager.ConnectionCreator connectionCreator =
@@ -201,7 +201,7 @@ public class TestConnectionManager {
     // Create a bad connection pool pointing to unresolvable namenode address.
     ConnectionPool badPool = new ConnectionPool(
         conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
-        ClientProtocol.class);
+        ClientProtocol.class, null);
   }
 
   @Test
@@ -210,8 +210,8 @@ public class TestConnectionManager {
     final int totalConns = 10;
     int activeConns = 5;
 
-    ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
+    ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1,
+        0, 10, 0.5f, ClientProtocol.class, null);
     addConnectionsToPool(pool, totalConns, activeConns);
     poolMap.put(
         new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
@@ -235,8 +235,8 @@ public class TestConnectionManager {
 
   @Test
   public void testValidClientIndex() throws Exception {
-    ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class);
+    ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1,
+        2, 2, 0.5f, ClientProtocol.class, null);
     for(int i = -3; i <= 3; i++) {
       pool.getClientIndex().set(i);
       ConnectionContext conn = pool.getConnection();
@@ -251,8 +251,8 @@ public class TestConnectionManager {
     final int totalConns = 10;
     int activeConns = 5;
 
-    ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class);
+    ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1,
+        0, 10, 0.5f, NamenodeProtocol.class, null);
     addConnectionsToPool(pool, totalConns, activeConns);
     poolMap.put(
         new ConnectionPoolId(
@@ -325,7 +325,7 @@ public class TestConnectionManager {
 
     // Create one new connection pool
     tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS,
-        NamenodeProtocol.class);
+        NamenodeProtocol.class, "ns0");
 
     Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
     ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
@@ -356,6 +356,6 @@ public class TestConnectionManager {
         "Unsupported protocol for connection to NameNode: "
             + TestConnectionManager.class.getName(),
         () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1,
-            TestConnectionManager.class, false, 0));
+            TestConnectionManager.class, false, 0, null));
   }
 }

+ 15 - 10
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.hadoop.ipc.AlignmentContext;
@@ -38,16 +37,22 @@ public class TestRouterFederatedState {
   @Test
   public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
     byte[] uuid = ClientId.getClientId();
-    Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
-      put("namespace1", 11L );
-      put("namespace2", 22L);
-    }};
+    Map<String, Long> expectedStateIds = new HashMap<String, Long>() {
+      {
+        put("namespace1", 11L);
+        put("namespace2", 22L);
+      }
+    };
 
     AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);
 
     RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
-        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
-        RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET,
+        0,
+        RpcConstants.INVALID_RETRY_COUNT,
+        uuid,
+        alignmentContext);
 
     Map<String, Long> stateIdsFromHeader =
         RouterFederatedStateProto.parseFrom(
@@ -59,9 +64,9 @@ public class TestRouterFederatedState {
 
   private static class AlignmentContextWithRouterState implements AlignmentContext {
 
-    Map<String, Long> routerFederatedState;
+    private Map<String, Long> routerFederatedState;
 
-    public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
+    AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
       this.routerFederatedState = namespaceStates;
     }
 
@@ -82,7 +87,7 @@ public class TestRouterFederatedState {
     public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}
 
     @Override
-    public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
+    public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) {
       return 0;
     }