Procházet zdrojové kódy

ZOOKEEPER-1260: Audit logging in ZooKeeper servers.

Author: Mohammad Arshad <arshad@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Andor Molnar <andor@apache.org>

Closes #1133 from arshadmohammad/ZOOKEEPER-1260-AuditLog-master
Mohammad Arshad před 5 roky
rodič
revize
794adf1122
30 změnil soubory, kde provedl 1731 přidání a 29 odebrání
  1. 17 0
      conf/log4j.properties
  2. 3 0
      zookeeper-docs/src/main/resources/markdown/html/header.html
  3. binární
      zookeeper-docs/src/main/resources/markdown/images/zkAuditLogs.jpg
  4. 1 0
      zookeeper-docs/src/main/resources/markdown/index.md
  5. 12 0
      zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
  6. 129 0
      zookeeper-docs/src/main/resources/markdown/zookeeperAuditLogs.md
  7. 8 0
      zookeeper-server/src/main/java/org/apache/zookeeper/Login.java
  8. 45 1
      zookeeper-server/src/main/java/org/apache/zookeeper/ZKUtil.java
  9. 34 0
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditConstants.java
  10. 98 0
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditEvent.java
  11. 232 0
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
  12. 34 0
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditLogger.java
  13. 38 0
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/Log4jAuditLogger.java
  14. 133 0
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/ZKAuditProvider.java
  15. 2 23
      zookeeper-server/src/main/java/org/apache/zookeeper/cli/GetAclCommand.java
  16. 20 4
      zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
  17. 3 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
  18. 28 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
  19. 22 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
  20. 15 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java
  21. 7 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
  22. 13 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/AuthenticationProvider.java
  23. 9 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
  24. 7 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
  25. 39 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AuthUtil.java
  26. 45 0
      zookeeper-server/src/test/java/org/apache/zookeeper/audit/AuditEventTest.java
  27. 74 0
      zookeeper-server/src/test/java/org/apache/zookeeper/audit/AuditLogPerfReading.java
  28. 442 0
      zookeeper-server/src/test/java/org/apache/zookeeper/audit/Log4jAuditLoggerTest.java
  29. 150 0
      zookeeper-server/src/test/java/org/apache/zookeeper/audit/ZKAuditLoggerPerformance.java
  30. 71 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AuthUtilTest.java

+ 17 - 0
conf/log4j.properties

@@ -63,3 +63,20 @@ log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.fil
 log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
 ### Notice we are including log4j's NDC here (%x)
 log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
+#
+# zk audit logging
+#
+zookeeper.auditlog.file=zookeeper_audit.log
+zookeeper.auditlog.threshold=INFO
+audit.logger=INFO, RFAAUDIT
+log4j.logger.org.apache.zookeeper.audit.Log4jAuditLogger=${audit.logger}
+log4j.additivity.org.apache.zookeeper.audit.Log4jAuditLogger=false
+log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
+log4j.appender.RFAAUDIT.File=${zookeeper.log.dir}/${zookeeper.auditlog.file}
+log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
+log4j.appender.RFAAUDIT.Threshold=${zookeeper.auditlog.threshold}
+
+# Max log file size of 10MB
+log4j.appender.RFAAUDIT.MaxFileSize=10MB
+log4j.appender.RFAAUDIT.MaxBackupIndex=10

+ 3 - 0
zookeeper-docs/src/main/resources/markdown/html/header.html

@@ -104,6 +104,9 @@ document.write("Last Published: " + document.lastModified);
             </div>
             <div class="menuitem">
                 <a href="zookeeperReconfig.html">Dynamic Reconfiguration</a>
+            </div>
+			<div class="menuitem">
+                <a href="zookeeperAuditLogs.html">Audit Logs</a>
             </div>
         </div>
         <div onclick="SwitchMenu('menu_4', 'skin/')" id="menu_4Title" class="menutitle">Contributor</div>

binární
zookeeper-docs/src/main/resources/markdown/images/zkAuditLogs.jpg


+ 1 - 0
zookeeper-docs/src/main/resources/markdown/index.md

@@ -53,6 +53,7 @@ archives.
     + [ZooKeeper CLI](zookeeperCLI.html) - a guide on how to use the ZooKeeper command line interface
     + [ZooKeeper Tools](zookeeperTools.html) - a guide on how to use a series of tools for ZooKeeper
     + [ZooKeeper Monitor](zookeeperMonitor.html) - a guide on how to monitor the ZooKeeper
+    + [Audit Logging](zookeeperAuditLogs.html) - a guide on how to configure audit logs in ZooKeeper Server and what contents are logged.
 + **Contributors**
     Documents for Developers Contributing to the ZooKeeper Open Source Project
     + [ZooKeeper Internals](zookeeperInternals.html) - assorted topics on the inner workings of ZooKeeper

+ 12 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -956,6 +956,18 @@ property, when available, is noted below.
     and restart ZooKeeper process so ZooKeeper can continue normal data
     consistency check during recovery process.
     Default value is false.
+* *audit.enable* :
+    (Java system property: **zookeeper.audit.enable**)
+    **New in 3.6.0:**
+    By default audit logs are disabled. Set to "true" to enable it. Default value is "false".
+    See the [ZooKeeper audit logs](zookeeperAuditLogs.html) for more information.
+    
+* *audit.impl.class* :
+    (Java system property: **zookeeper.audit.impl.class**)
+    **New in 3.6.0:**
+    Class to implement the audit logger. By default log4j based audit logger org.apache.zookeeper.audit
+    .Log4jAuditLogger is used.
+    See the [ZooKeeper audit logs](zookeeperAuditLogs.html) for more information.
 
 * *largeRequestMaxBytes* :
     (Java system property: **zookeeper.largeRequestMaxBytes**)

+ 129 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAuditLogs.md

