Browse Source

HDFS-17514: RBF: Routers should unset cached stateID when namenode does not set stateID in RPC response header. (#6804)

Simbarashe Dzinamarira 11 months ago
parent
commit
6a4f0be854

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java

@@ -64,7 +64,12 @@ public class PoolAlignmentContext implements AlignmentContext {
    */
    */
   @Override
   @Override
   public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
   public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
-    sharedGlobalStateId.accumulate(header.getStateId());
+    if (header.getStateId() == 0 && sharedGlobalStateId.get() > 0) {
+      sharedGlobalStateId.reset();
+      poolLocalStateId.reset();
+    } else {
+      sharedGlobalStateId.accumulate(header.getStateId());
+    }
   }
   }
 
 
   /**
   /**

+ 59 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

@@ -107,10 +107,7 @@ public class TestObserverWithRouter {
   public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
   public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
     int numberOfNamenode = 2 + numberOfObserver;
     int numberOfNamenode = 2 + numberOfObserver;
     Configuration conf = new Configuration(false);
     Configuration conf = new Configuration(false);
-    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
-    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
-    conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
-    conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
+    setConfDefaults(conf);
     if (confOverrides != null) {
     if (confOverrides != null) {
       confOverrides
       confOverrides
           .iterator()
           .iterator()
@@ -153,6 +150,13 @@ public class TestObserverWithRouter {
     routerContext  = cluster.getRandomRouter();
     routerContext  = cluster.getRandomRouter();
   }
   }
 
 
+  private void setConfDefaults(Configuration conf) {
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
+    conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
+  }
+
   public enum ConfigSetting {
   public enum ConfigSetting {
     USE_NAMENODE_PROXY_FLAG,
     USE_NAMENODE_PROXY_FLAG,
     USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER,
     USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER,
@@ -972,4 +976,55 @@ public class TestObserverWithRouter {
     // There should no calls to any namespace.
     // There should no calls to any namespace.
     assertEquals("No calls to any namespace", 0, rpcCountForActive);
     assertEquals("No calls to any namespace", 0, rpcCountForActive);
   }
   }
+
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testRestartingNamenodeWithStateIDContextDisabled(ConfigSetting configSetting)
+      throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
+    Path path = new Path("/testFile1");
+    // Send Create call to active
+    fileSystem.create(path).close();
+
+    // Send read request
+    fileSystem.open(path).close();
+
+    long observerCount1 = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+
+    // Restart active namenodes and disable sending state id.
+    restartActiveWithStateIDContextDisabled();
+
+    Configuration conf = getConfToEnableObserverReads(configSetting);
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    FileSystem fileSystem2 = routerContext.getFileSystem(conf);
+    fileSystem2.msync();
+    fileSystem2.open(path).close();
+
+    long observerCount2 = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+    assertEquals("There should no extra calls to the observer", observerCount1, observerCount2);
+
+    fileSystem.open(path).close();
+    long observerCount3 = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+    assertTrue("Old filesystem will send calls to observer", observerCount3 > observerCount2);
+  }
+
+  void restartActiveWithStateIDContextDisabled() throws Exception {
+    for (int nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) {
+      NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
+      if (nameNode != null && nameNode.isActiveState()) {
+        Configuration conf = new Configuration();
+        setConfDefaults(conf);
+        cluster.getCluster().getConfiguration(nnIndex)
+            .setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, false);
+        cluster.getCluster().restartNameNode(nnIndex, true);
+        cluster.getCluster().getNameNode(nnIndex).isActiveState();
+      }
+    }
+    for (String ns : cluster.getNameservices()) {
+      cluster.switchToActive(ns, NAMENODES[0]);
+    }
+  }
 }
 }

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestPoolAlignmentContext.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 package org.apache.hadoop.hdfs.server.federation.router;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
@@ -50,4 +51,35 @@ public class TestPoolAlignmentContext {
     poolAlignmentContext.updateRequestState(builder);
     poolAlignmentContext.updateRequestState(builder);
     Assertions.assertEquals(expectedValue, builder.getStateId());
     Assertions.assertEquals(expectedValue, builder.getStateId());
   }
   }
+
+  @Test
+  public void testWhenNamenodeStopsSendingStateId() {
+    RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
+    String namespaceId = "namespace1";
+    PoolAlignmentContext poolContext = new PoolAlignmentContext(routerStateIdContext, namespaceId);
+
+    poolContext.receiveResponseState(getRpcResponseHeader(10L));
+    // Last seen value is the one from namenode,
+    // but request header is the max seen by clients so far.
+    Assertions.assertEquals(10L, poolContext.getLastSeenStateId());
+    assertRequestHeaderStateId(poolContext, Long.MIN_VALUE);
+
+    poolContext.advanceClientStateId(10L);
+    assertRequestHeaderStateId(poolContext, 10L);
+
+    // When namenode state context is disabled, it returns a stateId of zero
+    poolContext.receiveResponseState(getRpcResponseHeader(0));
+    // Routers should reset the cached state Id to not send a stale value to the observer.
+    Assertions.assertEquals(Long.MIN_VALUE, poolContext.getLastSeenStateId());
+    assertRequestHeaderStateId(poolContext, Long.MIN_VALUE);
+  }
+
+  private RpcResponseHeaderProto getRpcResponseHeader(long stateID) {
+    return RpcResponseHeaderProto
+        .newBuilder()
+        .setCallId(1)
+        .setStatus(RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
+        .setStateId(stateID)
+        .build();
+  }
 }
 }