|
@@ -27,6 +27,7 @@ import java.util.Collection;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -43,11 +44,10 @@ import org.junit.Test;
|
|
|
|
|
|
|
|
|
public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
- private volatile int counter = 0;
|
|
|
-
|
|
|
private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
|
|
|
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
|
|
|
|
|
|
+ private volatile int counter = 0;
|
|
|
|
|
|
/**
|
|
|
* See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
|
|
@@ -63,8 +63,9 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
* @throws KeeperException
|
|
|
*/
|
|
|
@Test
|
|
|
- public void testResyncBySnapThenDiffAfterFollowerCrashes ()
|
|
|
- throws IOException, InterruptedException, KeeperException, Throwable{
|
|
|
+ public void testResyncBySnapThenDiffAfterFollowerCrashes()
|
|
|
+ throws IOException, InterruptedException, KeeperException, Throwable
|
|
|
+ {
|
|
|
final Semaphore sem = new Semaphore(0);
|
|
|
|
|
|
QuorumUtil qu = new QuorumUtil(1);
|
|
@@ -74,32 +75,37 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
CountdownWatcher watcher3 = new CountdownWatcher();
|
|
|
|
|
|
int index = 1;
|
|
|
- while(qu.getPeer(index).peer.leader == null)
|
|
|
+ while(qu.getPeer(index).peer.leader == null) {
|
|
|
index++;
|
|
|
+ }
|
|
|
|
|
|
Leader leader = qu.getPeer(index).peer.leader;
|
|
|
-
|
|
|
assertNotNull(leader);
|
|
|
- /*
|
|
|
- * Reusing the index variable to select a follower to connect to
|
|
|
- */
|
|
|
+
|
|
|
+ /* Reusing the index variable to select a follower to connect to */
|
|
|
index = (index == 1) ? 2 : 1;
|
|
|
+ LOG.info("Connecting to follower:" + index);
|
|
|
+
|
|
|
qu.shutdown(index);
|
|
|
- final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000,watcher3);
|
|
|
- watcher3.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
+
|
|
|
+ final ZooKeeper zk3 =
|
|
|
+ createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
|
|
|
+ LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
|
|
|
+
|
|
|
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
|
|
|
|
|
|
qu.restart(index);
|
|
|
- ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
|
|
|
|
|
|
- ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2);
|
|
|
+ final ZooKeeper zk1 =
|
|
|
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
|
|
|
+ LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
|
|
|
|
|
|
- watcher1.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
- watcher2.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
-
|
|
|
- zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
- Thread t = new Thread(new Runnable() {
|
|
|
+ final ZooKeeper zk2 =
|
|
|
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
|
|
|
+ LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
|
|
|
|
|
|
+ zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ Thread mytestfooThread = new Thread(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
for(int i = 0; i < 1000; i++) {
|
|
@@ -111,8 +117,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
if(counter == 14200){
|
|
|
sem.release();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}, null);
|
|
|
if(i%10==0){
|
|
@@ -127,7 +131,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
-
|
|
|
for(int i = 0; i < 13000; i++) {
|
|
|
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
|
@@ -137,8 +140,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
if(counter == 14200){
|
|
|
sem.release();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}, null);
|
|
|
|
|
@@ -151,7 +152,7 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
qu.restart(index);
|
|
|
Thread.sleep(300);
|
|
|
qu.shutdown(index);
|
|
|
- t.start();
|
|
|
+ mytestfooThread.start();
|
|
|
Thread.sleep(300);
|
|
|
qu.restart(index);
|
|
|
LOG.info("Setting up server: " + index);
|
|
@@ -162,29 +163,34 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
|
|
|
if(i%50 == 0) {
|
|
|
zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
-
|
|
|
@Override
|
|
|
public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
counter++;
|
|
|
if(counter == 14200){
|
|
|
sem.release();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}, null);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Wait until all updates return
|
|
|
- if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
|
|
|
+ if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
|
|
|
LOG.warn("Did not aquire semaphore fast enough");
|
|
|
}
|
|
|
- t.join(10000);
|
|
|
+ mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
|
|
|
+ if (mytestfooThread.isAlive()) {
|
|
|
+ LOG.error("mytestfooThread is still alive");
|
|
|
+ }
|
|
|
Thread.sleep(1000);
|
|
|
|
|
|
- verifyState(qu, index, leader);
|
|
|
-
|
|
|
+ verifyState(qu, index, leader);
|
|
|
+
|
|
|
+ zk1.close();
|
|
|
+ zk2.close();
|
|
|
+ zk3.close();
|
|
|
+
|
|
|
+ qu.shutdownAll();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -212,8 +218,9 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
*/
|
|
|
|
|
|
@Test
|
|
|
- public void testResyncByDiffAfterFollowerCrashes ()
|
|
|
- throws IOException, InterruptedException, KeeperException, Throwable{
|
|
|
+ public void testResyncByDiffAfterFollowerCrashes()
|
|
|
+ throws IOException, InterruptedException, KeeperException, Throwable
|
|
|
+ {
|
|
|
final Semaphore sem = new Semaphore(0);
|
|
|
|
|
|
QuorumUtil qu = new QuorumUtil(1);
|
|
@@ -222,33 +229,35 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
CountdownWatcher watcher2 = new CountdownWatcher();
|
|
|
CountdownWatcher watcher3 = new CountdownWatcher();
|
|
|
|
|
|
-
|
|
|
int index = 1;
|
|
|
- while(qu.getPeer(index).peer.leader == null)
|
|
|
+ while(qu.getPeer(index).peer.leader == null) {
|
|
|
index++;
|
|
|
+ }
|
|
|
|
|
|
Leader leader = qu.getPeer(index).peer.leader;
|
|
|
-
|
|
|
assertNotNull(leader);
|
|
|
|
|
|
- /*
|
|
|
- * Reusing the index variable to select a follower to connect to
|
|
|
- */
|
|
|
+ /* Reusing the index variable to select a follower to connect to */
|
|
|
index = (index == 1) ? 2 : 1;
|
|
|
+ LOG.info("Connecting to follower:" + index);
|
|
|
|
|
|
- ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
|
|
|
+ final ZooKeeper zk1 =
|
|
|
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
|
|
|
+ LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
|
|
|
|
|
|
- ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000,watcher2);
|
|
|
- final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000, watcher3);
|
|
|
- watcher1.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
- watcher2.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
- watcher3.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
- zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
- zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
|
|
|
+ final ZooKeeper zk2 =
|
|
|
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
|
|
|
+ LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
|
|
|
+
|
|
|
+ final ZooKeeper zk3 =
|
|
|
+ createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
|
|
|
+ LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
|
|
|
|
|
|
+ zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
|
|
|
|
|
|
final AtomicBoolean runNow = new AtomicBoolean(false);
|
|
|
- Thread t = new Thread(new Runnable() {
|
|
|
+ Thread mytestfooThread = new Thread(new Runnable() {
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -263,8 +272,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
if(counter > 7300){
|
|
|
sem.release();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}, null);
|
|
|
|
|
@@ -273,8 +280,7 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
} catch (Exception e) {
|
|
|
}
|
|
|
inSyncCounter++;
|
|
|
- }
|
|
|
- else {
|
|
|
+ } else {
|
|
|
Thread.yield();
|
|
|
}
|
|
|
}
|
|
@@ -282,7 +288,7 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- t.start();
|
|
|
+ mytestfooThread.start();
|
|
|
for(int i = 0; i < 5000; i++) {
|
|
|
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
|
@@ -292,8 +298,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
if(counter > 7300){
|
|
|
sem.release();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}, null);
|
|
|
|
|
@@ -301,7 +305,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
qu.shutdown(index);
|
|
|
Thread.sleep(1100);
|
|
|
LOG.info("Shutting down s1");
|
|
|
-
|
|
|
}
|
|
|
if(i == 1100 || i == 1150 || i == 1200) {
|
|
|
Thread.sleep(1000);
|
|
@@ -314,7 +317,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
LOG.info("Setting up server: " + index);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
if(i>=1000 && i%2== 0) {
|
|
|
zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
|
@@ -324,8 +326,6 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
if(counter > 7300){
|
|
|
sem.release();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}, null);
|
|
|
}
|
|
@@ -335,15 +335,35 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
}
|
|
|
|
|
|
// Wait until all updates return
|
|
|
- if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
|
|
|
+ if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
|
|
|
LOG.warn("Did not aquire semaphore fast enough");
|
|
|
}
|
|
|
- t.join(10000);
|
|
|
+ mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
|
|
|
+ if (mytestfooThread.isAlive()) {
|
|
|
+ LOG.error("mytestfooThread is still alive");
|
|
|
+ }
|
|
|
+
|
|
|
Thread.sleep(1000);
|
|
|
// Verify that server is following and has the same epoch as the leader
|
|
|
|
|
|
verifyState(qu, index, leader);
|
|
|
|
|
|
+ zk1.close();
|
|
|
+ zk2.close();
|
|
|
+ zk3.close();
|
|
|
+
|
|
|
+ qu.shutdownAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static DisconnectableZooKeeper createClient(int port,
|
|
|
+ CountdownWatcher watcher)
|
|
|
+ throws IOException, TimeoutException, InterruptedException
|
|
|
+ {
|
|
|
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
|
|
|
+ "127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher);
|
|
|
+
|
|
|
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
|
|
|
+ return zk;
|
|
|
}
|
|
|
|
|
|
private void verifyState(QuorumUtil qu, int index, Leader leader) {
|