Przeglądaj źródła

ZOOKEEPER-3398: Learner.connectToLeader() may take too long to time-out

After leader election happens, the followers will connect to the leader which is facilitated by the Learner.connectToLeader() method.

Learner.connectToLeader() is relying on the initLimit configuration value to time-out in case the network connection is unreliable. This config may have a high value that could leave the ensemble retrying and waiting in the state of not having quorum for too long. The follower will retry up to 5 times.

This patch introduces a new configuration directive that will allow Zookeeper to use different time-out value `connectToLeaderLimit` which then could be set to lower value than `initLimit`.

Test plan:
- ant clean
- ant test-core-java

NOTE: Lots of whitespace changes, hope it helps.

Author: Vladimir Ivic <vladimir.ivic@me.com>

Reviewers: eolivelli@apache.org, hanm@apache.org, andor@apache.org

Closes #953 from vladimirivic/ZOOKEEPER-3398 and squashes the following commits:

da4ecd055 [Vladimir Ivic] Removed redundant test, chaning LearnerTest.connectToLearnerMasterLimitTest() params and assertions
6c413311c [Vladimir Ivic] Updating the tests with the new timeout parameter
5a89cbd7e [Vladimir Ivic] Rewriting timeout logic inside Leader.connectToLeader
99c065616 [Vladimir Ivic] Adding config connectToLearnerMasterLimit to prevent long connect timeout
Vladimir Ivic 6 lat temu
rodzic
commit
43ce772db0
24 zmienionych plików z 151 dodań i 77 usunięć
  1. 6 0
      zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
  2. 2 1
      zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
  3. 2 1
      zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
  4. 23 15
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
  5. 28 8
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
  6. 4 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
  7. 1 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
  8. 9 9
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
  9. 3 3
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
  10. 2 2
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
  11. 1 1
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
  12. 22 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
  13. 3 2
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
  14. 2 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
  15. 4 3
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
  16. 2 2
      zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
  17. 1 1
      zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
  18. 2 2
      zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
  19. 5 5
      zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
  20. 1 1
      zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
  21. 6 5
      zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
  22. 12 10
      zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
  23. 6 3
      zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
  24. 4 3
      zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java

+ 6 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -913,6 +913,12 @@ of servers -- that is, when deploying clusters of servers.
     connect and sync to a leader. Increased this value as needed, if
     the amount of data managed by ZooKeeper is large.
 
+* *connectToLearnerMasterLimit* :
+    (Java system property: zookeeper.**connectToLearnerMasterLimit**)
+    Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
+    connect to the leader after leader election. Defaults to the value of initLimit. 
+    Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
+        
 * *leaderServes* :
     (Java system property: zookeeper.**leaderServes**)
     Leader accepts client connections. Default value is "yes".

+ 2 - 1
zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java

