Browse Source

ZOOKEEPER-3437: Improve sync throttling on a learner master

Author: Jie Huang <jiehuang@fb.com>

Reviewers: Michael Han <lhan@twitter.com>, Enrico Olivelli <eolivelli@apache.org>, Fangmin Lyu <fangmin@apache.org>

Closes #995 from jhuan31/ZOOKEEPER-3437
Jie Huang 6 years ago
parent
commit
f38905e58e
16 changed files with 545 additions and 517 deletions
  1. 10 0
      zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
  2. 28 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
  3. 20 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
  4. 2 29
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
  5. 20 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
  6. 20 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
  7. 33 12
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
  8. 90 31
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
  9. 0 44
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
  10. 0 136
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
  11. 121 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottler.java
  12. 1 23
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
  13. 11 9
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SyncThrottleException.java
  14. 0 219
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
  15. 173 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottlerTest.java
  16. 16 14
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

+ 10 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -893,6 +893,16 @@ property, when available, is noted below.
     pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
     Netty.
 
+* *maxConcurrentSnapSyncs* :
+    (Java system property: **zookeeper.leader.maxConcurrentSnapSyncs**)
+    The maximum number of snap syncs a leader or a follower can serve at the same
+    time. The default is 10.
+
+* *maxConcurrentDiffSyncs* :
+    (Java system property: **zookeeper.leader.maxConcurrentDiffSyncs**)
+    The maximum number of diff syncs a leader or a follower can serve at the same
+    time. The default is 100.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options

+ 28 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java

@@ -62,4 +62,32 @@ public class FollowerBean extends ZooKeeperServerBean implements FollowerMXBean
     public void setObserverMasterPacketSizeLimit(int sizeLimit) {
         ObserverMaster.setPktsSizeLimit(sizeLimit);
     }
+
+    @Override
+    public int getMaxConcurrentSnapSyncs() {
+        final ObserverMaster om = follower.om;
+        return om == null ? -1 : om.getMaxConcurrentSnapSyncs();
+    }
+
+    @Override
+    public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapshots) {
+        final ObserverMaster om = follower.om;
+        if (om != null) {
+            om.setMaxConcurrentSnapSyncs(maxConcurrentSnapshots);
+        }
+    }
+
+    @Override
+    public int getMaxConcurrentDiffSyncs() {
+        final ObserverMaster om = follower.om;
+        return om == null ? -1 : om.getMaxConcurrentDiffSyncs();
+    }
+
+    @Override
+    public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
+        final ObserverMaster om = follower.om;
+        if (om != null) {
+            om.setMaxConcurrentDiffSyncs(maxConcurrentDiffSyncs);
+        }
+    }
 }

+ 20 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java

@@ -53,4 +53,24 @@ public interface FollowerMXBean extends ZooKeeperServerMXBean {
      * set the size limit in bytes for the observer master commit packet queue
      */
     public void setObserverMasterPacketSizeLimit(int sizeLimit);
+
+    /**
+     * @return Number of concurrent snapshots permitted to send to observers
+     */
+    public int getMaxConcurrentSnapSyncs();
+
+    /**
+     * @param maxConcurrentSnapSyncs Number of concurrent snapshots permitted to send to observers
+     */
+    public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapSyncs);
+
+    /**
+     * @return Number of concurrent diff syncs permitted to send to observers
+     */
+    public int getMaxConcurrentDiffSyncs();
+
+    /**
+     * @param maxConcurrentDiffSyncs Number of concurrent diff syncs permitted to send to observers
+     */
+    public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs);
 }

+ 2 - 29
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This class has the control logic for the Leader.
  */
-public class Leader implements LearnerMaster {
+public class Leader extends LearnerMaster {
     private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
 
     static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
@@ -87,22 +87,8 @@ public class Leader implements LearnerMaster {
         }
     }
 
-    // Throttle when there are too many concurrent snapshots being sent to observers
-    private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots";
-    private static final int maxConcurrentSnapshots;
-    private static final String MAX_CONCURRENT_SNAPSHOT_TIMEOUT = "zookeeper.leader.maxConcurrentSnapshotTimeout";
-    private static final long maxConcurrentSnapshotTimeout;
-    static {
-        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
-        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
-        maxConcurrentSnapshotTimeout = Long.getLong(MAX_CONCURRENT_SNAPSHOT_TIMEOUT, 5);
-        LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout);
-    }
-
-    private final LearnerSnapshotThrottler learnerSnapshotThrottler;
-
     // log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0, disable logging.
