|
@@ -18,6 +18,11 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
+import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
|
|
|
+import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader;
|
|
|
+import static org.apache.zookeeper.server.quorum.ZabUtils.MockLeader;
|
|
|
+import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
|
|
|
+
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.spy;
|
|
@@ -27,8 +32,6 @@ import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
|
-import java.io.FileNotFoundException;
|
|
|
-import java.io.FileOutputStream;
|
|
|
import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.EOFException;
|
|
@@ -37,15 +40,12 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.ServerSocket;
|
|
|
import java.net.Socket;
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.Map;
|
|
|
|
|
|
import org.apache.jute.BinaryInputArchive;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
|
import org.apache.jute.InputArchive;
|
|
|
import org.apache.jute.OutputArchive;
|
|
|
-import org.apache.zookeeper.PortAssignment;
|
|
|
import org.apache.zookeeper.WatchedEvent;
|
|
|
import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.Watcher.Event.EventType;
|
|
@@ -55,13 +55,10 @@ import org.apache.zookeeper.server.ByteBufferInputStream;
|
|
|
import org.apache.zookeeper.server.ByteBufferOutputStream;
|
|
|
import org.apache.zookeeper.server.DataTree;
|
|
|
import org.apache.zookeeper.server.Request;
|
|
|
-import org.apache.zookeeper.server.ServerCnxn;
|
|
|
-import org.apache.zookeeper.server.ServerCnxnFactory;
|
|
|
import org.apache.zookeeper.server.ZKDatabase;
|
|
|
-import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
|
|
|
+
|
|
|
import org.apache.zookeeper.server.util.ZxidUtils;
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
|
import org.apache.zookeeper.test.TestUtils;
|
|
@@ -78,7 +75,6 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class Zab1_0Test extends ZKTestCase {
|
|
|
- private static final int SYNC_LIMIT = 2;
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class);
|
|
|
|
|
@@ -109,26 +105,6 @@ public class Zab1_0Test extends ZKTestCase {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- private static final class MockLeader extends Leader {
|
|
|
-
|
|
|
- MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
|
|
|
- throws IOException {
|
|
|
- super(qp, zk);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This method returns the value of the variable that holds the epoch
|
|
|
- * to be proposed and that has been proposed, depending on the point
|
|
|
- * of the execution in which it is called.
|
|
|
- *
|
|
|
- * @return epoch
|
|
|
- */
|
|
|
- public long getCurrentEpochToPropose() {
|
|
|
- return epoch;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
public static final class FollowerMockThread extends Thread {
|
|
|
private final Leader leader;
|
|
@@ -287,54 +263,6 @@ public class Zab1_0Test extends ZKTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static final class NullServerCnxnFactory extends ServerCnxnFactory {
|
|
|
- public void startup(ZooKeeperServer zkServer, boolean startServer)
|
|
|
- throws IOException, InterruptedException {
|
|
|
- }
|
|
|
- public void start() {
|
|
|
- }
|
|
|
- public void shutdown() {
|
|
|
- }
|
|
|
- public void setMaxClientCnxnsPerHost(int max) {
|
|
|
- }
|
|
|
- public void join() throws InterruptedException {
|
|
|
- }
|
|
|
- public int getMaxClientCnxnsPerHost() {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- public int getLocalPort() {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- public InetSocketAddress getLocalAddress() {
|
|
|
- return null;
|
|
|
- }
|
|
|
- public Iterable<ServerCnxn> getConnections() {
|
|
|
- return null;
|
|
|
- }
|
|
|
- public void configure(InetSocketAddress addr, int maxcc, boolean secure)
|
|
|
- throws IOException {
|
|
|
- }
|
|
|
-
|
|
|
- public boolean closeSession(long sessionId) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- public void closeAll() {
|
|
|
- }
|
|
|
- @Override
|
|
|
- public int getNumAliveConnections() {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void reconfigure(InetSocketAddress addr) {
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void resetAllConnectionStats() {
|
|
|
- }
|
|
|
- @Override
|
|
|
- public Iterable<Map<String, Object>> getAllConnectionInfo(boolean brief) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
static Socket[] getSocketPair() throws IOException {
|
|
|
ServerSocket ss =
|
|
|
new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
|
|
@@ -1006,7 +934,7 @@ public class Zab1_0Test extends ZKTestCase {
|
|
|
|
|
|
LOG.info("Proposal sent.");
|
|
|
|
|
|
- for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) {
|
|
|
+ for (int i = 0; i < (2 * ZabUtils.SYNC_LIMIT) + 2; i++) {
|
|
|
try {
|
|
|
ia.readRecord(qp, null);
|
|
|
LOG.info("Ping received: " + i);
|
|
@@ -1252,27 +1180,6 @@ public class Zab1_0Test extends ZKTestCase {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Leader createLeader(File tmpDir, QuorumPeer peer)
|
|
|
- throws IOException, NoSuchFieldException, IllegalAccessException {
|
|
|
- LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
|
|
|
- return new Leader(peer, zk);
|
|
|
- }
|
|
|
-
|
|
|
- private Leader createMockLeader(File tmpDir, QuorumPeer peer)
|
|
|
- throws IOException, NoSuchFieldException, IllegalAccessException {
|
|
|
- LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
|
|
|
- return new MockLeader(peer, zk);
|
|
|
- }
|
|
|
-
|
|
|
- private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer)
|
|
|
- throws IOException, NoSuchFieldException, IllegalAccessException {
|
|
|
- FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
|
|
|
- peer.setTxnFactory(logFactory);
|
|
|
- ZKDatabase zkDb = new ZKDatabase(logFactory);
|
|
|
- LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
|
|
|
- return zk;
|
|
|
- }
|
|
|
-
|
|
|
static class ConversableFollower extends Follower {
|
|
|
|
|
|
ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
|
|
@@ -1326,41 +1233,6 @@ public class Zab1_0Test extends ZKTestCase {
|
|
|
return new ConversableObserver(peer, zk);
|
|
|
}
|
|
|
|
|
|
- private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
|
|
|
- HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
|
|
|
- QuorumPeer peer = QuorumPeer.testingQuorumPeer();
|
|
|
- peer.syncLimit = SYNC_LIMIT;
|
|
|
- peer.initLimit = 2;
|
|
|
- peer.tickTime = 2000;
|
|
|
-
|
|
|
- peers.put(0L, new QuorumServer(
|
|
|
- 0, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
|
|
|
- new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
|
|
|
- new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
|
|
|
- peers.put(1L, new QuorumServer(
|
|
|
- 1, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
|
|
|
- new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
|
|
|
- new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
|
|
|
- peers.put(2L, new QuorumServer(
|
|
|
- 2, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
|
|
|
- new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
|
|
|
- new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
|
|
|
-
|
|
|
- peer.setQuorumVerifier(new QuorumMaj(peers), false);
|
|
|
- peer.setCnxnFactory(new NullServerCnxnFactory());
|
|
|
- File version2 = new File(tmpDir, "version-2");
|
|
|
- version2.mkdir();
|
|
|
- ClientBase.createInitializeFile(tmpDir);
|
|
|
- FileOutputStream fos;
|
|
|
- fos = new FileOutputStream(new File(version2, "currentEpoch"));
|
|
|
- fos.write("0\n".getBytes());
|
|
|
- fos.close();
|
|
|
- fos = new FileOutputStream(new File(version2, "acceptedEpoch"));
|
|
|
- fos.write("0\n".getBytes());
|
|
|
- fos.close();
|
|
|
- return peer;
|
|
|
- }
|
|
|
-
|
|
|
private String readContentsOfFile(File f) throws IOException {
|
|
|
return new BufferedReader(new FileReader(f)).readLine();
|
|
|
}
|