Pārlūkot izejas kodu

HDFS-9184. Logging HDFS operation's caller context into audit logs. Contributed by Mingliang Liu.

Jitendra Pandey 9 gadi atpakaļ
vecāks
revīzija
63cebf57d0

+ 11 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -188,6 +188,17 @@ public class CommonConfigurationKeysPublic {
   /** Default value for TFILE_FS_OUTPUT_BUFFER_SIZE_KEY */
   public static final int     TFILE_FS_OUTPUT_BUFFER_SIZE_DEFAULT = 256*1024;
 
+  public static final String  HADOOP_CALLER_CONTEXT_ENABLED_KEY =
+      "hadoop.caller.context.enabled";
+  public static final boolean HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT = false;
+  public static final String  HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY =
+      "hadoop.caller.context.max.size";
+  public static final int     HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT = 128;
+  public static final String  HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY =
+      "hadoop.caller.context.signature.max.size";
+  public static final int     HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT =
+      40;
+
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY =
     "ipc.client.connection.maxidletime";

+ 147 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java

@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * A class defining the caller context for auditing coarse granularity
+ * operations.
+ *
+ * This class is immutable.
+ */
+@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "Hive", "MapReduce",
+    "Pig", "YARN"})
+@InterfaceStability.Evolving
+public class CallerContext {
+  public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
+  /** The caller context.
+   *
+   * It will be truncated if it exceeds the maximum allowed length in
+   * server. The default length limit is
+   * {@link org.apache.hadoop.fs.CommonConfigurationKeysPublic#HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT}
+   */
+  private final String context;
+  /** The caller's signature for validation.
+   *
+   * The signature is optional. The null or empty signature will be abandoned.
+   * If the signature exceeds the maximum allowed length in server, the caller
+   * context will be abandoned. The default length limit is
+   * {@link org.apache.hadoop.fs.CommonConfigurationKeysPublic#HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT}
+   */
+  private final byte[] signature;
+
+  public CallerContext(Builder builder) {
+    this.context = builder.context;
+    this.signature = builder.signature;
+  }
+
+  public boolean isValid() {
+    return context != null;
+  }
+
+  public String getContext() {
+    return context;
+  }
+
+  public byte[] getSignature() {
+    return signature == null ?
+        null : Arrays.copyOf(signature, signature.length);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(context).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    } else if (obj == this) {
+      return true;
+    } else if (obj.getClass() != getClass()) {
+      return false;
+    } else {
+      CallerContext rhs = (CallerContext) obj;
+      return new EqualsBuilder()
+          .append(context, rhs.context)
+          .append(signature, rhs.signature)
+          .isEquals();
+    }
+  }
+  @Override
+  public String toString() {
+    if (!isValid()) {
+      return "";
+    }
+    String str = context;
+    if (signature != null) {
+      str += ":";
+      str += new String(signature, SIGNATURE_ENCODING);
+    }
+    return str;
+  }
+
+  /** The caller context builder. */
+  public static final class Builder {
+    private final String context;
+    private byte[] signature;
+
+    public Builder(String context) {
+      this.context = context;
+    }
+
+    public Builder setSignature(byte[] signature) {
+      if (signature != null && signature.length > 0) {
+        this.signature = Arrays.copyOf(signature, signature.length);
+      }
+      return this;
+    }
+
+    public CallerContext build() {
+      return new CallerContext(this);
+    }
+  }
+
+  /**
+   * The thread local current caller context.
+   * <p/>
+   * Internal class for defered singleton idiom.
+   * https://en.wikipedia.org/wiki/Initialization_on_demand_holder_idiom
+   */
+  private static final class CurrentCallerContextHolder {
+    static final ThreadLocal<CallerContext> CALLER_CONTEXT =
+        new InheritableThreadLocal<>();
+  }
+
+  public static CallerContext getCurrent() {
+    return CurrentCallerContextHolder.CALLER_CONTEXT.get();
+  }
+
+  public static void setCurrent(CallerContext callerContext) {
+    CurrentCallerContextHolder.CALLER_CONTEXT.set(callerContext);
+  }
+}

