|
@@ -25,14 +25,12 @@ import java.net.InetAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketAddress;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
-import java.nio.channels.SelectionKey;
|
|
|
|
-import java.nio.channels.Selector;
|
|
|
|
-import java.nio.channels.SocketChannel;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
+import java.util.concurrent.CopyOnWriteArraySet;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
import org.apache.jute.BinaryInputArchive;
|
|
import org.apache.jute.BinaryInputArchive;
|
|
@@ -55,7 +53,6 @@ import org.apache.zookeeper.ZooKeeper.WatchRegistration;
|
|
import org.apache.zookeeper.common.PathUtils;
|
|
import org.apache.zookeeper.common.PathUtils;
|
|
import org.apache.zookeeper.proto.AuthPacket;
|
|
import org.apache.zookeeper.proto.AuthPacket;
|
|
import org.apache.zookeeper.proto.ConnectRequest;
|
|
import org.apache.zookeeper.proto.ConnectRequest;
|
|
-import org.apache.zookeeper.proto.ConnectResponse;
|
|
|
|
import org.apache.zookeeper.proto.CreateResponse;
|
|
import org.apache.zookeeper.proto.CreateResponse;
|
|
import org.apache.zookeeper.proto.ExistsResponse;
|
|
import org.apache.zookeeper.proto.ExistsResponse;
|
|
import org.apache.zookeeper.proto.GetACLResponse;
|
|
import org.apache.zookeeper.proto.GetACLResponse;
|
|
@@ -85,8 +82,6 @@ public class ClientCnxn {
|
|
* option allows the client to turn off this behavior by setting
|
|
* option allows the client to turn off this behavior by setting
|
|
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
|
|
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
|
|
private static boolean disableAutoWatchReset;
|
|
private static boolean disableAutoWatchReset;
|
|
-
|
|
|
|
- public static final int packetLen;
|
|
|
|
static {
|
|
static {
|
|
// this var should not be public, but otw there is no easy way
|
|
// this var should not be public, but otw there is no easy way
|
|
// to test
|
|
// to test
|
|
@@ -96,7 +91,6 @@ public class ClientCnxn {
|
|
LOG.debug("zookeeper.disableAutoWatchReset is "
|
|
LOG.debug("zookeeper.disableAutoWatchReset is "
|
|
+ disableAutoWatchReset);
|
|
+ disableAutoWatchReset);
|
|
}
|
|
}
|
|
- packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private final ArrayList<InetSocketAddress> serverAddrs =
|
|
private final ArrayList<InetSocketAddress> serverAddrs =
|
|
@@ -113,7 +107,7 @@ public class ClientCnxn {
|
|
byte data[];
|
|
byte data[];
|
|
}
|
|
}
|
|
|
|
|
|
- private final ArrayList<AuthData> authInfo = new ArrayList<AuthData>();
|
|
|
|
|
|
+ private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
|
|
|
|
|
|
/**
|
|
/**
|
|
* These are the packets that have been sent and are waiting for a response.
|
|
* These are the packets that have been sent and are waiting for a response.
|
|
@@ -129,10 +123,11 @@ public class ClientCnxn {
|
|
|
|
|
|
private int connectTimeout;
|
|
private int connectTimeout;
|
|
|
|
|
|
- /** The timeout in ms the client negotiated with the server. This is the
|
|
|
|
- * "real" timeout, not the timeout request by the client (which may
|
|
|
|
- * have been increased/decreased by the server which applies bounds
|
|
|
|
- * to this value.
|
|
|
|
|
|
+ /**
|
|
|
|
+ * The timeout in ms the client negotiated with the server. This is the
|
|
|
|
+ * "real" timeout, not the timeout request by the client (which may have
|
|
|
|
+ * been increased/decreased by the server which applies bounds to this
|
|
|
|
+ * value.
|
|
*/
|
|
*/
|
|
private volatile int negotiatedSessionTimeout;
|
|
private volatile int negotiatedSessionTimeout;
|
|
|
|
|
|
@@ -154,15 +149,13 @@ public class ClientCnxn {
|
|
|
|
|
|
final EventThread eventThread;
|
|
final EventThread eventThread;
|
|
|
|
|
|
- final Selector selector = Selector.open();
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Set to true when close is called. Latches the connection such that we
|
|
* Set to true when close is called. Latches the connection such that we
|
|
* don't attempt to re-connect to the server if in the middle of closing the
|
|
* don't attempt to re-connect to the server if in the middle of closing the
|
|
* connection (client sends session disconnect to server as part of close
|
|
* connection (client sends session disconnect to server as part of close
|
|
* operation)
|
|
* operation)
|
|
*/
|
|
*/
|
|
- volatile boolean closing = false;
|
|
|
|
|
|
+ private volatile boolean closing = false;
|
|
|
|
|
|
public long getSessionId() {
|
|
public long getSessionId() {
|
|
return sessionId;
|
|
return sessionId;
|
|
@@ -180,56 +173,22 @@ public class ClientCnxn {
|
|
public String toString() {
|
|
public String toString() {
|
|
StringBuilder sb = new StringBuilder();
|
|
StringBuilder sb = new StringBuilder();
|
|
|
|
|
|
- SocketAddress local = getLocalSocketAddress();
|
|
|
|
- SocketAddress remote = getRemoteSocketAddress();
|
|
|
|
|
|
+ SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
|
|
|
|
+ SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
|
|
sb
|
|
sb
|
|
.append("sessionid:0x").append(Long.toHexString(getSessionId()))
|
|
.append("sessionid:0x").append(Long.toHexString(getSessionId()))
|
|
.append(" local:").append(local)
|
|
.append(" local:").append(local)
|
|
.append(" remoteserver:").append(remote)
|
|
.append(" remoteserver:").append(remote)
|
|
.append(" lastZxid:").append(lastZxid)
|
|
.append(" lastZxid:").append(lastZxid)
|
|
.append(" xid:").append(xid)
|
|
.append(" xid:").append(xid)
|
|
- .append(" sent:").append(sendThread.sentCount)
|
|
|
|
- .append(" recv:").append(sendThread.recvCount)
|
|
|
|
|
|
+ .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
|
|
|
|
+ .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
|
|
.append(" queuedpkts:").append(outgoingQueue.size())
|
|
.append(" queuedpkts:").append(outgoingQueue.size())
|
|
.append(" pendingresp:").append(pendingQueue.size())
|
|
.append(" pendingresp:").append(pendingQueue.size())
|
|
.append(" queuedevents:").append(eventThread.waitingEvents.size());
|
|
.append(" queuedevents:").append(eventThread.waitingEvents.size());
|
|
|
|
|
|
return sb.toString();
|
|
return sb.toString();
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Returns the address to which the socket is connected.
|
|
|
|
- * @return ip address of the remote side of the connection or null if
|
|
|
|
- * not connected
|
|
|
|
- */
|
|
|
|
- SocketAddress getRemoteSocketAddress() {
|
|
|
|
- // a lot could go wrong here, so rather than put in a bunch of code
|
|
|
|
- // to check for nulls all down the chain let's do it the simple
|
|
|
|
- // yet bulletproof way
|
|
|
|
- try {
|
|
|
|
- return ((SocketChannel)sendThread.sockKey.channel())
|
|
|
|
- .socket().getRemoteSocketAddress();
|
|
|
|
- } catch (NullPointerException e) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Returns the local address to which the socket is bound.
|
|
|
|
- * @return ip address of the remote side of the connection or null if
|
|
|
|
- * not connected
|
|
|
|
- */
|
|
|
|
- SocketAddress getLocalSocketAddress() {
|
|
|
|
- // a lot could go wrong here, so rather than put in a bunch of code
|
|
|
|
- // to check for nulls all down the chain let's do it the simple
|
|
|
|
- // yet bulletproof way
|
|
|
|
- try {
|
|
|
|
- return ((SocketChannel)sendThread.sockKey.channel())
|
|
|
|
- .socket().getLocalSocketAddress();
|
|
|
|
- } catch (NullPointerException e) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* This class allows us to pass the headers and the relevant records around.
|
|
* This class allows us to pass the headers and the relevant records around.
|
|
@@ -319,13 +278,14 @@ public class ClientCnxn {
|
|
* @param zooKeeper
|
|
* @param zooKeeper
|
|
* the zookeeper object that this connection is related to.
|
|
* the zookeeper object that this connection is related to.
|
|
* @param watcher watcher for this connection
|
|
* @param watcher watcher for this connection
|
|
|
|
+ * @param clientCnxnSocket
|
|
|
|
+ * the socket implementation used (e.g. NIO/Netty)
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
- ClientWatchManager watcher)
|
|
|
|
- throws IOException
|
|
|
|
- {
|
|
|
|
- this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
|
|
|
|
|
|
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket)
|
|
|
|
+ throws IOException {
|
|
|
|
+ this(hosts, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -340,14 +300,15 @@ public class ClientCnxn {
|
|
* @param zooKeeper
|
|
* @param zooKeeper
|
|
* the zookeeper object that this connection is related to.
|
|
* the zookeeper object that this connection is related to.
|
|
* @param watcher watcher for this connection
|
|
* @param watcher watcher for this connection
|
|
|
|
+ * @param clientCnxnSocket
|
|
|
|
+ * the socket implementation used (e.g. NIO/Netty)
|
|
* @param sessionId session id if re-establishing session
|
|
* @param sessionId session id if re-establishing session
|
|
* @param sessionPasswd session passwd if re-establishing session
|
|
* @param sessionPasswd session passwd if re-establishing session
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
|
|
- ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
|
|
|
|
- throws IOException
|
|
|
|
- {
|
|
|
|
|
|
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
|
|
|
|
+ long sessionId, byte[] sessionPasswd) throws IOException {
|
|
this.zooKeeper = zooKeeper;
|
|
this.zooKeeper = zooKeeper;
|
|
this.watcher = watcher;
|
|
this.watcher = watcher;
|
|
this.sessionId = sessionId;
|
|
this.sessionId = sessionId;
|
|
@@ -389,7 +350,7 @@ public class ClientCnxn {
|
|
connectTimeout = sessionTimeout / hostsList.length;
|
|
connectTimeout = sessionTimeout / hostsList.length;
|
|
readTimeout = sessionTimeout * 2 / 3;
|
|
readTimeout = sessionTimeout * 2 / 3;
|
|
Collections.shuffle(serverAddrs);
|
|
Collections.shuffle(serverAddrs);
|
|
- sendThread = new SendThread();
|
|
|
|
|
|
+ sendThread = new SendThread(clientCnxnSocket);
|
|
eventThread = new EventThread();
|
|
eventThread = new EventThread();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -412,9 +373,10 @@ public class ClientCnxn {
|
|
eventThread.start();
|
|
eventThread.start();
|
|
}
|
|
}
|
|
|
|
|
|
- Object eventOfDeath = new Object();
|
|
|
|
|
|
+ private Object eventOfDeath = new Object();
|
|
|
|
|
|
- final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
|
|
|
|
|
|
+ private final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
|
|
|
|
+ @Override
|
|
public void uncaughtException(Thread t, Throwable e) {
|
|
public void uncaughtException(Thread t, Throwable e) {
|
|
LOG.error("from " + t.getName(), e);
|
|
LOG.error("from " + t.getName(), e);
|
|
}
|
|
}
|
|
@@ -640,7 +602,7 @@ public class ClientCnxn {
|
|
if (p.replyHeader == null) {
|
|
if (p.replyHeader == null) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- switch(zooKeeper.state) {
|
|
|
|
|
|
+ switch (state) {
|
|
case AUTH_FAILED:
|
|
case AUTH_FAILED:
|
|
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
|
|
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
|
|
break;
|
|
break;
|
|
@@ -653,15 +615,16 @@ public class ClientCnxn {
|
|
finishPacket(p);
|
|
finishPacket(p);
|
|
}
|
|
}
|
|
|
|
|
|
- volatile long lastZxid;
|
|
|
|
|
|
+ private volatile long lastZxid;
|
|
|
|
|
|
- private static class EndOfStreamException extends IOException {
|
|
|
|
|
|
+ static class EndOfStreamException extends IOException {
|
|
private static final long serialVersionUID = -5438877188796231422L;
|
|
private static final long serialVersionUID = -5438877188796231422L;
|
|
|
|
|
|
public EndOfStreamException(String msg) {
|
|
public EndOfStreamException(String msg) {
|
|
super(msg);
|
|
super(msg);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
public String toString() {
|
|
public String toString() {
|
|
return "EndOfStreamException: " + getMessage();
|
|
return "EndOfStreamException: " + getMessage();
|
|
}
|
|
}
|
|
@@ -683,75 +646,21 @@ public class ClientCnxn {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static final int packetLen = Integer.getInteger("jute.maxbuffer",
|
|
|
|
+ 4096 * 1024);
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This class services the outgoing request queue and generates the heart
|
|
* This class services the outgoing request queue and generates the heart
|
|
* beats. It also spawns the ReadThread.
|
|
* beats. It also spawns the ReadThread.
|
|
*/
|
|
*/
|
|
class SendThread extends Thread {
|
|
class SendThread extends Thread {
|
|
- SelectionKey sockKey;
|
|
|
|
-
|
|
|
|
- final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
|
|
|
|
-
|
|
|
|
- ByteBuffer incomingBuffer = lenBuffer;
|
|
|
|
-
|
|
|
|
- boolean initialized;
|
|
|
|
-
|
|
|
|
private long lastPingSentNs;
|
|
private long lastPingSentNs;
|
|
|
|
+ private final ClientCnxnSocket clientCnxnSocket;
|
|
|
|
+ private int lastConnectIndex = -1;
|
|
|
|
+ private int currentConnectIndex;
|
|
|
|
+ private Random r = new Random(System.nanoTime());
|
|
|
|
|
|
- long sentCount = 0;
|
|
|
|
- long recvCount = 0;
|
|
|
|
-
|
|
|
|
- void readLength() throws IOException {
|
|
|
|
- int len = incomingBuffer.getInt();
|
|
|
|
- if (len < 0 || len >= packetLen) {
|
|
|
|
- throw new IOException("Packet len" + len + " is out of range!");
|
|
|
|
- }
|
|
|
|
- incomingBuffer = ByteBuffer.allocate(len);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- void readConnectResult() throws IOException {
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- StringBuffer buf = new StringBuffer("0x[");
|
|
|
|
- for (byte b : incomingBuffer.array()) {
|
|
|
|
- buf.append(Integer.toHexString(b) + ",");
|
|
|
|
- }
|
|
|
|
- buf.append("]");
|
|
|
|
- LOG.trace("readConnectRestult " + incomingBuffer.remaining()
|
|
|
|
- + " " + buf.toString());
|
|
|
|
- }
|
|
|
|
- ByteBufferInputStream bbis = new ByteBufferInputStream(
|
|
|
|
- incomingBuffer);
|
|
|
|
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
|
|
|
|
- ConnectResponse conRsp = new ConnectResponse();
|
|
|
|
- conRsp.deserialize(bbia, "connect");
|
|
|
|
- negotiatedSessionTimeout = conRsp.getTimeOut();
|
|
|
|
- if (negotiatedSessionTimeout <= 0) {
|
|
|
|
- zooKeeper.state = States.CLOSED;
|
|
|
|
-
|
|
|
|
- eventThread.queueEvent(new WatchedEvent(
|
|
|
|
- Watcher.Event.EventType.None,
|
|
|
|
- Watcher.Event.KeeperState.Expired, null));
|
|
|
|
- eventThread.queueEventOfDeath();
|
|
|
|
- throw new SessionExpiredException(
|
|
|
|
- "Unable to reconnect to ZooKeeper service, session 0x"
|
|
|
|
- + Long.toHexString(sessionId) + " has expired");
|
|
|
|
- }
|
|
|
|
- readTimeout = negotiatedSessionTimeout * 2 / 3;
|
|
|
|
- connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
|
|
|
|
- sessionId = conRsp.getSessionId();
|
|
|
|
- sessionPasswd = conRsp.getPasswd();
|
|
|
|
- zooKeeper.state = States.CONNECTED;
|
|
|
|
- LOG.info("Session establishment complete on server "
|
|
|
|
- + ((SocketChannel)sockKey.channel())
|
|
|
|
- .socket().getRemoteSocketAddress()
|
|
|
|
- + ", sessionid = 0x"
|
|
|
|
- + Long.toHexString(sessionId)
|
|
|
|
- + ", negotiated timeout = " + negotiatedSessionTimeout);
|
|
|
|
- eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
|
|
|
|
- Watcher.Event.KeeperState.SyncConnected, null));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- void readResponse() throws IOException {
|
|
|
|
|
|
+ void readResponse(ByteBuffer incomingBuffer) throws IOException {
|
|
ByteBufferInputStream bbis = new ByteBufferInputStream(
|
|
ByteBufferInputStream bbis = new ByteBufferInputStream(
|
|
incomingBuffer);
|
|
incomingBuffer);
|
|
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
|
|
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
|
|
@@ -772,7 +681,7 @@ public class ClientCnxn {
|
|
if (replyHdr.getXid() == -4) {
|
|
if (replyHdr.getXid() == -4) {
|
|
// -4 is the xid for AuthPacket
|
|
// -4 is the xid for AuthPacket
|
|
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
|
|
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
|
|
- zooKeeper.state = States.AUTH_FAILED;
|
|
|
|
|
|
+ state = States.AUTH_FAILED;
|
|
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
|
|
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
|
|
Watcher.Event.KeeperState.AuthFailed, null) );
|
|
Watcher.Event.KeeperState.AuthFailed, null) );
|
|
}
|
|
}
|
|
@@ -809,12 +718,12 @@ public class ClientCnxn {
|
|
eventThread.queueEvent( we );
|
|
eventThread.queueEvent( we );
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- if (pendingQueue.size() == 0) {
|
|
|
|
- throw new IOException("Nothing in the queue, but got "
|
|
|
|
- + replyHdr.getXid());
|
|
|
|
- }
|
|
|
|
- Packet packet = null;
|
|
|
|
|
|
+ Packet packet;
|
|
synchronized (pendingQueue) {
|
|
synchronized (pendingQueue) {
|
|
|
|
+ if (pendingQueue.size() == 0) {
|
|
|
|
+ throw new IOException("Nothing in the queue, but got "
|
|
|
|
+ + replyHdr.getXid());
|
|
|
|
+ }
|
|
packet = pendingQueue.remove();
|
|
packet = pendingQueue.remove();
|
|
}
|
|
}
|
|
/*
|
|
/*
|
|
@@ -825,9 +734,13 @@ public class ClientCnxn {
|
|
if (packet.header.getXid() != replyHdr.getXid()) {
|
|
if (packet.header.getXid() != replyHdr.getXid()) {
|
|
packet.replyHeader.setErr(
|
|
packet.replyHeader.setErr(
|
|
KeeperException.Code.CONNECTIONLOSS.intValue());
|
|
KeeperException.Code.CONNECTIONLOSS.intValue());
|
|
- throw new IOException("Xid out of order. Got "
|
|
|
|
- + replyHdr.getXid() + " expected "
|
|
|
|
- + packet.header.getXid());
|
|
|
|
|
|
+ throw new IOException("Xid out of order. Got Xid "
|
|
|
|
+ + replyHdr.getXid() + " with err " +
|
|
|
|
+ + replyHdr.getErr() +
|
|
|
|
+ " expected Xid "
|
|
|
|
+ + packet.header.getXid()
|
|
|
|
+ + " for a packet with details: "
|
|
|
|
+ + packet );
|
|
}
|
|
}
|
|
|
|
|
|
packet.replyHeader.setXid(replyHdr.getXid());
|
|
packet.replyHeader.setXid(replyHdr.getXid());
|
|
@@ -849,113 +762,34 @@ public class ClientCnxn {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * @return true if a packet was received
|
|
|
|
- * @throws InterruptedException
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- boolean doIO() throws InterruptedException, IOException {
|
|
|
|
- boolean packetReceived = false;
|
|
|
|
- SocketChannel sock = (SocketChannel) sockKey.channel();
|
|
|
|
- if (sock == null) {
|
|
|
|
- throw new IOException("Socket is null!");
|
|
|
|
- }
|
|
|
|
- if (sockKey.isReadable()) {
|
|
|
|
- int rc = sock.read(incomingBuffer);
|
|
|
|
- if (rc < 0) {
|
|
|
|
- throw new EndOfStreamException(
|
|
|
|
- "Unable to read additional data from server sessionid 0x"
|
|
|
|
- + Long.toHexString(sessionId)
|
|
|
|
- + ", likely server has closed socket");
|
|
|
|
- }
|
|
|
|
- if (!incomingBuffer.hasRemaining()) {
|
|
|
|
- incomingBuffer.flip();
|
|
|
|
- if (incomingBuffer == lenBuffer) {
|
|
|
|
- recvCount++;
|
|
|
|
- readLength();
|
|
|
|
- } else if (!initialized) {
|
|
|
|
- readConnectResult();
|
|
|
|
- enableRead();
|
|
|
|
- if (!outgoingQueue.isEmpty()) {
|
|
|
|
- enableWrite();
|
|
|
|
- }
|
|
|
|
- lenBuffer.clear();
|
|
|
|
- incomingBuffer = lenBuffer;
|
|
|
|
- packetReceived = true;
|
|
|
|
- initialized = true;
|
|
|
|
- } else {
|
|
|
|
- readResponse();
|
|
|
|
- lenBuffer.clear();
|
|
|
|
- incomingBuffer = lenBuffer;
|
|
|
|
- packetReceived = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (sockKey.isWritable()) {
|
|
|
|
- synchronized (outgoingQueue) {
|
|
|
|
- if (!outgoingQueue.isEmpty()) {
|
|
|
|
- ByteBuffer pbb = outgoingQueue.getFirst().bb;
|
|
|
|
- sock.write(pbb);
|
|
|
|
- if (!pbb.hasRemaining()) {
|
|
|
|
- sentCount++;
|
|
|
|
- Packet p = outgoingQueue.removeFirst();
|
|
|
|
- if (p.header != null
|
|
|
|
- && p.header.getType() != OpCode.ping
|
|
|
|
- && p.header.getType() != OpCode.auth) {
|
|
|
|
- pendingQueue.add(p);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (outgoingQueue.isEmpty()) {
|
|
|
|
- disableWrite();
|
|
|
|
- } else {
|
|
|
|
- enableWrite();
|
|
|
|
- }
|
|
|
|
- return packetReceived;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- synchronized private void enableWrite() {
|
|
|
|
- int i = sockKey.interestOps();
|
|
|
|
- if ((i & SelectionKey.OP_WRITE) == 0) {
|
|
|
|
- sockKey.interestOps(i | SelectionKey.OP_WRITE);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- synchronized private void disableWrite() {
|
|
|
|
- int i = sockKey.interestOps();
|
|
|
|
- if ((i & SelectionKey.OP_WRITE) != 0) {
|
|
|
|
- sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- synchronized private void enableRead() {
|
|
|
|
- int i = sockKey.interestOps();
|
|
|
|
- if ((i & SelectionKey.OP_READ) == 0) {
|
|
|
|
- sockKey.interestOps(i | SelectionKey.OP_READ);
|
|
|
|
- }
|
|
|
|
|
|
+ SendThread(ClientCnxnSocket clientCnxnSocket) {
|
|
|
|
+ super(makeThreadName("-SendThread()"));
|
|
|
|
+ state = States.CONNECTING;
|
|
|
|
+ this.clientCnxnSocket = clientCnxnSocket;
|
|
|
|
+ setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
|
|
+ setDaemon(true);
|
|
}
|
|
}
|
|
|
|
|
|
- synchronized private void disableRead() {
|
|
|
|
- int i = sockKey.interestOps();
|
|
|
|
- if ((i & SelectionKey.OP_READ) != 0) {
|
|
|
|
- sockKey.interestOps(i & (~SelectionKey.OP_READ));
|
|
|
|
- }
|
|
|
|
|
|
+ // TODO: can not name this method getState since Thread.getState()
|
|
|
|
+ // already exists
|
|
|
|
+ // It would be cleaner to make class SendThread an implementation of
|
|
|
|
+ // Runnable
|
|
|
|
+ /**
|
|
|
|
+ * Used by ClientCnxnSocket
|
|
|
|
+ *
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ ZooKeeper.States getZkState() {
|
|
|
|
+ return state;
|
|
}
|
|
}
|
|
|
|
|
|
- SendThread() {
|
|
|
|
- super(makeThreadName("-SendThread()"));
|
|
|
|
- zooKeeper.state = States.CONNECTING;
|
|
|
|
- setUncaughtExceptionHandler(uncaughtExceptionHandler);
|
|
|
|
- setDaemon(true);
|
|
|
|
|
|
+ ClientCnxnSocket getClientCnxnSocket() {
|
|
|
|
+ return clientCnxnSocket;
|
|
}
|
|
}
|
|
|
|
|
|
- private void primeConnection(SelectionKey k) throws IOException {
|
|
|
|
|
|
+ void primeConnection() throws IOException {
|
|
LOG.info("Socket connection established to "
|
|
LOG.info("Socket connection established to "
|
|
- + ((SocketChannel)sockKey.channel())
|
|
|
|
- .socket().getRemoteSocketAddress()
|
|
|
|
- + ", initiating session");
|
|
|
|
|
|
+ + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
|
|
lastConnectIndex = currentConnectIndex;
|
|
lastConnectIndex = currentConnectIndex;
|
|
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
|
|
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
|
|
sessionTimeout, sessionId, sessionPasswd);
|
|
sessionTimeout, sessionId, sessionPasswd);
|
|
@@ -970,11 +804,12 @@ public class ClientCnxn {
|
|
synchronized (outgoingQueue) {
|
|
synchronized (outgoingQueue) {
|
|
// We add backwards since we are pushing into the front
|
|
// We add backwards since we are pushing into the front
|
|
// Only send if there's a pending watch
|
|
// Only send if there's a pending watch
|
|
- if (!disableAutoWatchReset &&
|
|
|
|
- (!zooKeeper.getDataWatches().isEmpty()
|
|
|
|
- || !zooKeeper.getExistWatches().isEmpty()
|
|
|
|
- || !zooKeeper.getChildWatches().isEmpty()))
|
|
|
|
- {
|
|
|
|
|
|
+ // TODO: here we have the only remaining use of zooKeeper in
|
|
|
|
+ // this class. It's to be eliminated!
|
|
|
|
+ if (!disableAutoWatchReset
|
|
|
|
+ && (!zooKeeper.getDataWatches().isEmpty()
|
|
|
|
+ || !zooKeeper.getExistWatches().isEmpty() || !zooKeeper
|
|
|
|
+ .getChildWatches().isEmpty())) {
|
|
SetWatches sw = new SetWatches(lastZxid,
|
|
SetWatches sw = new SetWatches(lastZxid,
|
|
zooKeeper.getDataWatches(),
|
|
zooKeeper.getDataWatches(),
|
|
zooKeeper.getExistWatches(),
|
|
zooKeeper.getExistWatches(),
|
|
@@ -995,13 +830,10 @@ public class ClientCnxn {
|
|
outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
|
|
outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
|
|
null)));
|
|
null)));
|
|
}
|
|
}
|
|
- synchronized (this) {
|
|
|
|
- k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
|
|
|
|
- }
|
|
|
|
|
|
+ clientCnxnSocket.enableReadWriteOnly();
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Session establishment request sent on "
|
|
LOG.debug("Session establishment request sent on "
|
|
- + ((SocketChannel)sockKey.channel())
|
|
|
|
- .socket().getRemoteSocketAddress());
|
|
|
|
|
|
+ + clientCnxnSocket.getRemoteSocketAddress());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1011,12 +843,6 @@ public class ClientCnxn {
|
|
queuePacket(h, null, null, null, null, null, null, null, null);
|
|
queuePacket(h, null, null, null, null, null, null, null, null);
|
|
}
|
|
}
|
|
|
|
|
|
- int lastConnectIndex = -1;
|
|
|
|
-
|
|
|
|
- int currentConnectIndex;
|
|
|
|
-
|
|
|
|
- Random r = new Random(System.nanoTime());
|
|
|
|
-
|
|
|
|
private void startConnect() throws IOException {
|
|
private void startConnect() throws IOException {
|
|
if (lastConnectIndex == -1) {
|
|
if (lastConnectIndex == -1) {
|
|
// We don't want to delay the first try at a connect, so we
|
|
// We don't want to delay the first try at a connect, so we
|
|
@@ -1037,7 +863,7 @@ public class ClientCnxn {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- zooKeeper.state = States.CONNECTING;
|
|
|
|
|
|
+ state = States.CONNECTING;
|
|
currentConnectIndex = nextAddrToTry;
|
|
currentConnectIndex = nextAddrToTry;
|
|
InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
|
|
InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
|
|
nextAddrToTry++;
|
|
nextAddrToTry++;
|
|
@@ -1045,24 +871,11 @@ public class ClientCnxn {
|
|
nextAddrToTry = 0;
|
|
nextAddrToTry = 0;
|
|
}
|
|
}
|
|
LOG.info("Opening socket connection to server " + addr);
|
|
LOG.info("Opening socket connection to server " + addr);
|
|
- SocketChannel sock;
|
|
|
|
- sock = SocketChannel.open();
|
|
|
|
- sock.configureBlocking(false);
|
|
|
|
- sock.socket().setSoLinger(false, -1);
|
|
|
|
- sock.socket().setTcpNoDelay(true);
|
|
|
|
|
|
+
|
|
setName(getName().replaceAll("\\(.*\\)",
|
|
setName(getName().replaceAll("\\(.*\\)",
|
|
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
|
|
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
|
|
- sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
|
|
|
|
- if (sock.connect(addr)) {
|
|
|
|
- primeConnection(sockKey);
|
|
|
|
- }
|
|
|
|
- initialized = false;
|
|
|
|
|
|
|
|
- /*
|
|
|
|
- * Reset incomingBuffer
|
|
|
|
- */
|
|
|
|
- lenBuffer.clear();
|
|
|
|
- incomingBuffer = lenBuffer;
|
|
|
|
|
|
+ clientCnxnSocket.connect(addr);
|
|
}
|
|
}
|
|
|
|
|
|
private static final String RETRY_CONN_MSG =
|
|
private static final String RETRY_CONN_MSG =
|
|
@@ -1070,39 +883,38 @@ public class ClientCnxn {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
- long now = System.currentTimeMillis();
|
|
|
|
- long lastHeard = now;
|
|
|
|
- long lastSend = now;
|
|
|
|
- while (zooKeeper.state.isAlive()) {
|
|
|
|
|
|
+ clientCnxnSocket.introduce(this,sessionId);
|
|
|
|
+ clientCnxnSocket.updateNow();
|
|
|
|
+ clientCnxnSocket.updateLastSendAndHeard();
|
|
|
|
+ while (state.isAlive()) {
|
|
try {
|
|
try {
|
|
- if (sockKey == null) {
|
|
|
|
|
|
+ if (!clientCnxnSocket.isConnected()) {
|
|
// don't re-establish connection if we are closing
|
|
// don't re-establish connection if we are closing
|
|
if (closing) {
|
|
if (closing) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
startConnect();
|
|
startConnect();
|
|
- lastSend = now;
|
|
|
|
- lastHeard = now;
|
|
|
|
|
|
+ clientCnxnSocket.updateLastSendAndHeard();
|
|
}
|
|
}
|
|
- int idleRecv = (int) (now - lastHeard);
|
|
|
|
- int idleSend = (int) (now - lastSend);
|
|
|
|
- int to = readTimeout - idleRecv;
|
|
|
|
- if (zooKeeper.state != States.CONNECTED) {
|
|
|
|
- to = connectTimeout - idleRecv;
|
|
|
|
|
|
+
|
|
|
|
+ int to = readTimeout - clientCnxnSocket.getIdleRecv();
|
|
|
|
+ if (state != States.CONNECTED) {
|
|
|
|
+ to = connectTimeout - clientCnxnSocket.getIdleRecv();
|
|
}
|
|
}
|
|
if (to <= 0) {
|
|
if (to <= 0) {
|
|
throw new SessionTimeoutException(
|
|
throw new SessionTimeoutException(
|
|
"Client session timed out, have not heard from server in "
|
|
"Client session timed out, have not heard from server in "
|
|
- + idleRecv + "ms"
|
|
|
|
- + " for sessionid 0x"
|
|
|
|
- + Long.toHexString(sessionId));
|
|
|
|
|
|
+ + clientCnxnSocket.getIdleRecv() + "ms"
|
|
|
|
+ + " for sessionid 0x"
|
|
|
|
+ + Long.toHexString(sessionId));
|
|
}
|
|
}
|
|
- if (zooKeeper.state == States.CONNECTED) {
|
|
|
|
- int timeToNextPing = readTimeout/2 - idleSend;
|
|
|
|
|
|
+ if (state == States.CONNECTED) {
|
|
|
|
+ int timeToNextPing = readTimeout / 2
|
|
|
|
+ - clientCnxnSocket.getIdleSend();
|
|
if (timeToNextPing <= 0) {
|
|
if (timeToNextPing <= 0) {
|
|
sendPing();
|
|
sendPing();
|
|
- lastSend = now;
|
|
|
|
- enableWrite();
|
|
|
|
|
|
+ clientCnxnSocket.updateLastSend();
|
|
|
|
+ clientCnxnSocket.enableWrite();
|
|
} else {
|
|
} else {
|
|
if (timeToNextPing < to) {
|
|
if (timeToNextPing < to) {
|
|
to = timeToNextPing;
|
|
to = timeToNextPing;
|
|
@@ -1110,42 +922,8 @@ public class ClientCnxn {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- selector.select(to);
|
|
|
|
- Set<SelectionKey> selected;
|
|
|
|
- synchronized (this) {
|
|
|
|
- selected = selector.selectedKeys();
|
|
|
|
- }
|
|
|
|
- // Everything below and until we get back to the select is
|
|
|
|
- // non blocking, so time is effectively a constant. That is
|
|
|
|
- // Why we just have to do this once, here
|
|
|
|
- now = System.currentTimeMillis();
|
|
|
|
- for (SelectionKey k : selected) {
|
|
|
|
- SocketChannel sc = ((SocketChannel) k.channel());
|
|
|
|
- if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
|
|
|
|
- if (sc.finishConnect()) {
|
|
|
|
- lastHeard = now;
|
|
|
|
- lastSend = now;
|
|
|
|
- primeConnection(k);
|
|
|
|
- }
|
|
|
|
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
|
|
|
|
- if (outgoingQueue.size() > 0) {
|
|
|
|
- // We have something to send so it's the same
|
|
|
|
- // as if we do the send now.
|
|
|
|
- lastSend = now;
|
|
|
|
- }
|
|
|
|
- if (doIO()) {
|
|
|
|
- lastHeard = now;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (zooKeeper.state == States.CONNECTED) {
|
|
|
|
- if (outgoingQueue.size() > 0) {
|
|
|
|
- enableWrite();
|
|
|
|
- } else {
|
|
|
|
- disableWrite();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- selected.clear();
|
|
|
|
|
|
+ clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
|
|
|
|
+
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
if (closing) {
|
|
if (closing) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -1164,92 +942,38 @@ public class ClientCnxn {
|
|
} else if (e instanceof EndOfStreamException) {
|
|
} else if (e instanceof EndOfStreamException) {
|
|
LOG.info(e.getMessage() + RETRY_CONN_MSG);
|
|
LOG.info(e.getMessage() + RETRY_CONN_MSG);
|
|
} else {
|
|
} else {
|
|
- LOG.warn("Session 0x"
|
|
|
|
- + Long.toHexString(getSessionId())
|
|
|
|
- + " for server "
|
|
|
|
- + ((SocketChannel)sockKey.channel())
|
|
|
|
- .socket().getRemoteSocketAddress()
|
|
|
|
- + ", unexpected error"
|
|
|
|
- + RETRY_CONN_MSG,
|
|
|
|
- e);
|
|
|
|
|
|
+ LOG.warn(
|
|
|
|
+ "Session 0x"
|
|
|
|
+ + Long.toHexString(getSessionId())
|
|
|
|
+ + " for server "
|
|
|
|
+ + clientCnxnSocket.getRemoteSocketAddress()
|
|
|
|
+ + ", unexpected error"
|
|
|
|
+ + RETRY_CONN_MSG, e);
|
|
}
|
|
}
|
|
cleanup();
|
|
cleanup();
|
|
- if (zooKeeper.state.isAlive()) {
|
|
|
|
|
|
+ if (state.isAlive()) {
|
|
eventThread.queueEvent(new WatchedEvent(
|
|
eventThread.queueEvent(new WatchedEvent(
|
|
Event.EventType.None,
|
|
Event.EventType.None,
|
|
Event.KeeperState.Disconnected,
|
|
Event.KeeperState.Disconnected,
|
|
null));
|
|
null));
|
|
}
|
|
}
|
|
-
|
|
|
|
- now = System.currentTimeMillis();
|
|
|
|
- lastHeard = now;
|
|
|
|
- lastSend = now;
|
|
|
|
|
|
+ clientCnxnSocket.updateNow();
|
|
|
|
+ clientCnxnSocket.updateLastSendAndHeard();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
cleanup();
|
|
cleanup();
|
|
- try {
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("Doing client selector close");
|
|
|
|
- }
|
|
|
|
- selector.close();
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("Closed client selector");
|
|
|
|
- }
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Ignoring exception during selector close", e);
|
|
|
|
- }
|
|
|
|
- if (zooKeeper.state.isAlive()) {
|
|
|
|
- eventThread.queueEvent(new WatchedEvent(
|
|
|
|
- Event.EventType.None,
|
|
|
|
- Event.KeeperState.Disconnected,
|
|
|
|
- null));
|
|
|
|
|
|
+ clientCnxnSocket.close();
|
|
|
|
+ if (state.isAlive()) {
|
|
|
|
+ eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
|
|
|
|
+ Event.KeeperState.Disconnected, null));
|
|
}
|
|
}
|
|
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
|
|
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
|
|
"SendThread exitedloop.");
|
|
"SendThread exitedloop.");
|
|
}
|
|
}
|
|
|
|
|
|
private void cleanup() {
|
|
private void cleanup() {
|
|
- if (sockKey != null) {
|
|
|
|
- SocketChannel sock = (SocketChannel) sockKey.channel();
|
|
|
|
- sockKey.cancel();
|
|
|
|
- try {
|
|
|
|
- sock.socket().shutdownInput();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Ignoring exception during shutdown input", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- sock.socket().shutdownOutput();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Ignoring exception during shutdown output", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- sock.socket().close();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Ignoring exception during socket close", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- sock.close();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Ignoring exception during channel close", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("SendThread interrupted during sleep, ignoring");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- sockKey = null;
|
|
|
|
|
|
+ clientCnxnSocket.cleanup();
|
|
synchronized (pendingQueue) {
|
|
synchronized (pendingQueue) {
|
|
for (Packet p : pendingQueue) {
|
|
for (Packet p : pendingQueue) {
|
|
conLossPacket(p);
|
|
conLossPacket(p);
|
|
@@ -1264,11 +988,50 @@ public class ClientCnxn {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void close() {
|
|
|
|
- synchronized (this) {
|
|
|
|
- zooKeeper.state = States.CLOSED;
|
|
|
|
- selector.wakeup();
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Callback invoked by the ClientCnxnSocket once a connection has been
|
|
|
|
+ * established.
|
|
|
|
+ *
|
|
|
|
+ * @param _negotiatedSessionTimeout
|
|
|
|
+ * @param _sessionId
|
|
|
|
+ * @param _sessionPasswd
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ void onConnected(int _negotiatedSessionTimeout, long _sessionId,
|
|
|
|
+ byte[] _sessionPasswd) throws IOException {
|
|
|
|
+ negotiatedSessionTimeout = _negotiatedSessionTimeout;
|
|
|
|
+ if (negotiatedSessionTimeout <= 0) {
|
|
|
|
+ state = States.CLOSED;
|
|
|
|
+
|
|
|
|
+ eventThread.queueEvent(new WatchedEvent(
|
|
|
|
+ Watcher.Event.EventType.None,
|
|
|
|
+ Watcher.Event.KeeperState.Expired, null));
|
|
|
|
+ eventThread.queueEventOfDeath();
|
|
|
|
+ throw new SessionExpiredException(
|
|
|
|
+ "Unable to reconnect to ZooKeeper service, session 0x"
|
|
|
|
+ + Long.toHexString(sessionId) + " has expired");
|
|
}
|
|
}
|
|
|
|
+ readTimeout = negotiatedSessionTimeout * 2 / 3;
|
|
|
|
+ connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
|
|
|
|
+ sessionId = _sessionId;
|
|
|
|
+ sessionPasswd = _sessionPasswd;
|
|
|
|
+ state = States.CONNECTED;
|
|
|
|
+ LOG.info("Session establishment complete on server "
|
|
|
|
+ + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x"
|
|
|
|
+ + Long.toHexString(sessionId) + ", negotiated timeout = "
|
|
|
|
+ + negotiatedSessionTimeout);
|
|
|
|
+ eventThread.queueEvent(new WatchedEvent(
|
|
|
|
+ Watcher.Event.EventType.None,
|
|
|
|
+ Watcher.Event.KeeperState.SyncConnected, null));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void close() {
|
|
|
|
+ state = States.CLOSED;
|
|
|
|
+ clientCnxnSocket.wakeupCnxn();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void testableCloseSocket() throws IOException {
|
|
|
|
+ clientCnxnSocket.testableCloseSocket();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1314,6 +1077,8 @@ public class ClientCnxn {
|
|
|
|
|
|
private int xid = 1;
|
|
private int xid = 1;
|
|
|
|
|
|
|
|
+ private volatile States state;
|
|
|
|
+
|
|
synchronized private int getXid() {
|
|
synchronized private int getXid() {
|
|
return xid++;
|
|
return xid++;
|
|
}
|
|
}
|
|
@@ -1347,7 +1112,7 @@ public class ClientCnxn {
|
|
packet.ctx = ctx;
|
|
packet.ctx = ctx;
|
|
packet.clientPath = clientPath;
|
|
packet.clientPath = clientPath;
|
|
packet.serverPath = serverPath;
|
|
packet.serverPath = serverPath;
|
|
- if (!zooKeeper.state.isAlive() || closing) {
|
|
|
|
|
|
+ if (!state.isAlive() || closing) {
|
|
conLossPacket(packet);
|
|
conLossPacket(packet);
|
|
} else {
|
|
} else {
|
|
// If the client is asking to close the session then
|
|
// If the client is asking to close the session then
|
|
@@ -1358,14 +1123,12 @@ public class ClientCnxn {
|
|
outgoingQueue.add(packet);
|
|
outgoingQueue.add(packet);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- synchronized (sendThread) {
|
|
|
|
- selector.wakeup();
|
|
|
|
- }
|
|
|
|
|
|
+ sendThread.getClientCnxnSocket().wakeupCnxn();
|
|
return packet;
|
|
return packet;
|
|
}
|
|
}
|
|
|
|
|
|
public void addAuthInfo(String scheme, byte auth[]) {
|
|
public void addAuthInfo(String scheme, byte auth[]) {
|
|
- if (!zooKeeper.state.isAlive()) {
|
|
|
|
|
|
+ if (!state.isAlive()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
authInfo.add(new AuthData(scheme, auth));
|
|
authInfo.add(new AuthData(scheme, auth));
|
|
@@ -1373,4 +1136,8 @@ public class ClientCnxn {
|
|
new AuthPacket(0, scheme, auth), null, null, null, null,
|
|
new AuthPacket(0, scheme, auth), null, null, null, null,
|
|
null, null);
|
|
null, null);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ States getState() {
|
|
|
|
+ return state;
|
|
|
|
+ }
|
|
}
|
|
}
|