@@ -0,0 +1,129 @@
+<!--
+Copyright 2002-2004 The Apache Software Foundation
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+//-->
+
+# ZooKeeper Audit Logging
+
+* [ZooKeeper Audit Logs](#ch_auditLogs)
+* [ZooKeeper Audit Log Configuration](#ch_reconfig_format)
+* [Who is taken as user in audit logs?](#ch_zkAuditUser)
+<a name="ch_auditLogs"></a>
+
+## ZooKeeper Audit Logs
+
+Apache ZooKeeper supports audit logs from version 3.6.0. By default audit logs are disabled. To enable audit logs
+ configure audit.enable=true in conf/zoo.cfg. Audit logs are not logged on all the ZooKeeper servers, but logged only on the servers where client is connected as depicted in below figure.
+
+![Audit Logs](images/zkAuditLogs.jpg)
+
+
+The audit log captures detailed information for the operations that are selected to be audited. The audit information is written as a set of key=value pairs for the following keys
+
+| Key   | Value |
+| ----- | ----- |
+|session | client session id |
+|user | comma separated list of users who are associate with a client session. For more on this, see [Who is taken as user in audit logs](#ch_zkAuditUser).
+|ip | client IP address
+|operation | any one of the selected operations for audit. Possible values are(serverStart, serverStop, create, delete, setData, setAcl, multiOperation, reconfig, ephemeralZNodeDeleteOnSessionClose)
+|znode | path of the znode
+|znode type | type of znode in case of creation operation
+|acl | String representation of znode ACL like cdrwa(create, delete,read, write, admin). This is logged only for setAcl operation
+|result | result of the operation. Possible values are (success/failure/invoked). Result "invoked" is used for serverStop operation because stop is logged before ensuring that server actually stopped.
+
+Below are sample audit logs for all operations, where client is connected from 192.168.1.2, client principal is zkcli@HADOOP.COM, server principal is zookeeper/192.168.1.3@HADOOP.COM
+
+    user=zookeeper/192.168.1.3 operation=serverStart   result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=create    znode=/a    znode_type=persistent  result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=create    znode=/a    znode_type=persistent  result=failure
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=setData   znode=/a    result=failure
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=setData   znode=/a    result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=setAcl    znode=/a    acl=world:anyone:cdrwa  result=failure
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=setAcl    znode=/a    acl=world:anyone:cdrwa  result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=create    znode=/b    znode_type=persistent  result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=setData   znode=/b    result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=delete    znode=/b    result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=multiOperation    result=failure
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=delete    znode=/a    result=failure
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=delete    znode=/a    result=success
+    session=0x19344730001   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=create   znode=/ephemral znode_type=ephemral result=success
+    session=0x19344730001   user=zookeeper/192.168.1.3   operation=ephemeralZNodeDeletionOnSessionCloseOrExpire  znode=/ephemral result=success
+    session=0x19344730000   user=192.168.1.2,zkcli@HADOOP.COM  ip=192.168.1.2    operation=reconfig  znode=/zookeeper/config result=success
+    user=zookeeper/192.168.1.3 operation=serverStop    result=invoked
+
+<a name="ch_auditConfig"></a>
+
+## ZooKeeper Audit Log Configuration
+
+By default audit logs are disabled. To enable audit logs configure audit.enable=true in conf/zoo.cfg. Audit logging is done using log4j. Following is the default log4j configuration for audit logs in conf/log4j.properties
+
+    #
+    # zk audit logging
+    #
+    zookeeper.auditlog.file=zookeeper_audit.log
+    zookeeper.auditlog.threshold=INFO
+    audit.logger=INFO, RFAAUDIT
+    log4j.logger.org.apache.zookeeper.audit.Log4jAuditLogger=${audit.logger}
+    log4j.additivity.org.apache.zookeeper.audit.Log4jAuditLogger=false
+    log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
+    log4j.appender.RFAAUDIT.File=${zookeeper.log.dir}/${zookeeper.auditlog.file}
+    log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
+    log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
+    log4j.appender.RFAAUDIT.Threshold=${zookeeper.auditlog.threshold}
+    
+    # Max log file size of 10MB
+    log4j.appender.RFAAUDIT.MaxFileSize=10MB
+    log4j.appender.RFAAUDIT.MaxBackupIndex=10
+
+Change above configuration to customize the auditlog file, number of backups, max file size, custom audit logger etc.
+
+<a name="ch_zkAuditUser"></a>
+
+## Who is taken as user in audit logs?
+
+By default there are only four authentication provider:
+
+* IPAuthenticationProvider
+* SASLAuthenticationProvider
+* X509AuthenticationProvider
+* DigestAuthenticationProvider
+
+User is decided based on the configured authentication provider:
+
+* When IPAuthenticationProvider is configured then authenticated IP is taken as user
+* When SASLAuthenticationProvider is configured then client principal is taken as user
+* When X509AuthenticationProvider is configured then client certificate is taken as user
+* When DigestAuthenticationProvider is configured then authenticated user is user 
+
+Custom authentication provider can override org.apache.zookeeper.server.auth.AuthenticationProvider.getUserName(String id)
+ to provide user name. If authentication provider is not overriding this method then whatever is stored in 
+ org.apache.zookeeper.data.Id.id is taken as user. 
+ Generally only user name is stored in this field but it is up to the custom authentication provider what they store in it. 
+ For audit logging value of org.apache.zookeeper.data.Id.id would be taken as user.
+
+In ZooKeeper Server not all the operations are done by clients but some operations are done by the server itself. For example when client closes the session, ephemeral znodes are deleted by the Server. These deletion are not done by clients directly but it is done the server itself these are called system operations. For these system operations the user associated with the ZooKeeper server are taken as user while audit logging these operations. For example if in ZooKeeper server principal is zookeeper/hadoop.hadoop.com@HADOOP.COM then this becomes the system user and all the system operations will be logged with this user name.
+
+	user=zookeeper/hadoop.hadoop.com@HADOOP.COM operation=serverStart result=success
+
+
+If there is no user associate with ZooKeeper server then the user who started the ZooKeeper server is taken as the user. For example if server started by root then root is taken as the system user
+
+	user=root operation=serverStart result=success
+
+
+Single client can attach multiple authentication schemes to a session, in this case all authenticated schemes will taken taken as user and will be presented as comma separated list. For example if a client is authenticate with principal zkcli@HADOOP.COM and ip 127.0.0.1 then create znode audit log will be as:		
+
+	session=0x10c0bcb0000 user=zkcli@HADOOP.COM,127.0.0.1 ip=127.0.0.1 operation=create znode=/a result=success
+
+	

+ 8 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/Login.java

@@ -47,6 +47,7 @@ public class Login {
 
     private static final String KINIT_COMMAND_DEFAULT = "/usr/bin/kinit";
     private static final Logger LOG = LoggerFactory.getLogger(Login.class);
+    public static final String SYSTEM_USER = System.getProperty("user.name", "<NA>");
     public CallbackHandler callbackHandler;
 
     // LoginThread will sleep until 80% of time from last refresh to
@@ -295,6 +296,13 @@ public class Login {
         return subject;
     }
 
+    public String getUserName() {
+        if (principal == null || principal.isEmpty()) {
+            return SYSTEM_USER;
+        }
+        return principal;
+    }
+
     public String getLoginContextName() {
         return loginContextName;
     }

+ 45 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/ZKUtil.java

@@ -23,7 +23,9 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.zookeeper.AsyncCallback.MultiCallback;
@@ -31,12 +33,14 @@ import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZKUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
+    private static final Map<Integer, String> permCache = new ConcurrentHashMap<Integer, String>();
     /**
      * Recursively delete the node with the given path.
      * <p>
@@ -234,4 +238,44 @@ public class ZKUtil {
         }
     }
 
-}
+    /**
+     * @param perms
+     *            ACL permissions
+     * @return string representation of permissions
+     */
+    public static String getPermString(int perms) {
+        return permCache.computeIfAbsent(perms, k -> constructPermString(k));
+    }
+
+    private static String constructPermString(int perms) {
+        StringBuilder p = new StringBuilder();
+        if ((perms & ZooDefs.Perms.CREATE) != 0) {
+            p.append('c');
+        }
+        if ((perms & ZooDefs.Perms.DELETE) != 0) {
+            p.append('d');
+        }
+        if ((perms & ZooDefs.Perms.READ) != 0) {
+            p.append('r');
+        }
+        if ((perms & ZooDefs.Perms.WRITE) != 0) {
+            p.append('w');
+        }
+        if ((perms & ZooDefs.Perms.ADMIN) != 0) {
+            p.append('a');
+        }
+        return p.toString();
+    }
+
+    public static String aclToString(List<ACL> acls) {
+        StringBuilder sb = new StringBuilder();
+        for (ACL acl : acls) {
+            sb.append(acl.getId().getScheme());
+            sb.append(":");
+            sb.append(acl.getId().getId());
+            sb.append(":");
+            sb.append(getPermString(acl.getPerms()));
+        }
+        return sb.toString();
+    }
+}

+ 34 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditConstants.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+public final class AuditConstants {
+    private AuditConstants() {
+        //Utility classes should not have public constructors
+    }
+
+    static final String OP_START = "serverStart";
+    static final String OP_STOP = "serverStop";
+    public static final String OP_CREATE = "create";
+    public static final String OP_DELETE = "delete";
+    public static final String OP_SETDATA = "setData";
+    public static final String OP_SETACL = "setAcl";
+    public static final String OP_MULTI_OP = "multiOperation";
+    public static final String OP_RECONFIG = "reconfig";
+    public static final String OP_DEL_EZNODE_EXP = "ephemeralZNodeDeletionOnSessionCloseOrExpire";
+}

+ 98 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditEvent.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public final class AuditEvent {
+    private static final char PAIR_SEPARATOR = '\t';
+    private static final String KEY_VAL_SEPARATOR = "=";
+    // Holds the entries which to be logged.
+    private Map<String, String> logEntries = new LinkedHashMap<>();
+    private Result result;
+
+    AuditEvent(Result result) {
+        this.result = result;
+    }
+
+    /**
+     * Gives all entries to be logged.
+     *
+     * @return log entries
+     */
+    public Set<Map.Entry<String, String>> getLogEntries() {
+        return logEntries.entrySet();
+    }
+
+    void addEntry(FieldName fieldName, String value) {
+        if (value != null) {
+            logEntries.put(fieldName.name().toLowerCase(), value);
+        }
+    }
+
+    public String getValue(FieldName fieldName) {
+        return logEntries.get(fieldName.name().toLowerCase());
+    }
+
+    public Result getResult() {
+        return result;
+    }
+
+    /**
+     * Gives the string to be logged, ignores fields with null values
+     *
+     * @return String
+     */
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        boolean first = true;
+        for (Map.Entry<String, String> entry : logEntries.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (null != value) {
+                // if first field then no need to add the tabs
+                if (first) {
+                    first = false;
+                } else {
+                    buffer.append(PAIR_SEPARATOR);
+                }
+                buffer.append(key).append(KEY_VAL_SEPARATOR)
+                        .append(value);
+            }
+        }
+        //add result field
+        if (buffer.length() > 0) {
+            buffer.append(PAIR_SEPARATOR);
+        }
+        buffer.append("result").append(KEY_VAL_SEPARATOR)
+                .append(result.name().toLowerCase());
+        return buffer.toString();
+    }
+
+    public enum FieldName {
+        USER, OPERATION, IP, ACL, ZNODE, SESSION, ZNODE_TYPE
+    }
+
+    public enum Result {
+        SUCCESS, FAILURE, INVOKED
+    }
+}
+

+ 232 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java

@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.jute.Record;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiOperationRecord;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.audit.AuditEvent.Result;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to decouple audit log code.
+ */
+public final class AuditHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(AuditHelper.class);
+
+    public static void addAuditLog(Request request, ProcessTxnResult rc) {
+        addAuditLog(request, rc, false);
+    }
+
+    /**
+     * Add audit log if audit log is enabled and operation is of type which to be audit logged.
+     *
+     * @param request   user request
+     * @param txnResult ProcessTxnResult
+     * @param failedTxn whether audit is being done failed transaction for normal transaction
+     */
+    public static void addAuditLog(Request request, ProcessTxnResult txnResult, boolean failedTxn) {
+        if (!ZKAuditProvider.isAuditEnabled()) {
+            return;
+        }
+        String op = null;
+        //For failed transaction rc.path is null
+        String path = txnResult.path;
+        String acls = null;
+        String createMode = null;
+        try {
+            switch (request.type) {
+                case ZooDefs.OpCode.create:
+                case ZooDefs.OpCode.create2:
+                case ZooDefs.OpCode.createContainer:
+                    op = AuditConstants.OP_CREATE;
+                    if (failedTxn) {
+                        CreateRequest createRequest = new CreateRequest();
+                        deserialize(request, createRequest);
+                        path = createRequest.getPath();
+                        createMode =
+                                getCreateMode(createRequest);
+                    } else {
+                        createMode = getCreateMode(request);
+                    }
+                    break;
+                case ZooDefs.OpCode.delete:
+                case ZooDefs.OpCode.deleteContainer:
+                    op = AuditConstants.OP_DELETE;
+                    if (failedTxn) {
+                        DeleteRequest deleteRequest = new DeleteRequest();
+                        deserialize(request, deleteRequest);
+                        path = deleteRequest.getPath();
+                    }
+                    break;
+                case ZooDefs.OpCode.setData:
+                    op = AuditConstants.OP_SETDATA;
+                    if (failedTxn) {
+                        SetDataRequest setDataRequest = new SetDataRequest();
+                        deserialize(request, setDataRequest);
+                        path = setDataRequest.getPath();
+                    }
+                    break;
+                case ZooDefs.OpCode.setACL:
+                    op = AuditConstants.OP_SETACL;
+                    if (failedTxn) {
+                        SetACLRequest setACLRequest = new SetACLRequest();
+                        deserialize(request, setACLRequest);
+                        path = setACLRequest.getPath();
+                        acls = ZKUtil.aclToString(setACLRequest.getAcl());
+                    } else {
+                        acls = getACLs(request);
+                    }
+                    break;
+                case ZooDefs.OpCode.multi:
+                    if (failedTxn) {
+                        op = AuditConstants.OP_MULTI_OP;
+                    } else {
+                        logMultiOperation(request, txnResult);
+                        //operation si already logged
+                        return;
+                    }
+                    break;
+                case ZooDefs.OpCode.reconfig:
+                    op = AuditConstants.OP_RECONFIG;
+                    break;
+                default:
+                    //Not an audit log operation
+                    return;
+            }
+            Result result = getResult(txnResult, failedTxn);
+            log(request, path, op, acls, createMode, result);
+        } catch (Throwable e) {
+            LOG.error("Failed to audit log request {}", request.type, e);
+        }
+    }
+
+    private static void deserialize(Request request, Record record) throws IOException {
+        ByteBufferInputStream.byteBuffer2Record(getRequestData(request), record);
+    }
+
+    private static ByteBuffer getRequestData(Request request) {
+        ByteBuffer reqData = request.request.slice();
+        reqData.rewind();
+        return reqData;
+    }
+
+    private static Result getResult(ProcessTxnResult rc, boolean failedTxn) {
+        if (failedTxn) {
+            return Result.FAILURE;
+        } else {
+            return rc.err == KeeperException.Code.OK.intValue() ? Result.SUCCESS : Result.FAILURE;
+        }
+    }
+
+    private static void logMultiOperation(Request request, ProcessTxnResult rc) throws IOException, KeeperException {
+        Map<String, String> createModes = AuditHelper.getCreateModes(request);
+        boolean multiFailed = false;
+        for (ProcessTxnResult subTxnResult : rc.multiResult) {
+            switch (subTxnResult.type) {
+                case ZooDefs.OpCode.create:
+                case ZooDefs.OpCode.create2:
+                case ZooDefs.OpCode.createTTL:
+                case ZooDefs.OpCode.createContainer:
+                    log(request, subTxnResult.path, AuditConstants.OP_CREATE, null,
+                            createModes.get(subTxnResult.path), Result.SUCCESS);
+                    break;
+                case ZooDefs.OpCode.delete:
+                case ZooDefs.OpCode.deleteContainer:
+                    log(request, subTxnResult.path, AuditConstants.OP_DELETE, null,
+                            null, Result.SUCCESS);
+                    break;
+                case ZooDefs.OpCode.setData:
+                    log(request, subTxnResult.path, AuditConstants.OP_SETDATA, null,
+                            null, Result.SUCCESS);
+                    break;
+                case ZooDefs.OpCode.error:
+                    multiFailed = true;
+                    break;
+                default:
+                    // Do nothing, it ok, we do not log all multi operations
+            }
+        }
+        if (multiFailed) {
+            log(request, rc.path, AuditConstants.OP_MULTI_OP, null,
+                    null, Result.FAILURE);
+        }
+    }
+
+    private static void log(Request request, String path, String op, String acls, String createMode, Result result) {
+        log(request.getUsers(), op, path, acls, createMode,
+                request.cnxn.getSessionIdHex(), request.cnxn.getHostAddress(), result);
+    }
+
+    private static void log(String user, String operation, String znode, String acl,
+                            String createMode, String session, String ip, Result result) {
+        ZKAuditProvider.log(user, operation, znode, acl, createMode, session, ip, result);
+    }
+
+    private static String getACLs(Request request) throws IOException {
+        SetACLRequest setACLRequest = new SetACLRequest();
+        deserialize(request, setACLRequest);
+        return ZKUtil.aclToString(setACLRequest.getAcl());
+    }
+
+    private static String getCreateMode(Request request) throws IOException, KeeperException {
+        CreateRequest createRequest = new CreateRequest();
+        deserialize(request, createRequest);
+        return getCreateMode(createRequest);
+    }
+
+    private static String getCreateMode(CreateRequest createRequest) throws KeeperException {
+        return CreateMode.fromFlag(createRequest.getFlags()).toString().toLowerCase();
+    }
+
+    private static Map<String, String> getCreateModes(Request request)
+            throws IOException, KeeperException {
+        Map<String, String> createModes = new HashMap<>();
+        if (!ZKAuditProvider.isAuditEnabled()) {
+            return createModes;
+        }
+        MultiOperationRecord multiRequest = new MultiOperationRecord();
+        deserialize(request, multiRequest);
+        for (Op op : multiRequest) {
+            if (op.getType() == ZooDefs.OpCode.create || op.getType() == ZooDefs.OpCode.create2
+                    || op.getType() == ZooDefs.OpCode.createContainer) {
+                CreateRequest requestRecord = (CreateRequest) op.toRequestRecord();
+                createModes.put(requestRecord.getPath(),
+                        getCreateMode(requestRecord));
+            }
+        }
+        return createModes;
+    }
+
+}

+ 34 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditLogger.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+public interface AuditLogger {
+
+    /**
+     * Called during initialization of the logger.
+     */
+    default void initialize() {
+    }
+
+    /**
+     * Called to log an audit event.
+     *
+     * @param auditEvent contains all the fields to be logged
+     */
+    void logAuditEvent(AuditEvent auditEvent);
+}

+ 38 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/audit/Log4jAuditLogger.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import org.apache.zookeeper.audit.AuditEvent.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Log4j based audit logger
+ */
+public class Log4jAuditLogger implements AuditLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(Log4jAuditLogger.class);
+
+    @Override
+    public void logAuditEvent(AuditEvent auditEvent) {
+        if (auditEvent.getResult() == Result.FAILURE) {
+            LOG.error(auditEvent.toString());
+        } else {
+            LOG.info(auditEvent.toString());
+        }
+    }
+}

+ 133 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/audit/ZKAuditProvider.java

@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import static org.apache.zookeeper.audit.AuditEvent.FieldName;
+import java.lang.reflect.Constructor;
+import org.apache.zookeeper.audit.AuditEvent.Result;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZKAuditProvider {
+    static final String AUDIT_ENABLE = "zookeeper.audit.enable";
+    static final String AUDIT_IMPL_CLASS = "zookeeper.audit.impl.class";
+    private static final Logger LOG = LoggerFactory.getLogger(ZKAuditProvider.class);
+    // By default audit logging is disabled
+    private static boolean auditEnabled;
+    private static AuditLogger auditLogger;
+
+    static {
+        auditEnabled = Boolean.getBoolean(AUDIT_ENABLE);
+        if (auditEnabled) {
+            //initialise only when audit logging is enabled
+            auditLogger = getAuditLogger();
+            LOG.info("ZooKeeper audit is enabled.");
+        } else {
+            LOG.info("ZooKeeper audit is disabled.");
+        }
+    }
+
+    private static AuditLogger getAuditLogger() {
+        String auditLoggerClass = System.getProperty(AUDIT_IMPL_CLASS);
+        if (auditLoggerClass == null) {
+            auditLoggerClass = Log4jAuditLogger.class.getName();
+        }
+        try {
+            Constructor<?> clientCxnConstructor = Class.forName(auditLoggerClass)
+                    .getDeclaredConstructor();
+            AuditLogger auditLogger = (AuditLogger) clientCxnConstructor.newInstance();
+            auditLogger.initialize();
+            return auditLogger;
+        } catch (Exception e) {
+            throw new RuntimeException("Couldn't instantiate " + auditLoggerClass, e);
+        }
+    }
+
+    /**
+     * @return true if audit log is enabled
+     */
+    public static boolean isAuditEnabled() {
+        return auditEnabled;
+    }
+
+    public static void log(String user, String operation, String znode, String acl,
+                           String createMode, String session, String ip, Result result) {
+        auditLogger.logAuditEvent(createLogEvent(user, operation, znode, acl, createMode, session, ip, result));
+    }
+
+    /**
+     * A helper api for creating an AuditEvent object.
+     */
+    static AuditEvent createLogEvent(String user, String operation, Result result) {
+        AuditEvent event = new AuditEvent(result);
+        event.addEntry(FieldName.USER, user);
+        event.addEntry(FieldName.OPERATION, operation);
+        return event;
+    }
+
+    /**
+     * A helper api for creating an AuditEvent object.
+     */
+    static AuditEvent createLogEvent(String user, String operation, String znode, String acl,
+                                     String createMode, String session, String ip, Result result) {
+        AuditEvent event = new AuditEvent(result);
+        event.addEntry(FieldName.SESSION, session);
+        event.addEntry(FieldName.USER, user);
+        event.addEntry(FieldName.IP, ip);
+        event.addEntry(FieldName.OPERATION, operation);
+        event.addEntry(FieldName.ZNODE, znode);
+        event.addEntry(FieldName.ZNODE_TYPE, createMode);
+        event.addEntry(FieldName.ACL, acl);
+        return event;
+    }
+
+    /**
+     * Add audit log for server start and register server stop log.
+     */
+    public static void addZKStartStopAuditLog() {
+        if (isAuditEnabled()) {
+            log(getZKUser(), AuditConstants.OP_START, Result.SUCCESS);
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                log(getZKUser(), AuditConstants.OP_STOP, Result.INVOKED);
+            }));
+        }
+    }
+
+    /**
+     * Add audit log for server start fail.
+     */
+    public static void addServerStartFailureAuditLog() {
+        if (isAuditEnabled()) {
+            log(ZKAuditProvider.getZKUser(), AuditConstants.OP_START, Result.FAILURE);
+        }
+    }
+
+    private static void log(String user, String operation, Result result) {
+        auditLogger.logAuditEvent(createLogEvent(user, operation, result));
+    }
+
+    /**
+     * User who has started the ZooKeeper server user, it will be the logged-in
+     * user. If no user logged-in then system user.
+     */
+    public static String getZKUser() {
+        return ServerCnxnFactory.getUserName();
+    }
+
+}

