Browse Source

HDFS-16767: RBF: Support observer node in Router-Based Federation.

Fixes #4127

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
Simbarashe Dzinamarira 2 years ago
parent
commit
6422eaf301
24 changed files with 775 additions and 110 deletions
  1. 4 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
  2. 22 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
  3. 4 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
  5. 26 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java
  6. 69 30
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
  8. 6 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
  9. 4 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
  10. 105 45
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
  11. 3 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
  12. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  13. 19 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
  14. 2 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
  15. 22 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
  16. 44 6
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
  17. 2 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java
  18. 7 7
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
  19. 425 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
  20. 3 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java
  21. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
  24. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java

@@ -30,6 +30,10 @@ public interface FederationRPCMBean {
 
   long getProxyOps();
 
+  long getActiveProxyOps();
+
+  long getObserverProxyOps();
+
   double getProxyAvg();
 
   long getProcessingOps();

+ 22 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
@@ -49,7 +50,10 @@ public class FederationRPCMetrics implements FederationRPCMBean {
   private MutableRate proxy;
   @Metric("Number of operations the Router proxied to a Namenode")
   private MutableCounterLong proxyOp;
-
+  @Metric("Number of operations the Router proxied to a Active Namenode")
+  private MutableCounterLong activeProxyOp;
+  @Metric("Number of operations the Router proxied to a Observer Namenode")
+  private MutableCounterLong observerProxyOp;
   @Metric("Number of operations to hit a standby NN")
   private MutableCounterLong proxyOpFailureStandby;
   @Metric("Number of operations to fail to reach NN")
@@ -256,9 +260,15 @@ public class FederationRPCMetrics implements FederationRPCMBean {
    * Add the time to proxy an operation from the moment the Router sends it to
    * the Namenode until it replied.
    * @param time Proxy time of an operation in nanoseconds.
+   * @param state NameNode state. Maybe null
    */
-  public void addProxyTime(long time) {
+  public void addProxyTime(long time, FederationNamenodeServiceState state) {
     proxy.add(time);
+    if(FederationNamenodeServiceState.ACTIVE == state) {
+      activeProxyOp.incr();
+    } else if (FederationNamenodeServiceState.OBSERVER == state) {
+      observerProxyOp.incr();
+    }
     proxyOp.incr();
   }
 
@@ -272,6 +282,16 @@ public class FederationRPCMetrics implements FederationRPCMBean {
     return proxyOp.value();
   }
 
+  @Override
+  public long getActiveProxyOps() {
+    return activeProxyOp.value();
+  }
+
+  @Override
+  public long getObserverProxyOps() {
+    return observerProxyOp.value();
+  }
+
   /**
    * Add the time to process a request in the Router from the time we receive
    * the call until we send it to the Namenode.

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java

@@ -29,6 +29,7 @@ import javax.management.StandardMBean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@@ -147,12 +148,13 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
   }
 
   @Override
-  public void proxyOpComplete(boolean success, String nsId) {
+  public void proxyOpComplete(boolean success, String nsId,
+      FederationNamenodeServiceState state) {
     if (success) {
       long proxyTime = getProxyTime();
       if (proxyTime >= 0) {
         if (metrics != null) {
-          metrics.addProxyTime(proxyTime);
+          metrics.addProxyTime(proxyTime, state);
         }
         if (nameserviceRPCMetricsMap != null &&
             nameserviceRPCMetricsMap.containsKey(nsId)) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java

@@ -886,7 +886,7 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
       // Fetch the most recent namenode registration
       String nsId = nsInfo.getNameserviceId();
       List<? extends FederationNamenodeContext> nns =
-          namenodeResolver.getNamenodesForNameserviceId(nsId);
+          namenodeResolver.getNamenodesForNameserviceId(nsId, false);
       if (nns != null) {
         FederationNamenodeContext nn = nns.get(0);
         if (nn instanceof MembershipState) {

+ 26 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java

@@ -43,6 +43,17 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public interface ActiveNamenodeResolver {
 
+  /**
+   * Report a failed, unavailable NN address for a nameservice or blockPool.
+   *
+   * @param ns Nameservice identifier.
+   * @param failedAddress The address the failed responded to the command.
+   *
+   * @throws IOException If the state store cannot be accessed.
+   */
+  void updateUnavailableNamenode(
+      String ns, InetSocketAddress failedAddress) throws IOException;
+
   /**
    * Report a successful, active NN address for a nameservice or blockPool.
    *
@@ -56,20 +67,30 @@ public interface ActiveNamenodeResolver {
 
   /**
    * Returns a prioritized list of the most recent cached registration entries
-   * for a single nameservice ID.
-   * Returns an empty list if none are found. Returns entries in preference of:
+   * for a single nameservice ID. Returns an empty list if none are found.
+   * In the case of not observerRead Returns entries in preference of :
    * <ul>
    * <li>The most recent ACTIVE NN
+   * <li>The most recent OBSERVER NN
+   * <li>The most recent STANDBY NN
+   * <li>The most recent UNAVAILABLE NN
+   * </ul>
+   *
+   * In the case of observerRead Returns entries in preference of :
+   * <ul>
+   * <li>The most recent OBSERVER NN
+   * <li>The most recent ACTIVE NN
    * <li>The most recent STANDBY NN
    * <li>The most recent UNAVAILABLE NN
    * </ul>
    *
    * @param nameserviceId Nameservice identifier.
+   * @param listObserversFirst Observer read case, observer NN will be ranked first
    * @return Prioritized list of namenode contexts.
    * @throws IOException If the state store cannot be accessed.
    */
-  List<? extends FederationNamenodeContext>
-      getNamenodesForNameserviceId(String nameserviceId) throws IOException;
+  List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
+      String nameserviceId, boolean listObserversFirst) throws IOException;
 
   /**
    * Returns a prioritized list of the most recent cached registration entries
@@ -77,6 +98,7 @@ public interface ActiveNamenodeResolver {
    * Returns an empty list if none are found. Returns entries in preference of:
    * <ul>
    * <li>The most recent ACTIVE NN
+   * <li>The most recent OBSERVER NN
    * <li>The most recent STANDBY NN
    * <li>The most recent UNAVAILABLE NN
    * </ul>

+ 69 - 30
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.resolver;
 
 import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
 import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.OBSERVER;
 import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;
 
 import java.io.IOException;
@@ -32,6 +33,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
 import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
@@ -73,8 +75,11 @@ public class MembershipNamenodeResolver
   /** Parent router ID. */
   private String routerId;
 
-  /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */
-  private Map<String, List<? extends FederationNamenodeContext>> cacheNS;
+  /** Cached lookup of namenodes for nameservice. The keys are a pair of the nameservice
+   * name and a boolean indicating if observer namenodes should be listed first.
+   * If true, observer namenodes are listed first. If false, active namenodes are listed first.
+   *  Invalidated on cache refresh. */
+  private Map<Pair<String,Boolean>, List<? extends FederationNamenodeContext>> cacheNS;
   /** Cached lookup of NN for block pool. Invalidated on cache refresh. */
   private Map<String, List<? extends FederationNamenodeContext>> cacheBP;
 
@@ -136,11 +141,21 @@ public class MembershipNamenodeResolver
     return true;
   }
 
+  @Override public void updateUnavailableNamenode(String nsId,
+      InetSocketAddress address) throws IOException {
+    updateNameNodeState(nsId, address, UNAVAILABLE);
+  }
+
   @Override
   public void updateActiveNamenode(
       final String nsId, final InetSocketAddress address) throws IOException {
+    updateNameNodeState(nsId, address, ACTIVE);
+  }
 
-    // Called when we have an RPC miss and successful hit on an alternate NN.
+
+  private void updateNameNodeState(final String nsId,
+      final InetSocketAddress address, FederationNamenodeServiceState state)
+      throws IOException {
     // Temporarily update our cache, it will be overwritten on the next update.
     try {
       MembershipState partial = MembershipState.newInstance();
@@ -160,10 +175,11 @@ public class MembershipNamenodeResolver
         MembershipState record = records.get(0);
         UpdateNamenodeRegistrationRequest updateRequest =
             UpdateNamenodeRegistrationRequest.newInstance(
-                record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
+                record.getNameserviceId(), record.getNamenodeId(), state);
         membership.updateNamenodeRegistration(updateRequest);
 
-        cacheNS.remove(nsId);
+        cacheNS.remove(Pair.of(nsId, Boolean.TRUE));
+        cacheNS.remove(Pair.of(nsId, Boolean.FALSE));
         // Invalidating the full cacheBp since getting the blockpool id from
         // namespace id is quite costly.
         cacheBP.clear();
@@ -175,9 +191,9 @@ public class MembershipNamenodeResolver
 
   @Override
   public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
-      final String nsId) throws IOException {
+      final String nsId, boolean listObserversFirst) throws IOException {
 
-    List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
+    List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
     if (ret != null) {
       return ret;
     }
@@ -189,7 +205,8 @@ public class MembershipNamenodeResolver
       partial.setNameserviceId(nsId);
       GetNamenodeRegistrationsRequest request =
           GetNamenodeRegistrationsRequest.newInstance(partial);
-      result = getRecentRegistrationForQuery(request, true, false);
+      result = getRecentRegistrationForQuery(request, true,
+          false, listObserversFirst);
     } catch (StateStoreUnavailableException e) {
       LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
       return null;
@@ -218,7 +235,7 @@ public class MembershipNamenodeResolver
 
     // Cache the response
     ret = Collections.unmodifiableList(result);
-    cacheNS.put(nsId, result);
+    cacheNS.put(Pair.of(nsId, listObserversFirst), result);
     return ret;
   }
 
@@ -235,7 +252,7 @@ public class MembershipNamenodeResolver
             GetNamenodeRegistrationsRequest.newInstance(partial);
 
         final List<MembershipState> result =
-            getRecentRegistrationForQuery(request, true, false);
+            getRecentRegistrationForQuery(request, true, false, false);
         if (result == null || result.isEmpty()) {
           LOG.error("Cannot locate eligible NNs for {}", bpId);
         } else {
@@ -346,22 +363,34 @@ public class MembershipNamenodeResolver
   }
 
   /**
-   * Picks the most relevant record registration that matches the query. Return
-   * registrations matching the query in this preference: 1) Most recently
-   * updated ACTIVE registration 2) Most recently updated STANDBY registration
-   * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if
-   * showUnavailable). EXPIRED registrations are ignored.
+   * Picks the most relevant record registration that matches the query.
+   * If not observer read,
+   * return registrations matching the query in this preference:
+   * 1) Most recently updated ACTIVE registration
+   * 2) Most recently updated Observer registration
+   * 3) Most recently updated STANDBY registration (if showStandby)
+   * 4) Most recently updated UNAVAILABLE registration (if showUnavailable).
+   *
+   * If observer read,
+   * return registrations matching the query in this preference:
+   * 1) Observer registrations, shuffled to disperse queries.
+   * 2) Most recently updated ACTIVE registration
+   * 3) Most recently updated STANDBY registration (if showStandby)
+   * 4) Most recently updated UNAVAILABLE registration (if showUnavailable).
+   *
+   * EXPIRED registrations are ignored.
    *
    * @param request The select query for NN registrations.
    * @param addUnavailable include UNAVAILABLE registrations.
    * @param addExpired include EXPIRED registrations.
+   * @param observerRead  Observer read case, observer NN will be ranked first
    * @return List of memberships or null if no registrations that
    *         both match the query AND the selected states.
    * @throws IOException
    */
   private List<MembershipState> getRecentRegistrationForQuery(
       GetNamenodeRegistrationsRequest request, boolean addUnavailable,
-      boolean addExpired) throws IOException {
+      boolean addExpired, boolean observerRead) throws IOException {
 
     // Retrieve a list of all registrations that match this query.
     // This may include all NN records for a namespace/blockpool, including
@@ -371,24 +400,34 @@ public class MembershipNamenodeResolver
         membershipStore.getNamenodeRegistrations(request);
 
     List<MembershipState> memberships = response.getNamenodeMemberships();
-    if (!addExpired || !addUnavailable) {
-      Iterator<MembershipState> iterator = memberships.iterator();
-      while (iterator.hasNext()) {
-        MembershipState membership = iterator.next();
-        if (membership.getState() == EXPIRED && !addExpired) {
-          iterator.remove();
-        } else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
-          iterator.remove();
-        }
+    List<MembershipState> observerMemberships = new ArrayList<>();
+    Iterator<MembershipState> iterator = memberships.iterator();
+    while (iterator.hasNext()) {
+      MembershipState membership = iterator.next();
+      if (membership.getState() == EXPIRED && !addExpired) {
+        iterator.remove();
+      } else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
+        iterator.remove();
+      } else if (membership.getState() == OBSERVER && observerRead) {
+        iterator.remove();
+        observerMemberships.add(membership);
       }
     }
 
-    List<MembershipState> priorityList = new ArrayList<>();
-    priorityList.addAll(memberships);
-    Collections.sort(priorityList, new NamenodePriorityComparator());
+    memberships.sort(new NamenodePriorityComparator());
+    if(observerRead) {
+      List<MembershipState> ret = new ArrayList<>(
+          memberships.size() + observerMemberships.size());
+      if(observerMemberships.size() > 1) {
+        Collections.shuffle(observerMemberships);
+      }
+      ret.addAll(observerMemberships);
+      ret.addAll(memberships);
+      memberships = ret;
+    }
 
-    LOG.debug("Selected most recent NN {} for query", priorityList);
-    return priorityList;
+    LOG.debug("Selected most recent NN {} for query", memberships);
+    return memberships;
   }
 
   @Override

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java

@@ -218,7 +218,7 @@ public class ConnectionPool {
   }
 
   /**
-   * Get the alignment context for this pool
+   * Get the alignment context for this pool.
    * @return Alignment context
    */
   public PoolAlignmentContext getPoolAlignmentContext() {

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -191,6 +191,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       FEDERATION_STORE_PREFIX + "enable";
   public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
 
+  public static final String DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY =
+      FEDERATION_ROUTER_PREFIX + "observer.read.default";
+  public static final boolean DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE = false;
+  public static final String DFS_ROUTER_OBSERVER_READ_OVERRIDES =
+      FEDERATION_ROUTER_PREFIX + "observer.read.overrides";
+
   public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE =
       FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
   public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -1918,7 +1918,10 @@ public class RouterClientProtocol implements ClientProtocol {
 
   @Override
   public void msync() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    RemoteMethod method = new RemoteMethod("msync");
+    rpcClient.invokeConcurrent(nss, method);
   }
 
   @Override

+ 105 - 45
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -70,16 +71,19 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
 import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.eclipse.jetty.util.ajax.JSON;
@@ -128,6 +132,10 @@ public class RouterRpcClient {
   private final RouterRpcMonitor rpcMonitor;
   /** Field separator of CallerContext. */
   private final String contextFieldSeparator;
+  /** Observer read enabled. Default for all nameservices. */
+  private final boolean observerReadEnabledDefault;
+  /** Nameservice specific overrides of the default setting for enabling observer reads. */
+  private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
 
   /** Pattern to parse a stack trace line. */
   private static final Pattern STACK_TRACE_PATTERN =
@@ -200,6 +208,16 @@ public class RouterRpcClient {
         failoverSleepBaseMillis, failoverSleepMaxMillis);
     String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
     this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0;
+    this.observerReadEnabledDefault = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
+    String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
+    if (observerReadOverrides != null) {
+      observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
+    }
+    if (this.observerReadEnabledDefault) {
+      LOG.info("Observer read is enabled for router.");
+    }
   }
 
   /**
@@ -451,6 +469,7 @@ public class RouterRpcClient {
    * @param ugi User group information.
    * @param namenodes A prioritized list of namenodes within the same
    *                  nameservice.
+   * @param useObserver Whether to use observer namenodes.
    * @param method Remote ClientProtocol method to invoke.
    * @param params Variable list of parameters matching the method.
    * @return The result of invoking the method.
@@ -462,6 +481,7 @@ public class RouterRpcClient {
   public Object invokeMethod(
       final UserGroupInformation ugi,
       final List<? extends FederationNamenodeContext> namenodes,
+      boolean useObserver,
       final Class<?> protocol, final Method method, final Object... params)
           throws ConnectException, StandbyException, IOException {
 
@@ -478,8 +498,12 @@ public class RouterRpcClient {
       rpcMonitor.proxyOp();
     }
     boolean failover = false;
+    boolean shouldUseObserver = useObserver;
     Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
     for (FederationNamenodeContext namenode : namenodes) {
+      if (!shouldUseObserver && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
+        continue;
+      }
       ConnectionContext connection = null;
       String nsId = namenode.getNameserviceId();
       String rpcAddress = namenode.getRpcAddress();
@@ -489,13 +513,14 @@ public class RouterRpcClient {
         final Object proxy = client.getProxy();
 
         ret = invoke(nsId, 0, method, proxy, params);
-        if (failover) {
+        if (failover &&
+            FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
           // Success on alternate server, update
           InetSocketAddress address = client.getAddress();
           namenodeResolver.updateActiveNamenode(nsId, address);
         }
         if (this.rpcMonitor != null) {
-          this.rpcMonitor.proxyOpComplete(true, nsId);
+          this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
         }
         if (this.router.getRouterClientMetrics() != null) {
           this.router.getRouterClientMetrics().incInvokedMethod(method);
@@ -503,7 +528,11 @@ public class RouterRpcClient {
         return ret;
       } catch (IOException ioe) {
         ioes.put(namenode, ioe);
-        if (ioe instanceof StandbyException) {
+        if (ioe instanceof ObserverRetryOnActiveException) {
+          LOG.info("Encountered ObserverRetryOnActiveException from {}."
+                  + " Retry active namenode directly.", namenode);
+          shouldUseObserver = false;
+        } else if (ioe instanceof StandbyException) {
           // Fail over indicated by retry policy and/or NN
           if (this.rpcMonitor != null) {
             this.rpcMonitor.proxyOpFailureStandby(nsId);
@@ -513,10 +542,15 @@ public class RouterRpcClient {
           if (this.rpcMonitor != null) {
             this.rpcMonitor.proxyOpFailureCommunicate(nsId);
           }
-          failover = true;
+          if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) {
+            namenodeResolver.updateUnavailableNamenode(nsId,
+                NetUtils.createSocketAddr(namenode.getRpcAddress()));
+          } else {
+            failover = true;
+          }
         } else if (ioe instanceof RemoteException) {
           if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpComplete(true, nsId);
+            this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
           }
           RemoteException re = (RemoteException) ioe;
           ioe = re.unwrapRemoteException();
@@ -546,7 +580,7 @@ public class RouterRpcClient {
           // Communication retries are handled by the retry policy
           if (this.rpcMonitor != null) {
             this.rpcMonitor.proxyOpFailureCommunicate(nsId);
-            this.rpcMonitor.proxyOpComplete(false, nsId);
+            this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState());
           }
           throw ioe;
         }
@@ -557,7 +591,7 @@ public class RouterRpcClient {
       }
     }
     if (this.rpcMonitor != null) {
-      this.rpcMonitor.proxyOpComplete(false, null);
+      this.rpcMonitor.proxyOpComplete(false, null, null);
     }
 
     // All namenodes were unavailable or in standby
@@ -640,16 +674,12 @@ public class RouterRpcClient {
    * @param params Variable parameters
    * @return Response from the remote server
    * @throws IOException
-   * @throws InterruptedException
    */
   private Object invoke(String nsId, int retryCount, final Method method,
       final Object obj, final Object... params) throws IOException {
     try {
       return method.invoke(obj, params);
-    } catch (IllegalAccessException e) {
-      LOG.error("Unexpected exception while proxying API", e);
-      return null;
-    } catch (IllegalArgumentException e) {
+    } catch (IllegalAccessException | IllegalArgumentException e) {
       LOG.error("Unexpected exception while proxying API", e);
       return null;
     } catch (InvocationTargetException e) {
@@ -713,7 +743,7 @@ public class RouterRpcClient {
    */
   private boolean isClusterUnAvailable(String nsId) throws IOException {
     List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
-        .getNamenodesForNameserviceId(nsId);
+        .getNamenodesForNameserviceId(nsId, false);
 
     if (nnState != null) {
       for (FederationNamenodeContext nnContext : nnState) {
@@ -844,13 +874,13 @@ public class RouterRpcClient {
     RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
     acquirePermit(nsId, ugi, method, controller);
     try {
-      List<? extends FederationNamenodeContext> nns =
-          getNamenodesForNameservice(nsId);
+      boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
+      List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
       RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
       Class<?> proto = method.getProtocol();
       Method m = method.getMethod();
       Object[] params = method.getParams(loc);
-      return invokeMethod(ugi, nns, proto, m, params);
+      return invokeMethod(ugi, nns, isObserverRead, proto, m, params);
     } finally {
       releasePermit(nsId, ugi, method, controller);
     }
@@ -927,7 +957,7 @@ public class RouterRpcClient {
    * @throws IOException if the success condition is not met and one of the RPC
    *           calls generated a remote exception.
    */
-  public Object invokeSequential(
+  public <T> T invokeSequential(
       final List<? extends RemoteLocationContext> locations,
       final RemoteMethod remoteMethod) throws IOException {
     return invokeSequential(locations, remoteMethod, null, null);
@@ -1012,12 +1042,14 @@ public class RouterRpcClient {
     for (final RemoteLocationContext loc : locations) {
       String ns = loc.getNameserviceId();
       acquirePermit(ns, ugi, remoteMethod, controller);
+      boolean isObserverRead = isObserverReadEligible(ns, m);
       List<? extends FederationNamenodeContext> namenodes =
-          getNamenodesForNameservice(ns);
+          getOrderedNamenodes(ns, isObserverRead);
       try {
         Class<?> proto = remoteMethod.getProtocol();
         Object[] params = remoteMethod.getParams(loc);
-        Object result = invokeMethod(ugi, namenodes, proto, m, params);
+        Object result = invokeMethod(
+            ugi, namenodes, isObserverRead, proto, m, params);
         // Check if the result is what we expected
         if (isExpectedClass(expectedResultClass, result) &&
             isExpectedValue(expectedResultValue, result)) {
@@ -1373,12 +1405,14 @@ public class RouterRpcClient {
       String ns = location.getNameserviceId();
       RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
       acquirePermit(ns, ugi, method, controller);
+      boolean isObserverRead = isObserverReadEligible(ns, m);
       final List<? extends FederationNamenodeContext> namenodes =
-          getNamenodesForNameservice(ns);
+          getOrderedNamenodes(ns, isObserverRead);
       try {
         Class<?> proto = method.getProtocol();
         Object[] paramList = method.getParams(location);
-        R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList);
+        R result = (R) invokeMethod(
+            ugi, namenodes, isObserverRead, proto, m, paramList);
         RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
         return Collections.singletonList(remoteResult);
       } catch (IOException ioe) {
@@ -1396,8 +1430,9 @@ public class RouterRpcClient {
     final CallerContext originContext = CallerContext.getCurrent();
     for (final T location : locations) {
       String nsId = location.getNameserviceId();
+      boolean isObserverRead = isObserverReadEligible(nsId, m);
       final List<? extends FederationNamenodeContext> namenodes =
-          getNamenodesForNameservice(nsId);
+          getOrderedNamenodes(nsId, isObserverRead);
       final Class<?> proto = method.getProtocol();
       final Object[] paramList = method.getParams(location);
       if (standby) {
@@ -1414,7 +1449,8 @@ public class RouterRpcClient {
           callables.add(
               () -> {
                 transferThreadLocalContext(originCall, originContext);
-                return invokeMethod(ugi, nnList, proto, m, paramList);
+                return invokeMethod(
+                    ugi, nnList, isObserverRead, proto, m, paramList);
               });
         }
       } else {
@@ -1423,7 +1459,8 @@ public class RouterRpcClient {
         callables.add(
             () -> {
               transferThreadLocalContext(originCall, originContext);
-              return invokeMethod(ugi, namenodes, proto, m, paramList);
+              return invokeMethod(
+                  ugi, namenodes, isObserverRead, proto, m, paramList);
             });
       }
     }
@@ -1512,27 +1549,6 @@ public class RouterRpcClient {
     CallerContext.setCurrent(originContext);
   }
 
-  /**
-   * Get a prioritized list of NNs that share the same nameservice ID (in the
-   * same namespace). NNs that are reported as ACTIVE will be first in the list.
-   *
-   * @param nsId The nameservice ID for the namespace.
-   * @return A prioritized list of NNs to use for communication.
-   * @throws IOException If a NN cannot be located for the nameservice ID.
-   */
-  private List<? extends FederationNamenodeContext> getNamenodesForNameservice(
-      final String nsId) throws IOException {
-
-    final List<? extends FederationNamenodeContext> namenodes =
-        namenodeResolver.getNamenodesForNameserviceId(nsId);
-
-    if (namenodes == null || namenodes.isEmpty()) {
-      throw new IOException("Cannot locate a registered namenode for " + nsId +
-          " from " + router.getRouterId());
-    }
-    return namenodes;
-  }
-
   /**
    * Get a prioritized list of NNs that share the same block pool ID (in the
    * same namespace). NNs that are reported as ACTIVE will be first in the list.
@@ -1670,4 +1686,48 @@ public class RouterRpcClient {
     }
     return null;
   }
+
+  /**
+   * Get a prioritized list of NNs that share the same nameservice ID (in the
+   * same namespace).
+   * In observer read case, OBSERVER NNs will be first in the list.
+   * Otherwise, ACTIVE NNs will be first in the list.
+   *
+   * @param nsId The nameservice ID for the namespace.
+   * @param isObserverRead Read on observer namenode.
+   * @return A prioritized list of NNs to use for communication.
+   * @throws IOException If a NN cannot be located for the nameservice ID.
+   */
+  private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
+      boolean isObserverRead) throws IOException {
+    final List<? extends FederationNamenodeContext> namenodes;
+
+    if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) {
+      namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead);
+    } else {
+      namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false);
+    }
+
+    if (namenodes == null || namenodes.isEmpty()) {
+      throw new IOException("Cannot locate a registered namenode for " + nsId +
+          " from " + router.getRouterId());
+    }
+    return namenodes;
+  }
+
+  private boolean isObserverReadEligible(String nsId, Method method) {
+    boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
+    return isReadEnabledForNamespace && isReadCall(method);
+  }
+
+  /**
+   * Check if a method is read-only.
+   * @return whether the 'method' is a read-only operation.
+   */
+  private static boolean isReadCall(Method method) {
+    if (!method.isAnnotationPresent(ReadOnly.class)) {
+      return false;
+    }
+    return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
+  }
 }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 
 /**
@@ -61,8 +62,9 @@ public interface RouterRpcMonitor {
   /**
    * Mark a proxy operation as completed.
    * @param success If the operation was successful.
+   * @param state proxy namenode state.
    */
-  void proxyOpComplete(boolean success, String nsId);
+  void proxyOpComplete(boolean success, String nsId, FederationNamenodeServiceState state);
 
   /**
    * Failed to proxy an operation to a Namenode because it was in standby.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -1331,7 +1331,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     clientProto.modifyAclEntries(src, aclSpec);
   }
 
-  @Override // ClienProtocol
+  @Override // ClientProtocol
   public void removeAclEntries(String src, List<AclEntry> aclSpec)
       throws IOException {
     clientProto.removeAclEntries(src, aclSpec);

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -835,6 +835,25 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.observer.read.default</name>
+    <value>false</value>
+    <description>
+      Whether observer reads are enabled. This is a default for all nameservices.
+      The default can be inverted for individual namespace by adding them to
+      dfs.federation.router.observer.read.overrides.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.observer.read.overrides</name>
+    <value/>
+    <description>
+      Commas separated list of namespaces for which to invert the default configuration,
+      dfs.federation.router.observer.read.default, for whether to enable observer reads.
+    </description>
+  </property>
+
   <property>
     <name>dfs.federation.router.observer.federated.state.propagation.maxsize</name>
     <value>5</value>

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java

@@ -175,7 +175,7 @@ public final class FederationTestUtils {
     GenericTestUtils.waitFor(() -> {
       try {
         List<? extends FederationNamenodeContext> namenodes =
-            resolver.getNamenodesForNameserviceId(nsId);
+            resolver.getNamenodesForNameserviceId(nsId, false);
         if (namenodes != null) {
           for (FederationNamenodeContext namenode : namenodes) {
             // Check if this is the Namenode we are checking
@@ -207,7 +207,7 @@ public final class FederationTestUtils {
     GenericTestUtils.waitFor(() -> {
       try {
         List<? extends FederationNamenodeContext> nns =
-            resolver.getNamenodesForNameserviceId(nsId);
+            resolver.getNamenodesForNameserviceId(nsId, false);
         for (FederationNamenodeContext nn : nns) {
           if (nn.getState().equals(state)) {
             return true;

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java

@@ -806,6 +806,7 @@ public class MiniRouterDFSCluster {
           .numDataNodes(numDNs)
           .nnTopology(topology)
           .dataNodeConfOverlays(dnConfs)
+          .checkExitOnShutdown(false)
           .storageTypes(storageTypes)
           .racks(racks)
           .build();
@@ -1038,6 +1039,27 @@ public class MiniRouterDFSCluster {
     }
   }
 
+  /**
+   * Switch a namenode in a nameservice to be the observer.
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   */
+  public void switchToObserver(String nsId, String nnId) {
+    try {
+      int total = cluster.getNumNameNodes();
+      NameNodeInfo[] nns = cluster.getNameNodeInfos();
+      for (int i = 0; i < total; i++) {
+        NameNodeInfo nn = nns[i];
+        if (nn.getNameserviceId().equals(nsId) &&
+            nn.getNamenodeId().equals(nnId)) {
+          cluster.transitionToObserver(i);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Cannot transition to active", e);
+    }
+  }
+
   /**
    * Stop the federated HDFS cluster.
    */

+ 44 - 6
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java

@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -119,12 +120,24 @@ public class MockResolver
     disableRegistration = isDisable;
   }
 
+  @Override public void updateUnavailableNamenode(String ns,
+      InetSocketAddress failedAddress) throws IOException {
+    updateNameNodeState(ns, failedAddress,
+        FederationNamenodeServiceState.UNAVAILABLE);
+  }
+
   @Override
   public void updateActiveNamenode(
       String nsId, InetSocketAddress successfulAddress) {
+    updateNameNodeState(nsId, successfulAddress,
+        FederationNamenodeServiceState.ACTIVE);
+  }
 
-    String address = successfulAddress.getHostName() + ":" +
-        successfulAddress.getPort();
+  private void updateNameNodeState(String nsId,
+      InetSocketAddress iAddr,
+      FederationNamenodeServiceState state) {
+    String sAddress = iAddr.getHostName() + ":" +
+        iAddr.getPort();
     String key = nsId;
     if (key != null) {
       // Update the active entry
@@ -132,9 +145,9 @@ public class MockResolver
       List<FederationNamenodeContext> namenodes =
           (List<FederationNamenodeContext>) this.resolver.get(key);
       for (FederationNamenodeContext namenode : namenodes) {
-        if (namenode.getRpcAddress().equals(address)) {
+        if (namenode.getRpcAddress().equals(sAddress)) {
           MockNamenodeContext nn = (MockNamenodeContext) namenode;
-          nn.setState(FederationNamenodeServiceState.ACTIVE);
+          nn.setState(state);
           break;
         }
       }
@@ -147,14 +160,39 @@ public class MockResolver
 
   @Override
   public synchronized List<? extends FederationNamenodeContext>
-      getNamenodesForNameserviceId(String nameserviceId) {
+      getNamenodesForNameserviceId(String nameserviceId, boolean observerRead) {
     // Return a copy of the list because it is updated periodically
     List<? extends FederationNamenodeContext> namenodes =
         this.resolver.get(nameserviceId);
     if (namenodes == null) {
       namenodes = new ArrayList<>();
     }
-    return Collections.unmodifiableList(new ArrayList<>(namenodes));
+
+    List<FederationNamenodeContext> ret = new ArrayList<>();
+
+    if (observerRead) {
+      Iterator<? extends FederationNamenodeContext> iterator = namenodes
+          .iterator();
+      List<FederationNamenodeContext> observerNN = new ArrayList<>();
+      List<FederationNamenodeContext> nonObserverNN = new ArrayList<>();
+      while (iterator.hasNext()) {
+        FederationNamenodeContext membership = iterator.next();
+        if (membership.getState() == FederationNamenodeServiceState.OBSERVER) {
+          observerNN.add(membership);
+        } else {
+          nonObserverNN.add(membership);
+        }
+      }
+      Collections.shuffle(observerNN);
+      Collections.sort(nonObserverNN, new NamenodePriorityComparator());
+      ret.addAll(observerNN);
+      ret.addAll(nonObserverNN);
+    } else {
+      ret.addAll(namenodes);
+      Collections.sort(ret, new NamenodePriorityComparator());
+    }
+
+    return Collections.unmodifiableList(ret);
   }
 
   @Override

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java

@@ -161,7 +161,8 @@ public class TestRouterRefreshFairnessPolicyController {
       Thread.sleep(sleepTime);
       return null;
     }).when(client)
-        .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        .invokeMethod(Mockito.any(), Mockito.any(), Mockito.anyBoolean(),
+            Mockito.any(), Mockito.any(), Mockito.any());
 
     // No calls yet
     assertEquals("{}",

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java

@@ -129,7 +129,7 @@ public class TestNamenodeResolver {
       int resultsCount, FederationNamenodeServiceState state)
           throws IOException {
     List<? extends FederationNamenodeContext> namenodes =
-        namenodeResolver.getNamenodesForNameserviceId(nsId);
+        namenodeResolver.getNamenodesForNameserviceId(nsId, false);
     if (resultsCount == 0) {
       assertNull(namenodes);
     } else {
@@ -291,8 +291,8 @@ public class TestNamenodeResolver {
             HAServiceState.STANDBY)));
     stateStore.refreshCaches(true);
     // Check whether the namenpde state is reported correct as standby.
-    FederationNamenodeContext namenode =
-        namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
+    FederationNamenodeContext namenode = namenodeResolver
+        .getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0);
     assertEquals(FederationNamenodeServiceState.STANDBY, namenode.getState());
     String rpcAddr = namenode.getRpcAddress();
     InetSocketAddress inetAddr = getInetSocketAddress(rpcAddr);
@@ -301,8 +301,8 @@ public class TestNamenodeResolver {
     // RouterRpcClient calls updateActiveNamenode to update the state to active,
     // Check whether correct updated state is returned post update.
     namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
-    FederationNamenodeContext namenode1 =
-        namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
+    FederationNamenodeContext namenode1 = namenodeResolver
+        .getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0);
     assertEquals("The namenode state should be ACTIVE post update.",
         FederationNamenodeServiceState.ACTIVE, namenode1.getState());
   }
@@ -318,8 +318,8 @@ public class TestNamenodeResolver {
 
     InetSocketAddress inetAddr = getInetSocketAddress(rpcAddress);
     namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
-    FederationNamenodeContext namenode =
-        namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
+    FederationNamenodeContext namenode = namenodeResolver
+        .getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0);
     assertEquals("The namenode state should be ACTIVE post update.",
         FederationNamenodeServiceState.ACTIVE, namenode.getState());
   }

+ 425 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestObserverWithRouter {
+
+  private MiniRouterDFSCluster cluster;
+
+  public void startUpCluster(int numberOfObserver) throws Exception {
+    startUpCluster(numberOfObserver, null);
+  }
+
+  public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
+    int numberOfNamenode = 2 + numberOfObserver;
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
+    if (confOverrides != null) {
+      conf.addResource(confOverrides);
+    }
+    cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode);
+    cluster.addNamenodeOverrides(conf);
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+        for (int i = 2; i < numberOfNamenode; i++) {
+          cluster.switchToObserver(ns, NAMENODES[i]);
+        }
+      }
+    }
+
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+
+    cluster.addRouterOverrides(conf);
+    cluster.addRouterOverrides(routerConf);
+
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    // Setup the mount table
+    cluster.installMockLocations();
+
+    cluster.waitActiveNamespaces();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testObserverRead() throws Exception {
+    startUpCluster(1);
+    RouterContext routerContext = cluster.getRandomRouter();
+    List<? extends FederationNamenodeContext> namenodes = routerContext
+        .getRouter().getNamenodeResolver()
+        .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
+    assertEquals("First namenode should be observer", namenodes.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    FileSystem fileSystem = routerContext.getFileSystem();
+    Path path = new Path("/testFile");
+    // Send Create call to active
+    fileSystem.create(path).close();
+
+    // Send read request to observer
+    fileSystem.open(path).close();
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    // Create and complete calls should be sent to active
+    assertEquals("Two calls should be sent to active", 2, rpcCountForActive);
+
+    long rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+    // getBlockLocations should be sent to observer
+    assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
+    fileSystem.close();
+  }
+
+  @Test
+  public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
+    Configuration confOverrides = new Configuration(false);
+    confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
+    startUpCluster(1, confOverrides);
+    RouterContext routerContext = cluster.getRandomRouter();
+    List<? extends FederationNamenodeContext> namenodes = routerContext
+        .getRouter().getNamenodeResolver()
+        .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
+    assertEquals("First namenode should be observer", namenodes.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    FileSystem fileSystem = routerContext.getFileSystem();
+    Path path = new Path("/testFile");
+    // Send Create call to active
+    fileSystem.create(path).close();
+
+    // Send read request to observer. The router will msync to the active namenode.
+    fileSystem.open(path).close();
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    // Create, complete and getBlockLocations calls should be sent to active
+    assertEquals("Three calls should be sent to active", 3, rpcCountForActive);
+
+    long rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+    assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
+    fileSystem.close();
+  }
+
+  @Test
+  public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
+    // Disable observer reads using per-nameservice override
+    Configuration confOverrides = new Configuration(false);
+    confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
+    startUpCluster(1, confOverrides);
+
+    RouterContext routerContext = cluster.getRandomRouter();
+    FileSystem fileSystem = routerContext.getFileSystem();
+    Path path = new Path("/testFile");
+    fileSystem.create(path).close();
+    fileSystem.open(path).close();
+    fileSystem.close();
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    // Create, complete and read calls should be sent to active
+    assertEquals("Three calls should be sent to active", 3, rpcCountForActive);
+
+    long rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+    assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver);
+  }
+
+  @Test
+  public void testReadWhenObserverIsDown() throws Exception {
+    startUpCluster(1);
+    RouterContext routerContext = cluster.getRandomRouter();
+    FileSystem fileSystem = routerContext.getFileSystem();
+    Path path = new Path("/testFile1");
+    // Send Create call to active
+    fileSystem.create(path).close();
+
+    // Stop observer NN
+    int nnIndex = stopObserver(1);
+
+    assertNotEquals("No observer found", 3, nnIndex);
+
+    // Send read request
+    fileSystem.open(path).close();
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    // Create, complete and getBlockLocation calls should be sent to active
+    assertEquals("Three calls should be sent to active", 3,
+        rpcCountForActive);
+
+    long rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+    assertEquals("No call should send to observer", 0,
+        rpcCountForObserver);
+    fileSystem.close();
+  }
+
+  @Test
+  public void testMultipleObserver() throws Exception {
+    startUpCluster(2);
+    RouterContext routerContext = cluster.getRandomRouter();
+    FileSystem fileSystem = routerContext.getFileSystem();
+    Path path = new Path("/testFile1");
+    // Send Create call to active
+    fileSystem.create(path).close();
+
+    // Stop one observer NN
+    stopObserver(1);
+
+    // Send read request
+    fileSystem.open(path).close();
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+
+    long expectedActiveRpc = 2;
+    long expectedObserverRpc = 1;
+
+    // Create and complete calls should be sent to active
+    assertEquals("Two calls should be sent to active",
+        expectedActiveRpc, rpcCountForActive);
+
+    long rpcCountForObserver = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics().getObserverProxyOps();
+    // getBlockLocation call should send to observer
+    assertEquals("Read should be success with another observer",
+        expectedObserverRpc, rpcCountForObserver);
+
+    // Stop one observer NN
+    stopObserver(1);
+
+    // Send read request
+    fileSystem.open(path).close();
+
+    rpcCountForActive = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics().getActiveProxyOps();
+
+    // getBlockLocation call should be sent to active
+    expectedActiveRpc += 1;
+    assertEquals("One call should be sent to active", expectedActiveRpc,
+        rpcCountForActive);
+    expectedObserverRpc += 0;
+    rpcCountForObserver = routerContext.getRouter()
+        .getRpcServer().getRPCMetrics().getObserverProxyOps();
+    assertEquals("No call should send to observer",
+        expectedObserverRpc, rpcCountForObserver);
+    fileSystem.close();
+  }
+
+  private int stopObserver(int num) {
+    int nnIndex;
+    for (nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) {
+      NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
+      if (nameNode != null && nameNode.isObserverState()) {
+        cluster.getCluster().shutdownNameNode(nnIndex);
+        num--;
+        if (num == 0) {
+          break;
+        }
+      }
+    }
+    return nnIndex;
+  }
+
+  // test router observer with multiple to know which observer NN received
+  // requests
+  @Test
+  public void testMultipleObserverRouter() throws Exception {
+    StateStoreDFSCluster innerCluster;
+    RouterContext routerContext;
+    MembershipNamenodeResolver resolver;
+
+    String ns0;
+    String ns1;
+    //create 4NN, One Active One Standby and Two Observers
+    innerCluster = new StateStoreDFSCluster(true, 4, 4, TimeUnit.SECONDS.toMillis(5),
+        TimeUnit.SECONDS.toMillis(5));
+    Configuration routerConf =
+        new RouterConfigBuilder().stateStore().admin().rpc()
+            .enableLocalHeartbeat(true).heartbeat().build();
+
+    StringBuilder sb = new StringBuilder();
+    ns0 = innerCluster.getNameservices().get(0);
+    MiniRouterDFSCluster.NamenodeContext context =
+        innerCluster.getNamenodes(ns0).get(1);
+    routerConf.set(DFS_NAMESERVICE_ID, ns0);
+    routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
+
+    // Specify namenodes (ns1.nn0,ns1.nn1) to monitor
+    ns1 = innerCluster.getNameservices().get(1);
+    for (MiniRouterDFSCluster.NamenodeContext ctx : innerCluster.getNamenodes(ns1)) {
+      String suffix = ctx.getConfSuffix();
+      if (sb.length() != 0) {
+        sb.append(",");
+      }
+      sb.append(suffix);
+    }
+    routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
+    routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
+    routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
+
+    innerCluster.addNamenodeOverrides(routerConf);
+    innerCluster.addRouterOverrides(routerConf);
+    innerCluster.startCluster();
+
+    if (innerCluster.isHighAvailability()) {
+      for (String ns : innerCluster.getNameservices()) {
+        innerCluster.switchToActive(ns, NAMENODES[0]);
+        innerCluster.switchToStandby(ns, NAMENODES[1]);
+        for (int i = 2; i < 4; i++) {
+          innerCluster.switchToObserver(ns, NAMENODES[i]);
+        }
+      }
+    }
+    innerCluster.startRouters();
+    innerCluster.waitClusterUp();
+
+    routerContext = innerCluster.getRandomRouter();
+    resolver = (MembershipNamenodeResolver) routerContext.getRouter()
+        .getNamenodeResolver();
+
+    resolver.loadCache(true);
+    List<? extends FederationNamenodeContext> namespaceInfo0 =
+        resolver.getNamenodesForNameserviceId(ns0, true);
+    List<? extends FederationNamenodeContext> namespaceInfo1 =
+        resolver.getNamenodesForNameserviceId(ns1, true);
+    assertEquals(namespaceInfo0.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    assertEquals(namespaceInfo0.get(1).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    assertNotEquals(namespaceInfo0.get(0).getNamenodeId(),
+        namespaceInfo0.get(1).getNamenodeId());
+    assertEquals(namespaceInfo1.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+  }
+
+  @Test
+  public void testUnavailableObserverNN() throws Exception {
+    startUpCluster(2);
+    RouterContext routerContext = cluster.getRandomRouter();
+    FileSystem fileSystem = routerContext.getFileSystem();
+
+    stopObserver(2);
+
+    Path path = new Path("/testFile");
+    // Send Create call to active
+    fileSystem.create(path).close();
+
+    // Send read request.
+    fileSystem.open(path).close();
+
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+
+    // Create, complete and getBlockLocations
+    // calls should be sent to active.
+    assertEquals("Three calls should be send to active",
+        3, rpcCountForActive);
+
+
+    boolean hasUnavailable = false;
+    for(String ns : cluster.getNameservices()) {
+      List<? extends FederationNamenodeContext> nns = routerContext.getRouter()
+          .getNamenodeResolver().getNamenodesForNameserviceId(ns, false);
+      for(FederationNamenodeContext nn : nns) {
+        if(FederationNamenodeServiceState.UNAVAILABLE == nn.getState()) {
+          hasUnavailable = true;
+        }
+      }
+    }
+    // After attempting to communicate with unavailable observer namenode,
+    // its state is updated to unavailable.
+    assertTrue("There must be unavailable namenodes", hasUnavailable);
+  }
+
+  @Test
+  public void testRouterMsync() throws Exception {
+    startUpCluster(1);
+    RouterContext routerContext = cluster.getRandomRouter();
+
+    FileSystem fileSystem = routerContext.getFileSystem();
+    Path path = new Path("/testFile");
+
+    // Send Create call to active
+    fileSystem.create(path).close();
+    long rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    // Create and complete calls should be sent to active
+    assertEquals("Two calls should be sent to active", 2,
+        rpcCountForActive);
+
+    // Send msync
+    fileSystem.msync();
+    rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+    // 2 msync calls should be sent. One to each active namenode in the two namespaces.
+    assertEquals("Four calls should be sent to active", 4,
+        rpcCountForActive);
+    fileSystem.close();
+  }
+}

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java

@@ -167,7 +167,7 @@ public class TestRouterNamenodeHeartbeat {
     // Verify the locator has matching NN entries for each NS
     for (String ns : cluster.getNameservices()) {
       List<? extends FederationNamenodeContext> nns =
-          namenodeResolver.getNamenodesForNameserviceId(ns);
+          namenodeResolver.getNamenodesForNameserviceId(ns, false);
 
       // Active
       FederationNamenodeContext active = nns.get(0);
@@ -191,7 +191,7 @@ public class TestRouterNamenodeHeartbeat {
 
     // Verify the locator has recorded the failover for the failover NS
     List<? extends FederationNamenodeContext> failoverNSs =
-        namenodeResolver.getNamenodesForNameserviceId(failoverNS);
+        namenodeResolver.getNamenodesForNameserviceId(failoverNS, false);
     // Active
     FederationNamenodeContext active = failoverNSs.get(0);
     assertEquals(NAMENODES[1], active.getNamenodeId());
@@ -202,7 +202,7 @@ public class TestRouterNamenodeHeartbeat {
 
     // Verify the locator has the same records for the other ns
     List<? extends FederationNamenodeContext> normalNss =
-        namenodeResolver.getNamenodesForNameserviceId(normalNs);
+        namenodeResolver.getNamenodesForNameserviceId(normalNs, false);
     // Active
     active = normalNss.get(0);
     assertEquals(NAMENODES[0], active.getNamenodeId());

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java

@@ -204,7 +204,7 @@ public class TestRouterNamenodeMonitoring {
     final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
     for (String nsId : nns.keySet()) {
       List<? extends FederationNamenodeContext> nnReports =
-          resolver.getNamenodesForNameserviceId(nsId);
+          resolver.getNamenodesForNameserviceId(nsId, false);
       namespaceInfo.addAll(nnReports);
     }
     for (FederationNamenodeContext nnInfo : namespaceInfo) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java

@@ -194,7 +194,7 @@ public class TestRouterNamenodeWebScheme {
     final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
     for (String nsId : nns.keySet()) {
       List<? extends FederationNamenodeContext> nnReports =
-          resolver.getNamenodesForNameserviceId(nsId);
+          resolver.getNamenodesForNameserviceId(nsId, false);
       namespaceInfo.addAll(nnReports);
     }
     for (FederationNamenodeContext nnInfo : namespaceInfo) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java

@@ -166,7 +166,7 @@ public class TestRouterRPCClientRetries {
   private void registerInvalidNameReport() throws IOException {
     String ns0 = cluster.getNameservices().get(0);
     List<? extends FederationNamenodeContext> origin = resolver
-        .getNamenodesForNameserviceId(ns0);
+        .getNamenodesForNameserviceId(ns0, false);
     FederationNamenodeContext nnInfo = origin.get(0);
     NamenodeStatusReport report = new NamenodeStatusReport(ns0,
         nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -2309,6 +2309,8 @@ public class MiniDFSCluster implements AutoCloseable {
     nn.getHttpServer()
         .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
     info.nameNode = nn;
+    info.nameserviceId = info.conf.get(DFS_NAMESERVICE_ID);
+    info.nnId = info.conf.get(DFS_HA_NAMENODE_ID_KEY);
     info.setStartOpt(startOpt);
     if (waitActive) {
       if (numDataNodes > 0) {