+ 18 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -580,10 +580,11 @@ public abstract class Server {
     private final RPC.RpcKind rpcKind;
     private final byte[] clientId;
     private final TraceScope traceScope; // the HTrace scope on the server side
+    private final CallerContext callerContext; // the call context
 
     private Call(Call call) {
       this(call.callId, call.retryCount, call.rpcRequest, call.connection,
-          call.rpcKind, call.clientId, call.traceScope);
+          call.rpcKind, call.clientId, call.traceScope, call.callerContext);
     }
 
     public Call(int id, int retryCount, Writable param, 
@@ -594,11 +595,12 @@ public abstract class Server {
 
     public Call(int id, int retryCount, Writable param, Connection connection,
         RPC.RpcKind kind, byte[] clientId) {
-      this(id, retryCount, param, connection, kind, clientId, null);
+      this(id, retryCount, param, connection, kind, clientId, null, null);
     }
 
     public Call(int id, int retryCount, Writable param, Connection connection,
-        RPC.RpcKind kind, byte[] clientId, TraceScope traceScope) {
+        RPC.RpcKind kind, byte[] clientId, TraceScope traceScope,
+        CallerContext callerContext) {
       this.callId = id;
       this.retryCount = retryCount;
       this.rpcRequest = param;
@@ -608,6 +610,7 @@ public abstract class Server {
       this.rpcKind = kind;
       this.clientId = clientId;
       this.traceScope = traceScope;
+      this.callerContext = callerContext;
     }
     
     @Override
@@ -1995,9 +1998,18 @@ public abstract class Server {
         }
       }
 
+      CallerContext callerContext = null;
+      if (header.hasCallerContext()) {
+        callerContext =
+            new CallerContext.Builder(header.getCallerContext().getContext())
+                .setSignature(header.getCallerContext().getSignature()
+                    .toByteArray())
+                .build();
+      }
+
       Call call = new Call(header.getCallId(), header.getRetryCount(),
           rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
-          header.getClientId().toByteArray(), traceScope);
+          header.getClientId().toByteArray(), traceScope, callerContext);
 
       if (callQueue.isClientBackoffEnabled()) {
         // if RPC queue is full, we will ask the RPC client to back off by
@@ -2188,6 +2200,8 @@ public abstract class Server {
             traceScope = call.traceScope;
             traceScope.getSpan().addTimelineAnnotation("called");
           }
+          // always update the current call context
+          CallerContext.setCurrent(call.callerContext);
 
           try {
             // Make the call as the user via Subject.doAs, thus associating

+ 13 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.util;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
@@ -177,6 +178,18 @@ public abstract class ProtoUtil {
             .build());
     }
 
+    // Add caller context if it is not null
+    CallerContext callerContext = CallerContext.getCurrent();
+    if (callerContext != null && callerContext.isValid()) {
+      RPCCallerContextProto.Builder contextBuilder = RPCCallerContextProto
+          .newBuilder().setContext(callerContext.getContext());
+      if (callerContext.getSignature() != null) {
+        contextBuilder.setSignature(
+            ByteString.copyFrom(callerContext.getSignature()));
+      }
+      result.setCallerContext(contextBuilder);
+    }
+
     return result.build();
   }
 }

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

@@ -66,6 +66,14 @@ message RPCTraceInfoProto {
 
 }
 
