|
@@ -5,6 +5,10 @@ import java.nio.file.Files;
|
|
|
import java.nio.file.Path;
|
|
|
import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+import javax.security.sasl.SaslException;
|
|
|
import org.apache.zookeeper.server.DatadirCleanupManager;
|
|
|
import org.apache.zookeeper.server.ExitCode;
|
|
|
import org.apache.zookeeper.server.ServerConfig;
|
|
@@ -72,6 +76,11 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
|
|
|
|
|
|
@Override
|
|
|
public void start() throws Exception {
|
|
|
+ start(Integer.MAX_VALUE);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void start(long startupTimeout) throws Exception {
|
|
|
switch (exitHandler) {
|
|
|
case EXIT:
|
|
|
ServiceUtils.setSystemExitProcedure(ServiceUtils.SYSTEM_EXIT);
|
|
@@ -83,12 +92,23 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
|
|
|
ServiceUtils.setSystemExitProcedure(ServiceUtils.SYSTEM_EXIT);
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
+ final CompletableFuture<String> started = new CompletableFuture<>();
|
|
|
|
|
|
if (config.getServers().size() > 1 || config.isDistributed()) {
|
|
|
LOG.info("Running ZK Server in single Quorum MODE");
|
|
|
|
|
|
- maincluster = new QuorumPeerMain();
|
|
|
+ maincluster = new QuorumPeerMain() {
|
|
|
+ protected QuorumPeer getQuorumPeer() throws SaslException {
|
|
|
+ return new QuorumPeer() {
|
|
|
+ @Override
|
|
|
+ public void start() {
|
|
|
+ super.start();
|
|
|
+ LOG.info("ZK Server {} started", this);
|
|
|
+ started.complete(null);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
// Start and schedule the the purge task
|
|
|
purgeMgr = new DatadirCleanupManager(config
|
|
@@ -118,7 +138,13 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
|
|
|
thread.start();
|
|
|
} else {
|
|
|
LOG.info("Running ZK Server in single STANDALONE MODE");
|
|
|
- mainsingle = new ZooKeeperServerMain();
|
|
|
+ mainsingle = new ZooKeeperServerMain() {
|
|
|
+ @Override
|
|
|
+ public void serverStarted() {
|
|
|
+ LOG.info("ZK Server started");
|
|
|
+ started.complete(null);
|
|
|
+ }
|
|
|
+ };
|
|
|
purgeMgr = new DatadirCleanupManager(config
|
|
|
.getDataDir(), config.getDataLogDir(), config
|
|
|
.getSnapRetainCount(), config.getPurgeInterval());
|
|
@@ -146,6 +172,34 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
|
|
|
};
|
|
|
thread.start();
|
|
|
}
|
|
|
+
|
|
|
+ try {
|
|
|
+ started.get(startupTimeout, TimeUnit.MILLISECONDS);
|
|
|
+ } catch (TimeoutException err) {
|
|
|
+ LOG.info("Startup timed out, trying to close");
|
|
|
+ close();
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getConnectionString() {
|
|
|
+ if (config.getClientPortAddress() != null) {
|
|
|
+ String raw = config.getClientPortAddress().getHostString() + ":" + config.getClientPortAddress().getPort();
|
|
|
+ return raw.replace("0.0.0.0", "localhost");
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("No client address is configured");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getSecureConnectionString() {
|
|
|
+ if (config.getSecureClientPortAddress() != null) {
|
|
|
+ String raw = config.getSecureClientPortAddress().getHostString() + ":" + config.getSecureClientPortAddress().getPort();
|
|
|
+ return raw.replace("0.0.0.0", "localhost");
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("No client address is configured");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|