|
@@ -46,6 +46,7 @@ import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.IntStream;
|
|
|
import javax.security.sasl.SaslException;
|
|
@@ -115,6 +116,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class);
|
|
|
|
|
|
+ public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames";
|
|
|
+ public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false";
|
|
|
+
|
|
|
private QuorumBean jmxQuorumBean;
|
|
|
LocalPeerBean jmxLocalPeerBean;
|
|
|
private Map<Long, RemotePeerBean> jmxRemotePeerBean;
|
|
@@ -265,13 +269,39 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public QuorumServer(long sid, String addressStr) throws ConfigException {
|
|
|
+ this(sid, addressStr, QuorumServer::getInetAddress);
|
|
|
+ }
|
|
|
+
|
|
|
+ QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
|
|
|
+ this.id = sid;
|
|
|
+ initializeWithAddressString(addressStr, getInetAddress);
|
|
|
+ }
|
|
|
+
|
|
|
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
|
|
|
+ this(id, addr, electionAddr, null, type);
|
|
|
+ }
|
|
|
+
|
|
|
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
|
|
|
+ this.id = id;
|
|
|
+ if (addr != null) {
|
|
|
+ this.addr.addAddress(addr);
|
|
|
+ }
|
|
|
+ if (electionAddr != null) {
|
|
|
+ this.electionAddr.addAddress(electionAddr);
|
|
|
+ }
|
|
|
+ this.type = type;
|
|
|
+ this.clientAddr = clientAddr;
|
|
|
+
|
|
|
+ setMyAddrs();
|
|
|
+ }
|
|
|
+
|
|
|
private static final String wrongFormat =
|
|
|
" does not have the form server_config or server_config;client_config"
|
|
|
+ " where server_config is the pipe separated list of host:port:port or host:port:port:type"
|
|
|
+ " and client_config is port or host:port";
|
|
|
|
|
|
- public QuorumServer(long sid, String addressStr) throws ConfigException {
|
|
|
- this.id = sid;
|
|
|
+ private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
|
|
|
LearnerType newType = null;
|
|
|
String[] serverClientParts = addressStr.split(";");
|
|
|
String[] serverAddresses = serverClientParts[0].split("\\|");
|
|
@@ -283,9 +313,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
|
|
|
// is client_config a host:port or just a port
|
|
|
- hostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
|
|
|
+ String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
|
|
|
try {
|
|
|
- clientAddr = new InetSocketAddress(hostname, Integer.parseInt(clientParts[clientParts.length - 1]));
|
|
|
+ clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
|
|
|
} catch (NumberFormatException e) {
|
|
|
throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
|
|
|
}
|
|
@@ -294,9 +324,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
boolean multiAddressEnabled = Boolean.parseBoolean(
|
|
|
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
|
|
|
if (!multiAddressEnabled && serverAddresses.length > 1) {
|
|
|
- throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + sid);
|
|
|
+ throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id);
|
|
|
}
|
|
|
|
|
|
+ boolean canonicalize = Boolean.parseBoolean(
|
|
|
+ System.getProperty(
|
|
|
+ CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES,
|
|
|
+ CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));
|
|
|
+
|
|
|
for (String serverAddress : serverAddresses) {
|
|
|
String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
|
|
|
if ((serverClientParts.length > 2) || (serverParts.length < 3)
|
|
@@ -304,25 +339,46 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
throw new ConfigException(addressStr + wrongFormat);
|
|
|
}
|
|
|
|
|
|
+ String serverHostName = serverParts[0];
|
|
|
+
|
|
|
// server_config should be either host:port:port or host:port:port:type
|
|
|
InetSocketAddress tempAddress;
|
|
|
InetSocketAddress tempElectionAddress;
|
|
|
try {
|
|
|
- tempAddress = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[1]));
|
|
|
+ tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1]));
|
|
|
addr.addAddress(tempAddress);
|
|
|
} catch (NumberFormatException e) {
|
|
|
- throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]);
|
|
|
+ throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]);
|
|
|
}
|
|
|
try {
|
|
|
- tempElectionAddress = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[2]));
|
|
|
+ tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2]));
|
|
|
electionAddr.addAddress(tempElectionAddress);
|
|
|
} catch (NumberFormatException e) {
|
|
|
- throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]);
|
|
|
+ throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]);
|
|
|
}
|
|
|
|
|
|
if (tempAddress.getPort() == tempElectionAddress.getPort()) {
|
|
|
throw new ConfigException("Client and election port must be different! Please update the "
|
|
|
- + "configuration file on server." + sid);
|
|
|
+ + "configuration file on server." + this.id);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (canonicalize) {
|
|
|
+ InetAddress ia = getInetAddress.apply(tempAddress);
|
|
|
+ if (ia == null) {
|
|
|
+ throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable");
|
|
|
+ }
|
|
|
+
|
|
|
+ String canonicalHostName = ia.getCanonicalHostName();
|
|
|
+
|
|
|
+ if (!canonicalHostName.equals(serverHostName)
|
|
|
+ // Avoid using literal IP address when
|
|
|
+ // security check fails
|
|
|
+ && !canonicalHostName.equals(ia.getHostAddress())) {
|
|
|
+ LOG.info("Host name for quorum server {} "
|
|
|
+ + "canonicalized from {} to {}",
|
|
|
+ this.id, serverHostName, canonicalHostName);
|
|
|
+ serverHostName = canonicalHostName;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (serverParts.length == 4) {
|
|
@@ -336,7 +392,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- this.hostname = serverParts[0];
|
|
|
+ this.hostname = serverHostName;
|
|
|
}
|
|
|
|
|
|
if (newType != null) {
|
|
@@ -346,22 +402,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
setMyAddrs();
|
|
|
}
|
|
|
|
|
|
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
|
|
|
- this(id, addr, electionAddr, null, type);
|
|
|
- }
|
|
|
-
|
|
|
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
|
|
|
- this.id = id;
|
|
|
- if (addr != null) {
|
|
|
- this.addr.addAddress(addr);
|
|
|
- }
|
|
|
- if (electionAddr != null) {
|
|
|
- this.electionAddr.addAddress(electionAddr);
|
|
|
- }
|
|
|
- this.type = type;
|
|
|
- this.clientAddr = clientAddr;
|
|
|
-
|
|
|
- setMyAddrs();
|
|
|
+ private static InetAddress getInetAddress(InetSocketAddress addr) {
|
|
|
+ return addr.getAddress();
|
|
|
}
|
|
|
|
|
|
private void setMyAddrs() {
|