|
@@ -30,6 +30,7 @@ import java.util.ArrayList;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import javax.security.sasl.SaslException;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
|
+import org.apache.zookeeper.AsyncCallback.StringCallback;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
@@ -195,22 +196,114 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
|
|
|
zk.close();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCloseSessionWhileUpgradeOnLeader()
|
|
|
+ throws IOException, KeeperException, InterruptedException {
|
|
|
+ int leaderId = -1;
|
|
|
+ for (int i = SERVER_COUNT - 1; i >= 0; i--) {
|
|
|
+ if (mt[i].main.quorumPeer.leader != null) {
|
|
|
+ leaderId = i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (leaderId > 0) {
|
|
|
+ makeSureEphemeralIsGone(leaderId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCloseSessionWhileUpgradeOnLearner()
|
|
|
+ throws IOException, KeeperException, InterruptedException {
|
|
|
+ int learnerId = -1;
|
|
|
+ for (int i = SERVER_COUNT - 1; i >= 0; i--) {
|
|
|
+ if (mt[i].main.quorumPeer.follower != null) {
|
|
|
+ learnerId = i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (learnerId > 0) {
|
|
|
+ makeSureEphemeralIsGone(learnerId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void makeSureEphemeralIsGone(int sid)
|
|
|
+ throws IOException, KeeperException, InterruptedException {
|
|
|
+ // Delay submit request to simulate the request queued in
|
|
|
+ // RequestThrottler
|
|
|
+ qpMain[sid].setSubmitDelayMs(200);
|
|
|
+
|
|
|
+ // Create a client and an ephemeral node
|
|
|
+ ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ waitForOne(zk, States.CONNECTED);
|
|
|
+
|
|
|
+ final String node = "/node-1";
|
|
|
+ zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.EPHEMERAL, new StringCallback() {
|
|
|
+ @Override
|
|
|
+ public void processResult(int rc, String path, Object ctx,
|
|
|
+ String name) {}
|
|
|
+ }, null);
|
|
|
+
|
|
|
+ // close the client
|
|
|
+ zk.close();
|
|
|
+
|
|
|
+ // make sure the ephemeral is gone
|
|
|
+ zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ waitForOne(zk, States.CONNECTED);
|
|
|
+ assertNull(zk.exists(node, false));
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+
|
|
|
private static class TestQPMainDropSessionUpgrading extends TestQPMain {
|
|
|
|
|
|
private volatile boolean shouldDrop = false;
|
|
|
+ private volatile int submitDelayMs = 0;
|
|
|
|
|
|
public void setDropCreateSession(boolean dropCreateSession) {
|
|
|
shouldDrop = dropCreateSession;
|
|
|
}
|
|
|
|
|
|
+ public void setSubmitDelayMs(int delay) {
|
|
|
+ this.submitDelayMs = delay;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected QuorumPeer getQuorumPeer() throws SaslException {
|
|
|
return new QuorumPeer() {
|
|
|
|
|
|
+ @Override
|
|
|
+ protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
|
|
|
+ return new Leader(this, new LeaderZooKeeperServer(
|
|
|
+ logFactory, this, this.getZkDb()) {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void submitRequestNow(Request si) {
|
|
|
+ if (submitDelayMs > 0) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(submitDelayMs);
|
|
|
+ } catch (Exception e) {}
|
|
|
+ }
|
|
|
+ super.submitRequestNow(si);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
|
|
|
|
|
|
- return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
|
|
|
+ return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void submitRequestNow(Request si) {
|
|
|
+ if (submitDelayMs > 0) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(submitDelayMs);
|
|
|
+ } catch (Exception e) {}
|
|
|
+ }
|
|
|
+ super.submitRequestNow(si);
|
|
|
+ }
|
|
|
+
|
|
|
+ }) {
|
|
|
|
|
|
@Override
|
|
|
protected void request(Request request) throws IOException {
|