|
@@ -18,13 +18,17 @@
|
|
|
|
|
|
package org.apache.zookeeper.test;
|
|
|
|
|
|
+import static org.junit.Assert.assertArrayEquals;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
@@ -34,8 +38,11 @@ import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.AsyncCallback;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.TestableZooKeeper;
|
|
|
+import org.apache.zookeeper.WatchedEvent;
|
|
|
import org.apache.zookeeper.ZKTestCase;
|
|
|
import org.apache.zookeeper.ZooDefs;
|
|
|
+import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.server.ZKDatabase;
|
|
|
import org.apache.zookeeper.server.quorum.Leader;
|
|
@@ -444,6 +451,24 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
return zk;
|
|
|
}
|
|
|
|
|
|
+ private static TestableZooKeeper createTestableClient(String hp)
|
|
|
+ throws IOException, TimeoutException, InterruptedException
|
|
|
+ {
|
|
|
+ CountdownWatcher watcher = new CountdownWatcher();
|
|
|
+ return createTestableClient(watcher, hp);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static TestableZooKeeper createTestableClient(
|
|
|
+ CountdownWatcher watcher, String hp)
|
|
|
+ throws IOException, TimeoutException, InterruptedException
|
|
|
+ {
|
|
|
+ TestableZooKeeper zk = new TestableZooKeeper(
|
|
|
+ hp, ClientBase.CONNECTION_TIMEOUT, watcher);
|
|
|
+
|
|
|
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
+ return zk;
|
|
|
+ }
|
|
|
+
|
|
|
private void verifyState(QuorumUtil qu, int index, Leader leader) {
|
|
|
assertTrue("Not following", qu.getPeer(index).peer.follower != null);
|
|
|
long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
|
|
@@ -479,5 +504,86 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
|
|
|
assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFollowerSendsLastZxid() throws Exception {
|
|
|
+ QuorumUtil qu = new QuorumUtil(1);
|
|
|
+ qu.startAll();
|
|
|
+
|
|
|
+ int index = 1;
|
|
|
+ while(qu.getPeer(index).peer.follower == null) {
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ LOG.info("Connecting to follower:" + index);
|
|
|
+
|
|
|
+ TestableZooKeeper zk =
|
|
|
+ createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
|
|
|
+
|
|
|
+ assertEquals(0L, zk.testableLastZxid());
|
|
|
+ zk.exists("/", false);
|
|
|
+ long lzxid = zk.testableLastZxid();
|
|
|
+ assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private class MyWatcher extends CountdownWatcher {
|
|
|
+ LinkedBlockingQueue<WatchedEvent> events =
|
|
|
+ new LinkedBlockingQueue<WatchedEvent>();
|
|
|
+
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ super.process(event);
|
|
|
+ if (event.getType() != Event.EventType.None) {
|
|
|
+ try {
|
|
|
+ events.put(event);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("ignoring interrupt during event.put");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify that the server is sending the proper zxid, and as a result
|
|
|
+ * the watch doesn't fire. See ZOOKEEPER-1412.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFollowerWatcherResync() throws Exception {
|
|
|
+ QuorumUtil qu = new QuorumUtil(1);
|
|
|
+ qu.startAll();
|
|
|
+
|
|
|
+ int index = 1;
|
|
|
+ while(qu.getPeer(index).peer.follower == null) {
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ LOG.info("Connecting to follower:" + index);
|
|
|
+
|
|
|
+ TestableZooKeeper zk1 = createTestableClient(
|
|
|
+ "localhost:" + qu.getPeer(index).peer.getClientPort());
|
|
|
+ zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
+
|
|
|
+ MyWatcher watcher = new MyWatcher();
|
|
|
+ TestableZooKeeper zk2 = createTestableClient(watcher,
|
|
|
+ "localhost:" + qu.getPeer(index).peer.getClientPort());
|
|
|
+
|
|
|
+ zk2.exists("/foo", true);
|
|
|
+
|
|
|
+ watcher.reset();
|
|
|
+ zk2.testableConnloss();
|
|
|
+ if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
|
|
|
+ {
|
|
|
+ fail("Unable to connect to server");
|
|
|
+ }
|
|
|
+ assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
|
|
|
+
|
|
|
+ assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
|
|
|
+
|
|
|
+ zk1.close();
|
|
|
+ zk2.close();
|
|
|
}
|
|
|
+
|
|
|
}
|