Browse Source

HDFS-13293. RBF: The RouterRPCServer should transfer client IP via CallerContext to NamenodeRpcServer (#2363)

Hui Fei 4 years ago
parent
commit
518a212cff

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java

@@ -129,7 +129,7 @@ public final class CallerContext {
     private byte[] signature;
 
     public Builder(String context) {
-      this(context, new Configuration());
+      this(context, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
     }
 
     public Builder(String context, Configuration conf) {
@@ -141,6 +141,14 @@ public final class CallerContext {
       checkFieldSeparator(fieldSeparator);
     }
 
+    public Builder(String context, String separator) {
+      if (isValid(context)) {
+        sb.append(context);
+      }
+      fieldSeparator = separator;
+      checkFieldSeparator(fieldSeparator);
+    }
+
     /**
      * Check whether the separator is legal.
      * The illegal separators include '\t', '\n', '='.

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
 
@@ -66,8 +68,10 @@ import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -115,11 +119,14 @@ public class RouterRpcClient {
   private final RetryPolicy retryPolicy;
   /** Optional perf monitor. */
   private final RouterRpcMonitor rpcMonitor;
+  /** Field separator of CallerContext. */
+  private final String contextFieldSeparator;
 
   /** Pattern to parse a stack trace line. */
   private static final Pattern STACK_TRACE_PATTERN =
       Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
 
+  private static final String CLIENT_IP_STR = "clientIp";
 
   /**
    * Create a router RPC client to manage remote procedure calls to NNs.
@@ -136,6 +143,9 @@ public class RouterRpcClient {
     this.namenodeResolver = resolver;
 
     Configuration clientConf = getClientConfiguration(conf);
+    this.contextFieldSeparator =
+        clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
+            HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
     this.connectionManager = new ConnectionManager(clientConf);
     this.connectionManager.start();
 
@@ -404,6 +414,7 @@ public class RouterRpcClient {
           " with params " + Arrays.deepToString(params) + " from "
           + router.getRouterId());
     }
+    appendClientIpToCallerContext();
 
     Object ret = null;
     if (rpcMonitor != null) {
@@ -519,6 +530,20 @@ public class RouterRpcClient {
     }
   }
 
+  /**
+   * For Tracking which is the actual client address.
+   * It adds key/value (clientIp/"ip") pair to the caller context.
+   */
+  private void appendClientIpToCallerContext() {
+    final CallerContext ctx = CallerContext.getCurrent();
+    String origContext = ctx == null ? null : ctx.getContext();
+    byte[] origSignature = ctx == null ? null : ctx.getSignature();
+    CallerContext.setCurrent(
+        new CallerContext.Builder(origContext, contextFieldSeparator)
+            .append(CLIENT_IP_STR, Server.getRemoteAddress())
+            .setSignature(origSignature).build());
+  }
+
   /**
    * Invokes a method on the designated object. Catches exceptions specific to
    * the invocation.

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -198,6 +199,8 @@ public class TestRouterRpc {
   @BeforeClass
   public static void globalSetUp() throws Exception {
     Configuration namenodeConf = new Configuration();
+    namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
+        true);
     // It's very easy to become overloaded for some specific dn in this small
     // cluster, which will cause the EC file block allocation failure. To avoid
     // this issue, we disable considerLoad option.
@@ -1901,4 +1904,27 @@ public class TestRouterRpc {
     }
     return null;
   }
+
+  @Test
+  public void testMkdirsWithCallerContext() throws IOException {
+    GenericTestUtils.LogCapturer auditlog =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+    // Current callerContext is null
+    assertNull(CallerContext.getCurrent());
+
+    // Set client context
+    CallerContext.setCurrent(
+        new CallerContext.Builder("clientContext").build());
+
+    // Create a directory via the router
+    String dirPath = "/test_dir_with_callercontext";
+    FsPermission permission = new FsPermission("755");
+    routerProtocol.mkdirs(dirPath, permission, false);
+
+    // The audit log should contains "callerContext=clientContext,clientIp:"
+    assertTrue(auditlog.getOutput()
+        .contains("callerContext=clientContext,clientIp:"));
+    assertTrue(verifyFileExists(routerFS, dirPath));
+  }
 }