Browse Source

HDFS-14035. NN status discovery does not leverage delegation token. Contributed by Chen Liang.

Chen Liang 6 years ago
parent
commit
17fac44040
14 changed files with 158 additions and 58 deletions
  1. 13 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  2. 12 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  3. 27 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  4. 4 29
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
  6. 8 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
  7. 10 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
  8. 2 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
  9. 8 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
  10. 8 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  11. 36 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  12. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  13. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
  14. 20 25
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -96,6 +96,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@@ -3040,4 +3041,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public void msync() throws IOException {
     namenode.msync();
   }
+
+  /**
+   * An unblocking call to get the HA service state of NameNode.
+   *
+   * @return HA state of NameNode
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    return namenode.getHAServiceState();
+  }
 }

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -1514,7 +1515,7 @@ public interface ClientProtocol {
    * @throws IOException see specific implementation
    */
   @Idempotent
-  @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call
+  @ReadOnly(isCoordinated = true)
   void checkAccess(String path, FsAction mode) throws IOException;
 
   /**
@@ -1566,6 +1567,16 @@ public interface ClientProtocol {
   @ReadOnly(isCoordinated = true)
   BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
 
+  /**
+   * Get HA service state of the server.
+   *
+   * @return server HA state
+   * @throws IOException
+   */
+  @Idempotent
+  @ReadOnly
+  HAServiceProtocol.HAServiceState getHAServiceState() throws IOException;
+
   /**
    * Called by client to wait until the server has reached the state id of the
    * client. The client and server state id are given by client side and server

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -126,6 +128,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
@@ -1616,4 +1619,28 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    HAServiceStateRequestProto req =
+        HAServiceStateRequestProto.newBuilder().build();
+    try {
+      HAServiceStateProto res =
+          rpcProxy.getHAServiceState(null, req).getState();
+      switch(res) {
+      case ACTIVE:
+        return HAServiceProtocol.HAServiceState.ACTIVE;
+      case STANDBY:
+        return HAServiceProtocol.HAServiceState.STANDBY;
+      case OBSERVER:
+        return HAServiceProtocol.HAServiceState.OBSERVER;
+      case INITIALIZING:
+      default:
+        return HAServiceProtocol.HAServiceState.INITIALIZING;
+      }
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

+ 4 - 29
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java

@@ -28,14 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HAUtilClient;
-import org.apache.hadoop.hdfs.NameNodeProxiesClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -122,44 +119,22 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
      */
     private HAServiceState cachedState;
 
-    /** Proxy for getting HA service status from the given NameNode. */
-    private HAServiceProtocol serviceProxy;
-
-    public NNProxyInfo(InetSocketAddress address, Configuration conf) {
+    public NNProxyInfo(InetSocketAddress address) {
       super(null, address.toString());
       this.address = address;
-      try {
-        serviceProxy = NameNodeProxiesClient
-            .createNonHAProxyWithHAServiceProtocol(address, conf);
-      } catch (IOException ioe) {
-        LOG.error("Failed to create HAServiceProtocol proxy to NameNode" +
-            " at {}", address, ioe);
-        throw new RuntimeException(ioe);
-      }
     }
 
     public InetSocketAddress getAddress() {
       return address;
     }
 
-    public void refreshCachedState() {
-      try {
-        cachedState = serviceProxy.getServiceStatus().getState();
-      } catch (IOException e) {
-        LOG.warn("Failed to connect to {}. Setting cached state to Standby",
-            address, e);
-        cachedState = HAServiceState.STANDBY;
-      }
+    public void setCachedState(HAServiceState state) {
+      cachedState = state;
     }
 
     public HAServiceState getCachedState() {
       return cachedState;
     }
-
-    @VisibleForTesting
-    public void setServiceProxyForTesting(HAServiceProtocol proxy) {
-      this.serviceProxy = proxy;
-    }
   }
 
   @Override
@@ -202,7 +177,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
 
     Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
     for (InetSocketAddress address : addressesOfNns) {
-      proxies.add(new NNProxyInfo<T>(address, conf));
+      proxies.add(new NNProxyInfo<T>(address));
     }
     // Randomize the list to prevent all clients pointing to the same one
     boolean randomized = getRandomOrder(conf, uri);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java

@@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends
   public IPFailoverProxyProvider(Configuration conf, URI uri,
       Class<T> xface, HAProxyFactory<T> factory) {
     super(conf, uri, xface, factory);
-    this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf);
+    this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri));
   }
 
   @Override

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java

@@ -211,7 +211,14 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     currentProxy = null;
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.refreshCachedState();
+    try {
+      HAServiceState state = currentProxy.proxy.getHAServiceState();
+      currentProxy.setCachedState(state);
+    } catch (IOException e) {
+      LOG.info("Failed to connect to {}. Setting cached state to Standby",
+          currentProxy.getAddress(), e);
+      currentProxy.setCachedState(HAServiceState.STANDBY);
+    }
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto

@@ -34,6 +34,7 @@ import "acl.proto";
 import "xattr.proto";
 import "encryption.proto";
 import "inotify.proto";
