|
@@ -466,7 +466,7 @@ public class QuorumCnxManager {
|
|
|
private boolean startConnection(Socket sock, Long sid) throws IOException {
|
|
|
DataOutputStream dout = null;
|
|
|
DataInputStream din = null;
|
|
|
- LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);
|
|
|
+ LOG.debug("startConnection (myId:{} --> sid:{})", self.getMyId(), sid);
|
|
|
try {
|
|
|
// Use BufferedOutputStream to reduce the number of IP packets. This is
|
|
|
// important for x-DC scenarios.
|
|
@@ -481,7 +481,7 @@ public class QuorumCnxManager {
|
|
|
// understand the protocol version we use to avoid multiple partitions. see ZOOKEEPER-3720
|
|
|
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
|
|
|
dout.writeLong(protocolVersion);
|
|
|
- dout.writeLong(self.getId());
|
|
|
+ dout.writeLong(self.getMyId());
|
|
|
|
|
|
// now we send our election address. For the new protocol version, we can send multiple addresses.
|
|
|
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
|
|
@@ -510,12 +510,12 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
|
|
|
// If lost the challenge, then drop the new connection
|
|
|
- if (sid > self.getId()) {
|
|
|
- LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid);
|
|
|
+ if (sid > self.getMyId()) {
|
|
|
+ LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);
|
|
|
closeSocket(sock);
|
|
|
// Otherwise proceed with the connection
|
|
|
} else {
|
|
|
- LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid);
|
|
|
+ LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);
|
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
|
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
|
|
|
sw.setRecv(rw);
|
|
@@ -610,7 +610,7 @@ public class QuorumCnxManager {
|
|
|
electionAddr = new MultipleAddresses(init.electionAddr,
|
|
|
Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
|
|
|
}
|
|
|
- LOG.debug("Initial message parsed by {}: {}", self.getId(), init.toString());
|
|
|
+ LOG.debug("Initial message parsed by {}: {}", self.getMyId(), init.toString());
|
|
|
} catch (InitialMessage.InitialMessageException ex) {
|
|
|
LOG.error("Initial message parsing error!", ex);
|
|
|
closeSocket(sock);
|
|
@@ -635,7 +635,7 @@ public class QuorumCnxManager {
|
|
|
// do authenticating learner
|
|
|
authServer.authenticate(sock, din);
|
|
|
//If wins the challenge, then close the new connection.
|
|
|
- if (sid < self.getId()) {
|
|
|
+ if (sid < self.getMyId()) {
|
|
|
/*
|
|
|
* This replica might still believe that the connection to sid is
|
|
|
* up, so we have to shut down the workers before trying to open a
|
|
@@ -658,7 +658,7 @@ public class QuorumCnxManager {
|
|
|
connectOne(sid);
|
|
|
}
|
|
|
|
|
|
- } else if (sid == self.getId()) {
|
|
|
+ } else if (sid == self.getMyId()) {
|
|
|
// we saw this case in ZOOKEEPER-2164
|
|
|
LOG.warn("We got a connection request from a server with our own ID. "
|
|
|
+ "This should be either a configuration error, or a bug.");
|
|
@@ -760,7 +760,7 @@ public class QuorumCnxManager {
|
|
|
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
|
|
|
if (lastCommittedView.containsKey(sid)) {
|
|
|
knownId = true;
|
|
|
- LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
|
|
|
+ LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getMyId(), sid);
|
|
|
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
|
|
|
return;
|
|
|
}
|
|
@@ -770,7 +770,7 @@ public class QuorumCnxManager {
|
|
|
&& (!knownId
|
|
|
|| !lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
|
|
|
knownId = true;
|
|
|
- LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
|
|
|
+ LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getMyId(), sid);
|
|
|
|
|
|
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
|
|
|
return;
|
|
@@ -839,7 +839,7 @@ public class QuorumCnxManager {
|
|
|
*/
|
|
|
public void softHalt() {
|
|
|
for (SendWorker sw : senderWorkerMap.values()) {
|
|
|
- LOG.debug("Server {} is soft-halting sender towards: {}", self.getId(), sw);
|
|
|
+ LOG.debug("Server {} is soft-halting sender towards: {}", self.getMyId(), sw);
|
|
|
sw.finish();
|
|
|
}
|
|
|
}
|
|
@@ -943,7 +943,7 @@ public class QuorumCnxManager {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
if (!shutdown) {
|
|
|
- LOG.debug("Listener thread started, myId: {}", self.getId());
|
|
|
+ LOG.debug("Listener thread started, myId: {}", self.getMyId());
|
|
|
Set<InetSocketAddress> addresses;
|
|
|
|
|
|
if (self.getQuorumListenOnAllIPs()) {
|
|
@@ -1297,7 +1297,7 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
this.finish();
|
|
|
|
|
|
- LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
|
|
|
+ LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getMyId());
|
|
|
}
|
|
|
|
|
|
|