浏览代码

ZOOKEEPER-4571: Admin server API for restore database from a snapshot (#1961)

Provides a restore command for restoring database from a snapshot

Author: Li Wang <liwang@apple.com>

Co-authored-by: liwang <liwang@apple.com>
li4wang 2 年之前
父节点
当前提交
d35bdfb9d3
共有 18 个文件被更改,包括 891 次插入162 次删除
  1. 18 5
      zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
  2. 18 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
  3. 40 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
  4. 78 4
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
  5. 21 5
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
  6. 158 64
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
  7. 43 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
  8. 56 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
  9. 37 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
  10. 1 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
  11. 1 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
  12. 2 2
      zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
  13. 141 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java
  14. 23 32
      zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java
  15. 65 20
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
  16. 1 1
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
  17. 187 25
      zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
  18. 1 1
      zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java

+ 18 - 5
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -2115,16 +2115,22 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
 **New in 3.9.0:** The following
 **New in 3.9.0:** The following
 options are used to configure the [AdminServer](#sc_adminserver).
 options are used to configure the [AdminServer](#sc_adminserver).
 
 
+* *admin.rateLimiterIntervalInMS* :
+  (Java system property: **zookeeper.admin.rateLimiterIntervalInMS**)
+  The time interval for rate limiting admin command to protect the server.
+  Defaults to 5 mins.
+
 * *admin.snapshot.enabled* :
 * *admin.snapshot.enabled* :
   (Java system property: **zookeeper.admin.snapshot.enabled**)
   (Java system property: **zookeeper.admin.snapshot.enabled**)
   The flag for enabling the snapshot command. Defaults to false. 
   The flag for enabling the snapshot command. Defaults to false. 
   It will be enabled by default once the auth support for admin server commands 
   It will be enabled by default once the auth support for admin server commands 
   is available.
   is available.
-
-* *admin.snapshot.intervalInMS* :
-  (Java system property: **zookeeper.admin.snapshot.intervalInMS**)
-  The time interval for rate limiting snapshot command to protect the server.
-  Defaults to 5 mins.
+  
+* *admin.restore.enabled* :
+  (Java system property: **zookeeper.admin.restore.enabled**)
+  The flag for enabling the restore command. Defaults to false.
+  It will be enabled by default once the auth support for admin server commands
+  is available.
 
 
 **New in 3.7.1:** The following
 **New in 3.7.1:** The following
 options are used to configure the [AdminServer](#sc_adminserver).
 options are used to configure the [AdminServer](#sc_adminserver).
@@ -2641,6 +2647,13 @@ Available commands include:
     Reset all observer connection statistics. Companion command to *observers*.
     Reset all observer connection statistics. Companion command to *observers*.
     No new fields returned.
     No new fields returned.
 
 
+* *restore/rest* :
+  Restore database from snapshot input stream on the current server.
+  Returns the following data in response payload:
+  "last_zxid": String
+  Note: this API is rate-limited (once every 5 mins by default) to protect the server
+  from being over-loaded.  
+
 * *ruok* :
 * *ruok* :
     No-op command, check if the server is running.
     No-op command, check if the server is running.
     A response does not necessarily indicate that the
     A response does not necessarily indicate that the

+ 18 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -74,6 +74,9 @@ public final class ServerMetrics {
         SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
         SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
         SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
         SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
         SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
         SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
+        RESTORE_TIME = metricsContext.getSummary("restore_time", DetailLevel.BASIC);
+        RESTORE_ERROR_COUNT = metricsContext.getCounter("restore_error_count");
+        RESTORE_RATE_LIMITED_COUNT = metricsContext.getCounter("restore_rate_limited_count");
         DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
         DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
         READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
         READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
         UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
         UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
@@ -288,6 +291,21 @@ public final class ServerMetrics {
      */
      */
     public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
     public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
 
 
+    /**
+     * Restore time
+     */
+    public final Summary RESTORE_TIME;
+
+    /**
+     * Restore error count
+     */
+    public final Counter RESTORE_ERROR_COUNT;
+
+    /**
+     * Restore rate limited count
+     */
+    public final Counter RESTORE_RATE_LIMITED_COUNT;
+
     /**
     /**
      * Db init time (snapshot loading + txnlog replay)
      * Db init time (snapshot loading + txnlog replay)
      */
      */

+ 40 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.zip.CheckedInputStream;
 import org.apache.jute.InputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
@@ -48,8 +49,10 @@ import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.persistence.FileSnap;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
@@ -620,6 +623,43 @@ public class ZKDatabase {
         initialized = true;
         initialized = true;
     }
     }
 
 
+    /**
+     * Deserialize a snapshot that contains FileHeader from an input archive. It is used by
+     * the admin restore command.
+     *
+     * @param ia the input archive to deserialize from
+     * @param is the CheckInputStream to check integrity
+     *
+     * @throws IOException
+     */
+    public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
+        clear();
+
+        // deserialize data tree
+        final DataTree dataTree = getDataTree();
+        final FileSnap filesnap = new FileSnap(snapLog.getSnapDir());
+        filesnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
+        SnapStream.checkSealIntegrity(is, ia);
+
+        // deserialize digest and check integrity
+        if (dataTree.deserializeZxidDigest(ia, 0)) {
+            SnapStream.checkSealIntegrity(is, ia);
+        }
+
+        // deserialize lastProcessedZxid and check integrity
+        if (dataTree.deserializeLastProcessedZxid(ia)) {
+            SnapStream.checkSealIntegrity(is, ia);
+        }
+
+        // compare the digest to find inconsistency
+        if (dataTree.getDigestFromLoadedSnapshot() != null) {
+            dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
+        }
+
+        initialized = true;
+    }
+
+
     /**
     /**
      * serialize the snapshot
      * serialize the snapshot
      * @param oa the output archive to which the snapshot needs to be serialized
      * @param oa the output archive to which the snapshot needs to be serialized

+ 78 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -19,9 +19,11 @@
 package org.apache.zookeeper.server;
 package org.apache.zookeeper.server;
 
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintWriter;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayDeque;
@@ -34,11 +36,16 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Random;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiConsumer;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslException;
+import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
@@ -127,6 +134,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     // this feature is confirmed to be stable
     // this feature is confirmed to be stable
     public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
     public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
     private static boolean closeSessionTxnEnabled = true;
     private static boolean closeSessionTxnEnabled = true;
+    private volatile CountDownLatch restoreLatch;
 
 
     static {
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -541,12 +549,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         takeSnapshot();
         takeSnapshot();
     }
     }
 
 
-    public void takeSnapshot() throws IOException {
-        takeSnapshot(false);
+    public File takeSnapshot() throws IOException {
+        return takeSnapshot(false);
     }
     }
 
 
-    public void takeSnapshot(boolean syncSnap) throws IOException {
-        takeSnapshot(syncSnap, true, false);
+    public File takeSnapshot(boolean syncSnap) throws IOException {
+        return takeSnapshot(syncSnap, true, false);
     }
     }
 
 
     /**
     /**
@@ -583,6 +591,61 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return snapFile;
         return snapFile;
     }
     }
 
 
+    /**
+     * Restores database from a snapshot. It is used by the restore admin server command.
+     *
+     * @param inputStream input stream of snapshot
+     * @Return last processed zxid
+     * @throws IOException
+     */
+    public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
+        if (inputStream == null) {
+            throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
+        }
+
+        long start = Time.currentElapsedTime();
+        LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
+            getZKDatabase().getDataTreeLastProcessedZxid(),
+            getZKDatabase().dataTree.getNodeCount(),
+            getZKDatabase().getSessionCount());
+
+        // restore to a new zkDatabase
+        final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
+        final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
+        final InputArchive ia = BinaryInputArchive.getArchive(cis);
+        newZKDatabase.deserializeSnapshot(ia, cis);
+        LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+            newZKDatabase.getDataTreeLastProcessedZxid(),
+            newZKDatabase.dataTree.getNodeCount(),
+            newZKDatabase.getSessionCount());
+
+        // create a CountDownLatch
+        restoreLatch = new CountDownLatch(1);
+
+        try {
+            // set to the new zkDatabase
+            setZKDatabase(newZKDatabase);
+
+            // re-create SessionTrack
+            createSessionTracker();
+        } finally {
+            // unblock request submission
+            restoreLatch.countDown();
+            restoreLatch = null;
+        }
+
+        LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+                getZKDatabase().getDataTreeLastProcessedZxid(),
+                getZKDatabase().dataTree.getNodeCount(),
+                getZKDatabase().getSessionCount());
+
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("Restore taken in {} ms", elapsed);
+        ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
+
+        return getLastProcessedZxid();
+    }
+
     public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
     public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
         return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
         return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
     }
     }
