瀏覽代碼

ZOOKEEPER-4708: recreateSocketAddresses before comparing addresses

Reviewers: ppatierno, anmolnar, see-quick
Author: showuon
Closes #2041 from showuon/ZOOKEEPER-4708_2
Luke Chen 7 月之前
父節點
當前提交
34c6d9fc1d

+ 6 - 0
pom.xml

@@ -703,6 +703,12 @@
         <artifactId>mockito-core</artifactId>
         <version>${mockito.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-inline</artifactId>
+        <version>${mockito.version}</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-bom</artifactId>

+ 5 - 0
zookeeper-server/pom.xml

@@ -147,6 +147,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.jmockit</groupId>
       <artifactId>jmockit</artifactId>

+ 9 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -903,6 +903,12 @@ public class Leader extends LearnerMaster {
        }
     }
 
+    MultipleAddresses recreateSocketAddresses(MultipleAddresses addr) {
+        return new MultipleAddresses(addr.getAllAddresses().stream()
+                .map(address -> new InetSocketAddress(address.getHostString(), address.getPort()))
+                .collect(Collectors.toSet()));
+    }
+
     /** In a reconfig operation, this method attempts to find the best leader for next configuration.
      *  If the current leader is a voter in the next configuration, then it remains the leader.
      *  Otherwise, choose one of the new voters that acked the reconfiguration, such that it is as
@@ -913,14 +919,15 @@ public class Leader extends LearnerMaster {
      * @return server if of the designated leader
      */
 
-    private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
+    long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
         //new configuration
         Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size() - 1);
 
         //check if I'm in the new configuration with the same quorum address -
         // if so, I'll remain the leader
         if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getMyId())
-            && newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getMyId()).addr.equals(self.getQuorumAddress())) {
+            && recreateSocketAddresses(newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getMyId()).addr)
+                .equals(recreateSocketAddresses(self.getQuorumAddress()))) {
             return self.getMyId();
         }
         // start with an initial set of candidates that are voters from new config that

+ 9 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -2346,6 +2346,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         acceptedEpoch = e;
     }
 