+ 2 - 23
zookeeper-server/src/main/java/org/apache/zookeeper/cli/GetAclCommand.java

@@ -24,7 +24,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.Parser;
 import org.apache.commons.cli.PosixParser;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
@@ -75,7 +75,7 @@ public class GetAclCommand extends CliCommand {
         }
 
         for (ACL a : acl) {
-            out.println(a.getId() + ": " + getPermString(a.getPerms()));
+            out.println(a.getId() + ": " + ZKUtil.getPermString(a.getPerms()));
         }
 
         if (cl.hasOption("s")) {
@@ -83,25 +83,4 @@ public class GetAclCommand extends CliCommand {
         }
         return false;
     }
-
-    private static String getPermString(int perms) {
-        StringBuilder p = new StringBuilder();
-        if ((perms & ZooDefs.Perms.CREATE) != 0) {
-            p.append('c');
-        }
-        if ((perms & ZooDefs.Perms.DELETE) != 0) {
-            p.append('d');
-        }
-        if ((perms & ZooDefs.Perms.READ) != 0) {
-            p.append('r');
-        }
-        if ((perms & ZooDefs.Perms.WRITE) != 0) {
-            p.append('w');
-        }
-        if ((perms & ZooDefs.Perms.ADMIN) != 0) {
-            p.append('a');
-        }
-        return p.toString();
-    }
-
 }

