|
@@ -29,6 +29,10 @@ import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CopyOnWriteArraySet;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.net.ConnectException;
|
|
|
+import java.net.Socket;
|
|
|
|
|
|
import org.apache.jute.BinaryInputArchive;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
@@ -136,6 +140,14 @@ public class ClientCnxn {
|
|
|
|
|
|
private byte sessionPasswd[] = new byte[16];
|
|
|
|
|
|
+ /**
|
|
|
+ * If true, the connection is allowed to go to r-o mode. This field's value
|
|
|
+ * is sent, besides other data, during session creation handshake. If the
|
|
|
+ * server on the other side of the wire is partitioned it'll accept
|
|
|
+ * read-only clients only.
|
|
|
+ */
|
|
|
+ private boolean readOnly;
|
|
|
+
|
|
|
final String chrootPath;
|
|
|
|
|
|
final SendThread sendThread;
|
|
@@ -155,6 +167,21 @@ public class ClientCnxn {
|
|
|
*/
|
|
|
private final HostProvider hostProvider;
|
|
|
|
|
|
+ /**
|
|
|
+ * Is set to true when a connection to a r/w server is established for the
|
|
|
+ * first time; never changed afterwards.
|
|
|
+ * <p>
|
|
|
+ * Is used to handle situations when client without sessionId connects to a
|
|
|
+ * read-only server. Such client receives "fake" sessionId from read-only
|
|
|
+ * server, but this sessionId is invalid for other servers. So when such
|
|
|
+ * client finds a r/w server, it sends 0 instead of fake sessionId during
|
|
|
+ * connection handshake and establishes new, valid session.
|
|
|
+ * <p>
|
|
|
+ * If this field is false (which implies we haven't seen r/w server before)
|
|
|
+ * then non-zero sessionId is fake, otherwise it is valid.
|
|
|
+ */
|
|
|
+ volatile boolean seenRwServerBefore = false;
|
|
|
+
|
|
|
public long getSessionId() {
|
|
|
return sessionId;
|
|
|
}
|
|
@@ -215,8 +242,18 @@ public class ClientCnxn {
|
|
|
|
|
|
WatchRegistration watchRegistration;
|
|
|
|
|
|
- Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request,
|
|
|
- Record response, WatchRegistration watchRegistration) {
|
|
|
+ /** Convenience ctor */
|
|
|
+ Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
|
|
|
+ Record request, Record response,
|
|
|
+ WatchRegistration watchRegistration) {
|
|
|
+ this(requestHeader, replyHeader, request, response,
|
|
|
+ watchRegistration, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
|
|
|
+ Record request, Record response,
|
|
|
+ WatchRegistration watchRegistration, boolean readOnly) {
|
|
|
+
|
|
|
this.requestHeader = requestHeader;
|
|
|
this.replyHeader = replyHeader;
|
|
|
this.request = request;
|
|
@@ -231,6 +268,8 @@ public class ClientCnxn {
|
|
|
}
|
|
|
if (request instanceof ConnectRequest) {
|
|
|
request.serialize(boa, "connect");
|
|
|
+ // append "am-I-allowed-to-be-readonly" flag
|
|
|
+ boa.writeBool(readOnly, "readOnly");
|
|
|
} else if (request != null) {
|
|
|
request.serialize(boa, "request");
|
|
|
}
|
|
@@ -278,12 +317,16 @@ public class ClientCnxn {
|
|
|
* @param watcher watcher for this connection
|
|
|
* @param clientCnxnSocket
|
|
|
* the socket implementation used (e.g. NIO/Netty)
|
|
|
+ * @param canBeReadOnly
|
|
|
+ * whether the connection is allowed to go to read-only
|
|
|
+ * mode in case of partitioning
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
- ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket)
|
|
|
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
|
|
|
throws IOException {
|
|
|
- this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
|
|
|
+ this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
|
|
|
+ clientCnxnSocket, 0, new byte[16], canBeReadOnly);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -303,11 +346,14 @@ public class ClientCnxn {
|
|
|
* the socket implementation used (e.g. NIO/Netty)
|
|
|
* @param sessionId session id if re-establishing session
|
|
|
* @param sessionPasswd session passwd if re-establishing session
|
|
|
+ * @param canBeReadOnly
|
|
|
+ * whether the connection is allowed to go to read-only
|
|
|
+ * mode in case of partitioning
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
|
|
|
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
|
|
|
- long sessionId, byte[] sessionPasswd) {
|
|
|
+ long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
|
|
|
this.zooKeeper = zooKeeper;
|
|
|
this.watcher = watcher;
|
|
|
this.sessionId = sessionId;
|
|
@@ -318,6 +364,7 @@ public class ClientCnxn {
|
|
|
|
|
|
connectTimeout = sessionTimeout / hostProvider.size();
|
|
|
readTimeout = sessionTimeout * 2 / 3;
|
|
|
+ readOnly = canBeReadOnly;
|
|
|
|
|
|
sendThread = new SendThread(clientCnxnSocket);
|
|
|
eventThread = new EventThread();
|
|
@@ -614,6 +661,14 @@ public class ClientCnxn {
|
|
|
super(msg);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static class RWServerFoundException extends IOException {
|
|
|
+ private static final long serialVersionUID = 90431199887158758L;
|
|
|
+
|
|
|
+ public RWServerFoundException(String msg) {
|
|
|
+ super(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public static final int packetLen = Integer.getInteger("jute.maxbuffer",
|
|
|
4096 * 1024);
|
|
@@ -757,10 +812,12 @@ public class ClientCnxn {
|
|
|
|
|
|
void primeConnection() throws IOException {
|
|
|
LOG.info("Socket connection established to "
|
|
|
- + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
|
|
|
+ + clientCnxnSocket.getRemoteSocketAddress()
|
|
|
+ + ", initiating session");
|
|
|
isFirstConnect = false;
|
|
|
+ long sessId = (seenRwServerBefore) ? sessionId : 0;
|
|
|
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
|
|
|
- sessionTimeout, sessionId, sessionPasswd);
|
|
|
+ sessionTimeout, sessId, sessionPasswd);
|
|
|
synchronized (outgoingQueue) {
|
|
|
// We add backwards since we are pushing into the front
|
|
|
// Only send if there's a pending watch
|
|
@@ -786,8 +843,8 @@ public class ClientCnxn {
|
|
|
OpCode.auth), null, new AuthPacket(0, id.scheme,
|
|
|
id.data), null, null));
|
|
|
}
|
|
|
- outgoingQueue.addFirst((new Packet(null, null, conReq, null,
|
|
|
- null)));
|
|
|
+ outgoingQueue.addFirst(new Packet(null, null, conReq,
|
|
|
+ null, null, readOnly));
|
|
|
}
|
|
|
clientCnxnSocket.enableReadWriteOnly();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -802,16 +859,31 @@ public class ClientCnxn {
|
|
|
queuePacket(h, null, null, null, null, null, null, null, null);
|
|
|
}
|
|
|
|
|
|
+ private InetSocketAddress rwServerAddress = null;
|
|
|
+
|
|
|
+ private final static int minPingRwTimeout = 100;
|
|
|
+
|
|
|
+ private final static int maxPingRwTimeout = 60000;
|
|
|
+
|
|
|
+ private int pingRwTimeout = minPingRwTimeout;
|
|
|
+
|
|
|
private void startConnect() throws IOException {
|
|
|
if(!isFirstConnect){
|
|
|
try {
|
|
|
Thread.sleep(r.nextInt(1000));
|
|
|
- } catch (InterruptedException e1) {
|
|
|
- LOG.warn("Unexpected exception", e1);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Unexpected exception", e);
|
|
|
}
|
|
|
}
|
|
|
state = States.CONNECTING;
|
|
|
- InetSocketAddress addr = hostProvider.next(1000);
|
|
|
+
|
|
|
+ InetSocketAddress addr;
|
|
|
+ if (rwServerAddress != null) {
|
|
|
+ addr = rwServerAddress;
|
|
|
+ rwServerAddress = null;
|
|
|
+ } else {
|
|
|
+ addr = hostProvider.next(1000);
|
|
|
+ }
|
|
|
|
|
|
LOG.info("Opening socket connection to server " + addr);
|
|
|
|
|
@@ -830,6 +902,7 @@ public class ClientCnxn {
|
|
|
clientCnxnSocket.updateNow();
|
|
|
clientCnxnSocket.updateLastSendAndHeard();
|
|
|
int to;
|
|
|
+ long lastPingRwServer = System.currentTimeMillis();
|
|
|
while (state.isAlive()) {
|
|
|
try {
|
|
|
if (!clientCnxnSocket.isConnected()) {
|
|
@@ -841,7 +914,7 @@ public class ClientCnxn {
|
|
|
clientCnxnSocket.updateLastSendAndHeard();
|
|
|
}
|
|
|
|
|
|
- if (state == States.CONNECTED) {
|
|
|
+ if (state.isConnected()) {
|
|
|
to = readTimeout - clientCnxnSocket.getIdleRecv();
|
|
|
} else {
|
|
|
to = connectTimeout - clientCnxnSocket.getIdleRecv();
|
|
@@ -854,7 +927,7 @@ public class ClientCnxn {
|
|
|
+ " for sessionid 0x"
|
|
|
+ Long.toHexString(sessionId));
|
|
|
}
|
|
|
- if (state == States.CONNECTED) {
|
|
|
+ if (state.isConnected()) {
|
|
|
int timeToNextPing = readTimeout / 2
|
|
|
- clientCnxnSocket.getIdleSend();
|
|
|
if (timeToNextPing <= 0) {
|
|
@@ -868,6 +941,20 @@ public class ClientCnxn {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // If we are in read-only mode, seek for read/write server
|
|
|
+ if (state == States.CONNECTEDREADONLY) {
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ int idlePingRwServer = (int) (now - lastPingRwServer);
|
|
|
+ if (idlePingRwServer >= pingRwTimeout) {
|
|
|
+ lastPingRwServer = now;
|
|
|
+ idlePingRwServer = 0;
|
|
|
+ pingRwTimeout =
|
|
|
+ Math.min(2*pingRwTimeout, maxPingRwTimeout);
|
|
|
+ pingRwServer();
|
|
|
+ }
|
|
|
+ to = Math.min(to, pingRwTimeout - idlePingRwServer);
|
|
|
+ }
|
|
|
+
|
|
|
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -887,6 +974,8 @@ public class ClientCnxn {
|
|
|
LOG.info(e.getMessage() + RETRY_CONN_MSG);
|
|
|
} else if (e instanceof EndOfStreamException) {
|
|
|
LOG.info(e.getMessage() + RETRY_CONN_MSG);
|
|
|
+ } else if (e instanceof RWServerFoundException) {
|
|
|
+ LOG.info(e.getMessage());
|
|
|
} else {
|
|
|
LOG.warn(
|
|
|
"Session 0x"
|
|
@@ -918,6 +1007,43 @@ public class ClientCnxn {
|
|
|
"SendThread exitedloop.");
|
|
|
}
|
|
|
|
|
|
+ private void pingRwServer() throws RWServerFoundException {
|
|
|
+ String result = null;
|
|
|
+ InetSocketAddress addr = hostProvider.next(0);
|
|
|
+ LOG.info("Checking server " + addr + " for being r/w." +
|
|
|
+ " Timeout " + pingRwTimeout);
|
|
|
+
|
|
|
+ try {
|
|
|
+ Socket sock = new Socket(addr.getHostName(), addr.getPort());
|
|
|
+ sock.setSoLinger(false, -1);
|
|
|
+ sock.setSoTimeout(1000);
|
|
|
+ sock.setTcpNoDelay(true);
|
|
|
+ sock.getOutputStream().write("isro".getBytes());
|
|
|
+ sock.getOutputStream().flush();
|
|
|
+ sock.shutdownOutput();
|
|
|
+ BufferedReader br = new BufferedReader(
|
|
|
+ new InputStreamReader(sock.getInputStream()));
|
|
|
+ result = br.readLine();
|
|
|
+ sock.close();
|
|
|
+ br.close();
|
|
|
+ } catch (ConnectException e) {
|
|
|
+ // ignore, this just means server is not up
|
|
|
+ } catch (IOException e) {
|
|
|
+ // some unexpected error, warn about it
|
|
|
+ LOG.warn("Exception while seeking for r/w server " +
|
|
|
+ e.getMessage(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if ("rw".equals(result)) {
|
|
|
+ pingRwTimeout = minPingRwTimeout;
|
|
|
+ // save the found address so that it's used during the next
|
|
|
+ // connection attempt
|
|
|
+ rwServerAddress = addr;
|
|
|
+ throw new RWServerFoundException("Majority server found at "
|
|
|
+ + addr.getHostName() + ":" + addr.getPort());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void cleanup() {
|
|
|
clientCnxnSocket.cleanup();
|
|
|
synchronized (pendingQueue) {
|
|
@@ -941,10 +1067,11 @@ public class ClientCnxn {
|
|
|
* @param _negotiatedSessionTimeout
|
|
|
* @param _sessionId
|
|
|
* @param _sessionPasswd
|
|
|
+ * @param isRO
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
|
|
|
- byte[] _sessionPasswd) throws IOException {
|
|
|
+ byte[] _sessionPasswd, boolean isRO) throws IOException {
|
|
|
negotiatedSessionTimeout = _negotiatedSessionTimeout;
|
|
|
if (negotiatedSessionTimeout <= 0) {
|
|
|
state = States.CLOSED;
|
|
@@ -957,19 +1084,27 @@ public class ClientCnxn {
|
|
|
"Unable to reconnect to ZooKeeper service, session 0x"
|
|
|
+ Long.toHexString(sessionId) + " has expired");
|
|
|
}
|
|
|
+ if (!readOnly && isRO) {
|
|
|
+ LOG.error("Read/write client got connected to read-only server");
|
|
|
+ }
|
|
|
readTimeout = negotiatedSessionTimeout * 2 / 3;
|
|
|
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
|
|
|
hostProvider.onConnected();
|
|
|
sessionId = _sessionId;
|
|
|
sessionPasswd = _sessionPasswd;
|
|
|
- state = States.CONNECTED;
|
|
|
+ state = (isRO) ?
|
|
|
+ States.CONNECTEDREADONLY : States.CONNECTED;
|
|
|
+ seenRwServerBefore |= !isRO;
|
|
|
LOG.info("Session establishment complete on server "
|
|
|
- + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x"
|
|
|
- + Long.toHexString(sessionId) + ", negotiated timeout = "
|
|
|
- + negotiatedSessionTimeout);
|
|
|
+ + clientCnxnSocket.getRemoteSocketAddress()
|
|
|
+ + ", sessionid = 0x" + Long.toHexString(sessionId)
|
|
|
+ + ", negotiated timeout = " + negotiatedSessionTimeout
|
|
|
+ + (isRO ? " (READ-ONLY mode)" : ""));
|
|
|
+ KeeperState eventState = (isRO) ?
|
|
|
+ KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
|
|
|
eventThread.queueEvent(new WatchedEvent(
|
|
|
Watcher.Event.EventType.None,
|
|
|
- Watcher.Event.KeeperState.SyncConnected, null));
|
|
|
+ eventState, null));
|
|
|
}
|
|
|
|
|
|
void close() {
|