|
@@ -22,20 +22,21 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|
|
import static org.hamcrest.number.OrderingComparison.greaterThan;
|
|
|
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
|
|
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
+import java.io.File;
|
|
|
import java.util.Map;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
-import org.apache.zookeeper.WatchedEvent;
|
|
|
-import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZKTestCase;
|
|
|
import org.apache.zookeeper.ZooDefs;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.metrics.MetricsUtils;
|
|
|
+import org.apache.zookeeper.server.ServerCnxnFactory;
|
|
|
import org.apache.zookeeper.server.ServerMetrics;
|
|
|
import org.apache.zookeeper.server.SyncRequestProcessor;
|
|
|
+import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
|
-import org.apache.zookeeper.test.QuorumUtil;
|
|
|
+import org.junit.jupiter.api.AfterEach;
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
+import org.junit.jupiter.api.io.TempDir;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -43,43 +44,53 @@ public class FileTxnSnapLogMetricsTest extends ZKTestCase {
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLogMetricsTest.class);
|
|
|
|
|
|
- CountDownLatch allCreatedLatch;
|
|
|
+ @TempDir
|
|
|
+ File logDir;
|
|
|
|
|
|
- private class MockWatcher implements Watcher {
|
|
|
+ @TempDir
|
|
|
+ File snapDir;
|
|
|
|
|
|
- @Override
|
|
|
- public void process(WatchedEvent e) {
|
|
|
- LOG.info("all nodes created");
|
|
|
- allCreatedLatch.countDown();
|
|
|
- }
|
|
|
+ private ServerCnxnFactory startServer() throws Exception {
|
|
|
+ ZooKeeperServer zkServer = new ZooKeeperServer(snapDir, logDir, 3000);
|
|
|
+ ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(0, -1);
|
|
|
+ cnxnFactory.startup(zkServer);
|
|
|
+ return cnxnFactory;
|
|
|
+ }
|
|
|
|
|
|
+ @AfterEach
|
|
|
+ public void cleanup() throws Exception {
|
|
|
+ SyncRequestProcessor.setSnapCount(ZooKeeperServer.getSnapCount());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testFileTxnSnapLogMetrics() throws Exception {
|
|
|
SyncRequestProcessor.setSnapCount(100);
|
|
|
|
|
|
- QuorumUtil util = new QuorumUtil(1);
|
|
|
- util.startAll();
|
|
|
+ ServerCnxnFactory cnxnFactory = startServer();
|
|
|
+ String connectString = "127.0.0.1:" + cnxnFactory.getLocalPort();
|
|
|
|
|
|
- allCreatedLatch = new CountDownLatch(1);
|
|
|
+ // Snapshot in load data.
|
|
|
+ assertEquals(1L, MetricsUtils.currentServerMetrics().get("cnt_snapshottime"));
|
|
|
|
|
|
byte[] data = new byte[500];
|
|
|
- // make sure a snapshot is taken and some txns are not in a snapshot
|
|
|
- ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
|
|
|
+ ZooKeeper zk = ClientBase.createZKClient(connectString);
|
|
|
for (int i = 0; i < 150; i++) {
|
|
|
zk.create("/path" + i, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
}
|
|
|
|
|
|
- if (null == zk.exists("/path149", new MockWatcher())) {
|
|
|
- allCreatedLatch.await();
|
|
|
- }
|
|
|
+ // It is possible that above writes will trigger more than one snapshot due to randomization.
|
|
|
+ WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L;
|
|
|
+ waitFor("no snapshot in 10s", newSnapshot, 10);
|
|
|
+
|
|
|
+ // Pauses snapshot and logs more txns.
|
|
|
+ cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close();
|
|
|
+ zk.create("/" + 1000, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ zk.create("/" + 1001, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
|
|
+ // Restart server to count startup metrics.
|
|
|
+ cnxnFactory.shutdown();
|
|
|
ServerMetrics.getMetrics().resetAll();
|
|
|
- int leader = util.getLeaderServer();
|
|
|
- // restart a server so it will read the snapshot and the txn logs
|
|
|
- util.shutdown(leader);
|
|
|
- util.start(leader);
|
|
|
+ cnxnFactory = startServer();
|
|
|
|
|
|
Map<String, Object> values = MetricsUtils.currentServerMetrics();
|
|
|
LOG.info("txn loaded during start up {}", values.get("max_startup_txns_loaded"));
|
|
@@ -90,7 +101,7 @@ public class FileTxnSnapLogMetricsTest extends ZKTestCase {
|
|
|
assertEquals(1L, values.get("cnt_startup_snap_load_time"));
|
|
|
assertThat((long) values.get("max_startup_snap_load_time"), greaterThan(0L));
|
|
|
|
|
|
- util.shutdownAll();
|
|
|
+ cnxnFactory.shutdown();
|
|
|
}
|
|
|
|
|
|
}
|