+ 20 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -52,6 +52,9 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.audit.AuditConstants;
+import org.apache.zookeeper.audit.AuditEvent.Result;
+import org.apache.zookeeper.audit.ZKAuditProvider;
 import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
@@ -1171,14 +1174,27 @@ public class DataTree {
 
     void deleteNodes(long session, long zxid, Iterable<String> paths2Delete) {
         for (String path : paths2Delete) {
+            boolean deleted = false;
+            String sessionHex = "0x" + Long.toHexString(session);
             try {
                 deleteNode(path, zxid);
-                LOG.debug("Deleting ephemeral node {} for session 0x{}", path, Long.toHexString(session));
+                deleted = true;
+                LOG.debug("Deleting ephemeral node {} for session {}", path, sessionHex);
             } catch (NoNodeException e) {
                 LOG.warn(
-                    "Ignoring NoNodeException for path {} while removing ephemeral for dead session 0x{}",
-                    path,
-                    Long.toHexString(session));
+                    "Ignoring NoNodeException for path {} while removing ephemeral for dead session {}",
+                        path, sessionHex);
+            }
+            if (ZKAuditProvider.isAuditEnabled()) {
+                if (deleted) {
+                    ZKAuditProvider.log(ZKAuditProvider.getZKUser(),
+                            AuditConstants.OP_DEL_EZNODE_EXP, path, null, null,
+                            sessionHex, null, Result.SUCCESS);
+                } else {
+                    ZKAuditProvider.log(ZKAuditProvider.getZKUser(),
+                            AuditConstants.OP_DEL_EZNODE_EXP, path, null, null,
+                            sessionHex, null, Result.FAILURE);
+                }
             }
         }
     }

