|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
|
|
|
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.configureSSLAuth;
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
@@ -152,15 +153,20 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
public final MultipleAddresses quorumAddr;
|
|
|
public final MultipleAddresses electionAddr;
|
|
|
public final InetSocketAddress clientAddr;
|
|
|
+ public final InetSocketAddress secureClientAddr;
|
|
|
|
|
|
- public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
|
|
|
+ public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr, InetSocketAddress secureClientAddr) {
|
|
|
this.quorumAddr = quorumAddr;
|
|
|
this.electionAddr = electionAddr;
|
|
|
this.clientAddr = clientAddr;
|
|
|
+ this.secureClientAddr = secureClientAddr;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
+ private Boolean isClientAddrFromStatic = null;
|
|
|
+ private Boolean isSecureClientAddrFromStatic = null;
|
|
|
+
|
|
|
private int observerMasterPort;
|
|
|
|
|
|
public int getObserverMasterPort() {
|
|
@@ -216,6 +222,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
public MultipleAddresses electionAddr = new MultipleAddresses();
|
|
|
|
|
|
public InetSocketAddress clientAddr = null;
|
|
|
+ public InetSocketAddress secureClientAddr = null;
|
|
|
|
|
|
public long id;
|
|
|
|
|
@@ -224,20 +231,31 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
public LearnerType type = LearnerType.PARTICIPANT;
|
|
|
|
|
|
public boolean isClientAddrFromStatic = false;
|
|
|
+ public boolean isSecureClientAddrFromStatic = false;
|
|
|
|
|
|
private List<InetSocketAddress> myAddrs;
|
|
|
|
|
|
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
|
|
|
- this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
|
|
|
+ this(id, addr, electionAddr, clientAddr, null, LearnerType.PARTICIPANT);
|
|
|
+ }
|
|
|
+
|
|
|
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr,
|
|
|
+ InetSocketAddress secureClientAddr) {
|
|
|
+ this(id, addr, electionAddr, clientAddr, secureClientAddr, LearnerType.PARTICIPANT);
|
|
|
+ }
|
|
|
+
|
|
|
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr,
|
|
|
+ LearnerType learnerType) {
|
|
|
+ this(id, addr, electionAddr, clientAddr, null, learnerType);
|
|
|
}
|
|
|
|
|
|
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
|
|
|
- this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
|
|
|
+ this(id, addr, electionAddr, null, null, LearnerType.PARTICIPANT);
|
|
|
}
|
|
|
|
|
|
// VisibleForTesting
|
|
|
public QuorumServer(long id, InetSocketAddress addr) {
|
|
|
- this(id, addr, null, null, LearnerType.PARTICIPANT);
|
|
|
+ this(id, addr, null, null, null, LearnerType.PARTICIPANT);
|
|
|
}
|
|
|
|
|
|
public long getId() {
|
|
@@ -284,10 +302,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
|
|
|
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
|
|
|
- this(id, addr, electionAddr, null, type);
|
|
|
+ this(id, addr, electionAddr, null, null, type);
|
|
|
}
|
|
|
|
|
|
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
|
|
|
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, InetSocketAddress secureClientAddr, LearnerType type) {
|
|
|
this.id = id;
|
|
|
if (addr != null) {
|
|
|
this.addr.addAddress(addr);
|
|
@@ -297,6 +315,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
this.type = type;
|
|
|
this.clientAddr = clientAddr;
|
|
|
+ this.secureClientAddr = secureClientAddr;
|
|
|
|
|
|
setMyAddrs();
|
|
|
}
|
|
@@ -304,14 +323,15 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
private static final String wrongFormat =
|
|
|
" does not have the form server_config or server_config;client_config"
|
|
|
+ " where server_config is the pipe separated list of host:port:port or host:port:port:type"
|
|
|
- + " and client_config is port or host:port";
|
|
|
+ + " and client_config is host:clientPort;host:secureClientPort or clientPort or host:clientPort"
|
|
|
+ + " or ';secureClientPort' or ';host:secureClientPort'";
|
|
|
|
|
|
private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
|
|
|
LearnerType newType = null;
|
|
|
String[] serverClientParts = addressStr.split(";");
|
|
|
String[] serverAddresses = serverClientParts[0].split("\\|");
|
|
|
|
|
|
- if (serverClientParts.length == 2) {
|
|
|
+ if (serverClientParts.length >= 2 && !serverClientParts[1].isEmpty()) {
|
|
|
String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
|
|
|
if (clientParts.length > 2) {
|
|
|
throw new ConfigException(addressStr + wrongFormat);
|
|
@@ -322,8 +342,25 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
try {
|
|
|
clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
|
|
|
} catch (NumberFormatException e) {
|
|
|
- throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
|
|
|
+ throw new ConfigException("Address unresolved: " + clientHostName + ":" + clientParts[clientParts.length - 1]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (serverClientParts.length == 3 && !serverClientParts[2].isEmpty()) {
|
|
|
+ String[] secureClientParts = ConfigUtils.getHostAndPort(serverClientParts[2]);
|
|
|
+ if (secureClientParts.length > 2) {
|
|
|
+ throw new ConfigException(addressStr + wrongFormat);
|
|
|
+ }
|
|
|
+
|
|
|
+ // is secure client config a host:port or just a port
|
|
|
+ String secureClientHostName = (secureClientParts.length == 2) ? secureClientParts[0] : "0.0.0.0";
|
|
|
+ try {
|
|
|
+ secureClientAddr = new InetSocketAddress(secureClientHostName, Integer.parseInt(secureClientParts[secureClientParts.length - 1]));
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ throw new ConfigException("Address unresolved: " + secureClientHostName + ":" + secureClientParts[secureClientParts.length - 1]);
|
|
|
}
|
|
|
+ // set x509 auth provider if not already set
|
|
|
+ configureSSLAuth();
|
|
|
}
|
|
|
|
|
|
boolean multiAddressEnabled = Boolean.parseBoolean(
|
|
@@ -338,9 +375,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));
|
|
|
|
|
|
for (String serverAddress : serverAddresses) {
|
|
|
- String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
|
|
|
- if ((serverClientParts.length > 2) || (serverParts.length < 3)
|
|
|
- || (serverParts.length > 4)) {
|
|
|
+ String[] serverParts = ConfigUtils.getHostAndPort(serverAddress);
|
|
|
+ if ((serverParts.length < 3) || (serverParts.length > 4)) {
|
|
|
throw new ConfigException(addressStr + wrongFormat);
|
|
|
}
|
|
|
|
|
@@ -415,6 +451,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
this.myAddrs = new ArrayList<>();
|
|
|
this.myAddrs.addAll(this.addr.getAllAddresses());
|
|
|
this.myAddrs.add(this.clientAddr);
|
|
|
+ this.myAddrs.add(this.secureClientAddr);
|
|
|
this.myAddrs.addAll(this.electionAddr.getAllAddresses());
|
|
|
this.myAddrs = excludedSpecialAddresses(this.myAddrs);
|
|
|
}
|
|
@@ -448,13 +485,24 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
sw.append(":participant");
|
|
|
}
|
|
|
|
|
|
+ boolean clientPortSpecAdded = false;
|
|
|
if (clientAddr != null && !isClientAddrFromStatic) {
|
|
|
+ clientPortSpecAdded = true;
|
|
|
sw.append(";");
|
|
|
sw.append(delimitedHostString(clientAddr));
|
|
|
sw.append(":");
|
|
|
sw.append(String.valueOf(clientAddr.getPort()));
|
|
|
}
|
|
|
|
|
|
+ if (secureClientAddr != null & !isSecureClientAddrFromStatic) {
|
|
|
+ if (!clientPortSpecAdded) {
|
|
|
+ sw.append(";");
|
|
|
+ }
|
|
|
+ sw.append(";");
|
|
|
+ sw.append(delimitedHostString(secureClientAddr));
|
|
|
+ sw.append(":");
|
|
|
+ sw.append(String.valueOf(secureClientAddr.getPort()));
|
|
|
+ }
|
|
|
return sw.toString();
|
|
|
}
|
|
|
|
|
@@ -463,7 +511,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
return 42; // any arbitrary constant will do
|
|
|
}
|
|
|
|
|
|
- private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
|
|
|
+ private static boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
|
|
|
return (addr1 != null || addr2 == null)
|
|
|
&& (addr1 == null || addr2 != null)
|
|
|
&& (addr1 == null || addr2 == null || addr1.equals(addr2));
|
|
@@ -483,12 +531,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
if (!electionAddr.equals(qs.electionAddr)) {
|
|
|
return false;
|
|
|
}
|
|
|
- return checkAddressesEqual(clientAddr, qs.clientAddr);
|
|
|
+ if (!checkAddressesEqual(clientAddr, qs.clientAddr)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return checkAddressesEqual(secureClientAddr, qs.secureClientAddr);
|
|
|
}
|
|
|
|
|
|
public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
|
|
|
List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
|
|
|
otherAddrs.add(s.clientAddr);
|
|
|
+ otherAddrs.add(s.secureClientAddr);
|
|
|
otherAddrs.addAll(s.electionAddr.getAllAddresses());
|
|
|
otherAddrs = excludedSpecialAddresses(otherAddrs);
|
|
|
|
|
@@ -709,6 +761,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
* value of one indicates the default backlog will be used.
|
|
|
*/
|
|
|
protected int clientPortListenBacklog = -1;
|
|
|
+ protected int maxClientCnxns = -1;
|
|
|
|
|
|
/**
|
|
|
* The number of ticks that the initial synchronization phase can take
|
|
@@ -994,7 +1047,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
if (qs != null) {
|
|
|
qs.recreateSocketAddresses();
|
|
|
if (id == getMyId()) {
|
|
|
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
|
|
|
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr, qs.secureClientAddr);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1040,9 +1093,15 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
return (addrs == null) ? null : addrs.clientAddr;
|
|
|
}
|
|
|
|
|
|
- private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
|
|
|
+ public InetSocketAddress getSecureClientAddress() {
|
|
|
+ final AddressTuple addrs = myAddrs.get();
|
|
|
+ return (addrs == null) ? null : addrs.secureClientAddr;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr,
|
|
|
+ InetSocketAddress secureClientAddr) {
|
|
|
synchronized (QV_LOCK) {
|
|
|
- myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
|
|
|
+ myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr, secureClientAddr));
|
|
|
QV_LOCK.notifyAll();
|
|
|
}
|
|
|
}
|
|
@@ -1305,7 +1364,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
|
|
|
QuorumServer quorumServer = quorumPeers.get(myid);
|
|
|
if (null == quorumServer) {
|
|
|
- throw new IOException("No QuorumServer correspoding to myid " + myid);
|
|
|
+ throw new IOException("No QuorumServer corresponding to myid " + myid);
|
|
|
}
|
|
|
if (null == quorumServer.clientAddr) {
|
|
|
return new InetSocketAddress(clientPort);
|
|
@@ -1825,6 +1884,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
this.clientPortListenBacklog = backlog;
|
|
|
}
|
|
|
|
|
|
+ /** The server max client connections */
|
|
|
+ public int getMaxClientCnxns() {
|
|
|
+ return maxClientCnxns;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Sets the server's max client connections */
|
|
|
+ public void setMaxClientCnxns(int maxClientCnxns) {
|
|
|
+ this.maxClientCnxns = maxClientCnxns;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get the number of ticks that the initial synchronization phase can take
|
|
|
*/
|
|
@@ -1975,7 +2044,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
try {
|
|
|
String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
|
|
|
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
|
|
|
- QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
|
|
|
+ QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig(), needEraseSecureClientInfoFromStaticConfig());
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Error closing file", e);
|
|
|
}
|
|
@@ -1989,7 +2058,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
QuorumServer qs = qv.getAllMembers().get(getMyId());
|
|
|
if (qs != null) {
|
|
|
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
|
|
|
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr, qs.secureClientAddr);
|
|
|
+
|
|
|
+ // we only set this once, because quorum verifier can change based on dynamic reconfig
|
|
|
+ if (isClientAddrFromStatic == null) {
|
|
|
+ isClientAddrFromStatic = qs.isClientAddrFromStatic;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isSecureClientAddrFromStatic == null) {
|
|
|
+ isSecureClientAddrFromStatic = qs.isSecureClientAddrFromStatic;
|
|
|
+ }
|
|
|
}
|
|
|
updateObserverMasterList();
|
|
|
return prevQV;
|
|
@@ -2005,6 +2083,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
|
|
|
}
|
|
|
|
|
|
+ private boolean needEraseSecureClientInfoFromStaticConfig() {
|
|
|
+ QuorumServer server = quorumVerifier.getAllMembers().get(getMyId());
|
|
|
+ return (server != null && server.secureClientAddr != null && !server.isSecureClientAddrFromStatic);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get an instance of LeaderElection
|
|
|
*/
|
|
@@ -2270,6 +2353,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
|
|
|
InetSocketAddress oldClientAddr = getClientAddress();
|
|
|
+ InetSocketAddress oldSecureClientAddr = getSecureClientAddress();
|
|
|
|
|
|
// update last committed quorum verifier, write the new config to disk
|
|
|
// and restart leader election if config changed.
|
|
@@ -2293,8 +2377,53 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
|
|
|
QuorumServer myNewQS = newMembers.get(getMyId());
|
|
|
- if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
|
|
|
- cnxnFactory.reconfigure(myNewQS.clientAddr);
|
|
|
+ if (myNewQS != null) {
|
|
|
+ if (myNewQS.clientAddr == null) {
|
|
|
+ if (!isClientAddrFromStatic && oldClientAddr != null && cnxnFactory != null) {
|
|
|
+ // clientAddr omitted in new config, shutdown cnxnFactory
|
|
|
+ cnxnFactory.shutdown();
|
|
|
+ cnxnFactory = null;
|
|
|
+ }
|
|
|
+ } else if (!myNewQS.clientAddr.equals(oldClientAddr)) {
|
|
|
+ // clientAddr has changed
|
|
|
+ if (cnxnFactory == null) {
|
|
|
+ // start cnxnFactory first
|
|
|
+ try {
|
|
|
+ cnxnFactory = ServerCnxnFactory.createFactory();
|
|
|
+ cnxnFactory.configure(myNewQS.clientAddr, getMaxClientCnxns(), getClientPortListenBacklog(), false);
|
|
|
+ cnxnFactory.start();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ cnxnFactory.reconfigure(myNewQS.clientAddr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (myNewQS.secureClientAddr == null) {
|
|
|
+ if (!isSecureClientAddrFromStatic && oldSecureClientAddr != null && secureCnxnFactory != null) {
|
|
|
+ // secureClientAddr omitted in new config, shutdown secureCnxnFactory
|
|
|
+ secureCnxnFactory.shutdown();
|
|
|
+ secureCnxnFactory = null;
|
|
|
+ }
|
|
|
+ } else if (!myNewQS.secureClientAddr.equals(oldSecureClientAddr)) {
|
|
|
+ // secureClientAddr has changed
|
|
|
+ if (secureCnxnFactory == null) {
|
|
|
+ // start secureCnxnFactory first
|
|
|
+ try {
|
|
|
+ configureSSLAuth();
|
|
|
+ secureCnxnFactory = ServerCnxnFactory.createFactory();
|
|
|
+ secureCnxnFactory.configure(myNewQS.secureClientAddr, getMaxClientCnxns(), getClientPortListenBacklog(), true);
|
|
|
+ secureCnxnFactory.start();
|
|
|
+
|
|
|
+ } catch (IOException | ConfigException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ secureCnxnFactory.reconfigure(myNewQS.secureClientAddr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
updateThreadName();
|
|
|
}
|
|
|
|
|
@@ -2673,6 +2802,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
|
|
|
quorumPeer.setConfigFileName(config.getConfigFilename());
|
|
|
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
|
|
|
+ quorumPeer.setMaxClientCnxns(config.getMaxClientCnxns());
|
|
|
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
|
|
|
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
|
|
|
if (config.getLastSeenQuorumVerifier() != null) {
|