-    protected static final String ACK_LOGGING_FREQUENCY = "zookeeper.leader.ackLoggingFrequency";
+    private static final String ACK_LOGGING_FREQUENCY = "zookeeper.leader.ackLoggingFrequency";
     private static int ackLoggingFrequency;
     static {
         ackLoggingFrequency = Integer.getInteger(ACK_LOGGING_FREQUENCY, 1000);
@@ -137,12 +123,6 @@ public class Leader implements LearnerMaster {
         return proposalStats;
     }
 
-    public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
-            int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
-        return new LearnerSnapshotThrottler(
-                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
-    }
-
     // beans for all learners
     private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>();
 
@@ -324,8 +304,6 @@ public class Leader implements LearnerMaster {
             throw e;
         }
         this.zk = zk;
-        this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
-                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
     }
 
     /**
@@ -1217,11 +1195,6 @@ public class Leader implements LearnerMaster {
         return p;
     }
 
-    @Override
-    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
-        return learnerSnapshotThrottler;
-    }
-
     /**
      * Process sync requests
      *

+ 20 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java

@@ -84,4 +84,24 @@ public class LeaderBean extends ZooKeeperServerBean implements LeaderMXBean {
     public void resetProposalStatistics() {
         leader.getProposalStats().reset();
     }
+    
+    @Override
+    public int getMaxConcurrentSnapSyncs() {
+        return leader.getMaxConcurrentSnapSyncs();
+    }
+
+    @Override
+    public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapshots) {
+        leader.setMaxConcurrentSnapSyncs(maxConcurrentSnapshots);
+    }
+
+    @Override
+    public int getMaxConcurrentDiffSyncs() {
+        return leader.getMaxConcurrentDiffSyncs();
+    }
+
+    @Override
+    public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
+        leader.setMaxConcurrentDiffSyncs(maxConcurrentDiffSyncs);
+    }
 }

+ 20 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java

@@ -63,4 +63,24 @@ public interface LeaderMXBean extends ZooKeeperServerMXBean {
      * Resets statistics of proposal size (min/max/last)
      */
     public void resetProposalStatistics();
+
+    /**
+     * @return Number of concurrent snapshots permitted to send to observers
+     */
+    public int getMaxConcurrentSnapSyncs();
+
+    /**
+     * @param maxConcurrentSnapSyncs Number of concurrent snapshots permitted to send to observers
+     */
+    public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapSyncs);
+
+    /**
+     * @return Number of concurrent diff syncs permitted to send to observers
+     */
+    public int getMaxConcurrentDiffSyncs();
+
+    /**
+     * @param maxConcurrentDiffSyncs Number of concurrent diff syncs permitted to send to observers
+     */
+    public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs);
 }

+ 33 - 12
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -245,6 +245,11 @@ public class LearnerHandler extends ZooKeeperThread {
      */
     private long leaderLastZxid;
 
+    /**
+     * for sync throttling
+     */
+    private LearnerSyncThrottler syncThrottler = null;
+
     LearnerHandler(Socket sock, BufferedInputStream bufferedInput, LearnerMaster learnerMaster) throws IOException {
         super("LearnerHandler-" + sock.getRemoteSocketAddress());
         this.sock = sock;
@@ -535,34 +540,38 @@ public class LearnerHandler extends ZooKeeperThread {
             // startForwarding() will be called in all cases
             boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
 
+            // syncs between followers and the leader are exempt from throttling because it
+            // is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
+            // are counted as concurrent syncs though
+            boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
             /* if we are not truncating or sending a diff just send a snapshot */
             if (needSnap) {
-                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
-                LearnerSnapshot snapshot =
-                        learnerMaster.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
+                syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
+                syncThrottler.beginSync(exemptFromThrottle);
                 try {
                     long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                     oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                     bufferedOutput.flush();
 
                     LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
-                            + "send zxid of db as 0x{}, {} concurrent snapshots, "
-                            + "snapshot was {} from throttle",
+                            + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
+                            + "snapshot sync was {} from throttle",
                             Long.toHexString(peerLastZxid),
                             Long.toHexString(leaderLastZxid),
                             Long.toHexString(zxidToSend),
-                            snapshot.getConcurrentSnapshotNumber(),
-                            snapshot.isEssential() ? "exempt" : "not exempt");
+                            syncThrottler.getSyncInProgress(),
+                            exemptFromThrottle ? "exempt" : "not exempt");
                     // Dump data to peer
                     learnerMaster.getZKDatabase().serializeSnapshot(oa);
                     oa.writeString("BenWasHere", "signature");
                     bufferedOutput.flush();
                 } finally {
-                    snapshot.close();
                     ServerMetrics.getMetrics().SNAP_COUNT.add(1);
                 }
             }
             else {
+                syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
+                syncThrottler.beginSync(exemptFromThrottle);
                 ServerMetrics.getMetrics().DIFF_COUNT.add(1);
             }
 
@@ -603,6 +612,9 @@ public class LearnerHandler extends ZooKeeperThread {
             learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
 
             syncLimitCheck.start();
+            // sync ends when NEWLEADER-ACK is received
+            syncThrottler.endSync();
+            syncThrottler = null;
 
             // now that the ack has been processed expect the syncLimit
             sock.setSoTimeout(learnerMaster.syncTimeout());
@@ -698,10 +710,18 @@ public class LearnerHandler extends ZooKeeperThread {
             	}
             }
         } catch (InterruptedException e) {
-            LOG.error("Unexpected exception causing shutdown", e);
-        } catch (SnapshotThrottleException e) {
-            LOG.error("too many concurrent snapshots: " + e);
+            LOG.error("Unexpected exception in LearnerHandler: ", e);
+        } catch (SyncThrottleException e) {
+                LOG.error("too many concurrent syncs: " + e);
+                syncThrottler = null;
+        } catch (Exception e) {
+            LOG.error("Unexpected exception in LearnerHandler: ", e);
+            throw e;
         } finally {
+            if (syncThrottler != null) {
+                syncThrottler.endSync();
+                syncThrottler = null;
+            }
             LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
             shutdown();
         }
