소스 검색

ZOOKEEPER-4570: Admin server API for taking snapshot and stream out data (#1943)

Provides a snapshot command for taking snapshot and streaming out data

Author: Li Wang <liwang@apple.com>

Co-authored-by: Li Wang <liwang@apple.com>
Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
li4wang 2 년 전
부모
커밋
b069edeb24
26개의 변경된 파일1073개의 추가작업 그리고 52개의 파일을 삭제
  1. 50 1
      zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
  2. 36 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
  3. 12 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
  4. 45 7
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
  5. 5 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java
  6. 76 2
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java
  7. 100 2
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
  8. 19 7
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
  9. 4 2
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java
  10. 52 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
  11. 17 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
  12. 10 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
  13. 3 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
  14. 60 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java
  15. 103 0
      zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java
  16. 4 0
      zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
  17. 55 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
  18. 2 1
      zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
  19. 3 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
  20. 6 1
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java
  21. 71 22
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
  22. 3 3
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
  23. 257 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java
  24. 8 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java
  25. 10 2
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
  26. 62 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java

+ 50 - 1
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -1208,7 +1208,32 @@ property, when available, is noted below.
 
     The default value is false.
 
-
+* *serializeLastProcessedZxid.enabled*
+  (Jave system property: **zookeeper.serializeLastProcessedZxid.enabled**)
+  **New in 3.9.0:**
+  If enabled, ZooKeeper serializes the lastProcessedZxid when snapshot and deserializes it
+  when restore. Defaults to true. Needs to be enabled for performing snapshot and restore
+  via admin server commands, as there is no snapshot file name to extract the lastProcessedZxid.
+  
+  This feature is backward and forward compatible. Here are the different scenarios.
+
+    1. Snapshot triggered by server internally
+       a. When loading old snapshot with new code, it will throw EOFException when trying to
+       read the non-exist lastProcessedZxid value, and the exception will be caught. 
+       The lastProcessedZxid will be set using the snapshot file name.
+       
+       b. When loading new snapshot with old code, it will finish successfully after deserializing the 
+       digest value, the lastProcessedZxid at the end of snapshot file will be ignored.
+       The lastProcessedZxid will be set using the snapshot file name.
+    
+    2. Sync up between leader and follower
+       The lastProcessedZxid will not be serialized by leader and deserialized by follower
+       in both new and old code. It will be set to the lastProcessedZxid sent from leader
+       via QuorumPacket.  
+
+   3. Snapshot triggered via admin server APIs
+      The feature flag need to be enabled for the snapshot command to work. 
+     
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
@@ -2087,6 +2112,20 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
 
 #### AdminServer configuration
 
+**New in 3.9.0:** The following
+options are used to configure the [AdminServer](#sc_adminserver).
+
+* *admin.snapshot.enabled* :
+  (Java system property: **zookeeper.admin.snapshot.enabled**)
+  The flag for enabling the snapshot command. Defaults to false. 
+  It will be enabled by default once the auth support for admin server commands 
+  is available.
+
+* *admin.snapshot.intervalInMS* :
+  (Java system property: **zookeeper.admin.snapshot.intervalInMS**)
+  The time interval for rate limiting snapshot command to protect the server.
+  Defaults to 5 mins.
+
 **New in 3.7.1:** The following
 options are used to configure the [AdminServer](#sc_adminserver).
 
@@ -2620,6 +2659,16 @@ Available commands include:
     Server information.
     Returns multiple fields giving a brief overview of server state.
 
+* *snapshot/snap* :
+  Takes a snapshot of the current server in the datadir and stream out data.
+  Optional query parameter:
+  "streaming": Boolean (defaults to true if the parameter is not present)
+  Returns the following via Http headers:
+  "last_zxid": String
+  "snapshot_size": String
+  Note: this API is rate-limited (once every 5 mins by default) to protect the server
+  from being over-loaded.
+
 * *stats/stat* :
     Same as *server_stats* but also returns the "connections" field (see *connections*
     for details).

+ 36 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -1744,6 +1744,42 @@ public class DataTree {
         }
     }
 
+    /**
+     * Serializes the lastProcessedZxid so we can get it from snapshot instead the snapshot file name.
+     * This is needed for performing snapshot and restore via admin server commands.
+     *
+     * @param oa the output stream to write to
+     * @return true if the lastProcessedZxid is serialized successfully, otherwise false
+     * @throws IOException if there is an I/O error
+     */
+    public boolean serializeLastProcessedZxid(final OutputArchive oa) throws IOException {
+        if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) {
+            return false;
+        }
+        oa.writeLong(lastProcessedZxid, "lastZxid");
+        return true;
+    }
+
+    /**
+     * Deserializes the lastProcessedZxid from the input stream and updates the lastProcessedZxid field.
+     *
+     * @param ia the input stream to read from
+     * @return true if lastProcessedZxid is deserialized successfully, otherwise false
+     * @throws IOException if there is an I/O error
+     */
+    public boolean deserializeLastProcessedZxid(final InputArchive ia)  throws IOException {
+        if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) {
+            return false;
+        }
+        try {
+            lastProcessedZxid = ia.readLong("lastZxid");
+        } catch (final EOFException e) {
+            LOG.warn("Got EOFException while reading the last processed zxid, likely due to reading an older snapshot.");
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Compares the actual tree's digest with that in the snapshot.
      * Resets digestFromLoadedSnapshot after comparision.

+ 12 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -72,6 +72,8 @@ public final class ServerMetrics {
         FSYNC_TIME = metricsContext.getSummary("fsynctime", DetailLevel.BASIC);
 
         SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
+        SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
+        SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
         DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
         READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
         UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
@@ -276,6 +278,16 @@ public final class ServerMetrics {
      */
     public final Summary SNAPSHOT_TIME;
 
+    /**
+     * Snapshot error count
+     */
+    public final Counter SNAPSHOT_ERROR_COUNT;
+
+    /**
+     * Snapshot rate limited count
+     */
+    public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
+
     /**
      * Db init time (snapshot loading + txnlog replay)
      */

+ 45 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -120,6 +120,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
     private static boolean digestEnabled;
 
+    public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
+    private static boolean serializeLastProcessedZxidEnabled;
+
     // Add a enable/disable option for now, we should remove this one when
     // this feature is confirmed to be stable
     public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
@@ -153,6 +156,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         closeSessionTxnEnabled = Boolean.parseBoolean(
                 System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
         LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+
+        setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
+                System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
     }
 
     // @VisibleForTesting
@@ -535,23 +541,46 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         takeSnapshot();
     }
 
-    public void takeSnapshot() {
+    public void takeSnapshot() throws IOException {
         takeSnapshot(false);
     }
 
-    public void takeSnapshot(boolean syncSnap) {
+    public void takeSnapshot(boolean syncSnap) throws IOException {
+        takeSnapshot(syncSnap, true, false);
+    }
+
+    /**
+     * Takes a snapshot on the server.
+     *
+     * @param syncSnap syncSnap sync the snapshot immediately after write
+     * @param isSevere if true system exist, otherwise throw IOException
+     * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
+     *
+     * @return file snapshot file object
+     * @throws IOException
+     */
+    public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
         long start = Time.currentElapsedTime();
+        File snapFile = null;
         try {
-            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
+            if (fastForwardFromEdits) {
+                zkDb.fastForwardDataBase();
+            }
+            snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
         } catch (IOException e) {
-            LOG.error("Severe unrecoverable error, exiting", e);
-            // This is a severe error that we cannot recover from,
-            // so we need to exit
-            ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+            if (isSevere) {
+                LOG.error("Severe unrecoverable error, exiting", e);
+                // This is a severe error that we cannot recover from,
+                // so we need to exit
+                ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+            } else {
+                throw e;
+            }
         }
         long elapsed = Time.currentElapsedTime() - start;
         LOG.info("Snapshot taken in {} ms", elapsed);
         ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
+        return snapFile;
     }
 
     public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
@@ -2139,6 +2168,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         ZooKeeperServer.digestEnabled = digestEnabled;
     }
 
+    public static boolean isSerializeLastProcessedZxidEnabled() {
+        return serializeLastProcessedZxidEnabled;
+    }
+
+    public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
+        serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
+        LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
+    }
+
     /**
      * Trim a path to get the immediate predecessor.
      *

+ 5 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java

@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.admin;
 
+import java.io.OutputStream;
 import java.io.PrintWriter;
 
 /**
@@ -31,6 +32,9 @@ public interface CommandOutputter {
     /** The MIME type of this output (e.g., "application/json") */
     String getContentType();
 
-    void output(CommandResponse response, PrintWriter pw);
+    /** Print out data as output */
+    default void output(CommandResponse response, PrintWriter pw) {}
 
+    /** Stream out data as output */
+    default void output(final CommandResponse response, final OutputStream os) {}
 }

+ 76 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java

@@ -17,8 +17,11 @@
 
 package org.apache.zookeeper.server.admin;
 
+import java.io.InputStream;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
 
 /**
  * A response from running a {@link Command}.
@@ -37,6 +40,9 @@ public class CommandResponse {
     private final String command;
     private final String error;
     private final Map<String, Object> data;
+    private final Map<String, String> headers;
+    private int statusCode;
+    private InputStream inputStream;
 
     /**
      * Creates a new response with no error string.
@@ -44,18 +50,35 @@ public class CommandResponse {
      * @param command command name
      */
     public CommandResponse(String command) {
-        this(command, null);
+        this(command, null, HttpServletResponse.SC_OK);
     }
     /**
      * Creates a new response.
      *
      * @param command command name
      * @param error error string (may be null)
+     * @param statusCode http status code
      */
-    public CommandResponse(String command, String error) {
+    public CommandResponse(String command, String error, int statusCode) {
+       this(command, error, statusCode, null);
+    }
+
+
+    /**
+     * Creates a new response.
+     *
+     * @param command command name
+     * @param error error string (may be null)
+     * @param statusCode http status code
+     * @param inputStream inputStream to send out data (may be null)
+     */
+    public CommandResponse(final String command, final String error, final int statusCode, final InputStream inputStream) {
         this.command = command;
         this.error = error;
         data = new LinkedHashMap<String, Object>();
+        headers = new HashMap<>();
+        this.statusCode = statusCode;
+        this.inputStream = inputStream;
     }
 
     /**
@@ -76,6 +99,38 @@ public class CommandResponse {
         return error;
     }
 
+    /**
+     * Gets the http status code
+     *
+     * @return http status code
+     */
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    /**
+     * Sets the http status code
+     */
+    public void setStatusCode(int statusCode) {
+        this.statusCode = statusCode;
+    }
+
+    /**
+     * Gets the InputStream (may be null).
+     *
+     * @return InputStream
+     */
+    public InputStream getInputStream() {
+        return inputStream;
+    }
+
+    /**
+     * Sets the InputStream
+     */
+    public void setInputStream(final InputStream inputStream) {
+         this.inputStream = inputStream;
+    }
+
     /**
      * Adds a key/value pair to this response.
      *
@@ -96,6 +151,25 @@ public class CommandResponse {
         data.putAll(m);
     }
 
+    /**
+     * Adds a header to this response.
+     *
+     * @param name name of the header
+     * @param value value of the header
+     */
+    public void addHeader(final String name, final String value) {
+        headers.put(name, value);
+    }
+
+    /**
+     * Returns all headers
+     *
+     * @return map representation of all headers
+     */
+    public Map<String, String> getHeaders() {
+        return headers;
+    }
+
     /**
      * Converts this response to a map. The returned map is mutable, and
      * changes to it do not reflect back into this response.

+ 100 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

@@ -18,8 +18,11 @@
 
 package org.apache.zookeeper.server.admin;
 
+import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.FileInputStream;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,7 +34,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.Environment.Entry;
 import org.apache.zookeeper.Version;
@@ -41,6 +46,7 @@ import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.persistence.SnapshotInfo;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.Follower;
 import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
 import org.apache.zookeeper.server.quorum.Leader;
@@ -51,7 +57,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.RateLimiter;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,10 +112,12 @@ public class Commands {
         Map<String, String> kwargs) {
         Command command = getCommand(cmdName);
         if (command == null) {
-            return new CommandResponse(cmdName, "Unknown command: " + cmdName);
+            // set the status code to 200 to keep the current behavior of existing commands
+            return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
         }
         if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
-            return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests");
+            // set the status code to 200 to keep the current behavior of existing commands
+            return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
         }
         return command.run(zkServer, kwargs);
     }
@@ -144,6 +154,7 @@ public class Commands {
         registerCommand(new ObserverCnxnStatResetCommand());
         registerCommand(new RuokCommand());
         registerCommand(new SetTraceMaskCommand());
+        registerCommand(new SnapshotCommand());
         registerCommand(new SrvrCommand());
         registerCommand(new StatCommand());
         registerCommand(new StatResetCommand());
@@ -530,6 +541,93 @@ public class Commands {
 
     }
 
+    /**
+     * Take a snapshot of current server and stream out the data.
+     *
+     *  Argument:
+     *   - "streaming": optional String to indicate whether streaming out data
+     *
+     *  Returned snapshot as stream if streaming is true and metadata of the snapshot
+     *   - "last_zxid": String
+     *   - "snapshot_size": String
+     */
+    public static class SnapshotCommand extends CommandBase {
+        static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
+
+        static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
+        static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
+
+        static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
+        static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS";
+
+        private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000"));
+
+        private final RateLimiter rateLimiter;
+
+        public SnapshotCommand() {
+            super(Arrays.asList("snapshot", "snap"));
+            rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS);
+        }
+
+        @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
+                justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
+        @Override
+        public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
+            final CommandResponse response = initializeResponse();
+
+            // check feature flag
+            final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "false"));
+            if (!snapshotEnabled) {
+                response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+                LOG.warn("Snapshot command is disabled");
+                return response;
+            }
+
+            if (!zkServer.isSerializeLastProcessedZxidEnabled()) {
+                response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                LOG.warn("Snapshot command requires serializeLastProcessedZxidEnable flag is set to true");
+                return response;
+            }
+
+            // check rate limiting
+            if (!rateLimiter.allow()) {
+                response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429);
+                ServerMetrics.getMetrics().SNAPSHOT_RATE_LIMITED_COUNT.add(1);
+                LOG.warn("Snapshot request was rate limited");
+                return response;
+            }
+
+            // check the streaming query param
+            boolean streaming = true;
+            if (kwargs.containsKey(REQUEST_QUERY_PARAM_STREAMING)) {
+                streaming = Boolean.parseBoolean(kwargs.get(REQUEST_QUERY_PARAM_STREAMING));
+            }
+
+            // take snapshot and stream out data if needed
+            try {
+                final File snapshotFile = zkServer.takeSnapshot(false, false, true);
+                final long lastZxid = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX);
+                response.addHeader(RESPONSE_HEADER_LAST_ZXID, "0x" + ZxidUtils.zxidToString(lastZxid));
+
+                final long size = snapshotFile.length();
+                response.addHeader(RESPONSE_HEADER_SNAPSHOT_SIZE, String.valueOf(size));
+
+                if (size == 0) {
+                    response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                    ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1);
+                    LOG.warn("Snapshot file {} is empty", snapshotFile);
+                } else if (streaming) {
+                    response.setInputStream(new FileInputStream(snapshotFile));
+                }
+            } catch (final Exception e) {
+                response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1);
+                LOG.warn("Exception occurred when taking the snapshot via the snapshot admin command", e);
+            }
+            return response;
+        }
+    }
+
     /**
      * Server information. Returned map contains:
      *   - "version": String

+ 19 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java

@@ -33,6 +33,7 @@ import org.apache.zookeeper.common.QuorumX509Util;
 import org.apache.zookeeper.common.SecretUtils;
 import org.apache.zookeeper.common.X509Util;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.auth.IPAuthenticationProvider;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.security.ConstraintMapping;
 import org.eclipse.jetty.security.ConstraintSecurityHandler;
@@ -259,15 +260,26 @@ public class JettyAdminServer implements AdminServer {
             }
 
             // Run the command
-            CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+            final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+            response.setStatus(cmdResponse.getStatusCode());
 
-            // Format and print the output of the command
-            CommandOutputter outputter = new JsonOutputter();
-            response.setStatus(HttpServletResponse.SC_OK);
-            response.setContentType(outputter.getContentType());
-            outputter.output(cmdResponse, response.getWriter());
+            final Map<String, String> headers = cmdResponse.getHeaders();
+            for (final Map.Entry<String, String> header : headers.entrySet()) {
+                response.addHeader(header.getKey(), header.getValue());
+            }
+            final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
+            if (cmdResponse.getInputStream() == null) {
+                // Format and print the output of the command
+                CommandOutputter outputter = new JsonOutputter(clientIP);
+                response.setContentType(outputter.getContentType());
+                outputter.output(cmdResponse, response.getWriter());
+            } else {
+                // Stream out the output of the command
+                CommandOutputter outputter = new StreamOutputter(clientIP);
+                response.setContentType(outputter.getContentType());
+                outputter.output(cmdResponse, response.getOutputStream());
+            }
         }
-
     }
 
     /**

+ 4 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java

@@ -35,12 +35,14 @@ public class JsonOutputter implements CommandOutputter {
     public static final String ERROR_RESPONSE = "{\"error\": \"Exception writing command response to JSON\"}";
 
     private ObjectMapper mapper;
+    private final String clientIP;
 
-    public JsonOutputter() {
+    public JsonOutputter(final String clientIP) {
         mapper = new ObjectMapper();
         mapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
         mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
         mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
+        this.clientIP = clientIP;
     }
 
     @Override
@@ -59,7 +61,7 @@ public class JsonOutputter implements CommandOutputter {
             LOG.warn("Exception writing command response to JSON:", e);
             pw.write(ERROR_RESPONSE);
         } catch (IOException e) {
-            LOG.warn("Exception writing command response to JSON:", e);
+            LOG.warn("Exception writing command response as JSON to {}", clientIP, e);
             pw.write(ERROR_RESPONSE);
         }
     }

+ 52 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.zookeeper.common.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for streaming data out.
+ */
+public class StreamOutputter implements CommandOutputter{
+    private static final Logger LOG = LoggerFactory.getLogger(StreamOutputter.class);
+    private final String clientIP;
+
+    public StreamOutputter(final String clientIP) {
+        this.clientIP = clientIP;
+    }
+
+    @Override
+    public String getContentType() {
+        return "application/octet-stream";
+    }
+
+    @Override
+    public void output(final CommandResponse response, final OutputStream os) {
+        try (final InputStream is = response.getInputStream()){
+            IOUtils.copyBytes(is, os, 1024, true);
+        } catch (final IOException e) {
+            LOG.error("Exception occurred when streaming out data to {}", clientIP, e);
+        }
+    }
+}