@@ -826,6 +889,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * <li>During shutdown the server sets the state to SHUTDOWN, which
      * <li>During shutdown the server sets the state to SHUTDOWN, which
      * corresponds to the server not running.</li></ul>
      * corresponds to the server not running.</li></ul>
      *
      *
+     * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
+     * </li></ul>
+     *
      * @param state new server state.
      * @param state new server state.
      */
      */
     protected void setState(State state) {
     protected void setState(State state) {
@@ -1151,6 +1217,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
     }
 
 
     public void submitRequest(Request si) {
     public void submitRequest(Request si) {
+        if (restoreLatch != null) {
+            try {
+                LOG.info("Blocking request submission while restore is in progress");
+                restoreLatch.await();
+            } catch (final InterruptedException e) {
+                LOG.warn("Unexpected interruption", e);
+            }
+        }
         enqueueRequest(si);
         enqueueRequest(si);
     }
     }
 
 

+ 21 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.zookeeper.server.admin;
 package org.apache.zookeeper.server.admin;
 
 
+import java.io.InputStream;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -57,19 +58,34 @@ public interface Command {
     boolean isServerRequired();
     boolean isServerRequired();
 
 
     /**
     /**
-     * Run this command. Commands take a ZooKeeperServer and String-valued
-     * keyword arguments and return a map containing any information
+     * Run this command for HTTP GET request. Commands take a ZooKeeperServer, String-valued keyword
+     * arguments and return a CommandResponse object containing any information
      * constituting the response to the command. Commands are responsible for
      * constituting the response to the command. Commands are responsible for
      * parsing keyword arguments and performing any error handling if necessary.
      * parsing keyword arguments and performing any error handling if necessary.
      * Errors should be reported by setting the "error" entry of the returned
      * Errors should be reported by setting the "error" entry of the returned
      * map with an appropriate message rather than throwing an exception.
      * map with an appropriate message rather than throwing an exception.
      *
      *
-     * @param zkServer
+     * @param zkServer ZooKeeper server
      * @param kwargs keyword -&gt; argument value mapping
      * @param kwargs keyword -&gt; argument value mapping
-     * @return Map representing response to command containing at minimum:
+     * @return CommandResponse representing response to command containing at minimum:
      *    - "command" key containing the command's primary name
      *    - "command" key containing the command's primary name
      *    - "error" key containing a String error message or null if no error
      *    - "error" key containing a String error message or null if no error
      */
      */
-    CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs);
+    CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs);
 
 
+    /**
+     * Run this command for HTTP POST. Commands take a ZooKeeperServer and InputStream and
+     * return a CommandResponse object containing any information
+     * constituting the response to the command. Commands are responsible for
+     * parsing keyword arguments and performing any error handling if necessary.
+     * Errors should be reported by setting the "error" entry of the returned
+     * map with an appropriate message rather than throwing an exception.
+     *
+     * @param zkServer ZooKeeper server
+     * @param inputStream InputStream from request
+     * @return CommandResponse representing response to command containing at minimum:
+     *    - "command" key containing the command's primary name
+     *    - "error" key containing a String error message or null if no error
+     */
+     CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream);
 }
 }