@@ -772,7 +792,7 @@ public class LearnerHandler extends ZooKeeperThread {
             long minCommittedLog = db.getminCommittedLog();
             long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
 
-            LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
+            LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
                     + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                     + " peerLastZxid=0x{}", getSid(),
                     Long.toHexString(maxCommittedLog),
@@ -1013,6 +1033,7 @@ public class LearnerHandler extends ZooKeeperThread {
     public void shutdown() {
         // Send the packet of death
         try {
+            queuedPackets.clear();
             queuedPackets.put(proposalOfDeath);
         } catch (InterruptedException e) {
             LOG.warn("Ignoring unexpected exception", e);

+ 90 - 31
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java

@@ -21,6 +21,8 @@ package org.apache.zookeeper.server.quorum;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.Socket;
@@ -29,18 +31,81 @@ import java.net.SocketAddress;
 /**
  * interface for keeping Observers in sync
  */
-public interface LearnerMaster {
+public abstract class LearnerMaster {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerMaster.class);
+
+    // Throttle when there are too many concurrent snapshots being sent to observers
+    private static final String MAX_CONCURRENT_SNAPSYNCS = "zookeeper.leader.maxConcurrentSnapSyncs";
+    private static final int DEFAULT_CONCURRENT_SNAPSYNCS;
+
+    // Throttle when there are too many concurrent diff syncs being sent to observers
+    private static final String MAX_CONCURRENT_DIFF_SYNCS = "zookeeper.leader.maxConcurrentDiffSyncs";
+    private static final int DEFAULT_CONCURRENT_DIFF_SYNCS;
+
+    static {
+        DEFAULT_CONCURRENT_SNAPSYNCS = Integer.getInteger(MAX_CONCURRENT_SNAPSYNCS, 10);
+        LOG.info(MAX_CONCURRENT_SNAPSYNCS + " = " + DEFAULT_CONCURRENT_SNAPSYNCS);
+
+        DEFAULT_CONCURRENT_DIFF_SYNCS = Integer.getInteger(MAX_CONCURRENT_DIFF_SYNCS, 100);
+        LOG.info(MAX_CONCURRENT_DIFF_SYNCS + " = " + DEFAULT_CONCURRENT_DIFF_SYNCS);
+    }
+
+    private volatile int maxConcurrentSnapSyncs = DEFAULT_CONCURRENT_SNAPSYNCS;
+    private volatile int maxConcurrentDiffSyncs = DEFAULT_CONCURRENT_DIFF_SYNCS;
+
+    private final LearnerSyncThrottler learnerSnapSyncThrottler =
+            new LearnerSyncThrottler(maxConcurrentSnapSyncs, LearnerSyncThrottler.SyncType.SNAP);
+
+    private final LearnerSyncThrottler learnerDiffSyncThrottler =
+            new LearnerSyncThrottler(maxConcurrentDiffSyncs,LearnerSyncThrottler.SyncType.DIFF);
+
+    public int getMaxConcurrentSnapSyncs() {
+        return maxConcurrentSnapSyncs;
+    }
+
+    public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapSyncs) {
+        LOG.info("Set maxConcurrentSnapSyncs to {}", maxConcurrentSnapSyncs);
+        this.maxConcurrentSnapSyncs = maxConcurrentSnapSyncs;
+        learnerSnapSyncThrottler.setMaxConcurrentSyncs(maxConcurrentSnapSyncs);
+    }
+
+    public int getMaxConcurrentDiffSyncs() {
+        return maxConcurrentDiffSyncs;
+    }
+
+    public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
+        LOG.info("Set maxConcurrentDiffSyncs to {}", maxConcurrentDiffSyncs);
+        this.maxConcurrentDiffSyncs = maxConcurrentDiffSyncs;
+        learnerDiffSyncThrottler.setMaxConcurrentSyncs(maxConcurrentDiffSyncs);
+    }
+
+    /**
+     * snap sync throttler
+     * @return snapshot throttler
+     */
+    public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
+        return learnerSnapSyncThrottler;
+    }
+
+    /**
+     * diff sync throttler
+     * @return diff throttler
+     */
+    public LearnerSyncThrottler getLearnerDiffSyncThrottler() {
+        return learnerDiffSyncThrottler;
+    }
+
     /**
      * start tracking a learner handler
      * @param learnerHandler to track
      */
-    void addLearnerHandler(LearnerHandler learnerHandler);
+    abstract void addLearnerHandler(LearnerHandler learnerHandler);
 
     /**
      * stop tracking a learner handler
      * @param learnerHandler to drop
      */
-    void removeLearnerHandler(LearnerHandler learnerHandler);
+    abstract void removeLearnerHandler(LearnerHandler learnerHandler);
 
     /**
      * wait for the leader of the new epoch to be confirmed by followers
@@ -49,19 +114,13 @@ public interface LearnerMaster {
      * @throws IOException
      * @throws InterruptedException
      */
-    void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException;
-
-    /**
-     * snapshot throttler
-     * @return snapshot throttler
-     */
-    LearnerSnapshotThrottler getLearnerSnapshotThrottler();
+    abstract void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException;
 
     /**
      * wait for server to start
      * @throws InterruptedException
      */
-    void waitForStartup() throws InterruptedException;
+    abstract void waitForStartup() throws InterruptedException;
 
     /**
      * get the first zxid of the next epoch
@@ -71,13 +130,13 @@ public interface LearnerMaster {
      * @throws InterruptedException
      * @throws IOException
      */
-    long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException;
+    abstract long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException;
 
     /**
      * ZKDatabase
      * @return ZKDatabase
      */
-    ZKDatabase getZKDatabase();
+    abstract ZKDatabase getZKDatabase();
 
     /**
      * wait for new leader to settle
@@ -85,43 +144,43 @@ public interface LearnerMaster {
      * @param zxid zxid at learner
      * @throws InterruptedException
      */
-    void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException;
+    abstract void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException;
 
     /**
      * last proposed zxid
      * @return last proposed zxid
      */
-    long getLastProposed();
+    abstract long getLastProposed();
 
     /**
      * the current tick
      * @return the current tick
      */
-    int getCurrentTick();
+    abstract int getCurrentTick();
 
     /**
      * time allowed for sync response
      * @return time allowed for sync response
      */
-    int syncTimeout();
+    abstract int syncTimeout();
 
     /**
      * deadline tick marking observer sync (initial)
      * @return deadline tick marking observer sync (initial)
      */
-    int getTickOfNextAckDeadline();
+    abstract int getTickOfNextAckDeadline();
 
     /**
      * next deadline tick marking observer sync (steady state)
      * @return next deadline tick marking observer sync (steady state)
      */
-    int getTickOfInitialAckDeadline();
+    abstract int getTickOfInitialAckDeadline();
 
     /**
      * decrement follower count
      * @return previous follower count
      */
-    long getAndDecrementFollowerCounter();
+    abstract long getAndDecrementFollowerCounter();
 
     /**
      * handle ack packet
@@ -129,14 +188,14 @@ public interface LearnerMaster {
      * @param zxid packet zxid
      * @param localSocketAddress forwarder's address
      */
-    void processAck(long sid, long zxid, SocketAddress localSocketAddress);
+    abstract void processAck(long sid, long zxid, SocketAddress localSocketAddress);
 
     /**
      * mark session as alive
      * @param sess session id
      * @param to timeout
      */
-    void touch(long sess, int to);
+    abstract void touch(long sess, int to);
 
     /**
      * handle revalidate packet
@@ -144,13 +203,13 @@ public interface LearnerMaster {
      * @param learnerHandler learner
      * @throws IOException
      */
-    void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException;
+    abstract void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException;
 
     /**
      * proxy request from learner to server
      * @param si request
      */
-    void submitLearnerRequest(Request si);
+    abstract void submitLearnerRequest(Request si);
 
     /**
      * begin forwarding packets to learner handler
@@ -158,39 +217,39 @@ public interface LearnerMaster {
      * @param lastSeenZxid zxid of learner
      * @return last zxid forwarded
      */
-    long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid);
+    abstract long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid);
 
     /**
      * version of current quorum verifier
      * @return version of current quorum verifier
      */
