瀏覽代碼

HDFS-17762. [ARR] Reset CallerContext information when async handler thread done. (#7539). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
hfutatzhanghb 1 月之前
父節點
當前提交
6e43e6047b

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java

@@ -78,6 +78,7 @@ public class ThreadLocalContext {
     if (call != null) {
       Server.getCurCall().set(call);
     }
+    CallerContext.setCurrent(null);
     if (context != null) {
       CallerContext.setCurrent(context);
     }

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -2385,4 +2385,45 @@ public class TestRouterRpc {
       fileSystem1.delete(new Path(testPath2), true);
     }
   }
+
+  @Test
+  public void testCallerContextNotResetByAsyncHandler() throws IOException {
+    GenericTestUtils.LogCapturer auditLog =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
+    String dirPath = "/test";
+
+    // The reason we start this child thread is that CallContext use InheritableThreadLocal.
+    Thread t1 = new Thread(() -> {
+      // Set flag async:true.
+      CallerContext.setCurrent(
+          new CallerContext.Builder("async:true").build());
+      // Issue some RPCs via the router to populate the CallerContext of async handler thread.
+      for (int i = 0; i < 10; i++) {
+        try {
+          routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
+          assertTrue(verifyFileExists(routerFS, dirPath));
+          routerProtocol.delete(dirPath, true);
+          assertFalse(verifyFileExists(routerFS, dirPath));
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      // The audit log should contains async:true.
+      assertTrue(auditLog.getOutput().contains("async:true"));
+      auditLog.clearOutput();
+      assertFalse(auditLog.getOutput().contains("async:true"));
+    });
+
+    t1.start();
+    try {
+      t1.join();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    routerProtocol.getFileInfo(dirPath);
+    // The audit log should not contain async:true.
+    assertFalse(auditLog.getOutput().contains("async:true"));
+  }
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
 import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -59,6 +60,7 @@ public class TestRouterAsyncRpc extends TestRouterRpc {
     routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
         RouterAsyncRpcFairnessPolicyController.class,
         RouterRpcFairnessPolicyController.class);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 2);
     setUp(routerConf);
   }
 
@@ -80,4 +82,5 @@ public class TestRouterAsyncRpc extends TestRouterRpc {
     String[] result = syncReturn(String[].class);
     assertArrayEquals(group, result);
   }
+
 }