+ 17 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java

@@ -18,11 +18,14 @@
 
 package org.apache.zookeeper.server.auth;
 
+import java.util.StringTokenizer;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.ServerCnxn;
 
 public class IPAuthenticationProvider implements AuthenticationProvider {
+    private static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For";
 
     public String getScheme() {
         return "ip";
@@ -128,4 +131,18 @@ public class IPAuthenticationProvider implements AuthenticationProvider {
         return true;
     }
 
+    /**
+     * Returns the HTTP(s) client IP address
+     * @param request HttpServletRequest
+     * @return IP address
+     */
+    public static String getClientIPAddress(final HttpServletRequest request) {
+        // to handle the case that a HTTP(s) client connects via a proxy or load balancer
+        final String xForwardedForHeader = request.getHeader(X_FORWARDED_FOR_HEADER_NAME);
+        if (xForwardedForHeader == null) {
+            return request.getRemoteAddr();
+        }
+        // the format of the field is: X-Forwarded-For: client, proxy1, proxy2 ...
+        return new StringTokenizer(xForwardedForHeader, ",").nextToken().trim();
+    }
 }

+ 10 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -99,6 +99,11 @@ public class FileSnap implements SnapShot {
                     SnapStream.checkSealIntegrity(snapIS, ia);
                 }
 
+                // deserialize the last processed zxid and check the intact
+                if (dt.deserializeLastProcessedZxid(ia)) {
+                    SnapStream.checkSealIntegrity(snapIS, ia);
+                }
+
                 foundValid = true;
                 break;
             } catch (IOException e) {
@@ -255,6 +260,11 @@ public class FileSnap implements SnapShot {
                     SnapStream.sealStream(snapOS, oa);
                 }
 
+                // serialize the last processed zxid and add another CRC check
+                if (dt.serializeLastProcessedZxid(oa)) {
+                    SnapStream.sealStream(snapOS, oa);
+                }
+
                 lastSnapshotInfo = new SnapshotInfo(
                     Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
                     snapShot.lastModified() / 1000);

+ 3 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -468,9 +468,10 @@ public class FileTxnSnapLog {
      * @param sessionsWithTimeouts the session timeouts to be
      * serialized onto disk
      * @param syncSnap sync the snapshot immediately after write
+     * @return the snapshot file
      * @throws IOException
      */
-    public void save(
+    public File save(
         DataTree dataTree,
         ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
         boolean syncSnap) throws IOException {
@@ -479,6 +480,7 @@ public class FileTxnSnapLog {
         LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
         try {
             snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
+            return snapshotFile;
         } catch (IOException e) {
             if (snapshotFile.length() == 0) {
                 /* This may be caused by a full disk. In such a case, the server

+ 60 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.common.Time;
+
+/**
+ * A class that provides simple interval-based rate limiting implementation.
+ */
+public class RateLimiter {
+    private final int rate;
+    private final long intervalInMs;
+    private long lastTimeReset;
+    private final AtomicInteger remained;
+
+    public RateLimiter(final int rate, final long interval, final TimeUnit unit) {
+        this.rate = rate;
+        this.intervalInMs = unit.toMillis(interval);
+        this.lastTimeReset = Time.currentElapsedTime();
+        this.remained = new AtomicInteger(rate);
+    }
+
+    public boolean allow() {
+        final long now = Time.currentElapsedTime();
+
+        // reset the rate if interval passed
+        if (now > lastTimeReset + intervalInMs) {
+            remained.set(rate);
+            lastTimeReset = now;
+        }
+
+        int value = remained.get();
+        boolean allowed = false;
+
+        // to handle race condition
+        while (!allowed && value > 0) {
+            allowed = remained.compareAndSet(value, value - 1);
+            value = remained.get();
+        }
+        return allowed;
+    }
+}

+ 103 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java

@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TakeSnapshotTest extends ClientBase {
+    private static final String BASE_PATH = "/takeSnapshotTest";
+    private static final int NODE_COUNT = 100;
+    private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+    private ZooKeeper zk;
+
+    @TempDir
+    static File dataDir;
+
+    @TempDir
+    static File logDir;
+
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        ClientBase.setupTestEnv();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (zk != null) {
+            zk.close();
+        }
+    }
+
+    @Test
+    public void testTakeSnapshotAndRestore() throws Exception {
+        ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+
+        final int port = Integer.parseInt(HOSTPORT.split(":")[1]);
+        final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+        serverCnxnFactory.startup(zks);
+        assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+        try {
+            zk = ClientBase.createZKClient(HOSTPORT);
+            for (int i = 0; i < NODE_COUNT; i++) {
+                final String path = BASE_PATH + "-" + i;
+                zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // takeSnapshot
+            zks.takeSnapshot(false, false, true);
+
+            // clean up
+            zk.close();
+            zks.shutdown();
+
+            // start server again and assert the data restored from snapshot
+            zks = new ZooKeeperServer(dataDir, logDir, 3000);
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+
+            serverCnxnFactory.startup(zks);
+            assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+            zk = ClientBase.createZKClient(HOSTPORT);
+            for (int i = 0; i < NODE_COUNT; i++) {
+                final String path = BASE_PATH + "-" + i;
+                final String expectedData = String.valueOf(i);
+                assertArrayEquals(expectedData.getBytes(), zk.getData(path, null, null));
+            }
+            assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/"));
+        } finally {
+            zks.shutdown();
+            serverCnxnFactory.shutdown();
+        }
+    }
+}

+ 4 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java

@@ -71,6 +71,10 @@ public class ZKTestCase {
         // accidentally attempting to start multiple admin servers on the
         // same port.
         System.setProperty("zookeeper.admin.enableServer", "false");
+
+        // disable rate limiting on the snapshot admin API
+        System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0");
+
         // ZOOKEEPER-2693 disables all 4lw by default.
         // Here we enable the 4lw which ZooKeeper tests depends.
         System.setProperty("zookeeper.4lw.commands.whitelist", "*");

+ 55 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java

@@ -516,6 +516,21 @@ public class DataTreeTest extends ZKTestCase {
         }
     }
 
+    @Test
+    public void testSerializeLastProcessedZxid_Enabled() throws Exception {
+        testSerializeLastProcessedZxid(true, true);
+    }
+
+    @Test
+    public void testSerializeLastProcessedZxid_Disabled() throws Exception {
+        testSerializeLastProcessedZxid(false, false);
+    }
+
+    @Test
+    public void testSerializeLastProcessedZxid_BackwardCompatibility() throws Exception {
+        testSerializeLastProcessedZxid(true, false);
+    }
+
     @Test
     public void testDataTreeMetrics() throws Exception {
         ServerMetrics.getMetrics().resetAll();
@@ -616,4 +631,44 @@ public class DataTreeTest extends ZKTestCase {
         }
     }
 
+    private DataTree buildDataTreeForTest() {
+        final DataTree dt = new DataTree();
+        assertEquals(dt.lastProcessedZxid, 0);
+
+        dt.processTxn(
+                new TxnHeader(100, 1000, 1, 30, ZooDefs.OpCode.create),
+                new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1),
+                null);
+        assertEquals(dt.lastProcessedZxid, 1);
+        return dt;
+    }
+
+    private void testSerializeLastProcessedZxid(boolean enableForSerialize, boolean enableForDeserialize) throws Exception{
+        final DataTree dt = buildDataTreeForTest();
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enableForSerialize);
+            final BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos);
+            if (enableForSerialize) {
+                assertTrue(dt.serializeLastProcessedZxid(oa));
+            } else {
+                assertFalse(dt.serializeLastProcessedZxid(oa));
+            }
+            baos.flush();
+
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enableForDeserialize);
+            try (final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) {
+                final InputArchive ia = BinaryInputArchive.getArchive(bais);
+                if (enableForDeserialize) {
+                    assertTrue(dt.deserializeLastProcessedZxid(ia));
+                } else {
+                    assertFalse(dt.deserializeLastProcessedZxid(ia));
+                }
+                assertEquals(dt.lastProcessedZxid, 1);
+            }
+        } finally {
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+        }
+    }
+
 }

+ 2 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java

@@ -166,7 +166,7 @@ public class SnapshotDigestTest extends ClientBase {
     private void testCompatibleHelper(Boolean enabledBefore, Boolean enabledAfter) throws Exception {
 
         ZooKeeperServer.setDigestEnabled(enabledBefore);
-
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enabledBefore);
 
         // restart the server to cache the option change
         reloadSnapshotAndCheckDigest();
@@ -179,6 +179,7 @@ public class SnapshotDigestTest extends ClientBase {
         server.takeSnapshot();
 
         ZooKeeperServer.setDigestEnabled(enabledAfter);
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enabledAfter);
 
         reloadSnapshotAndCheckDigest();
 

+ 3 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java

@@ -80,11 +80,13 @@ public class TxnLogDigestTest extends ClientBase {
     @Override
     public void setupCustomizedEnv() {
         ZooKeeperServer.setDigestEnabled(true);
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
     }
 
     @Override
     public void cleanUpCustomizedEnv() {
         ZooKeeperServer.setDigestEnabled(false);
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
     }
 
     @BeforeAll
@@ -189,6 +191,7 @@ public class TxnLogDigestTest extends ClientBase {
         QuorumPeerMainTest.waitForOne(zk, States.CONNECTING);
 
         ZooKeeperServer.setDigestEnabled(digestEnabled);
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled);
 
         startServer();
         QuorumPeerMainTest.waitForOne(zk, States.CONNECTED);

+ 6 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java

@@ -18,8 +18,10 @@
 package org.apache.zookeeper.server.admin;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import java.util.HashMap;
 import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.ZKTestCase;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -30,13 +32,16 @@ public class CommandResponseTest extends ZKTestCase {
 
     @BeforeEach
     public void setUp() throws Exception {
-        r = new CommandResponse("makemeasandwich", "makeityourself");
+        r = new CommandResponse("makemeasandwich", "makeityourself", HttpServletResponse.SC_OK);
     }
 
     @Test
     public void testGetters() {
         assertEquals("makemeasandwich", r.getCommand());
         assertEquals("makeityourself", r.getError());
+        assertEquals(HttpServletResponse.SC_OK, r.getStatusCode());
+        assertEquals(new HashMap(), r.getHeaders());
+        assertNull(r.getInputStream());
     }
 
     @Test

+ 71 - 22
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java

@@ -18,18 +18,25 @@
 
 package org.apache.zookeeper.server.admin;
 
+import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerStats;
@@ -50,37 +57,52 @@ public class CommandsTest extends ClientBase {
      *            - the primary name of the command
      * @param kwargs
      *            - keyword arguments to the command
+     * @param expectedHeaders
+     *            - expected HTTP response headers
+     * @param expectedStatusCode
+     *            - expected HTTP status code
      * @param fields
      *            - the fields that are expected in the returned Map
      * @throws IOException
      * @throws InterruptedException
      */
-    public void testCommand(String cmdName, Map<String, String> kwargs, Field... fields) throws IOException, InterruptedException {
+    private void testCommand(String cmdName, Map<String, String> kwargs,
+                             Map<String, String> expectedHeaders, int expectedStatusCode,
+                             Field... fields) throws IOException, InterruptedException {
         ZooKeeperServer zks = serverFactory.getZooKeeperServer();
-        Map<String, Object> result = Commands.runCommand(cmdName, zks, kwargs).toMap();
-
-        assertTrue(result.containsKey("command"));
-        // This is only true because we're setting cmdName to the primary name
-        assertEquals(cmdName, result.remove("command"));
-        assertTrue(result.containsKey("error"));
-        assertNull(result.remove("error"), "error: " + result.get("error"));
-
-        for (Field field : fields) {
-            String k = field.key;
-            assertTrue(result.containsKey(k),
-                    "Result from command " + cmdName + " missing field \"" + k + "\"" + "\n" + result);
-            Class<?> t = field.type;
-            Object v = result.remove(k);
-            assertTrue(t.isAssignableFrom(v.getClass()),
-                    "\"" + k + "\" field from command " + cmdName
-                            + " should be of type " + t + ", is actually of type " + v.getClass());
+        final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs);
+        assertNotNull(commandResponse);
+        assertEquals(expectedStatusCode, commandResponse.getStatusCode());
+        try (final InputStream responseStream = commandResponse.getInputStream()) {
+            if (Boolean.parseBoolean(kwargs.getOrDefault(REQUEST_QUERY_PARAM_STREAMING, "false"))) {
+                assertNotNull(responseStream, "InputStream in the response of command " + cmdName + " should not be null");
+            } else {
+                Map<String, Object> result = commandResponse.toMap();
+                assertTrue(result.containsKey("command"));
+                // This is only true because we're setting cmdName to the primary name
+                assertEquals(cmdName, result.remove("command"));
+                assertTrue(result.containsKey("error"));
+                assertNull(result.remove("error"), "error: " + result.get("error"));
+
+                for (Field field : fields) {
+                    String k = field.key;
+                    assertTrue(result.containsKey(k),
+                            "Result from command " + cmdName + " missing field \"" + k + "\"" + "\n" + result);
+                    Class<?> t = field.type;
+                    Object v = result.remove(k);
+                    assertTrue(t.isAssignableFrom(v.getClass()),
+                            "\"" + k + "\" field from command " + cmdName
+                                    + " should be of type " + t + ", is actually of type " + v.getClass());
+                }
+
+                assertTrue(result.isEmpty(), "Result from command " + cmdName + " contains extra fields: " + result);
+            }
         }
-
-        assertTrue(result.isEmpty(), "Result from command " + cmdName + " contains extra fields: " + result);
+        assertEquals(expectedHeaders, commandResponse.getHeaders());
     }
 
     public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
-        testCommand(cmdName, new HashMap<String, String>(), fields);
+        testCommand(cmdName, new HashMap<String, String>(), new HashMap<>(), HttpServletResponse.SC_OK, fields);
     }
 
     private static class Field {
@@ -91,7 +113,16 @@ public class CommandsTest extends ClientBase {
             this.key = key;
             this.type = type;
         }
+    }
 
+    @Test
+    public void testSnapshot_streaming() throws IOException, InterruptedException {
+        testSnapshot(true);
+    }
+
+    @Test
+    public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
+        testSnapshot(false);
     }
 
     @Test
@@ -208,7 +239,7 @@ public class CommandsTest extends ClientBase {
     public void testSetTraceMask() throws IOException, InterruptedException {
         Map<String, String> kwargs = new HashMap<String, String>();
         kwargs.put("traceMask", "1");
-        testCommand("set_trace_mask", kwargs, new Field("tracemask", Long.class));
+        testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
     }
 
     @Test
@@ -288,4 +319,22 @@ public class CommandsTest extends ClientBase {
         assertThat(response.toMap().containsKey("secure_connections"), is(true));
     }
 
+    private void testSnapshot(final boolean streaming) throws IOException, InterruptedException {
+        System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+        System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+        System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+        try {
+            final Map<String, String> kwargs = new HashMap<>();
+            kwargs.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
+            final Map<String, String> expectedHeaders = new HashMap<>();
+            expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
+            expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
+            testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK);
+        } finally {
+            System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
+            System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+            System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+        }
+    }
+
 }

+ 3 - 3
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java

@@ -61,9 +61,9 @@ public class JettyAdminServerTest extends ZKTestCase {
 
     protected static final Logger LOG = LoggerFactory.getLogger(JettyAdminServerTest.class);
 
-    private static final String URL_FORMAT = "http://localhost:%d/commands";
-    private static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
-    private static final int jettyAdminPort = PortAssignment.unique();
+    static final String URL_FORMAT = "http://localhost:%d/commands";
+    static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
+    static final int jettyAdminPort = PortAssignment.unique();
 
     @BeforeEach
     public void enableServer() {

+ 257 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java

@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
+import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT;
+import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.IOUtils;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SnapshotCommandTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class);
+
+    private static final String PATH = "/snapshot_test";
+    private static final int NODE_COUNT = 10;
+
+    private final String hostPort =  "127.0.0.1:" + PortAssignment.unique();
+    private ServerCnxnFactory cnxnFactory;
+    private JettyAdminServer adminServer;
+    private ZooKeeperServer zks;
+    private ZooKeeper zk;
+
+    @TempDir
+    static File dataDir;
+
+    @TempDir
+    static File logDir;
+
+    @BeforeAll
+    public void setup() throws Exception {
+        // start ZookeeperServer
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+        zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        final int port = Integer.parseInt(hostPort.split(":")[1]);
+        cnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+        cnxnFactory.startup(zks);
+        assertTrue(ClientBase.waitForServerUp(hostPort, 120000));
+
+        // start AdminServer
+        System.setProperty("zookeeper.admin.enableServer", "true");
+        System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
+        System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+        System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+        System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+
+        adminServer = new JettyAdminServer();
+        adminServer.setZooKeeperServer(zks);
+        adminServer.start();
+
+        // create Zookeeper client and test data
+        zk = ClientBase.createZKClient(hostPort);
+        createData(zk, NODE_COUNT);
+    }
+
+    @AfterAll
+    public void tearDown() throws Exception {
+        System.clearProperty("zookeeper.4lw.commands.whitelist");
+        System.clearProperty("zookeeper.admin.enableServer");
+        System.clearProperty("zookeeper.admin.serverPort");
+        System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
+        System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+        System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+
+        if (zk != null) {
+            zk.close();
+        }
+
+        if (adminServer != null) {
+            adminServer.shutdown();
+        }
+
+        if (cnxnFactory != null) {
+            cnxnFactory.shutdown();
+        }
+
+        if (zks != null) {
+            zks.shutdown();
+        }
+    }
+
+    @Test
+    public void testSnapshotCommand_streaming() throws Exception {
+        // take snapshot with streaming
+        final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+        // validate snapshot response
+        assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+        validateResponseHeaders(snapshotConn);
+        final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
+        try (final InputStream inputStream = snapshotConn.getInputStream();
+             final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
+            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+            final long fileSize = Files.size(snapshotFile.toPath());
+            assertTrue(fileSize > 0);
+        }
+    }
+
+    @Test
+    public void testSnapshotCommand_nonStreaming() throws Exception {
+        // take snapshot without streaming
+        final HttpURLConnection snapshotConn = sendSnapshotRequest(false);
+
+        // validate snapshot response
+        assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+        validateResponseHeaders(snapshotConn);
+        displayResponsePayload(snapshotConn);
+    }
+
+    @Test
+    public void testSnapshotCommand_disabled() throws Exception {
+        System.setProperty(ADMIN_SNAPSHOT_ENABLED, "false");
+        try {
+            // take snapshot
+            final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+            // validate snapshot response
+            assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, snapshotConn.getResponseCode());
+        } finally {
+            System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+        }
+    }
+
+    @Test
+    public void testSnapshotCommand_serializeLastZxidDisabled() throws Exception {
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+        try {
+            // take snapshot
+            final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+            // validate snapshot response
+            assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, snapshotConn.getResponseCode());
+        } finally {
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+        }
+    }
+
+    private void createData(final ZooKeeper zk, final long count) throws Exception {
+        try {
+            zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (final KeeperException.NodeExistsException ignore) {
+            // ignore
+        }
+
+        for (int i = 0; i < count; i++) {
+            final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+            LOG.info("Node created. path={}" + processNodePath);
+        }
+    }
+
+    private HttpURLConnection sendSnapshotRequest(final boolean streaming) throws Exception  {
+        final String queryParamsStr = buildQueryStringForSnapshotCommand(streaming);
+        final URL snapshotURL = new URL(String.format(URL_FORMAT + "/snapshot", jettyAdminPort) + "?" + queryParamsStr);
+        final HttpURLConnection snapshotConn = (HttpURLConnection) snapshotURL.openConnection();
+        snapshotConn.setRequestMethod("GET");
+
+        return snapshotConn;
+    }
+
+    private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
+        final Map<String, String> parameters = new HashMap<>();
+        parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
+        return getParamsString(parameters);
+    }
+
+    private static String getParamsString(final Map<String, String> params) throws UnsupportedEncodingException {
+        final StringBuilder result = new StringBuilder();
+
+        for (final Map.Entry<String, String> entry : params.entrySet()) {
+            result.append(URLEncoder.encode(entry.getKey(), "UTF-8"));
+            result.append("=");
+
+            result.append(URLEncoder.encode(entry.getValue(), "UTF-8"));
+            result.append("&");
+        }
+
+        final String resultString = result.toString();
+        return resultString.length() > 0
+                ? resultString.substring(0, resultString.length() - 1)
+                : resultString;
+    }
+
+    private void validateResponseHeaders(final HttpURLConnection conn) {
+        LOG.info("Header:{}, Value:{}",
+                Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID,
+                conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID));
+        assertNotNull(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID));
+
+        LOG.info("Header:{}, Value:{}",
+                Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE,
+                conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE));
+        assertNotNull(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE));
+        assertTrue(Integer.parseInt(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE)) > 0);
+    }
+
+    private void displayResponsePayload(final HttpURLConnection conn) throws IOException {
+        final StringBuilder sb = new StringBuilder();
+        try (final BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+            String inputLine;
+            while ((inputLine = in.readLine()) != null) {
+                sb.append(inputLine);
+            }
+            LOG.info("Response payload: {}", sb);
+        }
+    }
+}