-    long getQuorumVerifierVersion();
+    abstract long getQuorumVerifierVersion();
 
     /**
      *
      * @param sid server id
      * @return server information in the view
      */
-    String getPeerInfo(long sid);
+    abstract String getPeerInfo(long sid);
 
     /**
      * identifier of current quorum verifier for new leader
      * @return identifier of current quorum verifier for new leader
      */
-    byte[] getQuorumVerifierBytes();
+    abstract byte[] getQuorumVerifierBytes();
 
-    QuorumAuthServer getQuorumAuthServer();
+    abstract QuorumAuthServer getQuorumAuthServer();
 
     /**
      * registers the handler's bean
      * @param learnerHandler handler
      * @param socket connection to learner
      */
-    void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket);
+    abstract void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket);
 
     /**
      * unregisters the handler's bean
      * @param learnerHandler handler
      */
-    void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler);
+    abstract void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler);
 }

+ 0 - 44
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshot.java

@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-public class LearnerSnapshot {
-    private final LearnerSnapshotThrottler throttler;
-    private final int concurrentSnapshotNumber;
-    private final boolean essential;
-
-    LearnerSnapshot(LearnerSnapshotThrottler throttler, 
-            int concurrentSnapshotNumber, boolean essential) {
-        this.throttler = throttler;
-        this.concurrentSnapshotNumber = concurrentSnapshotNumber;
-        this.essential = essential;
-    }
-
-    public void close() {
-        throttler.endSnapshot();
-    }
-
-    public int getConcurrentSnapshotNumber() {
-        return concurrentSnapshotNumber;
-    }
-    
-    public boolean isEssential() {
-        return essential;
-    }
-}

+ 0 - 136
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java