@@ -176,11 +176,12 @@ public class BaseSysTest {
     final static int tickTime = 2000;
     final static int initLimit = 3;
     final static int syncLimit = 3;
+    final static int connectToLearnerMasterLimit = 3;
 
     public void startServer(int index) throws IOException {
         int port = fakeBasePort+10+index;
         if (fakeMachines) {
-            qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit);
+            qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             qps[index].start();
         } else {
             try {

+ 2 - 1
zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java

@@ -43,6 +43,7 @@ class QuorumPeerInstance implements Instance {
 
     private static final int syncLimit = 3;
     private static final int initLimit = 3;
+    private static final int connectToLearnerMasterLimit = 3;
     private static final int tickTime = 2000;
     String serverHostPort;
     int serverId;
@@ -191,7 +192,7 @@ class QuorumPeerInstance implements Instance {
                     return;
                 }
                 System.err.println("SnapDir = " + snapDir + " LogDir = " + logDir);
-                peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit);
+                peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
                 peer.start();
                 for(int i = 0; i < 5; i++) {
                     Thread.sleep(500);

+ 23 - 15
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -241,7 +241,7 @@ public class Learner {
     throws IOException {
         sock.connect(addr, timeout);
     }
-
+    
     /**
      * Establish a connection with the LearnerMaster found by findLearnerMaster.
      * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
@@ -256,41 +256,49 @@ public class Learner {
             throws IOException, InterruptedException, X509Exception {
         this.sock = createSocket();
 
-        int initLimitTime = self.tickTime * self.initLimit;
-        int remainingInitLimitTime;
+        // leader connection timeout defaults to tickTime * initLimit
+        int connectTimeout = self.tickTime * self.initLimit;
+
+        // but if connectToLearnerMasterLimit is specified, use that value to calculate
+        // timeout instead of using the initLimit value
+        if (self.connectToLearnerMasterLimit > 0) {
+          connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
+        }
+
+        int remainingTimeout;
         long startNanoTime = nanoTime();
 
         for (int tries = 0; tries < 5; tries++) {
             try {
                 // recalculate the init limit time because retries sleep for 1000 milliseconds
-                remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
-                if (remainingInitLimitTime <= 0) {
-                    LOG.error("initLimit exceeded on retries.");
-                    throw new IOException("initLimit exceeded on retries.");
+                remainingTimeout = connectTimeout - (int)((nanoTime() - startNanoTime) / 1000000);
+                if (remainingTimeout <= 0) {
+                    LOG.error("connectToLeader exceeded on retries.");
+                    throw new IOException("connectToLeader exceeded on retries.");
                 }
-
-                sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
+                
+                sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
                 if (self.isSslQuorum())  {
                     ((SSLSocket) sock).startHandshake();
                 }
                 sock.setTcpNoDelay(nodelay);
                 break;
             } catch (IOException e) {
-                remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
+                remainingTimeout = connectTimeout - (int)((nanoTime() - startNanoTime) / 1000000);
 
-                if (remainingInitLimitTime <= 1000) {
-                    LOG.error("Unexpected exception, initLimit exceeded. tries=" + tries +
-                             ", remaining init limit=" + remainingInitLimitTime +
+                if (remainingTimeout <= 1000) {
+                    LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries +
+                             ", remaining init limit=" + remainingTimeout +
                              ", connecting to " + addr,e);
                     throw e;
                 } else if (tries >= 4) {
                     LOG.error("Unexpected exception, retries exceeded. tries=" + tries +
-                             ", remaining init limit=" + remainingInitLimitTime +
+                             ", remaining init limit=" + remainingTimeout +
                              ", connecting to " + addr,e);
                     throw e;
                 } else {
                     LOG.warn("Unexpected exception, tries=" + tries +
-                            ", remaining init limit=" + remainingInitLimitTime +
+                            ", remaining init limit=" + remainingTimeout +
                             ", connecting to " + addr,e);
                     this.sock = createSocket();
                 }

+ 28 - 8
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -586,6 +586,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     protected volatile int syncLimit;
     
+    /**
+     * The number of ticks that can pass before retrying to connect to learner master
+     */
+    protected volatile int connectToLearnerMasterLimit;
+    
     /**
      * Enables/Disables sync request processor. This option is enabled
      * by default and is to be used with observers.
@@ -899,16 +904,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
-            long myid, int tickTime, int initLimit, int syncLimit,
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
             ServerCnxnFactory cnxnFactory) throws IOException {
         this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
-                initLimit, syncLimit, false, cnxnFactory,
+                initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory,
                 new QuorumMaj(quorumPeers));
     }
 
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
-            long myid, int tickTime, int initLimit, int syncLimit,
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
             boolean quorumListenOnAllIPs,
             ServerCnxnFactory cnxnFactory,
             QuorumVerifier quorumConfig) throws IOException {
@@ -919,6 +924,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         this.tickTime = tickTime;
         this.initLimit = initLimit;
         this.syncLimit = syncLimit;
+        this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
         this.quorumListenOnAllIPs = quorumListenOnAllIPs;
         this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
         this.zkDb = new ZKDatabase(this.logFactory);
@@ -1049,7 +1055,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
       }
       return count;
     }
-
+    
     /**
 
      * This constructor is only used by the existing unit test code.
@@ -1057,10 +1063,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
             File logDir, int clientPort, int electionAlg,
-            long myid, int tickTime, int initLimit, int syncLimit)
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit)
         throws IOException
     {
-        this(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+        this(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false,
                 ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
                 new QuorumMaj(quorumPeers));
     }
@@ -1071,12 +1077,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
             File logDir, int clientPort, int electionAlg,
-            long myid, int tickTime, int initLimit, int syncLimit,
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
             QuorumVerifier quorumConfig)
         throws IOException
     {
         this(quorumPeers, snapDir, logDir, electionAlg,
-                myid,tickTime, initLimit,syncLimit, false,
+                myid,tickTime, initLimit,syncLimit, connectToLearnerMasterLimit, false,
                 ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
                 quorumConfig);
     }
@@ -1785,6 +1791,20 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         this.syncLimit = syncLimit;
     }
     
+    /**
+     * Get the connectToLearnerMasterLimit
+     */
+    public int getConnectToLearnerMasterLimit() {
+        return connectToLearnerMasterLimit;
+    }
+    
+    /**
+     * Set the connectToLearnerMasterLimit
+     */
+    public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
+        LOG.info("connectToLearnerMasterLimit set to " + connectToLearnerMasterLimit);
+        this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
+    }
     
     /**
      * The syncEnabled can also be set via a system property.

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java

@@ -94,6 +94,7 @@ public class QuorumPeerConfig {
 
     protected int initLimit;
     protected int syncLimit;
+    protected int connectToLearnerMasterLimit;
     protected int electionAlg = 3;
     protected int electionPort = 2182;
     protected boolean quorumListenOnAllIPs = false;
@@ -306,6 +307,8 @@ public class QuorumPeerConfig {
                 initLimit = Integer.parseInt(value);
             } else if (key.equals("syncLimit")) {
                 syncLimit = Integer.parseInt(value);
+            } else if (key.equals("connectToLearnerMasterLimit")) {
+                connectToLearnerMasterLimit = Integer.parseInt(value);
             } else if (key.equals("electionAlg")) {
                 electionAlg = Integer.parseInt(value);
                 if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) {
@@ -834,6 +837,7 @@ public class QuorumPeerConfig {
 
     public int getInitLimit() { return initLimit; }
     public int getSyncLimit() { return syncLimit; }
+    public int getConnectToLearnerMasterLimit() { return connectToLearnerMasterLimit; }
     public int getElectionAlg() { return electionAlg; }
     public int getElectionPort() { return electionPort; }
 

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

@@ -189,6 +189,7 @@ public class QuorumPeerMain {
           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
           quorumPeer.setInitLimit(config.getInitLimit());
           quorumPeer.setSyncLimit(config.getSyncLimit());
+          quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
           quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
           quorumPeer.setConfigFileName(config.getConfigFilename());
           quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());

+ 9 - 9
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java

@@ -118,7 +118,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         public void run(){
             try {
-                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2);
+                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2, 2);
                 QuorumCnxManager cnxManager = peer.createCnxnManager();
                 QuorumCnxManager.Listener listener = cnxManager.listener;
                 if(listener != null){
@@ -162,7 +162,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         thread.start();
 
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -209,7 +209,7 @@ public class CnxManagerTest extends ZKTestCase {
                         new InetSocketAddress(deadAddress, PortAssignment.unique())));
         peerTmpdir[2] = ClientBase.createTmpDir();
 
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -237,7 +237,7 @@ public class CnxManagerTest extends ZKTestCase {
      */
     @Test
     public void testCnxManagerSpinLock() throws Exception {
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -302,7 +302,7 @@ public class CnxManagerTest extends ZKTestCase {
         // the connecting peer (id = 2) is a 3.4.6 observer
         peers.get(2L).type = LearnerType.OBSERVER;
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
-                peerClientPort[1], 3, 1, 1000, 2, 2);
+                peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if (listener != null) {
@@ -349,7 +349,7 @@ public class CnxManagerTest extends ZKTestCase {
      */
     @Test
     public void testSocketTimeout() throws Exception {
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -434,7 +434,7 @@ public class CnxManagerTest extends ZKTestCase {
         };
 
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0],
-                peerClientPort[0], 3, 0, 2000, 2, 2) {
+                peerClientPort[0], 3, 0, 2000, 2, 2, 2) {
             @Override
             public QuorumX509Util createX509Util() {
                 return mockedX509Util;
@@ -457,7 +457,7 @@ public class CnxManagerTest extends ZKTestCase {
             for (int sid = 0; sid < 3; sid++) {
                 QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid],
                         peerTmpdir[sid], peerClientPort[sid], 3, sid, 1000, 2,
-                        2);
+                        2, 2);
                 LOG.info("Starting peer {}", peer.getId());
                 peer.start();
                 peerList.add(sid, peer);
@@ -477,7 +477,7 @@ public class CnxManagerTest extends ZKTestCase {
                     // Restart halted node and verify count
                     peer = new QuorumPeer(peers, peerTmpdir[myid],
                             peerTmpdir[myid], peerClientPort[myid], 3, myid,
-                            1000, 2, 2);
+                            1000, 2, 2, 2);
                     LOG.info("Round {}, restarting peer ",
                             new Object[] { i, peer.getId() });
                     peer.start();

+ 3 - 3
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java

@@ -106,7 +106,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
         /*
          * Start server 0
          */
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
         peer.startLeaderElection();
         FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
         thread.start();
@@ -114,7 +114,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
         /*
          * Start mock server 1
          */
-        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2, 2);
         cnxManagers[0] = mockPeer.createCnxnManager();
         cnxManagers[0].listener.start();
 
@@ -123,7 +123,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
         /*
          * Start mock server 2
          */
-        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
+        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2, 2);
         cnxManagers[1] = mockPeer.createCnxnManager();
         cnxManagers[1].listener.start();
 

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java

@@ -78,7 +78,7 @@ public class FLELostMessageTest extends ZKTestCase {
         /*
          * Start server 0
          */
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2, 2);
         peer.startLeaderElection();
         FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
         thread.start();
@@ -94,7 +94,7 @@ public class FLELostMessageTest extends ZKTestCase {
     }
 
     void mockServer() throws InterruptedException, IOException {
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
         cnxManager = peer.createCnxnManager();
         cnxManager.listener.start();
 

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java

@@ -52,7 +52,7 @@ public class FLEOutOfElectionTest {
                     new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
         }
         QuorumPeer peer = new QuorumPeer(peers, tmpdir, tmpdir, 
-                PortAssignment.unique(), 3, 3, 1000, 2, 2);
+                PortAssignment.unique(), 3, 3, 1000, 2, 2, 2);
         fle = new FastLeaderElection(peer, peer.createCnxnManager());
     }
 

+ 22 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java

@@ -139,6 +139,28 @@ public class LearnerTest extends ZKTestCase {
         }
     }
 
+    @Test
+    public void connectToLearnerMasterLimitTest() throws Exception {
+      TimeoutLearner learner = new TimeoutLearner();
+      learner.self = new QuorumPeer();
+      learner.self.setTickTime(2000);
+      learner.self.setInitLimit(2);
+      learner.self.setSyncLimit(2);
+      learner.self.setConnectToLearnerMasterLimit(5);
+      
+      InetSocketAddress addr = new InetSocketAddress(1111);
+      learner.setTimeMultiplier((long)4000 * 1000000);
+      learner.setPassConnectAttempt(5);
+      
+      try {
+          learner.connectToLeader(addr, "");
+          Assert.fail("should have thrown IOException!");
+      } catch (IOException e) {
+        Assert.assertTrue(learner.nanoTime() > 2000*5*1000000);
+        Assert.assertEquals(3, learner.getSockConnectAttempt());
+      }
+    }
+
     @Test
     public void syncTest() throws Exception {
         File tmpFile = File.createTempFile("test", ".dir", testData);

+ 3 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java

@@ -39,6 +39,7 @@ public class QuorumPeerTest {
     private int tickTime = 2000;
     private int initLimit = 3;
     private int syncLimit = 3;
+    private int connectToLearnerMasterLimit = 3;
 
     /**
      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2301
@@ -60,7 +61,7 @@ public class QuorumPeerTest {
          * QuorumPeer constructor without QuorumVerifier
          */
         QuorumPeer peer1 = new QuorumPeer(peersView, dataDir, dataDir, clientPort, electionAlg, myId, tickTime,
-                initLimit, syncLimit);
+                initLimit, syncLimit, connectToLearnerMasterLimit);
         String hostString1 = peer1.cnxnFactory.getLocalAddress().getHostString();
         assertEquals(clientIP.getHostAddress(), hostString1);
 
@@ -77,7 +78,7 @@ public class QuorumPeerTest {
                         new InetSocketAddress(clientIP, PortAssignment.unique()),
                         new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
         QuorumPeer peer2 = new QuorumPeer(peersView, dataDir, dataDir, clientPort, electionAlg, myId, tickTime,
-                initLimit, syncLimit);
+                initLimit, syncLimit, connectToLearnerMasterLimit);
         String hostString2 = peer2.cnxnFactory.getLocalAddress().getHostString();
         assertEquals(clientIP.getHostAddress(), hostString2);
         // cleanup

+ 2 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java

@@ -126,6 +126,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             fwriter.write("tickTime=" + tickTime + "\n");
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
+            fwriter.write("connectToLearnerMasterLimit=5\n");
 
             tmpDir = new File(baseDir, "data");
             if (!tmpDir.mkdir()) {
@@ -229,6 +230,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             fwriter.write("tickTime=4000\n");
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
+            fwriter.write("connectToLearnerMasterLimit=5\n");
             if(configs != null){
                 fwriter.write(configs);
             }

+ 4 - 3
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java

@@ -208,9 +208,9 @@ public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
         private boolean newLeaderMessage = false;
 
         public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort,
-                int electionAlg, long myid, int tickTime, int initLimit, int syncLimit)
+                int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit)
                 throws IOException {
-            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false,
                     ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
         }
 
@@ -256,7 +256,8 @@ public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
                 throws IOException, AdminServerException {
             quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(),
                     config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
-                    config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
+                    config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(),
+                    config.getConnectToLearnerMasterLimit());
             quorumPeer.setConfigFileName(config.getConfigFilename());
             quorumPeer.start();
             try {

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java

@@ -165,7 +165,7 @@ public class FLENewEpochTest extends ZKTestCase {
           }
 
           for(int i = 1; i < count; i++) {
-              QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+              QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
               peer.startLeaderElection();
               LEThread thread = new LEThread(peer, i);
               thread.start();
@@ -174,7 +174,7 @@ public class FLENewEpochTest extends ZKTestCase {
           if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS))
               Assert.fail("First leader election failed");
 
-          QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+          QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
           peer.startLeaderElection();
           LEThread thread = new LEThread(peer, 0);
           thread.start();

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java

@@ -75,7 +75,7 @@ public class FLEPredicateTest extends ZKTestCase {
         try{
             File tmpDir = ClientBase.createTmpDir();
             QuorumPeer peer = new QuorumPeer(peers, tmpDir, tmpDir,
-                                        PortAssignment.unique(), 3, 0, 1000, 2, 2);
+                                        PortAssignment.unique(), 3, 0, 1000, 2, 2, 2);
         
             MockFLE mock = new MockFLE(peer);
             mock.start();

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java

@@ -123,7 +123,7 @@ public class FLERestartTest extends ZKTestCase {
                             QuorumBase.shutdown(peer);
                             ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
 
-                            peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+                            peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
                             peer.startLeaderElection();
                             peerRound++;
                         } else {
@@ -171,7 +171,7 @@ public class FLERestartTest extends ZKTestCase {
         }
 
         for(int i = 0; i < count; i++) {
-            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
             peer.startLeaderElection();
             FLERestartThread thread = new FLERestartThread(peer, i);
             thread.start();

+ 5 - 5
zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java

@@ -324,7 +324,7 @@ public class FLETest extends ZKTestCase {
          */
         for(int i = 0; i < count; i++) {
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
-                    port[i], 3, i, 1000, 2, 2);
+                    port[i], 3, i, 1000, 2, 2, 2);
             peer.startLeaderElection();
             LEThread thread = new LEThread(this, peer, i, rounds, quora);
             thread.start();
@@ -429,7 +429,7 @@ public class FLETest extends ZKTestCase {
         // start 2 peers and verify if they form the cluster
         for (sid = 0; sid < 2; sid++) {
             peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
-                                             port[sid], 3, sid, 2000, 2, 2);
+                                             port[sid], 3, sid, 2000, 2, 2, 2);
             LOG.info("Starting peer " + peer.getId());
             peer.start();
             peerList.add(sid, peer);
@@ -443,7 +443,7 @@ public class FLETest extends ZKTestCase {
             !v1.isSuccess());
         // Start 3rd peer and check if it goes in LEADING state
         peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
-                 port[sid], 3, sid, 2000, 2, 2);
+                 port[sid], 3, sid, 2000, 2, 2, 2);
         LOG.info("Starting peer " + peer.getId());
         peer.start();
         peerList.add(sid, peer);
@@ -488,7 +488,7 @@ public class FLETest extends ZKTestCase {
         // start 2 peers and verify if they form the cluster
         for (sid = 0; sid < 2; sid++) {
             peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
-                                             port[sid], 3, sid, 2000, 2, 2);
+                                             port[sid], 3, sid, 2000, 2, 2, 2);
             LOG.info("Starting peer " + peer.getId());
             peer.start();
             peerList.add(sid, peer);
@@ -510,7 +510,7 @@ public class FLETest extends ZKTestCase {
         peer.setCurrentVote(newVote);
         // Start 3rd peer and check if it joins the quorum
         peer = new QuorumPeer(peers, tmpdir[2], tmpdir[2],
-                 port[2], 3, 2, 2000, 2, 2);
+                 port[2], 3, 2, 2000, 2, 2, 2);
         LOG.info("Starting peer " + peer.getId());
         peer.start();
         peerList.add(sid, peer);

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java

@@ -154,7 +154,7 @@ public class FLEZeroWeightTest extends ZKTestCase {
 
         for(int i = 0; i < count; i++) {
             QuorumHierarchical hq = new QuorumHierarchical(qp);
-            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq);
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2, hq);
             peer.startLeaderElection();
             LEThread thread = new LEThread(peer, i);
             thread.start();

+ 6 - 5
zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java

@@ -145,6 +145,7 @@ public class HierarchicalQuorumTest extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        int connectToLearnerMasterLimit = 3;
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1, 
                 new InetSocketAddress("127.0.0.1", port1),
@@ -178,22 +179,22 @@ public class HierarchicalQuorumTest extends ClientBase {
                qp.setProperty("server.5", "127.0.0.1:" + port5 + ":" + leport5 +  ":observer" + ";" + clientport5);
         }
         QuorumHierarchical hq1 = new QuorumHierarchical(qp); 
-        s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, hq1);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq1);
         Assert.assertEquals(clientport1, s1.getClientPort());
         
         LOG.info("creating QuorumPeer 2 port " + clientport2);
         QuorumHierarchical hq2 = new QuorumHierarchical(qp); 
-        s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, hq2);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq2);
         Assert.assertEquals(clientport2, s2.getClientPort());
         
         LOG.info("creating QuorumPeer 3 port " + clientport3);
         QuorumHierarchical hq3 = new QuorumHierarchical(qp); 
-        s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, hq3);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq3);
         Assert.assertEquals(clientport3, s3.getClientPort());
         
         LOG.info("creating QuorumPeer 4 port " + clientport4);
         QuorumHierarchical hq4 = new QuorumHierarchical(qp); 
