浏览代码

ZOOKEEPER-3296: Explicitly closing the sslsocket when it failed handshake to prevent issue where peers cannot join quorum

The quorum connection manager is handling connections sequentially with a default listen backlog queue size 50, during the network loss, there are socket read timed out, which is syncLimit * tickTime, and almost all the following connect requests in the backlog queue will timed out from the other side before it's being processed.

Those timed out learners will try to connect to a different server, and leaves the connect requests on server side without sending the close_notify packet. The server is slowly consuming from these queue with syncLimit * tickTime timeout for each of those requests which haven't sent notify_close packet. Any new connect requests will be queued up again when there is spot in the listen backlog queue, but timed out before the server handles it, and it can never successfully finish any new connection, and it failed to join the quorum.

Please check the Jira for more details.

Author: Fangmin Lyu <fangmin@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Andor Molnár <andor@apache.org>

Closes #843 from lvfangmin/ZOOKEEPER-3296
Fangmin Lyu 6 年之前
父节点
当前提交
5874a0f355

+ 12 - 10
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -635,10 +635,12 @@ public class QuorumCnxManager {
     /**
      * Try to establish a connection to server with id sid using its electionAddr.
      *
+     * VisibleForTesting.
+     *
      *  @param sid  server id
      *  @return boolean success indication
      */
-    synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
+    synchronized boolean connectOne(long sid, InetSocketAddress electionAddr){
         if (senderWorkerMap.get(sid) != null) {
             LOG.debug("There is a connection already for server " + sid);
             return true;
@@ -648,18 +650,18 @@ public class QuorumCnxManager {
         try {
             LOG.debug("Opening channel to server " + sid);
             if (self.isSslQuorum()) {
-                 SSLSocket sslSock = self.getX509Util().createSSLSocket();
-                 setSockOpts(sslSock);
-                 sslSock.connect(electionAddr, cnxTO);
-                 sslSock.startHandshake();
-                 sock = sslSock;
-                 LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());
+                 sock = self.getX509Util().createSSLSocket();
              } else {
                  sock = new Socket();
-                 setSockOpts(sock);
-                 sock.connect(electionAddr, cnxTO);
-
              }
+            setSockOpts(sock);
+            sock.connect(electionAddr, cnxTO);
+            if (sock instanceof SSLSocket) {
+                SSLSocket sslSock = (SSLSocket) sock;
+                sslSock.startHandshake();
+                LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());
+            }
+
              LOG.debug("Connected to server " + sid);
             // Sends connection request asynchronously if the quorum
             // sasl authentication is enabled. This is required because

+ 6 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -883,10 +883,15 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         quorumStats = new QuorumStats(this);
         jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
         adminServer = AdminServerFactory.createAdminServer();
-        x509Util = new QuorumX509Util();
+        x509Util = createX509Util();
         initialize();
     }
 
+    // VisibleForTesting
+    QuorumX509Util createX509Util() {
+        return new QuorumX509Util();
+    }
+
     /**
      * For backward compatibility purposes, we instantiate QuorumMaj by default.
      */

+ 85 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java → zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java

@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.server.quorum;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
@@ -31,9 +33,14 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.net.Socket;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.HandshakeCompletedListener;
 
+import org.apache.zookeeper.common.QuorumX509Util;
 import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +53,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.FLENewEpochTest;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -363,6 +372,81 @@ public class CnxManagerTest extends ZKTestCase {
         Assert.assertFalse(cnxManager.listener.isAlive());
     }
 
+    /**
+     * Test the SSLSocket is explicitly closed when there is IOException
+     * happened during connect.
+     */
+    @Test
+    public void testSSLSocketClosedWhenHandshakeTimeout() throws Exception {
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        QuorumX509Util mockedX509Util = new QuorumX509Util() {
+            @Override
+            public SSLSocket createSSLSocket() {
+                return new SSLSocket() {
+
+                    @Override
+                    public void connect(SocketAddress endpoint, int timeout) {}
+
+                    @Override
+                    public void startHandshake() throws IOException {
+                        throw new IOException();
+                    }
+
+                    @Override
+                    public void close() {
+                        closeLatch.countDown();
+                    }
+
+                    public String [] getSupportedCipherSuites() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public String [] getEnabledCipherSuites() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public String [] getSupportedProtocols() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public String [] getEnabledProtocols() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public SSLSession getSession() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public void setEnabledCipherSuites(String suites []) {}
+                    public void setEnabledProtocols(String protocols[]) {}
+                    public void addHandshakeCompletedListener(HandshakeCompletedListener listener) {}
+                    public void removeHandshakeCompletedListener(HandshakeCompletedListener listener) {}
+                    public void setUseClientMode(boolean mode) {}
+                    public boolean getUseClientMode() { return true; }
+                    public void setNeedClientAuth(boolean need) {}
+                    public boolean getNeedClientAuth() { return true; }
+                    public void setWantClientAuth(boolean want) {}
+                    public boolean getWantClientAuth() { return true; }
+                    public void setEnableSessionCreation(boolean flag) {}
+                    public boolean getEnableSessionCreation() { return true; }
+                };
+            }
+        };
+
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0],
+                peerClientPort[0], 3, 0, 2000, 2, 2) {
+            @Override
+            public QuorumX509Util createX509Util() {
+                return mockedX509Util;
+            }
+        };
+
+        peer.setSslQuorum(true);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        cnxManager.connectOne(1, peers.get(1L).electionAddr);
+        Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS));
+    }
+
     /*
      * Test if Worker threads are getting killed after connection loss
      */