@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.common.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class to limit the number of concurrent snapshots from a leader to
- * observers and followers.  {@link LearnerHandler} objects should call
- * {@link #beginSnapshot(boolean)} before sending a snapshot and
- * {@link #endSnapshot()} after finishing, successfully or not.
- *
- */
-public class LearnerSnapshotThrottler {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(LearnerSnapshotThrottler.class);
-
-    private final Object snapCountSyncObject = new Object();
-    private int snapsInProgress;
-
-    private final int maxConcurrentSnapshots;
-    private final long timeoutMillis;
-
-    /**
-     * Constructs a new instance limiting the concurrent number of snapshots to
-     * <code>maxConcurrentSnapshots</code>.
-     * @param maxConcurrentSnapshots maximum concurrent number of snapshots
-     * @param timeoutMillis milliseconds to attempt to wait when attempting to
-     *                      begin a snapshot that would otherwise be throttled;
-     *                      a value of zero means no waiting will be attempted
-     * @throws java.lang.IllegalArgumentException when <code>timeoutMillis</code>
-     *                                            is negative or
-     *                                            <code>maxConcurrentSnaphots</code>
-     *                                            is less than 1
-     */
-    public LearnerSnapshotThrottler(int maxConcurrentSnapshots,
-                                    long timeoutMillis) {
-        if (timeoutMillis < 0) {
-            String errorMsg = "timeout cannot be negative, was " + timeoutMillis;
-            throw new IllegalArgumentException(errorMsg);
-        }
-        if (maxConcurrentSnapshots <= 0) {
-            String errorMsg = "maxConcurrentSnapshots must be positive, was " +
-                    maxConcurrentSnapshots;
-            throw new IllegalArgumentException(errorMsg);
-        }
-
-        this.maxConcurrentSnapshots = maxConcurrentSnapshots;
-        this.timeoutMillis = timeoutMillis;
-
-        synchronized (snapCountSyncObject) {
-            snapsInProgress = 0;
-        }
-    }
-
-    public LearnerSnapshotThrottler(int maxConcurrentSnapshots) {
-        this(maxConcurrentSnapshots, 0);
-    }
-
-    /**
-     * Indicates that a new snapshot is about to be sent.
-     * 
-     * @param essential if <code>true</code>, do not throw an exception even
-     *                  if throttling limit is reached
-     * @throws SnapshotThrottleException if throttling limit has been exceeded
-     *                                   and <code>essential == false</code>,
-     *                                   even after waiting for the timeout
-     *                                   period, if any
-     * @throws InterruptedException if thread is interrupted while trying
-     *                              to start a snapshot; cannot happen if
-     *                              timeout is zero
-     */
-    public LearnerSnapshot beginSnapshot(boolean essential)
-            throws SnapshotThrottleException, InterruptedException {
-        int snapshotNumber;
-
-        synchronized (snapCountSyncObject) {
-            if (!essential
-                && timeoutMillis > 0
-                && snapsInProgress >= maxConcurrentSnapshots) {
-                long timestamp = Time.currentElapsedTime();
-                do {
-                    snapCountSyncObject.wait(timeoutMillis);
-                } while (snapsInProgress >= maxConcurrentSnapshots
-                         && timestamp + timeoutMillis < Time.currentElapsedTime());
-            }
-
-            if (essential || snapsInProgress < maxConcurrentSnapshots) {
-                snapsInProgress++;
-                snapshotNumber = snapsInProgress;
-            } else {
-                throw new SnapshotThrottleException(snapsInProgress + 1,
-                                                    maxConcurrentSnapshots);
-            }
-        }
-
-        return new LearnerSnapshot(this, snapshotNumber, essential);
-    }
-
-    /**
-     * Indicates that a snapshot has been completed.
-     */
-    public void endSnapshot() {
-        int newCount;
-        synchronized (snapCountSyncObject) {
-            snapsInProgress--;
-            newCount = snapsInProgress;
-            snapCountSyncObject.notify();
-        }
-
-        if (newCount < 0) {
-            String errorMsg =
-                    "endSnapshot() called incorrectly; current snapshot count is "
-                            + newCount;
-            LOG.error(errorMsg);
-        }
-    }
-}

+ 121 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottler.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to limit the number of concurrent syncs from a leader to
+ * observers and followers orß from a follower to observers.  {@link LearnerHandler}
+ * objects should call {@link #beginSync(boolean)} before sending a sync and
+ * {@link #endSync()} after finishing, successfully or not.
+ *
+ */
+public class LearnerSyncThrottler {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerSyncThrottler.class);
+
+    private final Object countSyncObject = new Object();
+    private int syncInProgress;
+
+    private volatile int maxConcurrentSyncs;
+
+    public enum SyncType {
+        DIFF,
+        SNAP
+    }
+
+    private final SyncType syncType;
+
+    /**
+     * Constructs a new instance limiting the concurrent number of syncs to
+     * <code>maxConcurrentSyncs</code>.
+     * @param maxConcurrentSyncs maximum concurrent number of syncs
+     * @param syncType either a snapshot sync or a txn-based diff sync
+     * @throws java.lang.IllegalArgumentException when <code>maxConcurrentSyncs</code>
+     *                                            is less than 1
+     */
+    public LearnerSyncThrottler(int maxConcurrentSyncs, SyncType syncType) throws IllegalArgumentException {
+        if (maxConcurrentSyncs <= 0) {
+            String errorMsg = "maxConcurrentSyncs must be positive, was " +
+                    maxConcurrentSyncs;
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        this.maxConcurrentSyncs = maxConcurrentSyncs;
+        this.syncType = syncType;
+
+        synchronized (countSyncObject) {
+            syncInProgress = 0;
+        }
+    }
+
+    /**
+     * Indicates that a new sync is about to be sent.
+     *
+     * @param essential if <code>true</code>, do not throw an exception even
+     *                  if throttling limit is reached
+     * @throws SyncThrottleException if throttling limit has been exceeded
+     *                                   and <code>essential == false</code>,
+     *                                   even after waiting for the timeout
+     *                                   period, if any
+     * @throws InterruptedException if thread is interrupted while trying
+     *                              to start a sync; cannot happen if
+     *                              timeout is zero
+     */
+    protected void beginSync(boolean essential) throws SyncThrottleException, InterruptedException {
+
+        synchronized (countSyncObject) {
+            if (essential || syncInProgress < maxConcurrentSyncs) {
+                syncInProgress++;
+            } else {
+                throw new SyncThrottleException(syncInProgress + 1,
+                        maxConcurrentSyncs, syncType);
+            }
+        }
+    }
+
+    /**
+     * Indicates that a sync has been completed.
+     */
+    public void endSync() {
+        int newCount;
+        synchronized (countSyncObject) {
+            syncInProgress--;
+            newCount = syncInProgress;
+            countSyncObject.notify();
+        }
+
+        if (newCount < 0) {
+            String errorMsg =
+                    "endSync() called incorrectly; current sync count is "
+                            + newCount;
+            LOG.error(errorMsg);
+        }
+    }
+
+    public void setMaxConcurrentSyncs(int maxConcurrentSyncs) {
+        this.maxConcurrentSyncs = maxConcurrentSyncs;
+    }
+
+    public int getSyncInProgress(){
+        return syncInProgress;
+    }
+}

+ 1 - 23
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java

@@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory;
  *
  * The logic is quite a bit simpler than the corresponding logic in Leader because it only hosts observers.
  */
-public class ObserverMaster implements LearnerMaster, Runnable {
+public class ObserverMaster extends LearnerMaster implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class);
 
     //Follower counter
@@ -93,20 +93,6 @@ public class ObserverMaster implements LearnerMaster, Runnable {
     // ensure ordering of revalidations returned to this learner
     private final Object revalidateSessionLock = new Object();
 
-    // Throttle when there are too many concurrent snapshots being sent to observers
-    private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots";
-    private static final int maxConcurrentSnapshots;
-
-    private static final String MAX_CONCURRENT_DIFFS = "zookeeper.leader.maxConcurrentDiffs";
-    private static final int maxConcurrentDiffs;
-    static {
-        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
-        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
-
-        maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
-        LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
-    }
-
     private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue<>();
     static class Revalidation {
         public final long sessionId;
@@ -137,9 +123,6 @@ public class ObserverMaster implements LearnerMaster, Runnable {
         }
     }
 
-    private final LearnerSnapshotThrottler learnerSnapshotThrottler =
-            new LearnerSnapshotThrottler(maxConcurrentSnapshots);
-
     private Thread thread;
     private ServerSocket ss;
     private boolean listenerRunning;
@@ -197,11 +180,6 @@ public class ObserverMaster implements LearnerMaster, Runnable {
         // since this is done by an active follower, we don't need to wait for anything
     }
 
-    @Override
-    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
-        return learnerSnapshotThrottler;
-    }
-
     @Override
     public void waitForStartup() throws InterruptedException {
         // since this is done by an active follower, we don't need to wait for anything

+ 11 - 9
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java → zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SyncThrottleException.java

@@ -19,21 +19,23 @@
 package org.apache.zookeeper.server.quorum;
 
 /**
- * Thrown when a {@link Leader} has too many concurrent snapshots being sent
+ * Thrown when a {@link Leader} has too many concurrent syncs being sent
  * to observers.
- * 
- * @see LearnerSnapshotThrottler
+ *
+ * @see LearnerSyncThrottler
  *
  */
-public class SnapshotThrottleException extends Exception {
+public class SyncThrottleException extends Exception {
     private static final long serialVersionUID = 1L;
 
-    public SnapshotThrottleException(int concurrentSnapshotNumber, int throttleThreshold) {
-        super(getMessage(concurrentSnapshotNumber, throttleThreshold));
+    public SyncThrottleException(int concurrentSyncNumber, int throttleThreshold,
+                                 LearnerSyncThrottler.SyncType syncType) {
+        super(getMessage(concurrentSyncNumber, throttleThreshold, syncType));
     }
 
-    private static String getMessage(int concurrentSnapshotNumber, int throttleThreshold) {
-        return String.format("new snapshot would make %d concurrently in progress; " +
-                "maximum is %d", concurrentSnapshotNumber, throttleThreshold);
+    private static String getMessage(int concurrentSyncNumber, int throttleThreshold,
+                                     LearnerSyncThrottler.SyncType syncType) {
+        return String.format("new %s sync would make %d concurrently in progress; maximum is %d",
+                syncType.toString().toLowerCase(), concurrentSyncNumber, throttleThreshold);
     }
 }

+ 0 - 219
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java

@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LearnerSnapshotThrottlerTest extends ZKTestCase {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(LearnerSnapshotThrottlerTest.class);
-
-    @Test(expected = SnapshotThrottleException.class)
-    public void testTooManySnapshotsNonessential() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
-        for (int i = 0; i < 6; i++) {
-            throttler.beginSnapshot(false);
-        }
-    }
-
-    @Test(expected = SnapshotThrottleException.class)
-    public void testTooManySnapshotsEssential() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
-        try {
-            for (int i = 0; i < 6; i++) {
-                throttler.beginSnapshot(true);
-            }
-        }
-        catch (SnapshotThrottleException ex) {
-            Assert.fail("essential snapshots should not be throttled");
-        }
-        throttler.endSnapshot();
-        throttler.beginSnapshot(false);
-    }
-
-    @Test
-    public void testNoThrottle() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
-        try {
-            for (int i = 0; i < 6; i++) {
-                throttler.beginSnapshot(true);
-            }
-        }
-        catch (SnapshotThrottleException ex) {
-            Assert.fail("essential snapshots should not be throttled");
-        }
-        throttler.endSnapshot();
-        for (int i = 0; i < 5; i++) {
-            throttler.endSnapshot();
-            throttler.beginSnapshot(false);
-        }
-    }
-
-    @Test
-    public void testTryWithResourceNoThrottle() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
-        for (int i = 0; i < 3; i++) {
-            LearnerSnapshot snapshot = throttler.beginSnapshot(false);
-            try {
-                Assert.assertFalse(snapshot.isEssential());
-                Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
-            } finally {
-                snapshot.close();
-            }
-        }
-    }
-
-    @Test(expected = SnapshotThrottleException.class)
-    public void testTryWithResourceThrottle() throws Exception {
-        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
-        LearnerSnapshot outer = throttler.beginSnapshot(true);
-        try {
-            LearnerSnapshot inner = throttler.beginSnapshot(false);
-            try {
-                Assert.fail("shouldn't be able to have both snapshots open");
-            } finally {
-                inner.close();
-            }
-        } finally {
-            outer.close();
-        }
-    }
-
-    @Test
-    public void testParallelNoThrottle() throws Exception {
-        final int numThreads = 50;
-
-        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(numThreads);
-        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
-        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
-        final CountDownLatch snapshotProgressLatch = new CountDownLatch(numThreads);
-
-        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            results.add(threadPool.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    threadStartLatch.countDown();
-                    try {
-                        threadStartLatch.await();
-
-                        throttler.beginSnapshot(false);
-
-                        snapshotProgressLatch.countDown();
-                        snapshotProgressLatch.await();
-
-                        throttler.endSnapshot();
-                    }
-                    catch (Exception e) {
-                        return false;
-                    }
-
-                    return true;
-                }
-            }));
-        }
-
-        for (Future<Boolean> result : results) {
-            Assert.assertTrue(result.get());
-        }
-    }
-
-    @Test
-    public void testPositiveTimeout() throws Exception {
-        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1, 200);
-        ExecutorService threadPool = Executors.newFixedThreadPool(1);
-
-        LearnerSnapshot first = throttler.beginSnapshot(false);
-        final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
-
-        Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
-            @Override
-            public Boolean call() {
-                try {
-                    snapshotProgressLatch.countDown();
-                    LearnerSnapshot second = throttler.beginSnapshot(false);
-                    second.close();
-                }
-                catch (Exception e) {
-                    return false;
-                }
-
-                return true;
-            }
-        });
-
-        snapshotProgressLatch.await();
-
-        first.close();
-
-        Assert.assertTrue(result.get());
-    }
-
-    @Test
-    public void testHighContentionWithTimeout() throws Exception {
-        int numThreads = 20;
-
-        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(2, 5000);
-        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
-        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
-
-        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            results.add(threadPool.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    threadStartLatch.countDown();
-                    try {
-                        threadStartLatch.await();
-
-                        LearnerSnapshot snap = throttler.beginSnapshot(false);
-
-                        int snapshotNumber = snap.getConcurrentSnapshotNumber();
-
-                        throttler.endSnapshot();
-
-                        return snapshotNumber <= 2;
-                    }
-                    catch (Exception e) {
-                        LOG.error("Exception trying to begin snapshot", e);
-                        return false;
-                    }
-                }
-            }));
-        }
-
-        for (Future<Boolean> result : results) {
-            Assert.assertTrue(result.get());
-        }
-    }
-}

