Parcourir la source

ZOOKEEPER-2778: QuorumPeer: address potential reconfiguration deadlocks

* QuorumPeer: encapsulate quorum/election/client addresses in an AddressTuple held through an AtomicReference
* QuorumPeer/QuorumCnxManager: address deadlock and visibility issues
* QuorumPeer: add fast path for already-non-null quorum/election address
* QuorumPeer: fix access to newly private data members from ReconfigTest
* LeaderBeanTest: set up mock QuorumVerifier so that addresses get set
* QuorumPeer: warn when clobbering existing election algorithm
* QuorumPeer: halt old QCM when clobbering existing election algorithm

Author: Michael Edwards <medwards@bitpusher.com>

Reviewers: andor@apache.org

Closes #719 from mkedwards/ZOOKEEPER-2778-for-master
Michael Edwards il y a 6 ans
Parent
commit
6ea3c0b689

+ 108 - 71
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -41,6 +41,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.sasl.SaslException;
 
@@ -114,7 +115,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     LocalPeerBean jmxLocalPeerBean;
     private Map<Long, RemotePeerBean> jmxRemotePeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
-    private QuorumCnxManager qcm;
+
+    // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
+    // of updates; see the implementation comment at setLastSeenQuorumVerifier().
+    private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
+
     QuorumAuthServer authServer;
     QuorumAuthLearner authLearner;
 
@@ -127,6 +132,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     private ZKDatabase zkDb;
 
+    public static final class AddressTuple {
+        public final InetSocketAddress quorumAddr;
+        public final InetSocketAddress electionAddr;
+        public final InetSocketAddress clientAddr;
+
+        public AddressTuple(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
+            this.quorumAddr = quorumAddr;
+            this.electionAddr = electionAddr;
+            this.clientAddr = clientAddr;
+        }
+    }
+
     public static class QuorumServer {
         public InetSocketAddress addr = null;
 
@@ -442,10 +459,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
 
     //last committed quorum verifier
-    public QuorumVerifier quorumVerifier;
+    private QuorumVerifier quorumVerifier;
     
     //last proposed quorum verifier
-    public QuorumVerifier lastSeenQuorumVerifier = null;
+    private QuorumVerifier lastSeenQuorumVerifier = null;
 
     // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
     final Object QV_LOCK = new Object();
@@ -716,16 +733,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     DatagramSocket udpSocket;
 
-    private InetSocketAddress myQuorumAddr;
-    private InetSocketAddress myElectionAddr = null;
-    private InetSocketAddress myClientAddr = null;
+    private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
 
     /**
      * Resolves hostname for a given server ID.
      *
      * This method resolves hostname for a given server ID in both quorumVerifer
      * and lastSeenQuorumVerifier. If the server ID matches the local server ID,
-     * it also updates myQuorumAddr and myElectionAddr.
+     * it also updates myAddrs.
      */
     public void recreateSocketAddresses(long id) {
         QuorumVerifier qv = getQuorumVerifier();
@@ -734,8 +749,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             if (qs != null) {
                 qs.recreateSocketAddresses();
                 if (id == getId()) {
-                    setQuorumAddress(qs.addr);
-                    setElectionAddress(qs.electionAddr);
+                    setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
                 }
             }
         }
@@ -748,42 +762,46 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         }
     }
 
