|
@@ -37,6 +37,8 @@ import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
|
import java.time.Duration;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Enumeration;
|
|
|
import java.util.HashSet;
|
|
@@ -113,9 +115,12 @@ public class QuorumCnxManager {
|
|
|
private AtomicLong observerCounter = new AtomicLong(-1);
|
|
|
|
|
|
/*
|
|
|
- * Protocol identifier used among peers
|
|
|
+ * Protocol identifier used among peers (must be a negative number for backward compatibility reasons)
|
|
|
*/
|
|
|
- public static final long PROTOCOL_VERSION = -65535L;
|
|
|
+ // the following protocol version was sent in every connection initiation message since ZOOKEEPER-107 released in 3.5.0
|
|
|
+ public static final long PROTOCOL_VERSION_V1 = -65536L;
|
|
|
+ // ZOOKEEPER-3188 introduced multiple addresses in the connection initiation message, released in 3.6.0
|
|
|
+ public static final long PROTOCOL_VERSION_V2 = -65535L;
|
|
|
|
|
|
/*
|
|
|
* Max buffer size to be read from the network.
|
|
@@ -218,7 +223,7 @@ public class QuorumCnxManager {
|
|
|
public static InitialMessage parse(Long protocolVersion, DataInputStream din) throws InitialMessageException, IOException {
|
|
|
Long sid;
|
|
|
|
|
|
- if (protocolVersion != PROTOCOL_VERSION) {
|
|
|
+ if (protocolVersion != PROTOCOL_VERSION_V1 && protocolVersion != PROTOCOL_VERSION_V2) {
|
|
|
throw new InitialMessageException("Got unrecognized protocol version %s", protocolVersion);
|
|
|
}
|
|
|
|
|
@@ -236,6 +241,8 @@ public class QuorumCnxManager {
|
|
|
throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
|
|
|
}
|
|
|
|
|
|
+ // in PROTOCOL_VERSION_V1 we expect to get a single address here represented as a 'host:port' string
|
|
|
+ // in PROTOCOL_VERSION_V2 we expect to get multiple addresses like: 'host1:port1|host2:port2|...'
|
|
|
String[] addressStrings = new String(b).split("\\|");
|
|
|
List<InetSocketAddress> addresses = new ArrayList<>(addressStrings.length);
|
|
|
for (String addr : addressStrings) {
|
|
@@ -416,10 +423,20 @@ public class QuorumCnxManager {
|
|
|
|
|
|
// Sending id and challenge
|
|
|
|
|
|
- // represents protocol version (in other words - message type)
|
|
|
- dout.writeLong(PROTOCOL_VERSION);
|
|
|
+ // First sending the protocol version (in other words - message type).
|
|
|
+ // For backward compatibility reasons we stick to the old protocol version, unless the MultiAddress
|
|
|
+ // feature is enabled. During rolling upgrade, we must make sure that all the servers can
|
|
|
+ // understand the protocol version we use to avoid multiple partitions. see ZOOKEEPER-3720
|
|
|
+ long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
|
|
|
+ dout.writeLong(protocolVersion);
|
|
|
dout.writeLong(self.getId());
|
|
|
- String addr = self.getElectionAddress().getAllAddresses().stream()
|
|
|
+
|
|
|
+ // now we send our election address. For the new protocol version, we can send multiple addresses.
|
|
|
+ Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
|
|
|
+ ? self.getElectionAddress().getAllAddresses()
|
|
|
+ : Arrays.asList(self.getElectionAddress().getOne());
|
|
|
+
|
|
|
+ String addr = addressesToSend.stream()
|
|
|
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
|
|
|
byte[] addr_bytes = addr.getBytes();
|
|
|
dout.writeInt(addr_bytes.length);
|
|
@@ -639,7 +656,7 @@ public class QuorumCnxManager {
|
|
|
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
|
|
|
if (senderWorkerMap.get(sid) != null) {
|
|
|
LOG.debug("There is a connection already for server {}", sid);
|
|
|
- if (electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) {
|
|
|
+ if (self.isMultiAddressEnabled() && electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) {
|
|
|
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
|
|
|
// one we are using is already dead and we need to clean-up, so when we will create a new connection
|
|
|
// then we will choose an other one, which is actually reachable
|
|
@@ -710,7 +727,7 @@ public class QuorumCnxManager {
|
|
|
synchronized void connectOne(long sid) {
|
|
|
if (senderWorkerMap.get(sid) != null) {
|
|
|
LOG.debug("There is a connection already for server {}", sid);
|
|
|
- if (self.isMultiAddressReachabilityCheckEnabled()) {
|
|
|
+ if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
|
|
|
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
|
|
|
// one we are using is already dead and we need to clean-up, so when we will create a new connection
|
|
|
// then we will choose an other one, which is actually reachable
|