+ 173 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottlerTest.java

@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class LearnerSyncThrottlerTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerSyncThrottlerTest.class);
+
+    private LearnerSyncThrottler.SyncType syncType;
+    public LearnerSyncThrottlerTest(LearnerSyncThrottler.SyncType syncType){
+        this.syncType = syncType;
+    }
+
+    @Parameterized.Parameters
+    public static Collection syncTypes() {
+        return Arrays.asList(new Object[][]{
+                {LearnerSyncThrottler.SyncType.DIFF}, {LearnerSyncThrottler.SyncType.SNAP}});
+    }
+    @Test(expected = SyncThrottleException.class)
+    public void testTooManySyncsNonessential() throws Exception {
+        LearnerSyncThrottler throttler =
+                new LearnerSyncThrottler(5, syncType);
+        for (int i = 0; i < 6; i++) {
+            throttler.beginSync(false);
+        }
+    }
+
+    @Test(expected = SyncThrottleException.class)
+    public void testTooManySyncsEssential() throws Exception {
+        LearnerSyncThrottler throttler =
+                new LearnerSyncThrottler(5, syncType);
+        try {
+            for (int i = 0; i < 6; i++) {
+                throttler.beginSync(true);
+            }
+        } catch (SyncThrottleException ex) {
+            Assert.fail("essential syncs should not be throttled");
+        }
+        throttler.endSync();
+        throttler.beginSync(false);
+    }
+
+    @Test
+    public void testNoThrottle() throws Exception {
+        LearnerSyncThrottler throttler =
+                new LearnerSyncThrottler(5, syncType);
+        try {
+            for (int i = 0; i < 6; i++) {
+                throttler.beginSync(true);
+            }
+        }
+        catch (SyncThrottleException ex) {
+            Assert.fail("essential syncs should not be throttled");
+        }
+        throttler.endSync();
+        for (int i = 0; i < 5; i++) {
+            throttler.endSync();
+            throttler.beginSync(false);
+        }
+        Assert.assertTrue("should get here without exception", true);
+    }
+
+    @Test
+    public void testTryWithResourceNoThrottle() throws Exception {
+        LearnerSyncThrottler throttler =
+                new LearnerSyncThrottler(1, syncType);
+        for (int i = 0; i < 3; i++) {
+            throttler.beginSync(false);
+            try {
+                Assert.assertEquals(1, throttler.getSyncInProgress());
+            } finally {
+                throttler.endSync();
+            }
+        }
+    }
+
+    @Test
+    public void testTryWithResourceThrottle() throws Exception {
+        LearnerSyncThrottler throttler =
+                new LearnerSyncThrottler(1, syncType);
+        try {
+            throttler.beginSync(true);
+            try {
+                throttler.beginSync(false);
+                Assert.fail("shouldn't be able to have both syncs open");
+            } catch (SyncThrottleException e) {
+            }
+            throttler.endSync();
+        } catch (SyncThrottleException e) {
+            Assert.fail("First sync shouldn't be throttled");
+        }
+    }
+
+    @Test
+    public void testParallelNoThrottle() {
+        final int numThreads = 50;
+
+        final LearnerSyncThrottler throttler =
+                new LearnerSyncThrottler(numThreads, syncType);
+        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+        final CountDownLatch syncProgressLatch = new CountDownLatch(numThreads);
+
+        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            results.add(threadPool.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    threadStartLatch.countDown();
+                    try {
+                        threadStartLatch.await();
+
+                        throttler.beginSync(false);
+
+                        syncProgressLatch.countDown();
+                        syncProgressLatch.await();
+
+                        throttler.endSync();
+                    } catch (Exception e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            }));
+        }
+
+        try {
+            for (Future<Boolean> result : results) {
+                Assert.assertTrue(result.get());
+            }
+        } catch (Exception e){
+
+        } finally {
+            threadPool.shutdown();
+        }
+    }
+}