-        s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, hq4);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq4);
         if (withObservers) {
             s4.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
         }
@@ -201,7 +202,7 @@ public class HierarchicalQuorumTest extends ClientBase {
                        
         LOG.info("creating QuorumPeer 5 port " + clientport5);
         QuorumHierarchical hq5 = new QuorumHierarchical(qp); 
-        s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, hq5);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq5);
         if (withObservers) {
             s5.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
         }

+ 12 - 10
zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java

@@ -137,6 +137,7 @@ public class QuorumBase extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        int connectToLearnerMasterLimit = 3;
         Map<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1,
                 new InetSocketAddress(LOCALADDR, port1),
@@ -170,19 +171,19 @@ public class QuorumBase extends ClientBase {
         }
 
         LOG.info("creating QuorumPeer 1 port " + portClient1);
-        s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient1, s1.getClientPort());
         LOG.info("creating QuorumPeer 2 port " + portClient2);
-        s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient2, s2.getClientPort());
         LOG.info("creating QuorumPeer 3 port " + portClient3);
-        s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient3, s3.getClientPort());
         LOG.info("creating QuorumPeer 4 port " + portClient4);
-        s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient4, s4.getClientPort());
         LOG.info("creating QuorumPeer 5 port " + portClient5);
-        s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient5, s5.getClientPort());
 
         if (withObservers) {
@@ -299,6 +300,7 @@ public class QuorumBase extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        int connectToLearnerMasterLimit = 3;
 
         if(peers == null){
             peers = new HashMap<Long,QuorumServer>();
@@ -333,27 +335,27 @@ public class QuorumBase extends ClientBase {
         switch(i){
         case 1:
             LOG.info("creating QuorumPeer 1 port " + portClient1);
-            s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+            s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient1, s1.getClientPort());
             break;
         case 2:
             LOG.info("creating QuorumPeer 2 port " + portClient2);
-            s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+            s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient2, s2.getClientPort());
             break;
         case 3:
             LOG.info("creating QuorumPeer 3 port " + portClient3);
-            s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+            s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient3, s3.getClientPort());
             break;
         case 4:
             LOG.info("creating QuorumPeer 4 port " + portClient4);
-            s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+            s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient4, s4.getClientPort());
             break;
         case 5:
             LOG.info("creating QuorumPeer 5 port " + portClient5);