+ 3 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -44,6 +44,7 @@ import org.apache.zookeeper.OpResult.SetDataResult;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.audit.AuditHelper;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -165,6 +166,7 @@ public class FinalRequestProcessor implements RequestProcessor {
         String path = null;
         try {
             if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
+                AuditHelper.addAuditLog(request, rc, true);
                 /*
                  * When local session upgrading is disabled, leader will
                  * reject the ephemeral node creation due to session expire.
@@ -192,7 +194,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             if (request.isStale()) {
                 ServerMetrics.getMetrics().STALE_REPLIES.add(1);
             }
-
+            AuditHelper.addAuditLog(request, rc);
             switch (request.type) {
             case OpCode.ping: {
                 lastOp = "PING";

+ 28 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -28,6 +28,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.metrics.Summary;
 import org.apache.zookeeper.metrics.SummarySet;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.AuthUtil;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -434,4 +435,31 @@ public class Request {
         logLatency(metric, key, Time.currentWallTime());
     }
 
+
+    /**
+     * Returns comma separated list of users authenticated in the current
+     * session
+     */
+    public String getUsers() {
+        if (authInfo == null) {
+            return (String) null;
+        }
+        if (authInfo.size() == 1) {
+            return AuthUtil.getUser(authInfo.get(0));
+        }
+        StringBuilder users = new StringBuilder();
+        boolean first = true;
+        for (Id id : authInfo) {
+            String user = AuthUtil.getUser(id);
+            if (user != null) {
+                if (first) {
+                    first = false;
+                } else {
+                    users.append(",");
+                }
+                users.append(user);
+            }
+        }
+        return users.toString();
+    }
 }

+ 22 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.cert.Certificate;
@@ -555,4 +556,25 @@ public abstract class ServerCnxn implements Stats, Watcher {
         }
     }
 
+    /**
+     * Returns the IP address or empty string.
+     */
+    public String getHostAddress() {
+        InetSocketAddress remoteSocketAddress = getRemoteSocketAddress();
+        if (remoteSocketAddress == null) {
+            return "";
+        }
+        InetAddress address = remoteSocketAddress.getAddress();
+        if (address == null) {
+            return "";
+        }
+        return address.getHostAddress();
+    }
+
+    /**
+     * Get session id in hexadecimal notation.
+     */
+    public String getSessionIdHex() {
+        return "0x" + Long.toHexString(getSessionId());
+    }
 }

+ 15 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java

@@ -54,6 +54,8 @@ public abstract class ServerCnxnFactory {
     // sessionMap is used by closeSession()
     final ConcurrentHashMap<Long, ServerCnxn> sessionMap = new ConcurrentHashMap<Long, ServerCnxn>();
 
+    private static String loginUser = Login.SYSTEM_USER;
+
     public void addSession(long sessionId, ServerCnxn cnxn) {
         sessionMap.put(sessionId, cnxn);
     }
@@ -264,6 +266,7 @@ public abstract class ServerCnxnFactory {
         try {
             saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
             login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig());
+            setLoginUser(login.getUserName());
             login.startThreadIfNeeded();
         } catch (LoginException e) {
             throw new IOException("Could not configure server because SASL configuration did not allow the "
@@ -272,4 +275,16 @@ public abstract class ServerCnxnFactory {
         }
     }
 
+    private static void setLoginUser(String name) {
+        //Created this method to avoid ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD find bug issue
+        loginUser = name;
+    }
+    /**
+     * User who has started the ZooKeeper server user, it will be the logged-in
+     * user. If no user logged-in then system user
+     */
+    public static String getUserName() {
+        return loginUser;
+    }
+
 }

+ 7 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java

@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import javax.management.JMException;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.audit.ZKAuditProvider;
 import org.apache.zookeeper.jmx.ManagedUtil;
 import org.apache.zookeeper.metrics.MetricsProvider;
 import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
@@ -67,21 +68,26 @@ public class ZooKeeperServerMain {
             LOG.error("Invalid arguments, exiting abnormally", e);
             LOG.info(USAGE);
             System.err.println(USAGE);
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.INVALID_INVOCATION.getValue());
         } catch (ConfigException e) {
             LOG.error("Invalid config, exiting abnormally", e);
             System.err.println("Invalid config, exiting abnormally");
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.INVALID_INVOCATION.getValue());
         } catch (DatadirException e) {
             LOG.error("Unable to access datadir, exiting abnormally", e);
             System.err.println("Unable to access datadir, exiting abnormally");
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
         } catch (AdminServerException e) {
             LOG.error("Unable to start AdminServer, exiting abnormally", e);
             System.err.println("Unable to start AdminServer, exiting abnormally");
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
         } catch (Exception e) {
             LOG.error("Unexpected exception, exiting abnormally", e);
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
         }
         LOG.info("Exiting normally");
@@ -165,6 +171,7 @@ public class ZooKeeperServerMain {
                 Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
                 Integer.getInteger("znode.container.maxPerMinute", 10000));
             containerManager.start();
+            ZKAuditProvider.addZKStartStopAuditLog();
 
             // Watch status of ZooKeeper server. It will do a graceful shutdown
             // if the server is not running or hits an internal error.

+ 13 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/AuthenticationProvider.java

@@ -81,4 +81,17 @@ public interface AuthenticationProvider {
      */
     boolean isValid(String id);
 
+    /**
+     * <param>id</param> represents the authentication info which is set in server connection.
+     * id may contain both user name as well as password.
+     * This method should be implemented to extract the user name.
+     *
+     * @param id authentication info set by client.
+     * @return String user name
+     */
+    default String getUserName(String id) {
+        // Most of the authentication providers id contains only user name.
+        return id;
+    }
+
 }

+ 9 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java

@@ -120,6 +120,15 @@ public class DigestAuthenticationProvider implements AuthenticationProvider {
         return id.equals(aclExpr);
     }
 