+ 16 - 14
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

@@ -1203,7 +1203,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             });
 
             // 5. on the customized leader catch the beginSnapshot call in
-            //    LearnerSnapshotThrottler to set the node to value v2,
+            //    LearnerSyncThrottler to set the node to value v2,
             //    wait it hit data tree
             leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() {
                 @Override
@@ -1756,6 +1756,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
     static class CustomQuorumPeer extends QuorumPeer {
         private Context context;
 
+        private LearnerSyncThrottler throttler = null;
         private StartForwardingListener startForwardingListener;
         private BeginSnapshotListener beginSnapshotListener;
 
@@ -1818,20 +1819,21 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
                 }
 
                 @Override
-                public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
-                        int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
-                    return new LearnerSnapshotThrottler(
-                            maxConcurrentSnapshots, maxConcurrentSnapshotTimeout) {
-
-                        @Override
-                        public LearnerSnapshot beginSnapshot(boolean essential)
-                                throws SnapshotThrottleException, InterruptedException {
-                            if (beginSnapshotListener != null) {
-                                beginSnapshotListener.start();
+                public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
+                    if (throttler == null){
+                        throttler = new LearnerSyncThrottler(getMaxConcurrentSnapSyncs(),
+                                LearnerSyncThrottler.SyncType.SNAP){
+                            @Override
+                            public void beginSync(boolean essential)
+                                    throws SyncThrottleException, InterruptedException {
+                                if (beginSnapshotListener != null) {
+                                    beginSnapshotListener.start();
+                                }
+                                super.beginSync(essential);
                             }
-                            return super.beginSnapshot(essential);
-                        }
-                    };
+                        };
+                    }
+                    return throttler;
                 }
             };
         }