|
@@ -276,26 +276,29 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * clear all the connections in the selector
|
|
|
+ * Clear all the connections in the selector.
|
|
|
+ *
|
|
|
+ * You must first close ss (the serversocketchannel) if you wish
|
|
|
+ * to block any new connections from being established.
|
|
|
*
|
|
|
*/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
synchronized public void clear() {
|
|
|
selector.wakeup();
|
|
|
- synchronized (cnxns) {
|
|
|
- // got to clear all the connections that we have in the selector
|
|
|
- for (Iterator<NIOServerCnxn> it = cnxns.iterator(); it
|
|
|
- .hasNext();) {
|
|
|
- NIOServerCnxn cnxn = it.next();
|
|
|
- it.remove();
|
|
|
- try {
|
|
|
- cnxn.close();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Ignoring exception closing cnxn sessionid 0x"
|
|
|
- + Long.toHexString(cnxn.sessionId), e);
|
|
|
- }
|
|
|
+ HashSet<NIOServerCnxn> cnxns;
|
|
|
+ synchronized (this.cnxns) {
|
|
|
+ cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
|
|
|
+ }
|
|
|
+ // got to clear all the connections that we have in the selector
|
|
|
+ for (NIOServerCnxn cnxn: cnxns) {
|
|
|
+ try {
|
|
|
+ // don't hold this.cnxns lock as deadlock may occur
|
|
|
+ cnxn.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Ignoring exception closing cnxn sessionid 0x"
|
|
|
+ + Long.toHexString(cnxn.sessionId), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
public void shutdown() {
|
|
@@ -324,21 +327,21 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
closeSessionWithoutWakeup(sessionId);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
private void closeSessionWithoutWakeup(long sessionId) {
|
|
|
- synchronized (cnxns) {
|
|
|
- for (Iterator<NIOServerCnxn> it = cnxns.iterator(); it
|
|
|
- .hasNext();) {
|
|
|
- NIOServerCnxn cnxn = it.next();
|
|
|
- if (cnxn.sessionId == sessionId) {
|
|
|
- it.remove();
|
|
|
- try {
|
|
|
- cnxn.close();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("exception during session close", e);
|
|
|
- }
|
|
|
- break;
|
|
|
+ HashSet<NIOServerCnxn> cnxns;
|
|
|
+ synchronized (this.cnxns) {
|
|
|
+ cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
|
|
|
+ }
|
|
|
+
|
|
|
+ for (NIOServerCnxn cnxn : cnxns) {
|
|
|
+ if (cnxn.sessionId == sessionId) {
|
|
|
+ try {
|
|
|
+ cnxn.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("exception during session close", e);
|
|
|
}
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1202,31 +1205,58 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * (non-Javadoc)
|
|
|
- *
|
|
|
- * @see org.apache.zookeeper.server.ServerCnxnIface#close()
|
|
|
+ * Close the cnxn and remove it from the factory cnxns list.
|
|
|
+ *
|
|
|
+ * This function returns immediately if the cnxn is not on the cnxns list.
|
|
|
*/
|
|
|
public void close() {
|
|
|
- // unregister from JMX
|
|
|
- try {
|
|
|
- if(jmxConnectionBean != null){
|
|
|
- MBeanRegistry.getInstance().unregister(jmxConnectionBean);
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ // if this is not in cnxns then it's already closed
|
|
|
+ if (!factory.cnxns.remove(this)) {
|
|
|
+ return;
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Failed to unregister with JMX", e);
|
|
|
- }
|
|
|
- jmxConnectionBean = null;
|
|
|
|
|
|
- synchronized (factory.ipMap)
|
|
|
- {
|
|
|
- Set<NIOServerCnxn> s = factory.ipMap.get(sock.socket().getInetAddress());
|
|
|
- s.remove(this);
|
|
|
- }
|
|
|
- synchronized (factory.cnxns) {
|
|
|
- factory.cnxns.remove(this);
|
|
|
+ synchronized (factory.ipMap) {
|
|
|
+ Set<NIOServerCnxn> s =
|
|
|
+ factory.ipMap.get(sock.socket().getInetAddress());
|
|
|
+ s.remove(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ // unregister from JMX
|
|
|
+ try {
|
|
|
+ if(jmxConnectionBean != null){
|
|
|
+ MBeanRegistry.getInstance().unregister(jmxConnectionBean);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Failed to unregister with JMX", e);
|
|
|
+ }
|
|
|
+ jmxConnectionBean = null;
|
|
|
+
|
|
|
+ if (zk != null) {
|
|
|
+ zk.removeCnxn(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ closeSock();
|
|
|
+
|
|
|
+ if (sk != null) {
|
|
|
+ try {
|
|
|
+ // need to cancel this selection key from the selector
|
|
|
+ sk.cancel();
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("ignoring exception during selectionkey cancel", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- if (zk != null) {
|
|
|
- zk.removeCnxn(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Close resources associated with the sock of this cnxn.
|
|
|
+ */
|
|
|
+ private void closeSock() {
|
|
|
+ if (sock == null) {
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
LOG.info("Closed socket connection for client "
|
|
@@ -1275,18 +1305,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
}
|
|
|
sock = null;
|
|
|
- if (sk != null) {
|
|
|
- try {
|
|
|
- // need to cancel this selection key from the selector
|
|
|
- sk.cancel();
|
|
|
- } catch (Exception e) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("ignoring exception during selectionkey cancel", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private final static byte fourBytes[] = new byte[4];
|
|
|
|
|
|
/*
|
|
@@ -1419,7 +1439,10 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
return authInfo;
|
|
|
}
|
|
|
|
|
|
- public InetSocketAddress getRemoteAddress() {
|
|
|
+ public synchronized InetSocketAddress getRemoteAddress() {
|
|
|
+ if (sock == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
|
|
|
}
|
|
|
|