|
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
-import org.apache.zookeeper.AsyncCallback;
|
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
import org.apache.zookeeper.CreateMode;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.TestableZooKeeper;
|
|
import org.apache.zookeeper.TestableZooKeeper;
|
|
@@ -242,18 +241,14 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
public void run() {
|
|
public void run() {
|
|
for (int i = 0; i < 3000; i++) {
|
|
for (int i = 0; i < 3000; i++) {
|
|
// Here we create 3000 znodes
|
|
// Here we create 3000 znodes
|
|
- zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
|
- pending.decrementAndGet();
|
|
|
|
- counter.incrementAndGet();
|
|
|
|
- if (rc != 0) {
|
|
|
|
- errors.incrementAndGet();
|
|
|
|
- }
|
|
|
|
- if (counter.get() == 16200) {
|
|
|
|
- sem.release();
|
|
|
|
- }
|
|
|
|
|
|
+ zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
|
|
|
|
+ pending.decrementAndGet();
|
|
|
|
+ counter.incrementAndGet();
|
|
|
|
+ if (rc != 0) {
|
|
|
|
+ errors.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ if (counter.get() == 16200) {
|
|
|
|
+ sem.release();
|
|
}
|
|
}
|
|
}, null);
|
|
}, null);
|
|
pending.incrementAndGet();
|
|
pending.incrementAndGet();
|
|
@@ -273,18 +268,14 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
// initial data is written.
|
|
// initial data is written.
|
|
for (int i = 0; i < 13000; i++) {
|
|
for (int i = 0; i < 13000; i++) {
|
|
// Here we create 13000 znodes
|
|
// Here we create 13000 znodes
|
|
- zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
|
- pending.decrementAndGet();
|
|
|
|
- counter.incrementAndGet();
|
|
|
|
- if (rc != 0) {
|
|
|
|
- errors.incrementAndGet();
|
|
|
|
- }
|
|
|
|
- if (counter.get() == 16200) {
|
|
|
|
- sem.release();
|
|
|
|
- }
|
|
|
|
|
|
+ zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
|
|
|
|
+ pending.decrementAndGet();
|
|
|
|
+ counter.incrementAndGet();
|
|
|
|
+ if (rc != 0) {
|
|
|
|
+ errors.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ if (counter.get() == 16200) {
|
|
|
|
+ sem.release();
|
|
}
|
|
}
|
|
}, null);
|
|
}, null);
|
|
pending.incrementAndGet();
|
|
pending.incrementAndGet();
|
|
@@ -314,17 +305,14 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
if (i % 50 == 0) {
|
|
if (i % 50 == 0) {
|
|
- zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
- @Override
|
|
|
|
- public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
|
- pending.decrementAndGet();
|
|
|
|
- counter.incrementAndGet();
|
|
|
|
- if (rc != 0) {
|
|
|
|
- errors.incrementAndGet();
|
|
|
|
- }
|
|
|
|
- if (counter.get() == 16200) {
|
|
|
|
- sem.release();
|
|
|
|
- }
|
|
|
|
|
|
+ zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
|
|
|
|
+ pending.decrementAndGet();
|
|
|
|
+ counter.incrementAndGet();
|
|
|
|
+ if (rc != 0) {
|
|
|
|
+ errors.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ if (counter.get() == 16200) {
|
|
|
|
+ sem.release();
|
|
}
|
|
}
|
|
}, null);
|
|
}, null);
|
|
pending.incrementAndGet();
|
|
pending.incrementAndGet();
|
|
@@ -417,18 +405,14 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
int inSyncCounter = 0;
|
|
int inSyncCounter = 0;
|
|
while (inSyncCounter < 400) {
|
|
while (inSyncCounter < 400) {
|
|
if (runNow.get()) {
|
|
if (runNow.get()) {
|
|
- zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
|
- pending.decrementAndGet();
|
|
|
|
- counter.incrementAndGet();
|
|
|
|
- if (rc != 0) {
|
|
|
|
- errors.incrementAndGet();
|
|
|
|
- }
|
|
|
|
- if (counter.get() > 7300) {
|
|
|
|
- sem.release();
|
|
|
|
- }
|
|
|
|
|
|
+ zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
|
|
|
|
+ pending.decrementAndGet();
|
|
|
|
+ counter.incrementAndGet();
|
|
|
|
+ if (rc != 0) {
|
|
|
|
+ errors.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ if (counter.get() > 7300) {
|
|
|
|
+ sem.release();
|
|
}
|
|
}
|
|
}, null);
|
|
}, null);
|
|
pending.incrementAndGet();
|
|
pending.incrementAndGet();
|
|
@@ -447,18 +431,14 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
|
|
|
|
mytestfooThread.start();
|
|
mytestfooThread.start();
|
|
for (int i = 0; i < 5000; i++) {
|
|
for (int i = 0; i < 5000; i++) {
|
|
- zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
|
- pending.decrementAndGet();
|
|
|
|
- counter.incrementAndGet();
|
|
|
|
- if (rc != 0) {
|
|
|
|
- errors.incrementAndGet();
|
|
|
|
- }
|
|
|
|
- if (counter.get() > 7300) {
|
|
|
|
- sem.release();
|
|
|
|
- }
|
|
|
|
|
|
+ zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
|
|
|
|
+ pending.decrementAndGet();
|
|
|
|
+ counter.incrementAndGet();
|
|
|
|
+ if (rc != 0) {
|
|
|
|
+ errors.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ if (counter.get() > 7300) {
|
|
|
|
+ sem.release();
|
|
}
|
|
}
|
|
}, null);
|
|
}, null);
|
|
pending.incrementAndGet();
|
|
pending.incrementAndGet();
|
|
@@ -479,18 +459,14 @@ public class FollowerResyncConcurrencyTest extends ZKTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
if (i >= 1000 && i % 2 == 0) {
|
|
if (i >= 1000 && i % 2 == 0) {
|
|
- zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
|
- pending.decrementAndGet();
|
|
|
|
- counter.incrementAndGet();
|
|
|
|
- if (rc != 0) {
|
|
|
|
- errors.incrementAndGet();
|
|
|
|
- }
|
|
|
|
- if (counter.get() > 7300) {
|
|
|
|
- sem.release();
|
|
|
|
- }
|
|
|
|
|
|
+ zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
|
|
|
|
+ pending.decrementAndGet();
|
|
|
|
+ counter.incrementAndGet();
|
|
|
|
+ if (rc != 0) {
|
|
|
|
+ errors.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ if (counter.get() > 7300) {
|
|
|
|
+ sem.release();
|
|
}
|
|
}
|
|
}, null);
|
|
}, null);
|
|
pending.incrementAndGet();
|
|
pending.incrementAndGet();
|