-            s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+            s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient5, s5.getClientPort());
         }
     }

+ 6 - 3
zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java

@@ -73,6 +73,8 @@ public class QuorumUtil {
     private int initLimit;
 
     private int syncLimit;
+    
+    private int connectToLearnerMasterLimit;
 
     private int electionAlg;
 
@@ -94,6 +96,7 @@ public class QuorumUtil {
             tickTime = 2000;
             initLimit = 3;
             this.syncLimit = syncLimit;
+            connectToLearnerMasterLimit = 3;
             electionAlg = 3;
             hostPort = "";
 
@@ -115,7 +118,7 @@ public class QuorumUtil {
                 PeerStruct ps = peers.get(i);
                 LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort);
                 ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort,
-                        electionAlg, ps.id, tickTime, initLimit, syncLimit);
+                        electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
                 Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
             }
         } catch (Exception e) {
@@ -202,7 +205,7 @@ public class QuorumUtil {
         PeerStruct ps = getPeer(id);
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
-                ps.id, tickTime, initLimit, syncLimit);
+                ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         if (localSessionEnabled) {
             ps.peer.enableLocalSessions(true);
         }
@@ -221,7 +224,7 @@ public class QuorumUtil {
         PeerStruct ps = getPeer(id);
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
-                ps.id, tickTime, initLimit, syncLimit);
+                ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         if (localSessionEnabled) {
             ps.peer.enableLocalSessions(true);
         }

+ 4 - 3
zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java

@@ -185,6 +185,7 @@ public class TruncateTest extends ZKTestCase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+				int connectToLearnerMasterLimit = 3;
 
         int port1 = PortAssignment.unique();
         int port2 = PortAssignment.unique();
@@ -205,9 +206,9 @@ public class TruncateTest extends ZKTestCase {
                        new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
                        new InetSocketAddress("127.0.0.1", port3)));
 
-        QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit);
+        QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         s2.start();
-        QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit);
+        QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         s3.start();
         zk = ClientBase.createZKClient("127.0.0.1:" + port2, 15000);
 
@@ -223,7 +224,7 @@ public class TruncateTest extends ZKTestCase {
         } catch(KeeperException.NoNodeException e) {
             // this is what we want
         }
-        QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit);
+        QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         s1.start();
         ZooKeeper zk1 = ClientBase.createZKClient("127.0.0.1:" + port1, 15000);
         zk1.getData("/9", false, new Stat());