|
@@ -23,12 +23,13 @@ import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.security.KeyManagementException;
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import javax.net.ssl.SSLContext;
|
|
@@ -99,9 +100,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
private Channel parentChannel;
|
|
|
private final ChannelGroup allChannels =
|
|
|
new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
|
|
|
- // Access to ipMap or to any Set contained in the map needs to be
|
|
|
- // protected with synchronized (ipMap) { ... }
|
|
|
- private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
|
|
|
+ private final Map<InetAddress, AtomicInteger> ipMap = new ConcurrentHashMap<>();
|
|
|
private InetSocketAddress localAddress;
|
|
|
private int maxClientCnxns = 60;
|
|
|
int listenBacklog = -1;
|
|
@@ -635,44 +634,35 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
|
|
|
return localAddress;
|
|
|
}
|
|
|
|
|
|
- private void addCnxn(NettyServerCnxn cnxn) {
|
|
|
+ private void addCnxn(final NettyServerCnxn cnxn) {
|
|
|
cnxns.add(cnxn);
|
|
|
- synchronized (ipMap){
|
|
|
- InetAddress addr =
|
|
|
- ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress();
|
|
|
- Set<NettyServerCnxn> s = ipMap.get(addr);
|
|
|
- if (s == null) {
|
|
|
- s = new HashSet<>();
|
|
|
- ipMap.put(addr, s);
|
|
|
+ InetAddress addr =
|
|
|
+ ((InetSocketAddress) cnxn.getChannel().remoteAddress()).getAddress();
|
|
|
+
|
|
|
+ ipMap.compute(addr, (a, cnxnCount) -> {
|
|
|
+ if (cnxnCount == null) {
|
|
|
+ cnxnCount = new AtomicInteger();
|
|
|
}
|
|
|
- s.add(cnxn);
|
|
|
- }
|
|
|
+ cnxnCount.incrementAndGet();
|
|
|
+ return cnxnCount;
|
|
|
+ });
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) {
|
|
|
- synchronized (ipMap) {
|
|
|
- Set<NettyServerCnxn> s = ipMap.get(remoteAddress);
|
|
|
- if (s != null) {
|
|
|
- s.remove(cnxn);
|
|
|
- if (s.isEmpty()) {
|
|
|
- ipMap.remove(remoteAddress);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- // Fallthrough and log errors outside the synchronized block
|
|
|
- LOG.error(
|
|
|
- "Unexpected null set for remote address {} when removing cnxn {}",
|
|
|
- remoteAddress,
|
|
|
- cnxn);
|
|
|
+ ipMap.compute(remoteAddress, (addr, cnxnCount) -> {
|
|
|
+ if (cnxnCount == null) {
|
|
|
+ LOG.error("Unexpected remote address {} when removing cnxn {}",
|
|
|
+ remoteAddress, cnxn);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ final int newValue = cnxnCount.decrementAndGet();
|
|
|
+ return newValue == 0 ? null : cnxnCount;
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- private int getClientCnxnCount(InetAddress addr) {
|
|
|
- synchronized (ipMap) {
|
|
|
- Set<NettyServerCnxn> s = ipMap.get(addr);
|
|
|
- if (s == null) return 0;
|
|
|
- return s.size();
|
|
|
- }
|
|
|
+ private int getClientCnxnCount(final InetAddress addr) {
|
|
|
+ final AtomicInteger count = ipMap.get(addr);
|
|
|
+ return count == null ? 0 : count.get();
|
|
|
}
|
|
|
|
|
|
@Override
|