-    public InetSocketAddress getQuorumAddress(){
-        synchronized (QV_LOCK) {
-            return myQuorumAddr;
+    private AddressTuple getAddrs(){
+        AddressTuple addrs = myAddrs.get();
+        if (addrs != null) {
+            return addrs;
         }
-    }
-    
-    public void setQuorumAddress(InetSocketAddress addr){
-        synchronized (QV_LOCK) {
-            myQuorumAddr = addr;
+        try {
+            synchronized (QV_LOCK) {
+                addrs = myAddrs.get();
+                while (addrs == null) {
+                    QV_LOCK.wait();
+                    addrs = myAddrs.get();
+                }
+                return addrs;
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
         }
     }
 
-    public InetSocketAddress getElectionAddress(){
-        synchronized (QV_LOCK) {
-            return myElectionAddr;
-        }
+    public InetSocketAddress getQuorumAddress(){
+        return getAddrs().quorumAddr;
     }
 
-    public void setElectionAddress(InetSocketAddress addr){
-        synchronized (QV_LOCK) {
-            myElectionAddr = addr;
-        }
+    public InetSocketAddress getElectionAddress(){
+        return getAddrs().electionAddr;
     }
-    
+
     public InetSocketAddress getClientAddress(){
-        synchronized (QV_LOCK) {
-            return myClientAddr;
-        }
+        final AddressTuple addrs = myAddrs.get();
+        return (addrs == null) ? null : addrs.clientAddr;
     }
-    
-    public void setClientAddress(InetSocketAddress addr){
+
+    private void setAddrs(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr){
         synchronized (QV_LOCK) {
-            myClientAddr = addr;
+            myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
+            QV_LOCK.notifyAll();
         }
     }
-    
+
     private int electionType;
 
     Election electionAlg;
@@ -1050,7 +1068,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
-            qcm = createCnxnManager();
+            QuorumCnxManager qcm = createCnxnManager();
+            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
+            if (oldQcm != null) {
+                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
+                oldQcm.halt();
+            }
             QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
@@ -1515,18 +1538,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         }
     }
     
-    private void connectNewPeers(){
-        synchronized (QV_LOCK) {
-            if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier != null) {
-                Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
-                for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
-                    if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
-                        qcm.connectOne(e.getKey());
-                }
-            }
-        }
-    }
-
     public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){
         if (qvOLD == null || !qvOLD.equals(qvNEW)) {
             LOG.warn("Restarting Leader Election");
@@ -1544,33 +1555,61 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
     }
     
+    // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
+    // must be held.  We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
+    // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take
+    // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
+    // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
+    private void connectNewPeers(QuorumCnxManager qcm){
+        if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
+            Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
+            for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
+                if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
+                    qcm.connectOne(e.getKey());
+            }
+        }
+    }
+
     public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
-        synchronized (QV_LOCK) {
-            if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
-                LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
-                        ". Current version: " + quorumVerifier.getVersion());
+        // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
+        // and then take QV_LOCK.  Take the locks in the same order to ensure that we don't
+        // deadlock against other callers of connectOne().  If qcmRef gets set in another
+        // thread while we're inside the synchronized block, that does no harm; if we didn't
+        // take a lock on qcm (because it was null when we sampled it), we won't call
+        // connectOne() on it.  (Use of an AtomicReference is enough to guarantee visibility
+        // of updates that provably happen in another thread before entering this method.)
+        QuorumCnxManager qcm = qcmRef.get();
+        Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
+        synchronized (outerLockObject) {
+            synchronized (QV_LOCK) {
+                if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
+                    LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
+                            ". Current version: " + quorumVerifier.getVersion());
+                }
+                // assuming that a version uniquely identifies a configuration, so if
+                // version is the same, nothing to do here.
+                if (lastSeenQuorumVerifier != null &&
+                        lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
+                    return;
+                }
+                lastSeenQuorumVerifier = qv;
+                if (qcm != null) {
+                    connectNewPeers(qcm);
+                }
 
-            }
-            // assuming that a version uniquely identifies a configuration, so if
-            // version is the same, nothing to do here.
-            if (lastSeenQuorumVerifier != null &&
-                    lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
-                return;
-            }
-            lastSeenQuorumVerifier = qv;
-            connectNewPeers();
-            if (writeToDisk) {
-                try {
-                    String fileName = getNextDynamicConfigFilename();
-                    if (fileName != null) {
-                        QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+                if (writeToDisk) {
+                    try {
+                        String fileName = getNextDynamicConfigFilename();
+                        if (fileName != null) {
+                            QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+                        }
+                    } catch (IOException e) {
+                        LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
                     }
-                } catch (IOException e) {
-                    LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
                 }
             }
         }
-     }       
+    }
     
     public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
         synchronized (QV_LOCK) {
@@ -1610,9 +1649,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             }
             QuorumServer qs = qv.getAllMembers().get(getId());
             if (qs != null) {
-                setQuorumAddress(qs.addr);
-                setElectionAddress(qs.electionAddr);
-                setClientAddress(qs.clientAddr);
+                setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
             }
             return prevQV;
         }
@@ -1795,7 +1832,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      * get reference to QuorumCnxManager
      */
     public QuorumCnxManager getQuorumCnxManager() {
-        return qcm;
+        return qcmRef.get();
     }
     private long readLongFromFile(String name) throws IOException {
         File file = new File(logFactory.getSnapDir(), name);

+ 19 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java

@@ -21,11 +21,13 @@ package org.apache.zookeeper.server.quorum;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -37,6 +39,10 @@ import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -56,7 +62,20 @@ public class LeaderBeanTest {
     @Before
     public void setUp() throws IOException, X509Exception {
         qp = new QuorumPeer();
+        long myId = qp.getId();
+
+        int clientPort = PortAssignment.unique();
+        Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
+        InetAddress clientIP = InetAddress.getLoopbackAddress();
+
+        peersView.put(Long.valueOf(myId),
+                new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()),
+                        new InetSocketAddress(clientIP, PortAssignment.unique()),
+                        new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
+
         QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+        when(quorumVerifierMock.getAllMembers()).thenReturn(peersView);
+
         qp.setQuorumVerifier(quorumVerifierMock, false);
         File tmpDir = ClientBase.createEmptyTestDir();
         fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"),

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java

@@ -839,7 +839,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         testNormalOperation(zkArr[4], zkArr[5]);
 
         for (int i = 1; i <= 5; i++) {
-            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical))
+            if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumHierarchical))
                 Assert.fail("peer " + i
                         + " doesn't think the quorum system is Hieararchical!");
         }
@@ -876,7 +876,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         testNormalOperation(zkArr[1], zkArr[2]);
 
         for (int i = 1; i <= 2; i++) {
-            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj))
+            if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumMaj))
                 Assert.fail("peer "
                         + i
                         + " doesn't think the quorum system is a majority quorum system!");