Browse Source

ZOOKEEPER-3163: Use session map in the Netty to improve close session performance

This is a refactor to make the Netty able to use the same closeSession logic in NIOServerCnxn, which is more efficient with the sessionMap. Rely on the existing tests for the refactor work here.

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: Tamás Pénzes, Enrico Olivelli, maoling, Michael Han

Closes #665 from lvfangmin/ZOOKEEPER-3163
Fangmin Lyu 6 năm trước cách đây
mục cha
commit
1ce2ca8107

+ 1 - 21
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java

@@ -604,9 +604,6 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
         return directBufferBytes > 0 ? directBuffer.get() : null;
     }
 
-    // sessionMap is used by closeSession()
-    private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap =
-        new ConcurrentHashMap<Long, NIOServerCnxn>();
     // ipMap is used to limit connections per IP
     private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
         new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>( );
@@ -787,10 +784,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
         }
         cnxnExpiryQueue.remove(cnxn);
 
-        long sessionId = cnxn.getSessionId();
-        if (sessionId != 0) {
-            sessionMap.remove(sessionId);
-        }
+        removeCnxnFromSessionMap(cnxn);
 
         InetAddress addr = cnxn.getSocketAddress();
         if (addr != null) {
@@ -922,20 +916,6 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
         }
     }
 
-    public void addSession(long sessionId, NIOServerCnxn cnxn) {
-        sessionMap.put(sessionId, cnxn);
-    }
-
-    @Override
-    public boolean closeSession(long sessionId) {
-        NIOServerCnxn cnxn = sessionMap.remove(sessionId);
-        if (cnxn != null) {
-            cnxn.close();
-            return true;
-        }
-        return false;
-    }
-
     @Override
     public void join() throws InterruptedException {
         if (acceptThread != null) {

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -112,6 +112,8 @@ public class NettyServerCnxn extends ServerCnxn {
                         + Long.toHexString(sessionId));
             }
 
+            factory.removeCnxnFromSessionMap(this);
+
             synchronized (factory.ipMap) {
                 Set<NettyServerCnxn> s =
                     factory.ipMap.get(((InetSocketAddress)channel
@@ -198,6 +200,7 @@ public class NettyServerCnxn extends ServerCnxn {
     @Override
     public void setSessionId(long sessionId) {
         this.sessionId = sessionId;
+        factory.addSession(sessionId, this);
     }
 
     @Override

+ 0 - 18
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -415,24 +415,6 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         }
     }
 
-    @Override
-    public boolean closeSession(long sessionId) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("closeSession sessionid:0x" + sessionId);
-        }
-        for (ServerCnxn cnxn : cnxns) {
-            if (cnxn.getSessionId() == sessionId) {
-                try {
-                    cnxn.close();
-                } catch (Exception e) {
-                    LOG.warn("exception during session close", e);
-                }
-                return true;
-            }
-        }
-        return false;
-    }
-
     @Override
     public void configure(InetSocketAddress addr, int maxClientCnxns, boolean secure)
             throws IOException

+ 32 - 6
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxnFactory.java

@@ -54,6 +54,38 @@ public abstract class ServerCnxnFactory {
      */
     static final ByteBuffer closeConn = ByteBuffer.allocate(0);
 
+    // sessionMap is used by closeSession()
+    final ConcurrentHashMap<Long, ServerCnxn> sessionMap =
+        new ConcurrentHashMap<Long, ServerCnxn>();
+
+    public void addSession(long sessionId, ServerCnxn cnxn) {
+        sessionMap.put(sessionId, cnxn);
+    }
+
+    public void removeCnxnFromSessionMap(ServerCnxn cnxn) {
+        long sessionId = cnxn.getSessionId();
+        if (sessionId != 0) {
+            sessionMap.remove(sessionId);
+        }
+    }
+
+    /**
+     * @return true if the cnxn that contains the sessionId exists in this ServerCnxnFactory
+     *         and it's closed. Otherwise false.
+     */
+    public boolean closeSession(long sessionId) {
+        ServerCnxn cnxn = sessionMap.remove(sessionId);
+        if (cnxn != null) {
+            try {
+                cnxn.close();
+            } catch (Exception e) {
+                LOG.warn("exception during session close", e);
+            }
+            return true;
+        }
+        return false;
+    }
+
     public abstract int getLocalPort();
     
     public abstract Iterable<ServerCnxn> getConnections();
@@ -66,12 +98,6 @@ public abstract class ServerCnxnFactory {
         return zkServer;
     }
 
-    /**
-     * @return true if the cnxn that contains the sessionId exists in this ServerCnxnFactory
-     *         and it's closed. Otherwise false.
-     */
-    public abstract boolean closeSession(long sessionId);
-
     public void configure(InetSocketAddress addr, int maxcc) throws IOException {
         configure(addr, maxcc, false);
     }