|
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.zip.CheckedOutputStream;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
|
import org.apache.jute.OutputArchive;
|
|
@@ -59,7 +60,7 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(PurgeTxnTest.class);
|
|
|
private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
|
|
|
private static final int CONNECTION_TIMEOUT = 3000;
|
|
|
- private static final long OP_TIMEOUT_IN_MILLIS = 90000;
|
|
|
+ private static final long OP_TIMEOUT_IN_MILLIS = 120000;
|
|
|
private File tmpDir;
|
|
|
|
|
|
@Before
|
|
@@ -561,25 +562,25 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
Thread[] ths = new Thread[thCount];
|
|
|
final List<String> znodes = Collections.synchronizedList(new ArrayList<String>());
|
|
|
final CountDownLatch finished = new CountDownLatch(thCount);
|
|
|
+ final AtomicReference<Exception> exception = new AtomicReference<>();
|
|
|
for (int indx = 0; indx < thCount; indx++) {
|
|
|
final String myprefix = prefix + "-" + indx;
|
|
|
- Thread th = new Thread() {
|
|
|
- public void run() {
|
|
|
- for (int i = 0; i < 1000; i++) {
|
|
|
- try {
|
|
|
- String mynode = myprefix + "-" + i;
|
|
|
- znodes.add(mynode);
|
|
|
- zk.create(mynode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Unexpected exception", e);
|
|
|
- }
|
|
|
- if (i == 200) {
|
|
|
- doPurge.countDown();
|
|
|
- }
|
|
|
+ Thread th = new Thread(() -> {
|
|
|
+ for (int i = 0; i < 750; i++) {
|
|
|
+ try {
|
|
|
+ String mynode = myprefix + "-" + i;
|
|
|
+ znodes.add(mynode);
|
|
|
+ zk.create(mynode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Unexpected exception during ZkClient ops", e);
|
|
|
+ exception.set(e);
|
|
|
+ }
|
|
|
+ if (i == 200) {
|
|
|
+ doPurge.countDown();
|
|
|
}
|
|
|
- finished.countDown();
|
|
|
}
|
|
|
- };
|
|
|
+ finished.countDown();
|
|
|
+ });
|
|
|
ths[indx] = th;
|
|
|
}
|
|
|
|
|
@@ -587,7 +588,12 @@ public class PurgeTxnTest extends ZKTestCase {
|
|
|
thread.start();
|
|
|
}
|
|
|
try {
|
|
|
- assertTrue("ZkClient ops is not finished!", finished.await(OP_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS));
|
|
|
+ boolean operationsFinishedSuccessfully = finished.await(OP_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS);
|
|
|
+ if (exception.get() != null) {
|
|
|
+ LOG.error("unexpected exception during running ZkClient ops:", exception.get());
|
|
|
+ fail("unexpected exception during running ZkClient ops, see in the logs above");
|
|
|
+ }
|
|
|
+ assertTrue("ZkClient ops not finished in time!", operationsFinishedSuccessfully);
|
|
|
} catch (InterruptedException ie) {
|
|
|
LOG.error("Unexpected exception", ie);
|
|
|
fail("Unexpected exception occurred!");
|