+    @Override
+    public String getUserName(String id) {
+        /**
+         * format is already enforced in server code. so no need to check it
+         * again, just assume it is in correct format
+         */
+        return id.split(":")[0];
+    }
+
     /** Call with a single argument of user:pass to generate authdata.
      * Authdata output can be used when setting superDigest for example.
      * @param args single argument of user:pass

+ 7 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import javax.management.JMException;
 import javax.security.sasl.SaslException;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.audit.ZKAuditProvider;
 import org.apache.zookeeper.jmx.ManagedUtil;
 import org.apache.zookeeper.metrics.MetricsProvider;
 import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
@@ -90,21 +91,26 @@ public class QuorumPeerMain {
             LOG.error("Invalid arguments, exiting abnormally", e);
             LOG.info(USAGE);
             System.err.println(USAGE);
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.INVALID_INVOCATION.getValue());
         } catch (ConfigException e) {
             LOG.error("Invalid config, exiting abnormally", e);
             System.err.println("Invalid config, exiting abnormally");
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.INVALID_INVOCATION.getValue());
         } catch (DatadirException e) {
             LOG.error("Unable to access datadir, exiting abnormally", e);
             System.err.println("Unable to access datadir, exiting abnormally");
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
         } catch (AdminServerException e) {
             LOG.error("Unable to start AdminServer, exiting abnormally", e);
             System.err.println("Unable to start AdminServer, exiting abnormally");
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
         } catch (Exception e) {
             LOG.error("Unexpected exception, exiting abnormally", e);
+            ZKAuditProvider.addServerStartFailureAuditLog();
             System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
         }
         LOG.info("Exiting normally");
@@ -215,6 +221,7 @@ public class QuorumPeerMain {
             }
 
             quorumPeer.start();
+            ZKAuditProvider.addZKStartStopAuditLog();
             quorumPeer.join();
         } catch (InterruptedException e) {
             // warn, but generally this is ok

+ 39 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AuthUtil.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.AuthenticationProvider;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+
+public final class AuthUtil {
+    private AuthUtil() {
+        //Utility classes should not have public constructors
+    }
+    /**
+     * Gives user name
+     *
+     * @param id contains scheme and authentication info
+     * @return returns null if authentication scheme does not exist or
+     * authentication provider returns null as user
+     */
+    public static String getUser(Id id) {
+        AuthenticationProvider provider = ProviderRegistry.getProvider(id.getScheme());
+        return provider == null ? null : provider.getUserName(id.getId());
+    }
+}

+ 45 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/audit/AuditEventTest.java

@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.zookeeper.audit.AuditEvent.Result;
+import org.junit.Test;
+
+public class AuditEventTest {
+
+    @Test
+    public void testFormat() {
+        AuditEvent auditEvent = new AuditEvent(Result.SUCCESS);
+        auditEvent.addEntry(AuditEvent.FieldName.USER, "Value1");
+        auditEvent.addEntry(AuditEvent.FieldName.OPERATION, "Value2");
+        String actual = auditEvent.toString();
+        String expected = "user=Value1\toperation=Value2\tresult=success";
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFormatShouldIgnoreKeyIfValueIsNull() {
+        AuditEvent auditEvent = new AuditEvent(Result.SUCCESS);
+        auditEvent.addEntry(AuditEvent.FieldName.USER, null);
+        auditEvent.addEntry(AuditEvent.FieldName.OPERATION, "Value2");
+        String actual = auditEvent.toString();
+        String expected = "operation=Value2\tresult=success";
+        assertEquals(expected, actual);
+    }
+}

+ 74 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/audit/AuditLogPerfReading.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+/**
+ * Audit log performance reading
+ */
+public final class AuditLogPerfReading {
+    // time taken by create operations
+    private long create;
+    // time taken by setData operations
+    private long setData;
+    // time taken by delete operations
+    private long delete;
+
+    public long getCreate() {
+        return create;
+    }
+
+    public void setCreate(long create) {
+        this.create = create;
+    }
+
+    public long getSetData() {
+        return setData;
+    }
+
+    public void setSetData(long setData) {
+        this.setData = setData;
+    }
+
+    public long getDelete() {
+        return delete;
+    }
+
+    public void setDelete(long delete) {
+        this.delete = delete;
+    }
+
+    public String report() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("create=");
+        builder.append(create);
+        builder.append(" ms\n");
+        builder.append("setData=");
+        builder.append(setData);
+        builder.append(" ms\n");
+        builder.append("delete=");
+        builder.append(delete);
+        builder.append(" ms\n");
+        return builder.toString();
+    }
+
+    @Override
+    public String toString() {
+        return "create=" + create + ", setData=" + setData + ", delete="
+                + delete;
+    }
+}

+ 442 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/audit/Log4jAuditLoggerTest.java