+import "HAServiceProtocol.proto";
 
 /**
  * The ClientNamenodeProtocol Service defines the interface between a client 
@@ -768,6 +769,13 @@ message MsyncRequestProto {
 message MsyncResponseProto {
 }
 
+message HAServiceStateRequestProto {
+}
+
+message HAServiceStateResponseProto {
+  required hadoop.common.HAServiceStateProto state = 1;
+}
+
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
@@ -924,4 +932,6 @@ service ClientNamenodeProtocol {
       returns(ListOpenFilesResponseProto);
   rpc msync(MsyncRequestProto)
       returns(MsyncResponseProto);
+  rpc getHAServiceState(HAServiceStateRequestProto)
+      returns(HAServiceStateResponseProto);
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java

@@ -72,7 +72,8 @@ public class TestReadOnly {
           "getCurrentEditLogTxid",
           "getEditsFromTxid",
           "getQuotaUsage",
-          "msync"
+          "msync",
+          "getHAServiceState"
       )
   );
 

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -1403,6 +1404,13 @@ public class RouterClientProtocol implements ClientProtocol {
         "msync is not supported for router");
   }
 
+  @Override
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+    return null;
+  }
+
   /**
    * Determines combinations of eligible src/dst locations for a rename. A
    * rename cannot change the namespace. Renames are only allowed if there is an

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
@@ -1081,6 +1082,13 @@ public class RouterRpcServer extends AbstractService
     return clientProto.listOpenFiles(prevId);
   }
 
+  @Override
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Router does not support getHAServiceState");
+  }
+
   @Override
   public void msync() throws IOException {
     // TODO revisit if router should support msync

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -25,6 +25,9 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -136,6 +139,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@@ -1598,4 +1603,35 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public HAServiceStateResponseProto getHAServiceState(
+      RpcController controller,
+      HAServiceStateRequestProto request) throws ServiceException {
+    try {
+      HAServiceProtocol.HAServiceState state = server.getHAServiceState();
+      HAServiceStateProto retState;
+      switch (state) {
+      case ACTIVE:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.ACTIVE;
+        break;
+      case STANDBY:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.STANDBY;
+        break;
+      case OBSERVER:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.OBSERVER;
+        break;
+      case INITIALIZING:
+      default:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.INITIALIZING;
+        break;
+      }
+      HAServiceStateResponseProto.Builder builder =
+          HAServiceStateResponseProto.newBuilder();
+      builder.setState(retState);
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -1304,6 +1304,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     // TODO : need to be filled up if needed. May be a no-op here.
   }
 
+  @Override // ClientProtocol
+  public HAServiceState getHAServiceState() throws IOException {
+    checkNNStartup();
+    return nn.getServiceStatus().getState();
+  }
+
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java

@@ -89,6 +89,9 @@ public class TestConsistentReadsObserver {
     // 0 == not completed, 1 == succeeded, -1 == failed
     AtomicInteger readStatus = new AtomicInteger(0);
 
+    // Making an uncoordinated call, which initialize the proxy
+    // to Observer node.
+    dfs.getClient().getHAServiceState();
     dfs.mkdir(testPath, FsPermission.getDefault());
     assertSentTo(0);
 

+ 20 - 25
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java

@@ -27,8 +27,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -47,7 +45,6 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link ObserverReadProxyProvider} under various configurations of
@@ -314,8 +311,8 @@ public class TestObserverReadProxyProvider {
   }
 
   /**
-   * An {@link Answer} used for mocking of {@link ClientProtocol} and
-   * {@link HAServiceProtocol}. Setting the state or unreachability of this
+   * An {@link Answer} used for mocking of {@link ClientProtocol}.
+   * Setting the state or unreachability of this
    * Answer will make the linked ClientProtocol respond as if it was
    * communicating with a NameNode of the corresponding state. It is in Standby
    * state by default.
@@ -330,31 +327,29 @@ public class TestObserverReadProxyProvider {
     private volatile boolean allowReads = false;
 
     private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
-    private HAServiceProtocolAnswer serviceAnswer =
-        new HAServiceProtocolAnswer();
 
-    private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> {
+    private class ClientProtocolAnswer implements Answer<Object> {
       @Override
-      public HAServiceStatus answer(InvocationOnMock invocation)
-          throws Throwable {
-        HAServiceStatus status = mock(HAServiceStatus.class);
-        if (allowReads && allowWrites) {
-          when(status.getState()).thenReturn(HAServiceState.ACTIVE);
-        } else if (allowReads) {
-          when(status.getState()).thenReturn(HAServiceState.OBSERVER);
-        } else {
-          when(status.getState()).thenReturn(HAServiceState.STANDBY);
-        }
-        return status;
-      }
-    }
-
-    private class ClientProtocolAnswer implements Answer<Void> {
-      @Override
-      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         if (unreachable) {
           throw new IOException("Unavailable");
         }
+        // retryActive should be checked before getHAServiceState.
+        // Check getHAServiceState first here only because in test,
+        // it relies read call, which relies on getHAServiceState
+        // to have passed already. May revisit future.
+        if (invocationOnMock.getMethod()
+            .getName().equals("getHAServiceState")) {
+          HAServiceState status;
+          if (allowReads && allowWrites) {
+            status = HAServiceState.ACTIVE;
+          } else if (allowReads) {
+            status = HAServiceState.OBSERVER;
+          } else {
+            status = HAServiceState.STANDBY;
+          }
+          return status;
+        }
         if (retryActive) {
           throw new RemoteException(
               ObserverRetryOnActiveException.class.getCanonicalName(),