+ 8 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java

@@ -393,6 +393,10 @@ public class FileTxnSnapLogTest {
         SnapStream.setStreamMode(snappyEnabled ? SnapStream.StreamMode.SNAPPY : SnapStream.StreamMode.DEFAULT_MODE);
 
         ZooKeeperServer.setDigestEnabled(digestEnabled);
+        // set the flag to be the same as digestEnabled to make sure the last serialized data
+        // (for example, datatree, digest, lastProcessedZxid) is setup as expected for backward
+        // compatibility test.
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled);
         TxnHeader txnHeader = new TxnHeader(1, 1, 1, 1 + 1, ZooDefs.OpCode.create);
         CreateTxn txn = new CreateTxn("/" + 1, "data".getBytes(), null, false, 1);
         Request request = new Request(1, 1, 1, txnHeader, txn, 1);
@@ -401,6 +405,10 @@ public class FileTxnSnapLogTest {
 
         int expectedNodeCount = dataTree.getNodeCount();
         ZooKeeperServer.setDigestEnabled(!digestEnabled);
+        // set the flag to be the same as digestEnabled to make sure the last serialized data
+        // (for example, datatree, digest, lastProcessedZxid) is setup as expected for backward
+        // compatibility test.
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(!digestEnabled);
         snaplog.restore(dataTree, sessions, (hdr, rec, digest) -> {  });
         assertEquals(expectedNodeCount, dataTree.getNodeCount());
     }