+    private void recreateSocketAddressesFromQV(QuorumVerifier qv) {
+        for (QuorumServer qs : qv.getAllMembers().values()) {
+            qs.recreateSocketAddresses();
+        }
+    }
+
     public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
         if (!isReconfigEnabled()) {
             LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
@@ -2369,6 +2375,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         // for Learner):
         initConfigInZKDatabase();
 
+        recreateSocketAddressesFromQV(prevQV);
+        recreateSocketAddressesFromQV(qv);
+
         if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) {
             Map<Long, QuorumServer> newMembers = qv.getAllMembers();
             updateRemotePeerMXBeans(newMembers);

+ 67 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java

@@ -24,14 +24,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
@@ -42,6 +48,7 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.jupiter.api.AfterEach;
@@ -222,4 +229,64 @@ public class LeaderBeanTest {
         assertEquals("5\n", leaderBean.nonVotingFollowerInfo());
     }
 
+    @Test
+    public void testGetDesignatedLeaderShouldRecreateSocketAddresses() {
+        Leader.Proposal p = new Leader.Proposal();
+        Map<Long, QuorumServer> peersView = new HashMap<>();
+        QuorumServer qs = Mockito.mock(QuorumServer.class);
+        MultipleAddresses multipleAddresses = new MultipleAddresses();
+        qs.type = LearnerType.PARTICIPANT;
+        qs.addr = multipleAddresses;
+        peersView.put(0L, qs);
+        QuorumVerifier qv = new QuorumMaj(peersView);
+        HashSet<Long> ackset = new HashSet<>();
+        ackset.add(0L);
+        ArrayList<Leader.Proposal.QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<>();
+        qvAcksetPairs.add(new Leader.Proposal.QuorumVerifierAcksetPair(qv, ackset));
+        p.qvAcksetPairs = qvAcksetPairs;
+        Leader spyLeader = spy(leader);
+        doReturn(multipleAddresses, multipleAddresses).when(spyLeader).recreateSocketAddresses(any(MultipleAddresses.class));
+
+        spyLeader.getDesignatedLeader(p, 0L);
+        // Verify `recreateSocketAddresses` method should be invoked twice for the address in proposal and self one
+        verify(spyLeader, times(2)).recreateSocketAddresses(any(MultipleAddresses.class));
+    }
+
+    @Test
+    public void testRecreateSocketAddresses() {
+        InetAddress loopback = InetAddress.getLoopbackAddress();
+        String oldIP = loopback.getHostAddress();
+        String newIP = "1.1.1.1";
+
+        // test case 1: empty MultipleAddresses instance will still be empty after recreateSocketAddresses
+        MultipleAddresses multipleAddresses = new MultipleAddresses();
+        assertEquals(multipleAddresses, leader.recreateSocketAddresses(multipleAddresses));
+
+        // test case 2: The content of MultipleAddresses instance will still be the same after recreateSocketAddresses if address no change
+        InetSocketAddress addr1 = new InetSocketAddress(loopback, PortAssignment.unique());
+        InetSocketAddress addr2 = new InetSocketAddress(loopback, PortAssignment.unique());
+        multipleAddresses = new MultipleAddresses(Arrays.asList(addr1, addr2));
+        // Verify after recreateSocketAddresses, the multipleAddresses should be the same (i.e. under no DNS's interaction)
+        assertEquals(multipleAddresses, leader.recreateSocketAddresses(multipleAddresses));
+
+        // test case 3: Simulating the DNS returning different IP address for the same hostname during recreation.
+        // After recreateSocketAddresses, the MultipleAddresses should contain the updated IP address instance while other fields unchanged.
+        InetAddress spyInetAddr = Mockito.spy(loopback);
+        InetSocketAddress addr3 = new InetSocketAddress(spyInetAddr, PortAssignment.unique());
+        // Verify the address is the old IP before recreateSocketAddresses.
+        assertEquals(oldIP, addr3.getAddress().getHostAddress());
+        multipleAddresses = new MultipleAddresses(Arrays.asList(addr3));
+        // simulating the DNS returning different IP address
+        when(spyInetAddr.getHostAddress()).thenReturn(newIP);
+
+        // Verify after recreateSocketAddresses, the multipleAddresses should have different IP address result
+        MultipleAddresses newMultipleAddress = leader.recreateSocketAddresses(multipleAddresses);
+        assertNotEquals(multipleAddresses, newMultipleAddress);
+        assertEquals(1, multipleAddresses.getAllAddresses().size());
+        InetSocketAddress newAddr = multipleAddresses.getAllAddresses().iterator().next();
+        // Verify the hostName should still be the same
+        assertEquals(loopback.getHostName(), newAddr.getAddress().getHostName());
+        // Verify the IP address has changed.
+        assertEquals(newIP, newAddr.getAddress().getHostAddress());
+    }
 }

+ 26 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java

@@ -25,13 +25,17 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
 public class QuorumPeerTest {
 
@@ -76,6 +80,28 @@ public class QuorumPeerTest {
         peer2.shutdown();
     }
 
+    @Test
+    public void testProcessReconfigWillRecreateSocketAddresses() throws IOException {
+        QuorumPeerConfig.setReconfigEnabled(true);
+        QuorumPeer peer = new QuorumPeer();
+
+        File file = Files.createTempFile("", ".tmp").toFile();
+        file.deleteOnExit();
+
+        peer.setConfigFileName(file.getAbsoluteFile().toString());
+
+        Map<Long, QuorumServer> peersView = new HashMap<>();
+        QuorumServer qs = Mockito.mock(QuorumServer.class);
+        peersView.put(0L, qs);
+
+        QuorumVerifier qv = new QuorumMaj(peersView);
+        peer.setQuorumVerifier(qv, false);
+        peer.processReconfig(qv, 0L, 0L, false);
+
+        // verify the qs will recreateSocketAddresses twice for both the old qv and the new qv
+        Mockito.verify(qs, Mockito.times(2)).recreateSocketAddresses();
+    }
+
     @Test
     public void testLocalPeerIsLeader() throws Exception {
         long localPeerId = 7;