+ 158 - 64
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Collections;
@@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory;
 public class Commands {
 public class Commands {
 
 
     static final Logger LOG = LoggerFactory.getLogger(Commands.class);
     static final Logger LOG = LoggerFactory.getLogger(Commands.class);
+    static final String ADMIN_RATE_LIMITER_INTERVAL = "zookeeper.admin.rateLimiterIntervalInMS";
+    private static final long rateLimiterInterval = Integer.parseInt(System.getProperty(ADMIN_RATE_LIMITER_INTERVAL, "300000"));
 
 
     /** Maps command names to Command instances */
     /** Maps command names to Command instances */
     private static Map<String, Command> commands = new HashMap<String, Command>();
     private static Map<String, Command> commands = new HashMap<String, Command>();
@@ -100,16 +103,16 @@ public class Commands {
      *
      *
      * @param cmdName
      * @param cmdName
      * @param zkServer
      * @param zkServer
-     * @param kwargs String-valued keyword arguments to the command
+     * @param kwargs String-valued keyword arguments to the command from HTTP GET request
      *        (may be null if command requires no additional arguments)
      *        (may be null if command requires no additional arguments)
      * @return Map representing response to command containing at minimum:
      * @return Map representing response to command containing at minimum:
      *    - "command" key containing the command's primary name
      *    - "command" key containing the command's primary name
      *    - "error" key containing a String error message or null if no error
      *    - "error" key containing a String error message or null if no error
      */
      */
-    public static CommandResponse runCommand(
-        String cmdName,
-        ZooKeeperServer zkServer,
-        Map<String, String> kwargs) {
+    public static CommandResponse runGetCommand(
+            String cmdName,
+            ZooKeeperServer zkServer,
+            Map<String, String> kwargs) {
         Command command = getCommand(cmdName);
         Command command = getCommand(cmdName);
         if (command == null) {
         if (command == null) {
             // set the status code to 200 to keep the current behavior of existing commands
             // set the status code to 200 to keep the current behavior of existing commands
@@ -119,7 +122,36 @@ public class Commands {
             // set the status code to 200 to keep the current behavior of existing commands
             // set the status code to 200 to keep the current behavior of existing commands
             return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
             return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
         }
         }
-        return command.run(zkServer, kwargs);
+        return command.runGet(zkServer, kwargs);
+    }
+
+    /**
+     * Run the registered command with name cmdName. Commands should not produce
+     * any exceptions; any (anticipated) errors should be reported in the
+     * "error" entry of the returned map. Likewise, if no command with the given
+     * name is registered, this will be noted in the "error" entry.
+     *
+     * @param cmdName
+     * @param zkServer
+     * @param inputStream InputStream from HTTP POST request
+     * @return Map representing response to command containing at minimum:
+     *    - "command" key containing the command's primary name
+     *    - "error" key containing a String error message or null if no error
+     */
+    public static CommandResponse runPostCommand(
+            String cmdName,
+            ZooKeeperServer zkServer,
+            InputStream inputStream) {
+        Command command = getCommand(cmdName);
+        if (command == null) {
+            // set the status code to 200 to keep the current behavior of existing commands
+            return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
+        }
+        if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
+            // set the status code to 200 to keep the current behavior of existing commands
+            return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
+        }
+        return command.runPost(zkServer, inputStream);
     }
     }
 
 
     /**
     /**
@@ -152,6 +184,7 @@ public class Commands {
         registerCommand(new LeaderCommand());
         registerCommand(new LeaderCommand());
         registerCommand(new MonitorCommand());
         registerCommand(new MonitorCommand());
         registerCommand(new ObserverCnxnStatResetCommand());
         registerCommand(new ObserverCnxnStatResetCommand());
+        registerCommand(new RestoreCommand());
         registerCommand(new RuokCommand());
         registerCommand(new RuokCommand());
         registerCommand(new SetTraceMaskCommand());
         registerCommand(new SetTraceMaskCommand());
         registerCommand(new SnapshotCommand());
         registerCommand(new SnapshotCommand());
@@ -170,14 +203,14 @@ public class Commands {
     /**
     /**
      * Reset all connection statistics.
      * Reset all connection statistics.
      */
      */
-    public static class CnxnStatResetCommand extends CommandBase {
+    public static class CnxnStatResetCommand extends GetCommand {
 
 
         public CnxnStatResetCommand() {
         public CnxnStatResetCommand() {
             super(Arrays.asList("connection_stat_reset", "crst"));
             super(Arrays.asList("connection_stat_reset", "crst"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             zkServer.getServerCnxnFactory().resetAllConnectionStats();
             zkServer.getServerCnxnFactory().resetAllConnectionStats();
             return response;
             return response;
@@ -190,14 +223,14 @@ public class Commands {
      * Server configuration parameters.
      * Server configuration parameters.
      * @see ZooKeeperServer#getConf()
      * @see ZooKeeperServer#getConf()
      */
      */
-    public static class ConfCommand extends CommandBase {
+    public static class ConfCommand extends GetCommand {
 
 
         public ConfCommand() {
         public ConfCommand() {
             super(Arrays.asList("configuration", "conf", "config"));
             super(Arrays.asList("configuration", "conf", "config"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.putAll(zkServer.getConf().toMap());
             response.putAll(zkServer.getConf().toMap());
             return response;
             return response;
@@ -210,14 +243,14 @@ public class Commands {
      *   - "connections": list of connection info objects
      *   - "connections": list of connection info objects
      * @see org.apache.zookeeper.server.ServerCnxn#getConnectionInfo(boolean)
      * @see org.apache.zookeeper.server.ServerCnxn#getConnectionInfo(boolean)
      */
      */
-    public static class ConsCommand extends CommandBase {
+    public static class ConsCommand extends GetCommand {
 
 
         public ConsCommand() {
         public ConsCommand() {
             super(Arrays.asList("connections", "cons"));
             super(Arrays.asList("connections", "cons"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             ServerCnxnFactory serverCnxnFactory = zkServer.getServerCnxnFactory();
             ServerCnxnFactory serverCnxnFactory = zkServer.getServerCnxnFactory();
             if (serverCnxnFactory != null) {
             if (serverCnxnFactory != null) {
@@ -239,14 +272,14 @@ public class Commands {
     /**
     /**
      * Information on ZK datadir and snapdir size in bytes
      * Information on ZK datadir and snapdir size in bytes
      */
      */
-    public static class DirsCommand extends CommandBase {
+    public static class DirsCommand extends GetCommand {
 
 
         public DirsCommand() {
         public DirsCommand() {
             super(Arrays.asList("dirs"));
             super(Arrays.asList("dirs"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("datadir_size", zkServer.getDataDirSize());
             response.put("datadir_size", zkServer.getDataDirSize());
             response.put("logdir_size", zkServer.getLogDirSize());
             response.put("logdir_size", zkServer.getLogDirSize());
@@ -264,14 +297,14 @@ public class Commands {
      * @see ZooKeeperServer#getSessionExpiryMap()
      * @see ZooKeeperServer#getSessionExpiryMap()
      * @see ZooKeeperServer#getEphemerals()
      * @see ZooKeeperServer#getEphemerals()
      */
      */
-    public static class DumpCommand extends CommandBase {
+    public static class DumpCommand extends GetCommand {
 
 
         public DumpCommand() {
         public DumpCommand() {
             super(Arrays.asList("dump"));
             super(Arrays.asList("dump"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("expiry_time_to_session_ids", zkServer.getSessionExpiryMap());
             response.put("expiry_time_to_session_ids", zkServer.getSessionExpiryMap());
             response.put("session_id_to_ephemeral_paths", zkServer.getEphemerals());
             response.put("session_id_to_ephemeral_paths", zkServer.getEphemerals());
@@ -283,14 +316,14 @@ public class Commands {
     /**
     /**
      * All defined environment variables.
      * All defined environment variables.
      */
      */
-    public static class EnvCommand extends CommandBase {
+    public static class EnvCommand extends GetCommand {
 
 
         public EnvCommand() {
         public EnvCommand() {
             super(Arrays.asList("environment", "env", "envi"), false);
             super(Arrays.asList("environment", "env", "envi"), false);
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             for (Entry e : Environment.list()) {
             for (Entry e : Environment.list()) {
                 response.put(e.getKey(), e.getValue());
                 response.put(e.getKey(), e.getValue());
@@ -303,14 +336,14 @@ public class Commands {
     /**
     /**
      * Digest histories for every specific number of txns.
      * Digest histories for every specific number of txns.
      */
      */
-    public static class DigestCommand extends CommandBase {
+    public static class DigestCommand extends GetCommand {
 
 
         public DigestCommand() {
         public DigestCommand() {
             super(Arrays.asList("hash"));
             super(Arrays.asList("hash"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("digests", zkServer.getZKDatabase().getDataTree().getDigestLog());
             response.put("digests", zkServer.getZKDatabase().getDataTree().getDigestLog());
             return response;
             return response;
@@ -322,14 +355,14 @@ public class Commands {
      * The current trace mask. Returned map contains:
      * The current trace mask. Returned map contains:
      *   - "tracemask": Long
      *   - "tracemask": Long
      */
      */
-    public static class GetTraceMaskCommand extends CommandBase {
+    public static class GetTraceMaskCommand extends GetCommand {
 
 
         public GetTraceMaskCommand() {
         public GetTraceMaskCommand() {
             super(Arrays.asList("get_trace_mask", "gtmk"), false);
             super(Arrays.asList("get_trace_mask", "gtmk"), false);
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("tracemask", ZooTrace.getTextTraceLevel());
             response.put("tracemask", ZooTrace.getTextTraceLevel());
             return response;
             return response;
@@ -337,14 +370,14 @@ public class Commands {
 
 
     }
     }
 
 
-    public static class InitialConfigurationCommand extends CommandBase {
+    public static class InitialConfigurationCommand extends GetCommand {
 
 
         public InitialConfigurationCommand() {
         public InitialConfigurationCommand() {
             super(Arrays.asList("initial_configuration", "icfg"));
             super(Arrays.asList("initial_configuration", "icfg"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("initial_configuration", zkServer.getInitialConfig());
             response.put("initial_configuration", zkServer.getInitialConfig());
             return response;
             return response;
@@ -356,14 +389,14 @@ public class Commands {
      * Is this server in read-only mode. Returned map contains:
      * Is this server in read-only mode. Returned map contains:
      *   - "is_read_only": Boolean
      *   - "is_read_only": Boolean
      */
      */
-    public static class IsroCommand extends CommandBase {
+    public static class IsroCommand extends GetCommand {
 
 
         public IsroCommand() {
         public IsroCommand() {
             super(Arrays.asList("is_read_only", "isro"));
             super(Arrays.asList("is_read_only", "isro"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("read_only", zkServer instanceof ReadOnlyZooKeeperServer);
             response.put("read_only", zkServer instanceof ReadOnlyZooKeeperServer);
             return response;
             return response;
@@ -380,14 +413,14 @@ public class Commands {
      *   - "zxid": String
      *   - "zxid": String
      *   - "timestamp": Long
      *   - "timestamp": Long
      */
      */
-    public static class LastSnapshotCommand extends CommandBase {
+    public static class LastSnapshotCommand extends GetCommand {
 
 
         public LastSnapshotCommand() {
         public LastSnapshotCommand() {
             super(Arrays.asList("last_snapshot", "lsnp"));
             super(Arrays.asList("last_snapshot", "lsnp"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             SnapshotInfo info = zkServer.getTxnLogFactory().getLastSnapshotInfo();
             SnapshotInfo info = zkServer.getTxnLogFactory().getLastSnapshotInfo();
             response.put("zxid", Long.toHexString(info == null ? -1L : info.zxid));
             response.put("zxid", Long.toHexString(info == null ? -1L : info.zxid));
@@ -400,14 +433,14 @@ public class Commands {
     /**
     /**
      * Returns the leader status of this instance and the leader host string.
      * Returns the leader status of this instance and the leader host string.
      */
      */
-    public static class LeaderCommand extends CommandBase {
+    public static class LeaderCommand extends GetCommand {
 
 
         public LeaderCommand() {
         public LeaderCommand() {
             super(Arrays.asList("leader", "lead"));
             super(Arrays.asList("leader", "lead"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             if (zkServer instanceof QuorumZooKeeperServer) {
             if (zkServer instanceof QuorumZooKeeperServer) {
                 response.put("is_leader", zkServer instanceof LeaderZooKeeperServer);
                 response.put("is_leader", zkServer instanceof LeaderZooKeeperServer);
@@ -450,14 +483,14 @@ public class Commands {
      *   - "synced_followers": Integer (leader only)
      *   - "synced_followers": Integer (leader only)
      *   - "pending_syncs": Integer (leader only)
      *   - "pending_syncs": Integer (leader only)
      */
      */
-    public static class MonitorCommand extends CommandBase {
+    public static class MonitorCommand extends GetCommand {
 
 
         public MonitorCommand() {
         public MonitorCommand() {
             super(Arrays.asList("monitor", "mntr"), false);
             super(Arrays.asList("monitor", "mntr"), false);
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             zkServer.dumpMonitorValues(response::put);
             zkServer.dumpMonitorValues(response::put);
             ServerMetrics.getMetrics().getMetricsProvider().dump(response::put);
             ServerMetrics.getMetrics().getMetricsProvider().dump(response::put);
@@ -470,14 +503,14 @@ public class Commands {
     /**
     /**
      * Reset all observer connection statistics.
      * Reset all observer connection statistics.
      */
      */
-    public static class ObserverCnxnStatResetCommand extends CommandBase {
+    public static class ObserverCnxnStatResetCommand extends GetCommand {
 
 
         public ObserverCnxnStatResetCommand() {
         public ObserverCnxnStatResetCommand() {
             super(Arrays.asList("observer_connection_stat_reset", "orst"));
             super(Arrays.asList("observer_connection_stat_reset", "orst"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             if (zkServer instanceof LeaderZooKeeperServer) {
             if (zkServer instanceof LeaderZooKeeperServer) {
                 Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
                 Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
@@ -491,17 +524,81 @@ public class Commands {
 
 
     }
     }
 
 
+    /**
+     * Restore from snapshot on the current server.
+     *
+     * Returned map contains:
+     *  - "last_zxid": String
+     */
+    public static class RestoreCommand extends PostCommand {
+        static final String RESPONSE_DATA_LAST_ZXID = "last_zxid";
+
+        static final String ADMIN_RESTORE_ENABLED = "zookeeper.admin.restore.enabled";
+
+
+        private RateLimiter rateLimiter;
+
+        public RestoreCommand() {
+            super(Arrays.asList("restore", "rest"));
+            rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
+        }
+
+        @Override
+        public CommandResponse runPost(final ZooKeeperServer zkServer, final InputStream inputStream) {
+            final CommandResponse response = initializeResponse();
+
+            // check feature flag
+            final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "false"));
+            if (!restoreEnabled) {
+                response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+                LOG.warn("Restore command is disabled");
+                return response;
+            }
+
+            if (!zkServer.isSerializeLastProcessedZxidEnabled()) {
+                response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                LOG.warn("Restore command requires serializeLastProcessedZxidEnable flag is set to true");
+                return response;
+            }
+
+            if (inputStream == null){
+                response.setStatusCode(HttpServletResponse.SC_BAD_REQUEST);
+                LOG.warn("InputStream from restore request is null");
+                return response;
+            }
+
+            // check rate limiting
+            if (!rateLimiter.allow()) {
+                response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429);
+                ServerMetrics.getMetrics().RESTORE_RATE_LIMITED_COUNT.add(1);
+                LOG.warn("Restore request was rate limited");
+                return response;
+            }
+
+            // restore from snapshot InputStream
+            try {
+                final long lastZxid = zkServer.restoreFromSnapshot(inputStream);
+                response.put(RESPONSE_DATA_LAST_ZXID, lastZxid);
+            } catch (final Exception e) {
+                response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                ServerMetrics.getMetrics().RESTORE_ERROR_COUNT.add(1);
+                LOG.warn("Exception occurred when restore snapshot via the restore command", e);
+            }
+            return response;
+        }
+    }
+
     /**
     /**
      * No-op command, check if the server is running
      * No-op command, check if the server is running
      */
      */
-    public static class RuokCommand extends CommandBase {
+    public static class RuokCommand extends GetCommand {
 
 
         public RuokCommand() {
         public RuokCommand() {
             super(Arrays.asList("ruok"));
             super(Arrays.asList("ruok"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             return initializeResponse();
             return initializeResponse();
         }
         }
 
 
@@ -513,14 +610,14 @@ public class Commands {
      *  Returned Map contains:
      *  Returned Map contains:
      *   - "tracemask": Long
      *   - "tracemask": Long
      */
      */
-    public static class SetTraceMaskCommand extends CommandBase {
+    public static class SetTraceMaskCommand extends GetCommand {
 
 
         public SetTraceMaskCommand() {
         public SetTraceMaskCommand() {
             super(Arrays.asList("set_trace_mask", "stmk"), false);
             super(Arrays.asList("set_trace_mask", "stmk"), false);
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             long traceMask;
             long traceMask;
             if (!kwargs.containsKey("traceMask")) {
             if (!kwargs.containsKey("traceMask")) {
@@ -551,28 +648,25 @@ public class Commands {
      *   - "last_zxid": String
      *   - "last_zxid": String
      *   - "snapshot_size": String
      *   - "snapshot_size": String
      */
      */
-    public static class SnapshotCommand extends CommandBase {
+    public static class SnapshotCommand extends GetCommand {
         static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
         static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
 
 
         static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
         static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
         static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
         static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
 
 
         static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
         static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
-        static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS";
-
-        private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000"));
 
 
         private final RateLimiter rateLimiter;
         private final RateLimiter rateLimiter;
 
 
         public SnapshotCommand() {
         public SnapshotCommand() {
             super(Arrays.asList("snapshot", "snap"));
             super(Arrays.asList("snapshot", "snap"));
-            rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS);
+            rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
         }
         }
 
 
         @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
         @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
                 justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
                 justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
         @Override
         @Override
-        public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
+        public CommandResponse runGet(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
             final CommandResponse response = initializeResponse();
             final CommandResponse response = initializeResponse();
 
 
             // check feature flag
             // check feature flag
@@ -637,7 +731,7 @@ public class Commands {
      *   - "server_stats": ServerStats object
      *   - "server_stats": ServerStats object
      *   - "node_count": Integer
      *   - "node_count": Integer
      */
      */
-    public static class SrvrCommand extends CommandBase {
+    public static class SrvrCommand extends GetCommand {
 
 
         public SrvrCommand() {
         public SrvrCommand() {
             super(Arrays.asList("server_stats", "srvr"));
             super(Arrays.asList("server_stats", "srvr"));
@@ -649,7 +743,7 @@ public class Commands {
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             LOG.info("running stat");
             LOG.info("running stat");
             response.put("version", Version.getFullVersion());
             response.put("version", Version.getFullVersion());
@@ -676,8 +770,8 @@ public class Commands {
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
-            CommandResponse response = super.run(zkServer, kwargs);
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            CommandResponse response = super.runGet(zkServer, kwargs);
 
 
             final Iterable<Map<String, Object>> connections;
             final Iterable<Map<String, Object>> connections;
             if (zkServer.getServerCnxnFactory() != null) {
             if (zkServer.getServerCnxnFactory() != null) {
@@ -702,14 +796,14 @@ public class Commands {
     /**
     /**
      * Resets server statistics.
      * Resets server statistics.
      */
      */
-    public static class StatResetCommand extends CommandBase {
+    public static class StatResetCommand extends GetCommand {
 
 
         public StatResetCommand() {
         public StatResetCommand() {
             super(Arrays.asList("stat_reset", "srst"));
             super(Arrays.asList("stat_reset", "srst"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             zkServer.serverStats().reset();
             zkServer.serverStats().reset();
             return response;
             return response;
@@ -723,14 +817,14 @@ public class Commands {
      *   - "observers": list of observer learner handler info objects (leader/follower only)
      *   - "observers": list of observer learner handler info objects (leader/follower only)
      * @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo()
      * @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo()
      */
      */
-    public static class SyncedObserverConsCommand extends CommandBase {
+    public static class SyncedObserverConsCommand extends GetCommand {
 
 
         public SyncedObserverConsCommand() {
         public SyncedObserverConsCommand() {
             super(Arrays.asList("observers", "obsr"));
             super(Arrays.asList("observers", "obsr"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
 
 
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
 
 
@@ -760,14 +854,14 @@ public class Commands {
     /**
     /**
      * All defined system properties.
      * All defined system properties.
      */
      */
-    public static class SystemPropertiesCommand extends CommandBase {
+    public static class SystemPropertiesCommand extends GetCommand {
 
 
         public SystemPropertiesCommand() {
         public SystemPropertiesCommand() {
             super(Arrays.asList("system_properties", "sysp"), false);
             super(Arrays.asList("system_properties", "sysp"), false);
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             Properties systemProperties = System.getProperties();
             Properties systemProperties = System.getProperties();
             SortedMap<String, String> sortedSystemProperties = new TreeMap<>();
             SortedMap<String, String> sortedSystemProperties = new TreeMap<>();
@@ -782,14 +876,14 @@ public class Commands {
      * Returns the current ensemble configuration information.
      * Returns the current ensemble configuration information.
      * It provides list of current voting members in the ensemble.
      * It provides list of current voting members in the ensemble.
      */
      */
-    public static class VotingViewCommand extends CommandBase {
+    public static class VotingViewCommand extends GetCommand {
 
 
         public VotingViewCommand() {
         public VotingViewCommand() {
             super(Arrays.asList("voting_view"));
             super(Arrays.asList("voting_view"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             if (zkServer instanceof QuorumZooKeeperServer) {
             if (zkServer instanceof QuorumZooKeeperServer) {
                 QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
                 QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
@@ -850,14 +944,14 @@ public class Commands {
      * @see DataTree#getWatches()
      * @see DataTree#getWatches()
      * @see DataTree#getWatches()
      * @see DataTree#getWatches()
      */
      */
-    public static class WatchCommand extends CommandBase {
+    public static class WatchCommand extends GetCommand {
 
 
         public WatchCommand() {
         public WatchCommand() {
             super(Arrays.asList("watches", "wchc"));
             super(Arrays.asList("watches", "wchc"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("session_id_to_watched_paths", dt.getWatches().toMap());
             response.put("session_id_to_watched_paths", dt.getWatches().toMap());
@@ -871,14 +965,14 @@ public class Commands {
      *   - "path_to_session_ids": Map&lt;String, Set&lt;Long&gt;&gt; path -&gt; session IDs of sessions watching path
      *   - "path_to_session_ids": Map&lt;String, Set&lt;Long&gt;&gt; path -&gt; session IDs of sessions watching path
      * @see DataTree#getWatchesByPath()
      * @see DataTree#getWatchesByPath()
      */
      */
-    public static class WatchesByPathCommand extends CommandBase {
+    public static class WatchesByPathCommand extends GetCommand {
 
 
         public WatchesByPathCommand() {
         public WatchesByPathCommand() {
             super(Arrays.asList("watches_by_path", "wchp"));
             super(Arrays.asList("watches_by_path", "wchp"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.put("path_to_session_ids", dt.getWatchesByPath().toMap());
             response.put("path_to_session_ids", dt.getWatchesByPath().toMap());
@@ -891,14 +985,14 @@ public class Commands {
      * Summarized watch information.
      * Summarized watch information.
      * @see DataTree#getWatchesSummary()
      * @see DataTree#getWatchesSummary()
      */
      */
-    public static class WatchSummaryCommand extends CommandBase {
+    public static class WatchSummaryCommand extends GetCommand {
 
 
         public WatchSummaryCommand() {
         public WatchSummaryCommand() {
             super(Arrays.asList("watch_summary", "wchs"));
             super(Arrays.asList("watch_summary", "wchs"));
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             response.putAll(dt.getWatchesSummary().toMap());
             response.putAll(dt.getWatchesSummary().toMap());
@@ -911,14 +1005,14 @@ public class Commands {
      * Returns the current phase of Zab protocol that peer is running.
      * Returns the current phase of Zab protocol that peer is running.
      * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
      * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
      */
      */
-    public static class ZabStateCommand extends CommandBase {
+    public static class ZabStateCommand extends GetCommand {
 
 
         public ZabStateCommand() {
         public ZabStateCommand() {
             super(Arrays.asList("zabstate"), false);
             super(Arrays.asList("zabstate"), false);
         }
         }
 
 
         @Override
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             CommandResponse response = initializeResponse();
             if (zkServer instanceof QuorumZooKeeperServer) {
             if (zkServer instanceof QuorumZooKeeperServer) {
                 QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
                 QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;

+ 43 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import java.io.InputStream;
+import java.util.List;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+/**
+ * Command that represents HTTP GET request
+ */
+
+public abstract class GetCommand extends CommandBase {
+
+    protected GetCommand(List<String> names) {
+        super(names);
+    }
+
+    protected GetCommand(List<String> names, boolean serverRequired) {
+        super(names, serverRequired);
+    }
+
+    @Override
+    public CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream) {
+        return null;
+    }
+}

+ 56 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java

@@ -236,6 +236,7 @@ public class JettyAdminServer implements AdminServer {
 
 
         private static final long serialVersionUID = 1L;
         private static final long serialVersionUID = 1L;
 
 
+        @Override
         protected void doGet(
         protected void doGet(
             HttpServletRequest request,
             HttpServletRequest request,
             HttpServletResponse response) throws ServletException, IOException {
             HttpServletResponse response) throws ServletException, IOException {
@@ -260,7 +261,7 @@ public class JettyAdminServer implements AdminServer {
             }
             }
 
 
             // Run the command
             // Run the command
-            final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+            final CommandResponse cmdResponse = Commands.runGetCommand(cmd, zkServer, kwargs);
             response.setStatus(cmdResponse.getStatusCode());
             response.setStatus(cmdResponse.getStatusCode());
 
 
             final Map<String, String> headers = cmdResponse.getHeaders();
             final Map<String, String> headers = cmdResponse.getHeaders();
@@ -280,6 +281,60 @@ public class JettyAdminServer implements AdminServer {
                 outputter.output(cmdResponse, response.getOutputStream());
                 outputter.output(cmdResponse, response.getOutputStream());
             }
             }
         }
         }
+
+        /**
+         * Serves HTTP POST requests. It reads request payload as raw data.
+         * It's up to each command to process the payload accordingly.
+         * For example, RestoreCommand uses the payload InputStream directly
+         * to read snapshot data.
+         */
+        @Override
+        protected void doPost(final HttpServletRequest request,
+                              final HttpServletResponse response) throws ServletException, IOException {
+            final String cmdName = extractCommandNameFromURL(request, response);
+            if (cmdName != null) {
+                final CommandResponse cmdResponse = Commands.runPostCommand(cmdName, zkServer, request.getInputStream());
+                final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
+                sendJSONResponse(response, cmdResponse, clientIP);
+            }
+        }
+
+        /**
+         * Extracts the command name from URL if it exists otherwise null
+         */
+        private String extractCommandNameFromURL(final HttpServletRequest request,
+                                                 final HttpServletResponse response) throws IOException {
+            String cmd = request.getPathInfo();
+            if (cmd == null || cmd.equals("/")) {
+                printCommandLinks(response);
+                return null;
+            }
+            // Strip leading "/"
+            return cmd.substring(1);
+        }
+
+        /**
+         * Prints the list of URLs to each registered command as response.
+         */
+        private void printCommandLinks(final HttpServletResponse response) throws IOException {
+            for (final String link : commandLinks()) {
+                response.getWriter().println(link);
+                response.getWriter().println("<br/>");
+            }
+        }
+
+        /**
+         * Send JSON string as the response.
+         */
+        private void sendJSONResponse(final HttpServletResponse response,
+                                      final CommandResponse cmdResponse,
+                                      final String clientIP) throws IOException {
+            final CommandOutputter outputter = new JsonOutputter(clientIP);
+
+            response.setStatus(cmdResponse.getStatusCode());
+            response.setContentType(outputter.getContentType());
+            outputter.output(cmdResponse, response.getWriter());
+        }
     }
     }
 
 
     /**
     /**

+ 37 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Command that represents HTTP POST request
+ */
+package org.apache.zookeeper.server.admin;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+public abstract class PostCommand extends CommandBase {
+    protected PostCommand(List<String> names) {
+        super(names);
+    }
+
+    @Override
+    public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        return null;
+    }
+}

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -99,7 +99,7 @@ public class FileSnap implements SnapShot {
                     SnapStream.checkSealIntegrity(snapIS, ia);
                     SnapStream.checkSealIntegrity(snapIS, ia);
                 }
                 }
 
 
-                // deserialize the last processed zxid and check the intact
+                // deserialize lastProcessedZxid and check inconsistency
                 if (dt.deserializeLastProcessedZxid(ia)) {
                 if (dt.deserializeLastProcessedZxid(ia)) {
                     SnapStream.checkSealIntegrity(snapIS, ia);
                     SnapStream.checkSealIntegrity(snapIS, ia);
                 }
                 }

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java

@@ -170,7 +170,7 @@ public class SnapStream {
      * the checkSum of the content.
      * the checkSum of the content.
      *
      *
      */
      */
-    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
+    public static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
         long checkSum = is.getChecksum().getValue();
         long checkSum = is.getChecksum().getValue();
         long val = ia.readLong("val");
         long val = ia.readLong("val");
         ia.readString("path");  // Read and ignore "/" written by SealStream.
         ia.readString("path");  // Read and ignore "/" written by SealStream.

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java

@@ -72,8 +72,8 @@ public class ZKTestCase {
         // same port.
         // same port.
         System.setProperty("zookeeper.admin.enableServer", "false");
         System.setProperty("zookeeper.admin.enableServer", "false");
 
 
-        // disable rate limiting on the snapshot admin API
-        System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0");
+        // disable rate limiting
+        System.setProperty("zookeeper.admin.rateLimiterIntervalInMS", "0");
 
 
         // ZOOKEEPER-2693 disables all 4lw by default.
         // ZOOKEEPER-2693 disables all 4lw by default.
         // Here we enable the 4lw which ZooKeeper tests depends.
         // Here we enable the 4lw which ZooKeeper tests depends.

+ 141 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java

@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX;
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.util.Set;
+import java.util.zip.CheckedInputStream;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.persistence.SnapStream;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class ZookeeperServerRestoreTest extends ZKTestCase {
+    private static final String BASE_PATH = "/restoreFromSnapshotTest";
+    private static final int NODE_COUNT = 10;
+    private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique();
+
+    @TempDir
+    static File dataDir;
+
+    @TempDir
+    static File logDir;
+
+    @Test
+    public void testRestoreFromSnapshot() throws Exception {
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+
+        final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        final int port = Integer.parseInt(HOST_PORT.split(":")[1]);
+        final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+
+        ZooKeeper zk1 = null;
+        ZooKeeper zk2 = null;
+        ZooKeeper zk3 = null;
+
+        try {
+            // start the server
+            serverCnxnFactory.startup(zks);
+            assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
+
+            // zk1 create test data
+            zk1 = ClientBase.createZKClient(HOST_PORT);
+            for (int i = 0; i < NODE_COUNT; i++) {
+                final String path = BASE_PATH + "-" + i;
+                zk1.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // take Snapshot
+            final File snapshotFile = zks.takeSnapshot(false, false, true);
+            final long lastZxidFromSnapshot = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX);
+
+            // zk2 create more test data after snapshotting
+            zk2 = ClientBase.createZKClient(HOST_PORT);
+            for (int i = NODE_COUNT; i < NODE_COUNT * 2; i++) {
+                final String path = BASE_PATH + "-" + i;
+                zk2.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // restore from snapshot
+            try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile)) {
+                final long lastZxidFromRestore = zks.restoreFromSnapshot(is);
+
+                // validate the last processed zxid
+                assertEquals(lastZxidFromSnapshot, lastZxidFromRestore);
+
+                // validate restored data only contains data from snapshot
+                zk3 = ClientBase.createZKClient(HOST_PORT);
+                for (int i = 0; i < NODE_COUNT; i++) {
+                    final String path = BASE_PATH + "-" + i;
+                    final String expectedData = String.valueOf(i);
+                    assertArrayEquals(expectedData.getBytes(), zk3.getData(path, null, null));
+                }
+                assertEquals(NODE_COUNT + 3, zk3.getAllChildrenNumber("/"));
+
+                // validate sessions
+                final SessionTracker sessionTracker = zks.getSessionTracker();
+                final Set<Long> globalSessions = sessionTracker.globalSessions();
+                assertEquals(2, globalSessions.size());
+                assertTrue(globalSessions.contains(zk1.getSessionId()));
+                Assertions.assertFalse(globalSessions.contains(zk2.getSessionId()));
+                assertTrue(globalSessions.contains(zk3.getSessionId()));
+
+                // validate ZookeeperServer state
+                assertEquals(ZooKeeperServer.State.RUNNING, zks.state);
+
+                // validate being able to create more data after restore
+                zk3.create(BASE_PATH + "_" + "after", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                assertEquals(NODE_COUNT + 4, zk3.getAllChildrenNumber("/"));
+            }
+        } finally {
+            System.clearProperty("zookeeper.serializeLastProcessedZxid.enabled");
+
+            if (zk1 != null) {
+                zk1.close();
+            }
+            if (zk2 != null) {
+                zk2.close();
+            }
+            if (zk3 != null) {
+                zk3.close();
+            }
+
+            zks.shutdown();
+            serverCnxnFactory.shutdown();
+        }
+    }
+
+    @Test
+    public void testRestoreFromSnapshot_nulInputStream() throws Exception {
+        final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        assertThrows(IllegalArgumentException.class, () -> zks.restoreFromSnapshot(null));
+    }
+}

+ 23 - 32
zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java → zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java

@@ -16,26 +16,26 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 
 
-package org.apache.zookeeper;
+package org.apache.zookeeper.server;
 
 
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.File;
 import java.io.File;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.api.io.TempDir;
 
 
-public class TakeSnapshotTest extends ClientBase {
+public class ZookeeperServerSnapshotTest extends ZKTestCase {
     private static final String BASE_PATH = "/takeSnapshotTest";
     private static final String BASE_PATH = "/takeSnapshotTest";
-    private static final int NODE_COUNT = 100;
-    private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
-    private ZooKeeper zk;
+    private static final int NODE_COUNT = 10;
+    private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique();
 
 
     @TempDir
     @TempDir
     static File dataDir;
     static File dataDir;
@@ -43,32 +43,19 @@ public class TakeSnapshotTest extends ClientBase {
     @TempDir
     @TempDir
     static File logDir;
     static File logDir;
 
 
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        super.setUp();
-        ClientBase.setupTestEnv();
-    }
-
-    @AfterEach
-    public void tearDown() throws Exception {
-        if (zk != null) {
-            zk.close();
-        }
-    }
-
     @Test
     @Test
-    public void testTakeSnapshotAndRestore() throws Exception {
+    public void testTakeSnapshot() throws Exception {
         ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
         ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
         ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
         ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
 
 
-        final int port = Integer.parseInt(HOSTPORT.split(":")[1]);
+        final int port = Integer.parseInt(HOST_PORT.split(":")[1]);
         final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
         final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
-        serverCnxnFactory.startup(zks);
-        assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+        ZooKeeper zk = null;
+        try  {
+            serverCnxnFactory.startup(zks);
+            assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
 
 
-        try {
-            zk = ClientBase.createZKClient(HOSTPORT);
+            zk = ClientBase.createZKClient(HOST_PORT);
             for (int i = 0; i < NODE_COUNT; i++) {
             for (int i = 0; i < NODE_COUNT; i++) {
                 final String path = BASE_PATH + "-" + i;
                 final String path = BASE_PATH + "-" + i;
                 zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                 zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -81,14 +68,14 @@ public class TakeSnapshotTest extends ClientBase {
             zk.close();
             zk.close();
             zks.shutdown();
             zks.shutdown();
 
 
-            // start server again and assert the data restored from snapshot
+            // restart server and assert the data restored from snapshot
             zks = new ZooKeeperServer(dataDir, logDir, 3000);
             zks = new ZooKeeperServer(dataDir, logDir, 3000);
             ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
             ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
 
 
             serverCnxnFactory.startup(zks);
             serverCnxnFactory.startup(zks);
-            assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+            assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
 
 
-            zk = ClientBase.createZKClient(HOSTPORT);
+            zk = ClientBase.createZKClient(HOST_PORT);
             for (int i = 0; i < NODE_COUNT; i++) {
             for (int i = 0; i < NODE_COUNT; i++) {
                 final String path = BASE_PATH + "-" + i;
                 final String path = BASE_PATH + "-" + i;
                 final String expectedData = String.valueOf(i);
                 final String expectedData = String.valueOf(i);
@@ -96,6 +83,10 @@ public class TakeSnapshotTest extends ClientBase {
             }
             }
             assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/"));
             assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/"));
         } finally {
         } finally {
+            if (zk != null) {
+                zk.close();
+            }
+
             zks.shutdown();
             zks.shutdown();
             serverCnxnFactory.shutdown();
             serverCnxnFactory.shutdown();
         }
         }

+ 65 - 20
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java

@@ -19,8 +19,9 @@
 package org.apache.zookeeper.server.admin;
 package org.apache.zookeeper.server.admin;
 
 
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
-import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.Is.is;
@@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -57,6 +59,8 @@ public class CommandsTest extends ClientBase {
      *            - the primary name of the command
      *            - the primary name of the command
      * @param kwargs
      * @param kwargs
      *            - keyword arguments to the command
      *            - keyword arguments to the command
+     * @param inputStream
+     *            - InputStream to the command
      * @param expectedHeaders
      * @param expectedHeaders
      *            - expected HTTP response headers
      *            - expected HTTP response headers
      * @param expectedStatusCode
      * @param expectedStatusCode
@@ -66,11 +70,12 @@ public class CommandsTest extends ClientBase {
      * @throws IOException
      * @throws IOException
      * @throws InterruptedException
      * @throws InterruptedException
      */
      */
-    private void testCommand(String cmdName, Map<String, String> kwargs,
+    private void testCommand(String cmdName, Map<String, String> kwargs, InputStream inputStream,
                              Map<String, String> expectedHeaders, int expectedStatusCode,
                              Map<String, String> expectedHeaders, int expectedStatusCode,
                              Field... fields) throws IOException, InterruptedException {
                              Field... fields) throws IOException, InterruptedException {
         ZooKeeperServer zks = serverFactory.getZooKeeperServer();
         ZooKeeperServer zks = serverFactory.getZooKeeperServer();
-        final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs);
+        final CommandResponse commandResponse = inputStream == null
+        ? Commands.runGetCommand(cmdName, zks, kwargs) : Commands.runPostCommand(cmdName, zks, inputStream);
         assertNotNull(commandResponse);
         assertNotNull(commandResponse);
         assertEquals(expectedStatusCode, commandResponse.getStatusCode());
         assertEquals(expectedStatusCode, commandResponse.getStatusCode());
         try (final InputStream responseStream = commandResponse.getInputStream()) {
         try (final InputStream responseStream = commandResponse.getInputStream()) {
@@ -102,7 +107,7 @@ public class CommandsTest extends ClientBase {
     }
     }
 
 
     public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
     public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
-        testCommand(cmdName, new HashMap<String, String>(), new HashMap<>(), HttpServletResponse.SC_OK, fields);
+        testCommand(cmdName, new HashMap<String, String>(), null, new HashMap<>(), HttpServletResponse.SC_OK, fields);
     }
     }
 
 
     private static class Field {
     private static class Field {
@@ -115,16 +120,6 @@ public class CommandsTest extends ClientBase {
         }
         }
     }
     }
 
 
-    @Test
-    public void testSnapshot_streaming() throws IOException, InterruptedException {
-        testSnapshot(true);
-    }
-
-    @Test
-    public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
-        testSnapshot(false);
-    }
-
     @Test
     @Test
     public void testConfiguration() throws IOException, InterruptedException {
     public void testConfiguration() throws IOException, InterruptedException {
         testCommand("configuration", new Field("client_port", Integer.class), new Field("data_dir", String.class), new Field("data_log_dir", String.class), new Field("tick_time", Integer.class), new Field("max_client_cnxns", Integer.class), new Field("min_session_timeout", Integer.class), new Field("max_session_timeout", Integer.class), new Field("server_id", Long.class), new Field("client_port_listen_backlog", Integer.class));
         testCommand("configuration", new Field("client_port", Integer.class), new Field("data_dir", String.class), new Field("data_log_dir", String.class), new Field("tick_time", Integer.class), new Field("max_client_cnxns", Integer.class), new Field("min_session_timeout", Integer.class), new Field("max_session_timeout", Integer.class), new Field("server_id", Long.class), new Field("client_port_listen_backlog", Integer.class));
@@ -230,6 +225,45 @@ public class CommandsTest extends ClientBase {
         testCommand("ruok");
         testCommand("ruok");
     }
     }
 
 
+    @Test
+    public void testRestore_invalidInputStream() throws IOException, InterruptedException {
+        setupForRestoreCommand();
+
+        try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes())){
+            final Map<String, String> kwargs = new HashMap<>();
+            final Map<String, String> expectedHeaders = new HashMap<>();
+            testCommand("restore", kwargs, inputStream, expectedHeaders, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+        } finally {
+            clearForRestoreCommand();
+        }
+    }
+
+    @Test
+    public void testRestore_nullInputStream() {
+        setupForRestoreCommand();
+        final ZooKeeperServer zks = serverFactory.getZooKeeperServer();
+        try {
+            final CommandResponse commandResponse = Commands.runPostCommand("restore", zks, null);
+            assertNotNull(commandResponse);
+            assertEquals(HttpServletResponse.SC_BAD_REQUEST, commandResponse.getStatusCode());
+        } finally {
+          clearForRestoreCommand();
+          if (zks != null) {
+              zks.shutdown();
+          }
+        }
+    }
+
+    @Test
+    public void testSnapshot_streaming() throws IOException, InterruptedException {
+        testSnapshot(true);
+    }
+
+    @Test
+    public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
+        testSnapshot(false);
+    }
+
     @Test
     @Test
     public void testServerStats() throws IOException, InterruptedException {
     public void testServerStats() throws IOException, InterruptedException {
         testCommand("server_stats", new Field("version", String.class), new Field("read_only", Boolean.class), new Field("server_stats", ServerStats.class), new Field("node_count", Integer.class), new Field("client_response", BufferStats.class));
         testCommand("server_stats", new Field("version", String.class), new Field("read_only", Boolean.class), new Field("server_stats", ServerStats.class), new Field("node_count", Integer.class), new Field("client_response", BufferStats.class));
@@ -239,7 +273,7 @@ public class CommandsTest extends ClientBase {
     public void testSetTraceMask() throws IOException, InterruptedException {
     public void testSetTraceMask() throws IOException, InterruptedException {
         Map<String, String> kwargs = new HashMap<String, String>();
         Map<String, String> kwargs = new HashMap<String, String>();
         kwargs.put("traceMask", "1");
         kwargs.put("traceMask", "1");
-        testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
+        testCommand("set_trace_mask", kwargs, null, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
     }
     }
 
 
     @Test
     @Test
@@ -289,7 +323,7 @@ public class CommandsTest extends ClientBase {
         when(zkServer.getSecureServerCnxnFactory()).thenReturn(cnxnFactory);
         when(zkServer.getSecureServerCnxnFactory()).thenReturn(cnxnFactory);
 
 
         // Act
         // Act
-        CommandResponse response = cmd.run(zkServer, null);
+        CommandResponse response = cmd.runGet(zkServer, null);
 
 
         // Assert
         // Assert
         assertThat(response.toMap().containsKey("connections"), is(true));
         assertThat(response.toMap().containsKey("connections"), is(true));
@@ -313,7 +347,7 @@ public class CommandsTest extends ClientBase {
         when(zkServer.getZKDatabase()).thenReturn(zkDatabase);
         when(zkServer.getZKDatabase()).thenReturn(zkDatabase);
         when(zkDatabase.getNodeCount()).thenReturn(0);
         when(zkDatabase.getNodeCount()).thenReturn(0);
 
 
-        CommandResponse response = cmd.run(zkServer, null);
+        CommandResponse response = cmd.runGet(zkServer, null);
 
 
         assertThat(response.toMap().containsKey("connections"), is(true));
         assertThat(response.toMap().containsKey("connections"), is(true));
         assertThat(response.toMap().containsKey("secure_connections"), is(true));
         assertThat(response.toMap().containsKey("secure_connections"), is(true));
@@ -321,7 +355,7 @@ public class CommandsTest extends ClientBase {
 
 
     private void testSnapshot(final boolean streaming) throws IOException, InterruptedException {
     private void testSnapshot(final boolean streaming) throws IOException, InterruptedException {
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
-        System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
         try {
         try {
             final Map<String, String> kwargs = new HashMap<>();
             final Map<String, String> kwargs = new HashMap<>();
@@ -329,12 +363,23 @@ public class CommandsTest extends ClientBase {
             final Map<String, String> expectedHeaders = new HashMap<>();
             final Map<String, String> expectedHeaders = new HashMap<>();
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
-            testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK);
+            testCommand("snapshot", kwargs, null, expectedHeaders, HttpServletResponse.SC_OK);
         } finally {
         } finally {
             System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
             System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
-            System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+            System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
             System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
             System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
         }
         }
     }
     }
 
 
+    private void setupForRestoreCommand() {
+        System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
+        System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+    }
+
+    private void clearForRestoreCommand() {
+        System.clearProperty(ADMIN_RESTORE_ENABLED);
+        System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
+        System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+    }
 }
 }

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java

@@ -63,7 +63,7 @@ public class JettyAdminServerTest extends ZKTestCase {
 
 
     static final String URL_FORMAT = "http://localhost:%d/commands";
     static final String URL_FORMAT = "http://localhost:%d/commands";
     static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
     static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
-    static final int jettyAdminPort = PortAssignment.unique();
+    private final int jettyAdminPort = PortAssignment.unique();
 
 
     @BeforeEach
     @BeforeEach
     public void enableServer() {
     public void enableServer() {

+ 187 - 25
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java → zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java

@@ -19,20 +19,22 @@
 package org.apache.zookeeper.server.admin;
 package org.apache.zookeeper.server.admin;
 
 
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
-import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT;
 import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT;
-import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.BufferedReader;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URL;
@@ -40,6 +42,11 @@ import java.net.URLEncoder;
 import java.nio.file.Files;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CheckedInputStream;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
@@ -48,8 +55,11 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.common.IOUtils;
 import org.apache.zookeeper.common.IOUtils;
+import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -60,13 +70,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class SnapshotCommandTest extends ZKTestCase {
-    private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class);
+public class SnapshotAndRestoreCommandTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotAndRestoreCommandTest.class);
 
 
-    private static final String PATH = "/snapshot_test";
+    private static final String SNAPSHOT_TEST_PATH = "/snapshot_test";
     private static final int NODE_COUNT = 10;
     private static final int NODE_COUNT = 10;
 
 
-    private final String hostPort =  "127.0.0.1:" + PortAssignment.unique();
+    private final String hostPort = "127.0.0.1:" + PortAssignment.unique();
+    private final int jettyAdminPort = PortAssignment.unique();
     private ServerCnxnFactory cnxnFactory;
     private ServerCnxnFactory cnxnFactory;
     private JettyAdminServer adminServer;
     private JettyAdminServer adminServer;
     private ZooKeeperServer zks;
     private ZooKeeperServer zks;
@@ -91,9 +102,10 @@ public class SnapshotCommandTest extends ZKTestCase {
         // start AdminServer
         // start AdminServer
         System.setProperty("zookeeper.admin.enableServer", "true");
         System.setProperty("zookeeper.admin.enableServer", "true");
         System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
         System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
-        System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+        System.setProperty(ADMIN_RESTORE_ENABLED, "true");
 
 
         adminServer = new JettyAdminServer();
         adminServer = new JettyAdminServer();
         adminServer.setZooKeeperServer(zks);
         adminServer.setZooKeeperServer(zks);
@@ -101,7 +113,7 @@ public class SnapshotCommandTest extends ZKTestCase {
 
 
         // create Zookeeper client and test data
         // create Zookeeper client and test data
         zk = ClientBase.createZKClient(hostPort);
         zk = ClientBase.createZKClient(hostPort);
-        createData(zk, NODE_COUNT);
+        createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT);
     }
     }
 
 
     @AfterAll
     @AfterAll
@@ -109,9 +121,10 @@ public class SnapshotCommandTest extends ZKTestCase {
         System.clearProperty("zookeeper.4lw.commands.whitelist");
         System.clearProperty("zookeeper.4lw.commands.whitelist");
         System.clearProperty("zookeeper.admin.enableServer");
         System.clearProperty("zookeeper.admin.enableServer");
         System.clearProperty("zookeeper.admin.serverPort");
         System.clearProperty("zookeeper.admin.serverPort");
+        System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
         System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
         System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
-        System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
         System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
         System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+        System.clearProperty(ADMIN_RESTORE_ENABLED);
 
 
         if (zk != null) {
         if (zk != null) {
             zk.close();
             zk.close();
@@ -131,19 +144,85 @@ public class SnapshotCommandTest extends ZKTestCase {
     }
     }
 
 
     @Test
     @Test
-    public void testSnapshotCommand_streaming() throws Exception {
-        // take snapshot with streaming
-        final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+    public void testSnapshotAndRestoreCommand_streaming() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
 
 
-        // validate snapshot response
-        assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
-        validateResponseHeaders(snapshotConn);
-        final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
-        try (final InputStream inputStream = snapshotConn.getInputStream();
-             final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
-            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
-            final long fileSize = Files.size(snapshotFile.toPath());
-            assertTrue(fileSize > 0);
+        // take snapshot with streaming and validate
+        final File snapshotFile = takeSnapshotAndValidate();
+
+        // validate snapshot metrics
+        validateSnapshotMetrics();
+
+        // restore from snapshot and validate
+        performRestoreAndValidate(snapshotFile);
+
+        // validate creating data after restore
+        try (final ZooKeeper zk = ClientBase.createZKClient(hostPort)) {
+            createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT + 1);
+            assertEquals(NODE_COUNT + NODE_COUNT + 1, zk.getAllChildrenNumber(SNAPSHOT_TEST_PATH));
+        }
+
+        // validate restore metrics
+        validateRestoreMetrics();
+    }
+
+    @Test
+    public void testClientRequest_restoreInProgress() throws Exception {
+        final int threadCount = 2;
+        final int nodeCount = 50;
+        final String restoreTestPath = "/restore_test";
+
+        // take snapshot
+        final File snapshotFile = takeSnapshotAndValidate();
+
+        final ExecutorService service = Executors.newFixedThreadPool(threadCount);
+        final CountDownLatch latch = new CountDownLatch(threadCount);
+        final AtomicBoolean createSucceeded = new AtomicBoolean(false);
+        final AtomicBoolean restoreSucceeded = new AtomicBoolean(false);
+
+        // thread 1 creates data
+        service.submit(() -> {
+            try {
+                createData(zk, restoreTestPath, nodeCount);
+                createSucceeded.set(true);
+            } catch (final Exception e) {
+                LOG.error(e.getMessage());
+                e.printStackTrace();
+            } finally {
+                latch.countDown();
+            }
+        });
+
+        // thread 2 performs restore operation
+        service.submit(() -> {
+            try {
+                performRestoreAndValidate(snapshotFile);
+                restoreSucceeded.set(true);
+            } catch (final Exception e) {
+                LOG.error(e.getMessage());
+                e.printStackTrace();
+            } finally {
+                latch.countDown();
+            }
+        });
+
+        // wait for operations completed
+        latch.await();
+
+        // validate all client requests succeeded
+        if (createSucceeded.get() && restoreSucceeded.get()) {
+            assertEquals(nodeCount, zk.getAllChildrenNumber(restoreTestPath));
+        }
+    }
+
+    @Test
+    public void testRestores() throws Exception {
+        // take snapshot
+        final File snapshotFile = takeSnapshotAndValidate();
+
+        // perform restores
+        for (int i = 0; i < 3; i++) {
+            performRestoreAndValidate(snapshotFile);
         }
         }
     }
     }
 
 
@@ -186,16 +265,47 @@ public class SnapshotCommandTest extends ZKTestCase {
         }
         }
     }
     }
 
 
-    private void createData(final ZooKeeper zk, final long count) throws Exception {
+    @Test
+    public void testRestoreCommand_disabled() throws Exception {
+        System.setProperty(ADMIN_RESTORE_ENABLED, "false");
+        try {
+            final HttpURLConnection restoreConn = sendRestoreRequest();
+            assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, restoreConn.getResponseCode());
+        } finally {
+            System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+        }
+    }
+
+    @Test
+    public void testRestoreCommand_serializeLastZxidDisabled() throws Exception {
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+        try {
+            final HttpURLConnection restoreConn = sendRestoreRequest();
+            assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
+        } finally {
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+        }
+    }
+
+    @Test
+    public void testRestoreCommand_invalidSnapshotData() throws Exception {
+        final HttpURLConnection restoreConn = sendRestoreRequest();
+        try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes());
+             final OutputStream outputStream = restoreConn.getOutputStream()) {
+            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+        }
+        assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
+    }
+
+    private void createData(final ZooKeeper zk, final String parentPath, final long count) throws Exception {
         try {
         try {
-            zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create(parentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         } catch (final KeeperException.NodeExistsException ignore) {
         } catch (final KeeperException.NodeExistsException ignore) {
             // ignore
             // ignore
         }
         }
 
 
         for (int i = 0; i < count; i++) {
         for (int i = 0; i < count; i++) {
-            final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-            LOG.info("Node created. path={}" + processNodePath);
+            zk.create(String.format("%s/%s", parentPath, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
         }
         }
     }
     }
 
 
@@ -208,6 +318,15 @@ public class SnapshotCommandTest extends ZKTestCase {
         return snapshotConn;
         return snapshotConn;
     }
     }
 
 
+    private HttpURLConnection sendRestoreRequest() throws Exception  {
+        final URL restoreURL = new URL(String.format(URL_FORMAT + "/restore", jettyAdminPort));
+        final HttpURLConnection restoreConn = (HttpURLConnection) restoreURL.openConnection();
+        restoreConn.setDoOutput(true);
+        restoreConn.setRequestMethod("POST");
+
+        return restoreConn;
+    }
+
     private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
     private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
         final Map<String, String> parameters = new HashMap<>();
         final Map<String, String> parameters = new HashMap<>();
         parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
         parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
@@ -254,4 +373,47 @@ public class SnapshotCommandTest extends ZKTestCase {
             LOG.info("Response payload: {}", sb);
             LOG.info("Response payload: {}", sb);
         }
         }
     }
     }
+
+    private void validateSnapshotMetrics() {
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+        assertEquals(0, (long) metrics.get("snapshot_error_count"));
+        assertEquals(0, (long) metrics.get("snapshot_rate_limited_count"));
+        assertTrue((Double) metrics.get("avg_snapshottime") > 0.0);
+    }
+
+    private void validateRestoreMetrics() {
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+        assertEquals(0, (long) metrics.get("restore_error_count"));
+        assertEquals(0, (long) metrics.get("restore_rate_limited_count"));
+        assertTrue((Double) metrics.get("avg_restore_time") > 0.0);
+    }
+
+    private File takeSnapshotAndValidate() throws Exception {
+        // take snapshot with streaming
+        final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+        // validate snapshot response
+        assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+        validateResponseHeaders(snapshotConn);
+        final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
+        try (final InputStream inputStream = snapshotConn.getInputStream();
+             final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
+            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+            final long fileSize = Files.size(snapshotFile.toPath());
+            assertTrue(fileSize > 0);
+        }
+        return snapshotFile;
+    }
+
+    private void performRestoreAndValidate(final File snapshotFile) throws Exception {
+        // perform restore
+        final HttpURLConnection restoreConn = sendRestoreRequest();
+        try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile);
+             final OutputStream outputStream = restoreConn.getOutputStream()) {
+            IOUtils.copyBytes(is, outputStream, 1024, true);
+        }
+        // validate restore response
+        assertEquals(HttpURLConnection.HTTP_OK, restoreConn.getResponseCode());
+        displayResponsePayload(restoreConn);
+    }
 }
 }

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java

@@ -293,7 +293,7 @@ public class ObserverMasterTest extends ObserverMasterTestBase {
 
 
         // test stats collection
         // test stats collection
         final Map<String, String> emptyMap = Collections.emptyMap();
         final Map<String, String> emptyMap = Collections.emptyMap();
-        Map<String, Object> stats = Commands.runCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+        Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
         assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id");
         assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id");
 
 
         // check the stats for the first peer
         // check the stats for the first peer