@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.StringReader;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.audit.AuditEvent.Result;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class Log4jAuditLoggerTest extends QuorumPeerTestBase {
+    private static final Logger LOG = Logger.getLogger(Log4jAuditLoggerTest.class);
+    private static int SERVER_COUNT = 3;
+    private static MainThread[] mt;
+    private static ZooKeeper zk;
+    private static Logger zlogger;
+    private static WriterAppender appender;
+    private static ByteArrayOutputStream os;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        System.setProperty(ZKAuditProvider.AUDIT_ENABLE, "true");
+        // setup the logger to capture all logs
+        Layout layout = new SimpleLayout();
+        os = new ByteArrayOutputStream();
+        appender = new WriterAppender(layout, os);
+        appender.setImmediateFlush(true);
+        appender.setThreshold(Level.INFO);
+        zlogger = Logger.getLogger(Log4jAuditLogger.class);
+        zlogger.addAppender(appender);
+        mt = startQuorum();
+        zk = ClientBase.createZKClient("127.0.0.1:" + mt[0].getQuorumPeer().getClientPort());
+        //Verify start audit log here itself
+        String expectedAuditLog = getStartLog();
+        List<String> logs = readAuditLog(os, SERVER_COUNT);
+        verifyLogs(expectedAuditLog, logs);
+    }
+
+    @Before
+    public void setUp() {
+        os.reset();
+    }
+
+    @Test
+    public void testCreateAuditLogs()
+            throws KeeperException, InterruptedException, IOException {
+        String path = "/createPath";
+        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        // success log
+        String createMode = CreateMode.PERSISTENT.toString().toLowerCase();
+        verifyLog(
+                getAuditLog(AuditConstants.OP_CREATE, path, Result.SUCCESS,
+                        null, createMode), readAuditLog(os));
+        try {
+            zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } catch (KeeperException exception) {
+            Code code = exception.code();
+            assertEquals(Code.NODEEXISTS, code);
+        }
+        // Verify create operation log
+        verifyLog(
+                getAuditLog(AuditConstants.OP_CREATE, path, Result.FAILURE,
+                        null, createMode), readAuditLog(os));
+    }
+
+    @Test
+    public void testDeleteAuditLogs()
+            throws InterruptedException, IOException, KeeperException {
+        String path = "/deletePath";
+        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        os.reset();
+        try {
+            zk.delete(path, -100);
+        } catch (KeeperException exception) {
+            Code code = exception.code();
+            assertEquals(Code.BADVERSION, code);
+        }
+        verifyLog(getAuditLog(AuditConstants.OP_DELETE, path,
+                Result.FAILURE),
+                readAuditLog(os));
+        zk.delete(path, -1);
+        verifyLog(getAuditLog(AuditConstants.OP_DELETE, path),
+                readAuditLog(os));
+    }
+
+    @Test
+    public void testSetDataAuditLogs()
+            throws InterruptedException, IOException, KeeperException {
+        String path = "/setDataPath";
+        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        os.reset();
+        try {
+            zk.setData(path, "newData".getBytes(), -100);
+        } catch (KeeperException exception) {
+            Code code = exception.code();
+            assertEquals(Code.BADVERSION, code);
+        }
+        verifyLog(getAuditLog(AuditConstants.OP_SETDATA, path,
+                Result.FAILURE),
+                readAuditLog(os));
+        zk.setData(path, "newdata".getBytes(), -1);
+        verifyLog(getAuditLog(AuditConstants.OP_SETDATA, path),
+                readAuditLog(os));
+    }
+
+    @Test
+    public void testSetACLAuditLogs()
+            throws InterruptedException, IOException, KeeperException {
+        ArrayList<ACL> openAclUnsafe = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+        String path = "/aclPath";
+        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        os.reset();
+        try {
+            zk.setACL(path, openAclUnsafe, -100);
+        } catch (KeeperException exception) {
+            Code code = exception.code();
+            assertEquals(Code.BADVERSION, code);
+        }
+        verifyLog(
+                getAuditLog(AuditConstants.OP_SETACL, path, Result.FAILURE,
+                        ZKUtil.aclToString(openAclUnsafe), null), readAuditLog(os));
+        zk.setACL(path, openAclUnsafe, -1);
+        verifyLog(
+                getAuditLog(AuditConstants.OP_SETACL, path, Result.SUCCESS,
+                        ZKUtil.aclToString(openAclUnsafe), null), readAuditLog(os));
+    }
+
+    @Test
+    public void testMultiOperationAuditLogs()
+            throws InterruptedException, KeeperException, IOException {
+        List<Op> ops = new ArrayList<>();
+
+        String multiop = "/b";
+        Op create = Op.create(multiop, "".getBytes(),
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        Op setData = Op.setData(multiop, "newData".getBytes(), -1);
+        // check does nothing so it is audit logged
+        Op check = Op.check(multiop, -1);
+        Op delete = Op.delete(multiop, -1);
+
+        String createMode = CreateMode.PERSISTENT.toString().toLowerCase();
+
+        ops.add(create);
+        ops.add(setData);
+        ops.add(check);
+        ops.add(delete);
+
+        zk.multi(ops);
+        List<String> multiOpLogs = readAuditLog(os, 3);
+        // verify that each multi operation success is logged
+        verifyLog(getAuditLog(AuditConstants.OP_CREATE, multiop,
+                Result.SUCCESS, null, createMode),
+                multiOpLogs.get(0));
+        verifyLog(getAuditLog(AuditConstants.OP_SETDATA, multiop),
+                multiOpLogs.get(1));
+        verifyLog(getAuditLog(AuditConstants.OP_DELETE, multiop),
+                multiOpLogs.get(2));
+
+        ops = new ArrayList<>();
+        ops.add(create);
+        ops.add(create);
+        try {
+            zk.multi(ops);
+        } catch (KeeperException exception) {
+            Code code = exception.code();
+            assertEquals(Code.NODEEXISTS, code);
+        }
+
+        // Verify that multi operation failure is logged, and there is no path
+        // mentioned in the audit log
+        verifyLog(getAuditLog(AuditConstants.OP_MULTI_OP, null,
+                Result.FAILURE),
+                readAuditLog(os));
+    }
+
+    @Test
+    public void testEphemralZNodeAuditLogs()
+            throws Exception {
+        String ephemralPath = "/ephemral";
+        CountdownWatcher watcher2 = new CountdownWatcher();
+        ZooKeeper zk2 = new ZooKeeper(
+                "127.0.0.1:" + mt[0].getQuorumPeer().getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, watcher2);
+        watcher2.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        zk2.create(ephemralPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+        String session2 = "0x" + Long.toHexString(zk2.getSessionId());
+        verifyLog(getAuditLog(AuditConstants.OP_CREATE, ephemralPath,
+                Result.SUCCESS, null,
+                CreateMode.EPHEMERAL.toString().toLowerCase(),
+                session2), readAuditLog(os));
+        zk2.close();
+        waitForDeletion(zk, ephemralPath);
+        // verify that ephemeral node deletion on session close are captured
+        // in audit log
+        // Because these operations are done by ZooKeeper server itself,
+        // there are no IP user is zkServer user, not any client user
+        verifyLogs(getAuditLog(AuditConstants.OP_DEL_EZNODE_EXP, ephemralPath,
+                Result.SUCCESS, null, null, session2,
+                ZKAuditProvider.getZKUser(), null), readAuditLog(os, SERVER_COUNT));
+    }
+
+
+    private static String getStartLog() {
+        // user=userName operation=ZooKeeperServer start  result=success
+        AuditEvent logEvent = ZKAuditProvider.createLogEvent(ZKAuditProvider.getZKUser(),
+                AuditConstants.OP_START, Result.SUCCESS);
+        return logEvent.toString();
+    }
+
+    private String getAuditLog(String operation, String znode) {
+        return getAuditLog(operation, znode, Result.SUCCESS);
+    }
+
+    private String getAuditLog(String operation, String znode, Result result) {
+        return getAuditLog(operation, znode, result, null, null);
+    }
+
+    private String getAuditLog(String operation, String znode, Result result,
+                               String acl, String createMode) {
+        String session = getSession();
+        return getAuditLog(operation, znode, result, acl, createMode, session);
+    }
+
+    private String getAuditLog(String operation, String znode, Result result,
+                               String acl, String createMode, String session) {
+        String user = getUser();
+        String ip = getIp();
+        return getAuditLog(operation, znode, result, acl, createMode, session,
+                user, ip);
+    }
+
+    private String getAuditLog(String operation, String znode, Result result,
+                               String acl, String createMode, String session, String user, String ip) {
+        AuditEvent logEvent = ZKAuditProvider.createLogEvent(user, operation, znode, acl, createMode, session, ip,
+                result);
+        String auditLog = logEvent.toString();
+        LOG.info("expected audit log for operation '" + operation + "' is '"
+                + auditLog + "'");
+        return auditLog;
+    }
+
+    private String getSession() {
+        return "0x" + Long.toHexString(zk.getSessionId());
+    }
+
+    private String getUser() {
+        ServerCnxn next = getServerCnxn();
+        Request request = new Request(next, -1, -1, -1, null,
+                next.getAuthInfo());
+        return request.getUsers();
+    }
+
+    private String getIp() {
+        ServerCnxn next = getServerCnxn();
+        InetSocketAddress remoteSocketAddress = next.getRemoteSocketAddress();
+        InetAddress address = remoteSocketAddress.getAddress();
+        return address.getHostAddress();
+    }
+
+    private ServerCnxn getServerCnxn() {
+        Iterable<ServerCnxn> connections = mt[0].getQuorumPeer()
+                .getActiveServer()
+                .getServerCnxnFactory().getConnections();
+        return connections.iterator().next();
+    }
+
+    private static void verifyLog(String expectedLog, String log) {
+        String searchString = " - ";
+        int logStartIndex = log.indexOf(searchString);
+        String auditLog = log.substring(logStartIndex + searchString.length());
+        assertEquals(expectedLog, auditLog);
+
+    }
+
+    private static void verifyLogs(String expectedLog, List<String> logs) {
+        for (String log : logs) {
+            verifyLog(expectedLog, log);
+        }
+    }
+
+    private String readAuditLog(ByteArrayOutputStream os) throws IOException {
+        return readAuditLog(os, 1).get(0);
+    }
+
+    private static List<String> readAuditLog(ByteArrayOutputStream os,
+                                             int numberOfLogEntry)
+            throws IOException {
+        return readAuditLog(os, numberOfLogEntry, false);
+    }
+
+    private static List<String> readAuditLog(ByteArrayOutputStream os,
+                                             int numberOfLogEntry,
+                                             boolean skipEphemralDeletion) throws IOException {
+        List<String> logs = new ArrayList<>();
+        LineNumberReader r = new LineNumberReader(
+                new StringReader(os.toString()));
+        String line;
+        while ((line = r.readLine()) != null) {
+            if (skipEphemralDeletion
+                    && line.contains(AuditConstants.OP_DEL_EZNODE_EXP)) {
+                continue;
+            }
+            logs.add(line);
+        }
+        os.reset();
+        assertEquals(
+                "Expected number of log entries are not generated. Logs are "
+                        + logs,
+                numberOfLogEntry, logs.size());
+        return logs;
+
+    }
+
+    private static MainThread[] startQuorum() throws IOException {
+        final int[] clientPorts = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        sb.append("4lw.commands.whitelist=*");
+        sb.append("\n");
+        String server;
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                    + ":"
+                    + PortAssignment.unique() + ":participant;127.0.0.1:"
+                    + clientPorts[i];
+            sb.append(server);
+            sb.append("\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+        MainThread[] mt = new MainThread[SERVER_COUNT];
+
+        // start all the servers
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+                    false);
+            mt[i].start();
+        }
+
+        // ensure all servers started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT));
+        }
+        return mt;
+    }
+
+    private void waitForDeletion(ZooKeeper zooKeeper, String path)
+            throws Exception {
+        long elapsedTime = 0;
+        long waitInterval = 10;
+        int timeout = 100;
+        Stat exists = zooKeeper.exists(path, false);
+        while (exists != null && elapsedTime < timeout) {
+            try {
+                Thread.sleep(waitInterval);
+            } catch (InterruptedException e) {
+                Assert.fail("CurrentEpoch update failed");
+            }
+            elapsedTime = elapsedTime + waitInterval;
+            exists = zooKeeper.exists(path, false);
+        }
+        Assert.assertNull("Node " + path + " not deleted in " + timeout + " ms",
+                exists);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        System.clearProperty(ZKAuditProvider.AUDIT_ENABLE);
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            try {
+                if (mt[i] != null) {
+                    mt[i].shutdown();
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        try {
+            zlogger.removeAppender(appender);
+            os.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 150 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/audit/ZKAuditLoggerPerformance.java

@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.audit;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZKAuditLoggerPerformance {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(ZKAuditLoggerPerformance.class);
+    private ZooKeeper zkClient;
+    private String parentPath;
+    private int numberOfRecords;
+
+    public ZKAuditLoggerPerformance(ZooKeeper zkClient, String parentPath,
+                                    int numberOfRecords) {
+        this.zkClient = zkClient;
+        this.parentPath = parentPath;
+        this.numberOfRecords = numberOfRecords;
+    }
+
+    public void create() throws Exception {
+        for (int i = 0; i < numberOfRecords; i++) {
+            zkClient.create(getPath(i), "0123456789".getBytes(),
+                    Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        }
+    }
+
+    public void setData() throws Exception {
+        for (int i = 0; i < numberOfRecords; i++) {
+            zkClient.setData(getPath(i), "9876543210".getBytes(), -1);
+        }
+    }
+
+    public void delete() throws Exception {
+        for (int i = 0; i < numberOfRecords; i++) {
+            zkClient.delete(getPath(i), -1);
+        }
+    }
+
+    public AuditLogPerfReading doOperations() throws Exception {
+        AuditLogPerfReading perfReading = new AuditLogPerfReading();
+        // create
+        long startTime = Time.currentElapsedTime();
+        create();
+        perfReading.setCreate(Time.currentElapsedTime() - startTime);
+
+        // setData
+        startTime = Time.currentElapsedTime();
+        setData();
+        perfReading.setSetData(Time.currentElapsedTime() - startTime);
+
+        // delete
+        startTime = Time.currentElapsedTime();
+        delete();
+        perfReading.setDelete(Time.currentElapsedTime() - startTime);
+        return perfReading;
+    }
+
+    private String getPath(int i) {
+        return parentPath + "zNode" + i;
+    }
+
+    public static void main(String[] args) {
+        if (args.length != 3) {
+            System.err.println(
+                    "USAGE: ZKAuditLoggerPerformance connectionString parentPath numberOfRecords");
+            System.exit(1);
+        }
+        String cxnString = args[0];
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zkClient = null;
+        try {
+            zkClient = new ZooKeeper(cxnString, 60000, watcher);
+            watcher.waitForConnected(30000);
+        } catch (InterruptedException | TimeoutException | IOException e) {
+            String msg = "ZooKeeper client can not connect to " + cxnString;
+            logErrorAndExit(e, msg);
+        }
+        String parentPath = args[1];
+        try {
+            Stat exists = zkClient.exists(parentPath, false);
+            if (exists == null) {
+                System.err.println(
+                        "Parent path '" + parentPath + "' must exist.");
+                System.exit(1);
+            }
+        } catch (KeeperException | InterruptedException e1) {
+            String msg = "Error while checking the existence of parent path";
+            logErrorAndExit(e1, msg);
+        }
+        int recordCount = 0;
+        try {
+            recordCount = Integer.parseInt(args[2]);
+        } catch (NumberFormatException e) {
+            String msg = "Failed to parse '" + args[2] + "' to integer";
+            LOG.error(msg, e);
+            System.err.println(msg);
+            System.exit(1);
+        }
+        ZKAuditLoggerPerformance auditLoggingPerf = new ZKAuditLoggerPerformance(
+                zkClient,
+                parentPath, recordCount);
+        AuditLogPerfReading doOperations = null;
+        try {
+            doOperations = auditLoggingPerf.doOperations();
+        } catch (Exception e) {
+            String msg = "Error while doing operations.";
+            LOG.error(msg, e);
+            System.err.println(msg);
+            System.exit(1);
+        }
+        System.out
+                .println("Time taken for " + recordCount + " operations are:");
+        System.out.println(doOperations.report());
+        System.exit(0);
+    }
+
+    private static void logErrorAndExit(Exception e, String msg) {
+        LOG.error(msg, e);
+        System.err.println(msg + ", error=" + e.getMessage());
+        System.exit(1);
+    }
+}

+ 71 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AuthUtilTest.java

@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AuthUtilTest {
+
+    @BeforeClass
+    public static void beforeClassSetUp() {
+        ProviderRegistry.reset();
+        System.setProperty("zookeeper.authProvider.sasl",
+                "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+        System.setProperty("zookeeper.authProvider.x509",
+                "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+    }
+
+    @AfterClass
+    public static void afterClassTearDown() {
+        System.clearProperty("zookeeper.authProvider.sasl");
+        System.clearProperty("zookeeper.authProvider.x509");
+    }
+
+    @Test
+    public void testGetUserFromAllAuthenticationScheme() {
+        String user = "zkUser";
+        Id id = new Id("digest", user + ":password");
+        String result = AuthUtil.getUser(id);
+        assertEquals(user, result);
+
+        String principal = "zkCli/hadoop.hadoop.com";
+        id = new Id("sasl", principal);
+        assertEquals(principal, AuthUtil.getUser(id));
+
+        String ip = "192.168.1.2";
+        id = new Id("ip", ip);
+        assertEquals(ip, AuthUtil.getUser(id));
+
+        String certificate = "CN=host-192.168.1.2,OU=OrganizationUnit,O=Organization,L=Location,ST=State,C=IN";
+        id = new Id("x509", certificate);
+        assertEquals(certificate, AuthUtil.getUser(id));
+    }
+
+    @Test
+    public void testGetUserShouldReturnNullIfAuthenticationNotConfigured() {
+        Id id = new Id("invalid Authentication Scheme", "user");
+        String result = AuthUtil.getUser(id);
+        assertNull(result);
+    }
+}