+ 10 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java

@@ -150,7 +150,11 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
             @Override
             public void process(String path) {
                 LOG.info("Take a snapshot");
-                zkServer.takeSnapshot(true);
+                try {
+                    zkServer.takeSnapshot(true);
+                } catch (final IOException e) {
+                    // ignored as it should never reach here because of System.exit() call
+                }
             }
         });
 
@@ -373,7 +377,11 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
             public void process(long sessionId) {
                 LOG.info("Take snapshot");
                 if (shouldTakeSnapshot.getAndSet(false)) {
-                    zkServer.takeSnapshot(true);
+                    try {
+                        zkServer.takeSnapshot(true);
+                    } catch (IOException e) {
+                        // ignored as it should never reach here because of System.exit() call
+                    }
                 }
             }
         });

+ 62 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+public class RateLimiterTest {
+
+    @Test
+    public void testAllow_withinInterval() {
+        final int rate = 2;
+        final RateLimiter rateLimiter = new RateLimiter(rate, 5, TimeUnit.SECONDS);
+        for (int i = 0; i < rate; i++) {
+            assertTrue(rateLimiter.allow());
+        }
+        assertFalse(rateLimiter.allow());
+    }
+
+    @Test
+    public void testAllow_withinInterval_multiThreaded() {
+        final int rate = 10;
+
+        final RateLimiter rateLimiter = new RateLimiter(rate, 5, TimeUnit.SECONDS);
+        final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(rate + 1);
+        for (int i = 0; i < rate; i++) {
+            executor.execute(() -> assertTrue(rateLimiter.allow()));
+        }
+        executor.execute(() -> assertFalse(rateLimiter.allow()));
+    }
+
+    @Test
+    public void testAllow_exceedInterval() throws Exception {
+        final int interval = 1;
+
+        final RateLimiter rateLimiter = new RateLimiter(1, interval, TimeUnit.SECONDS);
+        assertTrue(rateLimiter.allow());
+        assertFalse(rateLimiter.allow());
+        Thread.sleep(TimeUnit.SECONDS.toMillis(interval + 1));
+        assertTrue(rateLimiter.allow());
+    }
+}