|
@@ -18,13 +18,18 @@
|
|
|
|
|
|
package org.apache.zookeeper.test;
|
|
|
|
|
|
+import static java.net.InetAddress.getLoopbackAddress;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.ServerSocket;
|
|
|
import java.net.UnknownHostException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import org.apache.zookeeper.AsyncCallback.DataCallback;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
@@ -33,18 +38,16 @@ import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZKTestCase;
|
|
|
import org.apache.zookeeper.ZooDefs;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
-import org.apache.zookeeper.AsyncCallback.DataCallback;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
import org.apache.zookeeper.jmx.CommonNames;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
|
-import org.apache.zookeeper.server.quorum.QuorumStats;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
|
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
|
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
|
|
|
-import org.junit.Assert;
|
|
|
import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -650,6 +653,89 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
|
|
|
closeAllHandles(zkArr);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testPortChangeToBlockedPortFollower() throws Exception {
|
|
|
+ testPortChangeToBlockedPort(false);
|
|
|
+ }
|
|
|
+ @Test
|
|
|
+ public void testPortChangeToBlockedPortLeader() throws Exception {
|
|
|
+ testPortChangeToBlockedPort(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testPortChangeToBlockedPort(boolean testLeader) throws Exception {
|
|
|
+ qu = new QuorumUtil(1); // create 3 servers
|
|
|
+ qu.disableJMXTest = true;
|
|
|
+ qu.startAll();
|
|
|
+ ZooKeeper[] zkArr = createHandles(qu);
|
|
|
+
|
|
|
+ List<String> joiningServers = new ArrayList<String>();
|
|
|
+
|
|
|
+ int leaderIndex = getLeaderId(qu);
|
|
|
+ int followerIndex = leaderIndex == 1 ? 2 : 1;
|
|
|
+ int serverIndex = testLeader ? leaderIndex : followerIndex;
|
|
|
+ int reconfigIndex = testLeader ? followerIndex : leaderIndex;
|
|
|
+
|
|
|
+ // modify server's client port
|
|
|
+ int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getPort();
|
|
|
+ int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getPort();
|
|
|
+ int oldClientPort = qu.getPeer(serverIndex).peer.getClientPort();
|
|
|
+ int newClientPort = PortAssignment.unique();
|
|
|
+
|
|
|
+ try(ServerSocket ss = new ServerSocket()) {
|
|
|
+ ss.bind(new InetSocketAddress(getLoopbackAddress(), newClientPort));
|
|
|
+
|
|
|
+ joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort
|
|
|
+ + ":" + electionPort + ":participant;localhost:" + newClientPort);
|
|
|
+
|
|
|
+ // create a /test znode and check that read/write works before
|
|
|
+ // any reconfig is invoked
|
|
|
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
|
|
|
+
|
|
|
+ // Reconfigure
|
|
|
+ reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1);
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ // The follower reconfiguration will have failed
|
|
|
+ zkArr[serverIndex].close();
|
|
|
+ zkArr[serverIndex] = new ZooKeeper("127.0.0.1:"
|
|
|
+ + newClientPort,
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
+ public void process(WatchedEvent event) {}});
|
|
|
+
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1);
|
|
|
+ Assert.fail("New client connected to new client port!");
|
|
|
+ } catch (KeeperException.ConnectionLossException e) {
|
|
|
+ // Exception is expected
|
|
|
+ }
|
|
|
+
|
|
|
+ //The old port should be clear at this stage
|
|
|
+
|
|
|
+ try (ServerSocket ss2 = new ServerSocket()) {
|
|
|
+ ss2.bind(new InetSocketAddress(getLoopbackAddress(), oldClientPort));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Move back to the old port
|
|
|
+ joiningServers.clear();
|
|
|
+ joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort
|
|
|
+ + ":" + electionPort + ":participant;localhost:" + oldClientPort);
|
|
|
+
|
|
|
+ reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1);
|
|
|
+
|
|
|
+ zkArr[serverIndex].close();
|
|
|
+ zkArr[serverIndex] = new ZooKeeper("127.0.0.1:"
|
|
|
+ + oldClientPort,
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
+ public void process(WatchedEvent event) {}});
|
|
|
+
|
|
|
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
|
|
|
+ testServerHasConfig(zkArr[serverIndex], joiningServers, null);
|
|
|
+ Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort());
|
|
|
+ }
|
|
|
+ closeAllHandles(zkArr);
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testUnspecifiedClientAddress() throws Exception {
|
|
|
int[] ports = new int[3];
|