+/**
+ * Used to pass through the call context entry after an RPC is made.
+ */
+message RPCCallerContextProto {
+  required string context = 1;
+  optional bytes signature = 2;
+}
+
 message RpcRequestHeaderProto { // the header for the RpcRequest
   enum OperationProto {
     RPC_FINAL_PACKET        = 0; // The final RPC Packet
@@ -81,6 +89,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
   // retry count, 1 means this is the first retry
   optional sint32 retryCount = 5 [default = -1];
   optional RPCTraceInfoProto traceInfo = 6; // tracing info
+  optional RPCCallerContextProto callerContext = 7; // call context
 }
 
 

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -14,6 +14,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8155. Support OAuth2 in WebHDFS. (jghoman)
 
+    HDFS-9184. Logging HDFS operation's caller context into audit logs.
+    (Mingliang Liu via jitendra)
+
   IMPROVEMENTS
 
     HDFS-9257. improve error message for "Absolute path required" in INode.java

+ 39 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -19,6 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
@@ -251,6 +257,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.Server;
@@ -347,7 +354,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (logger instanceof HdfsAuditLogger) {
         HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
         hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
-            status, ugi, dtSecretManager);
+            status, CallerContext.getCurrent(), ugi, dtSecretManager);
       } else {
         logger.logAuditEvent(succeeded, ugi.toString(), addr,
             cmd, src, dst, status);
@@ -7158,12 +7165,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   @VisibleForTesting
   static class DefaultAuditLogger extends HdfsAuditLogger {
+    private boolean isCallerContextEnabled;
+    private int callerContextMaxLen;
+    private int callerSignatureMaxLen;
 
     private boolean logTokenTrackingId;
     private Set<String> debugCmdSet = new HashSet<String>();
 
     @Override
     public void initialize(Configuration conf) {
+      isCallerContextEnabled = conf.getBoolean(
+          HADOOP_CALLER_CONTEXT_ENABLED_KEY,
+          HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT);
+      callerContextMaxLen = conf.getInt(
+          HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY,
+          HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT);
+      callerSignatureMaxLen = conf.getInt(
+          HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY,
+          HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT);
       logTokenTrackingId = conf.getBoolean(
           DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
           DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
@@ -7175,7 +7194,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     @Override
     public void logAuditEvent(boolean succeeded, String userName,
         InetAddress addr, String cmd, String src, String dst,
-        FileStatus status, UserGroupInformation ugi,
+        FileStatus status, CallerContext callerContext, UserGroupInformation ugi,
         DelegationTokenSecretManager dtSecretManager) {
 
       if (auditLog.isDebugEnabled() ||
@@ -7214,6 +7233,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         }
         sb.append("\t").append("proto=");
         sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
+        if (isCallerContextEnabled &&
+            callerContext != null &&
+            callerContext.isValid() &&
+            (callerContext.getSignature() == null ||
+                callerContext.getSignature().length <= callerSignatureMaxLen)) {
+          sb.append("\t").append("callerContext=");
+          if (callerContext.getContext().length() > callerContextMaxLen) {
+            sb.append(callerContext.getContext().substring(0,
+                callerContextMaxLen));
+          } else {
+            sb.append(callerContext.getContext());
+          }
+          if (callerContext.getSignature() != null) {
+            sb.append(":");
+            sb.append(new String(callerContext.getSignature(),
+                CallerContext.SIGNATURE_ENCODING));
+          }
+        }
         logAuditMessage(sb.toString());
       }
     }

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HdfsAuditLogger.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -36,8 +37,8 @@ public abstract class HdfsAuditLogger implements AuditLogger {
   public void logAuditEvent(boolean succeeded, String userName,
       InetAddress addr, String cmd, String src, String dst,
       FileStatus status) {
-    logAuditEvent(succeeded, userName, addr, cmd, src, dst, status, null,
-        null);
+    logAuditEvent(succeeded, userName, addr, cmd, src, dst, status,
+        null /*callerContext*/, null /*ugi*/, null /*dtSecretManager*/);
   }
 
   /**
@@ -61,6 +62,6 @@ public abstract class HdfsAuditLogger implements AuditLogger {
    */
   public abstract void logAuditEvent(boolean succeeded, String userName,
       InetAddress addr, String cmd, String src, String dst,
-      FileStatus stat, UserGroupInformation ugi,
+      FileStatus stat, CallerContext callerContext, UserGroupInformation ugi,
       DelegationTokenSecretManager dtSecretManager);
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogAtDebug.java

@@ -68,7 +68,7 @@ public class TestAuditLogAtDebug {
     logger.logAuditEvent(true, "",
                          Inet4Address.getLoopbackAddress(),
                          command, "", "",
-                         null, null, null);
+                         null, null, null, null);
   }
 
   @Test

+ 176 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,15 +30,24 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.authorize.ProxyServers;
 import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Level;
+
 import org.junit.Before;
 import org.junit.Test;
+
 import org.mockito.Mockito;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
@@ -45,11 +55,15 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doThrow;
@@ -58,6 +72,11 @@ import static org.mockito.Mockito.doThrow;
  * Tests for the {@link AuditLogger} custom audit logging interface.
  */
 public class TestAuditLogger {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestAuditLogger.class);
+  static {
+    GenericTestUtils.setLogLevel(LOG, Level.ALL);
+  }
 
   private static final short TEST_PERMISSION = (short) 0654;
 
@@ -199,6 +218,163 @@ public class TestAuditLogger {
     }
   }
 
+  /**
+   * Verify that the audit logger is aware of the call context
+   */
+  @Test
+  public void testAuditLoggerWithCallContext() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
+    conf.setInt(HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY, 128);
+    conf.setInt(HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY, 40);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    LogCapturer auditlog = LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+    try {
+      cluster.waitClusterUp();
+      final FileSystem fs = cluster.getFileSystem();
+      final long time = System.currentTimeMillis();
+      final Path p = new Path("/");
+
+      assertNull(CallerContext.getCurrent());
+
+      // context-only
+      CallerContext context = new CallerContext.Builder("setTimes").build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.setTimes(p, time, time);
+      System.out.println("LLLLLL" + auditlog.getOutput());
+      assertTrue(auditlog.getOutput().endsWith("callerContext=setTimes\n"));
+      auditlog.clearOutput();
+
+      // context with signature
+      context = new CallerContext.Builder("setTimes")
+          .setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
+          .build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.setTimes(p, time, time);
+      assertTrue(auditlog.getOutput().endsWith(
+          "callerContext=setTimes:L\n"));
+      auditlog.clearOutput();
+
+      // long context is truncated
+      final String longContext = RandomStringUtils.randomAscii(200);
+      context = new CallerContext.Builder(longContext)
+          .setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
+          .build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.setTimes(p, time, time);
+      assertTrue(auditlog.getOutput().endsWith(
+          "callerContext=" + longContext.substring(0, 128) + ":L\n"));
+      auditlog.clearOutput();
+
+      // caller context is inherited in child thread
+      context = new CallerContext.Builder("setTimes")
+          .setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
+          .build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      Thread child = new Thread(new Runnable()
+      {
+        @Override
+        public void run() {
+          try {
+            fs.setTimes(p, time, time);
+          } catch (IOException e) {
+            fail("Unexpected exception found." + e);
+          }
+        }
+      });
+      child.start();
+      try {
+        child.join();
+      } catch (InterruptedException ignored) {
+        // Ignore
+      }
+      assertTrue(auditlog.getOutput().endsWith("callerContext=setTimes:L\n"));
+      auditlog.clearOutput();
+
+      // caller context is overridden in child thread
+      final CallerContext childContext =
+          new CallerContext.Builder("setPermission")
+              .setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
+              .build();
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      child = new Thread(new Runnable()
+      {
+        @Override
+        public void run() {
+          try {
+            CallerContext.setCurrent(childContext);
+            fs.setPermission(p, new FsPermission((short)777));
+          } catch (IOException e) {
+            fail("Unexpected exception found." + e);
+          }
+        }
+      });
+      child.start();
+      try {
+        child.join();
+      } catch (InterruptedException ignored) {
+        // Ignore
+      }
+      assertTrue(auditlog.getOutput().endsWith(
+          "callerContext=setPermission:L\n"));
+      auditlog.clearOutput();
+
+      // reuse the current context's signature
+       context = new CallerContext.Builder("mkdirs")
+           .setSignature(CallerContext.getCurrent().getSignature()).build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.mkdirs(new Path("/reuse-context-signature"));
+      assertTrue(auditlog.getOutput().endsWith("callerContext=mkdirs:L\n"));
+      auditlog.clearOutput();
+
+      // caller context with too long signature is abandoned
+      context = new CallerContext.Builder("setTimes")
+          .setSignature(new byte[41])
+          .build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.setTimes(p, time, time);
+      assertFalse(auditlog.getOutput().contains("callerContext="));
+      auditlog.clearOutput();
+
+      // null signature is ignored
+      context = new CallerContext.Builder("setTimes").setSignature(null)
+          .build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.setTimes(p, time, time);
+      assertTrue(auditlog.getOutput().endsWith("callerContext=setTimes\n"));
+      auditlog.clearOutput();
+
+      // empty signature is ignored
+      context = new CallerContext.Builder("mkdirs")
+          .setSignature("".getBytes(CallerContext.SIGNATURE_ENCODING))
+          .build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.mkdirs(new Path("/empty-signature"));
+      assertTrue(auditlog.getOutput().endsWith("callerContext=mkdirs\n"));
+      auditlog.clearOutput();
+
+      // invalid context is not passed to the rpc
+      context = new CallerContext.Builder(null).build();
+      CallerContext.setCurrent(context);
+      LOG.info("Set current caller context as {}", CallerContext.getCurrent());
+      fs.mkdirs(new Path("/empty-signature"));
+      assertFalse(auditlog.getOutput().contains("callerContext="));
+      auditlog.clearOutput();
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testAuditLogWithAclFailure() throws Exception {
     final Configuration conf = new HdfsConfiguration();