|
@@ -571,65 +571,62 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
|
|
|
int leaderIndex = getLeaderId(qu);
|
|
|
int followerIndex = leaderIndex == 1 ? 2 : 1;
|
|
|
|
|
|
- // change leader into observer, and modify all its ports at the same
|
|
|
- // time
|
|
|
- int observerIndex = leaderIndex;
|
|
|
+ // modify follower's client port
|
|
|
|
|
|
- // new ports
|
|
|
- int port1 = PortAssignment.unique();
|
|
|
- int port2 = PortAssignment.unique();
|
|
|
- int port3 = PortAssignment.unique();
|
|
|
- joiningServers.add("server." + observerIndex + "=localhost:" + port1
|
|
|
- + ":" + port2 + ":observer;localhost:" + port3);
|
|
|
+ int quorumPort = qu.getPeer(followerIndex).peer.getQuorumAddress().getPort();
|
|
|
+ int electionPort = qu.getPeer(followerIndex).peer.getElectionAddress().getPort();
|
|
|
+ int oldClientPort = qu.getPeer(followerIndex).peer.getClientPort();
|
|
|
+ int newClientPort = PortAssignment.unique();
|
|
|
+ joiningServers.add("server." + followerIndex + "=localhost:" + quorumPort
|
|
|
+ + ":" + electionPort + ":participant;localhost:" + newClientPort);
|
|
|
|
|
|
// create a /test znode and check that read/write works before
|
|
|
// any reconfig is invoked
|
|
|
- testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
|
|
|
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
|
|
|
|
|
|
reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
|
|
|
|
|
|
- // the change of port may not be immediate -- we repeatedly
|
|
|
- // invoke an operation expecting it to eventually fail once
|
|
|
- // the client port of observerIndex changes. If it didn't
|
|
|
- // change -- that's an error.
|
|
|
try {
|
|
|
- for (int i=0; i < 30; i++) {
|
|
|
+ for (int i=0; i < 20; i++) {
|
|
|
Thread.sleep(1000);
|
|
|
- zkArr[observerIndex].setData("/test", "teststr".getBytes(), -1);
|
|
|
+ zkArr[followerIndex].setData("/test", "teststr".getBytes(), -1);
|
|
|
}
|
|
|
- Assert.fail("client port didn't change");
|
|
|
} catch (KeeperException.ConnectionLossException e) {
|
|
|
- zkArr[observerIndex] = new ZooKeeper("127.0.0.1:"
|
|
|
- + qu.getPeer(observerIndex).peer.getClientPort(),
|
|
|
- ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
- public void process(WatchedEvent event) {}});
|
|
|
+ Assert.fail("Existing client disconnected when client port changed!");
|
|
|
}
|
|
|
|
|
|
- leaderIndex = getLeaderId(qu);
|
|
|
-
|
|
|
- followerIndex = 1;
|
|
|
- while (followerIndex == leaderIndex || followerIndex == observerIndex)
|
|
|
- followerIndex++;
|
|
|
-
|
|
|
- testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
|
|
|
+ zkArr[followerIndex].close();
|
|
|
+ zkArr[followerIndex] = new ZooKeeper("127.0.0.1:"
|
|
|
+ + oldClientPort,
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
+ public void process(WatchedEvent event) {}});
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ zkArr[followerIndex].setData("/test", "teststr".getBytes(), -1);
|
|
|
+ Assert.fail("New client connected to old client port!");
|
|
|
+ } catch (KeeperException.ConnectionLossException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- testServerHasConfig(zkArr[observerIndex], joiningServers, null);
|
|
|
+ zkArr[followerIndex].close();
|
|
|
+ zkArr[followerIndex] = new ZooKeeper("127.0.0.1:"
|
|
|
+ + newClientPort,
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
|
|
|
+ public void process(WatchedEvent event) {}});
|
|
|
|
|
|
- Assert.assertTrue(qu.getPeer(observerIndex).peer.getQuorumAddress()
|
|
|
- .getPort() == port1);
|
|
|
- Assert.assertTrue(qu.getPeer(observerIndex).peer.getElectionAddress()
|
|
|
- .getPort() == port2);
|
|
|
- Assert.assertTrue(qu.getPeer(observerIndex).peer.getClientPort() == port3);
|
|
|
- Assert.assertTrue(qu.getPeer(observerIndex).peer.getPeerState() == ServerState.OBSERVING);
|
|
|
- Assert.assertTrue(qu.getPeer(observerIndex).peer.getName()
|
|
|
- .endsWith(String.format(":%d", port3)));
|
|
|
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
|
|
|
+ testServerHasConfig(zkArr[followerIndex], joiningServers, null);
|
|
|
+ Assert.assertTrue(qu.getPeer(followerIndex).peer.getName()
|
|
|
+ .endsWith(String.format(":%d", newClientPort)));
|
|
|
|
|
|
joiningServers.clear();
|
|
|
|
|
|
// change leader's leading port - should renounce leadership
|
|
|
|
|
|
- port1 = PortAssignment.unique();
|
|
|
- joiningServers.add("server." + leaderIndex + "=localhost:" + port1
|
|
|
+ int newQuorumPort = PortAssignment.unique();
|
|
|
+ joiningServers.add("server." + leaderIndex + "=localhost:"
|
|
|
+ + newQuorumPort
|
|
|
+ ":"
|
|
|
+ qu.getPeer(leaderIndex).peer.getElectionAddress().getPort()
|
|
|
+ ":participant;localhost:"
|
|
@@ -637,18 +634,15 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
|
|
|
|
|
|
reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
|
|
|
|
|
|
- testNormalOperation(zkArr[observerIndex], zkArr[followerIndex]);
|
|
|
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
|
|
|
|
|
|
Assert.assertTrue(qu.getPeer(leaderIndex).peer.getQuorumAddress()
|
|
|
- .getPort() == port1);
|
|
|
- Assert.assertTrue(qu.getPeer(leaderIndex).peer.leader == null
|
|
|
- && qu.getPeer(leaderIndex).peer.follower != null);
|
|
|
- Assert.assertTrue(qu.getPeer(followerIndex).peer.leader != null
|
|
|
- && qu.getPeer(followerIndex).peer.follower == null);
|
|
|
+ .getPort() == newQuorumPort);
|
|
|
+ Assert.assertTrue(getLeaderId(qu) != leaderIndex); // the leader changed
|
|
|
|
|
|
joiningServers.clear();
|
|
|
|
|
|
- // change in leader election port
|
|
|
+ // change everyone's leader election port
|
|
|
|
|
|
for (int i = 1; i <= 3; i++) {
|
|
|
joiningServers.add("server." + i + "=localhost:"
|