|
@@ -64,11 +64,11 @@ import org.apache.zookeeper.proto.GetACLResponse;
|
|
|
import org.apache.zookeeper.proto.GetChildren2Response;
|
|
|
import org.apache.zookeeper.proto.GetChildrenResponse;
|
|
|
import org.apache.zookeeper.proto.GetDataResponse;
|
|
|
+import org.apache.zookeeper.proto.GetSASLRequest;
|
|
|
import org.apache.zookeeper.proto.ReplyHeader;
|
|
|
import org.apache.zookeeper.proto.RequestHeader;
|
|
|
import org.apache.zookeeper.proto.SetACLResponse;
|
|
|
import org.apache.zookeeper.proto.SetDataResponse;
|
|
|
-import org.apache.zookeeper.proto.SetSASLResponse;
|
|
|
import org.apache.zookeeper.proto.SetWatches;
|
|
|
import org.apache.zookeeper.proto.WatcherEvent;
|
|
|
import org.apache.zookeeper.server.ByteBufferInputStream;
|
|
@@ -251,6 +251,8 @@ public class ClientCnxn {
|
|
|
|
|
|
WatchRegistration watchRegistration;
|
|
|
|
|
|
+ public boolean readOnly;
|
|
|
+
|
|
|
/** Convenience ctor */
|
|
|
Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
|
|
|
Record request, Record response,
|
|
@@ -267,7 +269,11 @@ public class ClientCnxn {
|
|
|
this.replyHeader = replyHeader;
|
|
|
this.request = request;
|
|
|
this.response = response;
|
|
|
+ this.readOnly = readOnly;
|
|
|
+ this.watchRegistration = watchRegistration;
|
|
|
+ }
|
|
|
|
|
|
+ public void createBB() {
|
|
|
try {
|
|
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
@@ -289,8 +295,6 @@ public class ClientCnxn {
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Ignoring unexpected exception", e);
|
|
|
}
|
|
|
-
|
|
|
- this.watchRegistration = watchRegistration;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -380,12 +384,6 @@ public class ClientCnxn {
|
|
|
|
|
|
}
|
|
|
|
|
|
- // used by ZooKeeperSaslClient.queueSaslPacket().
|
|
|
- public void queuePacket(RequestHeader h, ReplyHeader r, Record request,
|
|
|
- Record response, AsyncCallback cb) {
|
|
|
- queuePacket(h,r,request,response, cb, null, null, this, null);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* tests use this to check on reset of watches
|
|
|
* @return if the auto reset of watches are disabled
|
|
@@ -553,17 +551,6 @@ public class ClientCnxn {
|
|
|
} else {
|
|
|
cb.processResult(rc, clientPath, p.ctx, null);
|
|
|
}
|
|
|
- } else if (p.cb instanceof ZooKeeperSaslClient.ServerSaslResponseCallback) {
|
|
|
- ZooKeeperSaslClient.ServerSaslResponseCallback cb = (ZooKeeperSaslClient.ServerSaslResponseCallback) p.cb;
|
|
|
- SetSASLResponse rsp = (SetSASLResponse) p.response;
|
|
|
- // TODO : check rc (== 0, etc) as with other packet types.
|
|
|
- cb.processResult(rc,null,p.ctx,rsp.getToken(),null);
|
|
|
- ClientCnxn clientCnxn = (ClientCnxn)p.ctx;
|
|
|
- if ((clientCnxn == null) || (clientCnxn.zooKeeperSaslClient == null) ||
|
|
|
- (clientCnxn.zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.FAILED)) {
|
|
|
- queueEvent(new WatchedEvent(EventType.None,
|
|
|
- KeeperState.AuthFailed, null));
|
|
|
- }
|
|
|
} else if (p.response instanceof GetDataResponse) {
|
|
|
DataCallback cb = (DataCallback) p.cb;
|
|
|
GetDataResponse rsp = (GetDataResponse) p.response;
|
|
@@ -777,6 +764,18 @@ public class ClientCnxn {
|
|
|
eventThread.queueEvent( we );
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ // If SASL authentication is currently in progress, construct and
|
|
|
+ // send a response packet immediately, rather than queuing a
|
|
|
+ // response as with other packets.
|
|
|
+ if (clientTunneledAuthenticationInProgress()) {
|
|
|
+ GetSASLRequest request = new GetSASLRequest();
|
|
|
+ request.deserialize(bbia,"token");
|
|
|
+ zooKeeperSaslClient.respondToServer(request.getToken(),
|
|
|
+ ClientCnxn.this);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
Packet packet;
|
|
|
synchronized (pendingQueue) {
|
|
|
if (pendingQueue.size() == 0) {
|
|
@@ -923,6 +922,10 @@ public class ClientCnxn {
|
|
|
|
|
|
private int pingRwTimeout = minPingRwTimeout;
|
|
|
|
|
|
+ // Set to true if and only if constructor of ZooKeeperSaslClient
|
|
|
+ // throws a LoginException: see startConnect() below.
|
|
|
+ private boolean saslLoginFailed = false;
|
|
|
+
|
|
|
private void startConnect() throws IOException {
|
|
|
if(!isFirstConnect){
|
|
|
try {
|
|
@@ -946,11 +949,16 @@ public class ClientCnxn {
|
|
|
try {
|
|
|
zooKeeperSaslClient = new ZooKeeperSaslClient("zookeeper/"+addr.getHostName());
|
|
|
} catch (LoginException e) {
|
|
|
+ // An authentication error occurred when the SASL client tried to initialize:
|
|
|
+ // for Kerberos this means that the client failed to authenticate with the KDC.
|
|
|
+ // This is different from an authentication error that occurs during communication
|
|
|
+ // with the Zookeeper server, which is handled below.
|
|
|
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
|
|
|
- + "SASL authentication, if Zookeeper server allows it.");
|
|
|
+ + "SASL authentication, if Zookeeper server allows it.");
|
|
|
eventThread.queueEvent(new WatchedEvent(
|
|
|
- Watcher.Event.EventType.None,
|
|
|
- Watcher.Event.KeeperState.AuthFailed, null));
|
|
|
+ Watcher.Event.EventType.None,
|
|
|
+ Watcher.Event.KeeperState.AuthFailed, null));
|
|
|
+ saslLoginFailed = true;
|
|
|
}
|
|
|
logStartConnect(addr);
|
|
|
|
|
@@ -987,21 +995,35 @@ public class ClientCnxn {
|
|
|
}
|
|
|
|
|
|
if (state.isConnected()) {
|
|
|
- if ((zooKeeperSaslClient != null) && (zooKeeperSaslClient.isFailed() != true) && (zooKeeperSaslClient.isComplete() != true)) {
|
|
|
- try {
|
|
|
- zooKeeperSaslClient.initialize(ClientCnxn.this);
|
|
|
+ // determine whether we need to send an AuthFailed event.
|
|
|
+ if (zooKeeperSaslClient != null) {
|
|
|
+ boolean sendAuthEvent = false;
|
|
|
+ if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
|
|
|
+ try {
|
|
|
+ zooKeeperSaslClient.initialize(ClientCnxn.this);
|
|
|
+ } catch (SaslException e) {
|
|
|
+ LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
|
|
|
+ state = States.AUTH_FAILED;
|
|
|
+ sendAuthEvent = true;
|
|
|
+ }
|
|
|
}
|
|
|
- catch (SaslException e) {
|
|
|
- LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
|
|
|
- state = States.AUTH_FAILED;
|
|
|
- eventThread.queueEvent(new WatchedEvent(
|
|
|
- Watcher.Event.EventType.None,
|
|
|
- KeeperState.AuthFailed,null));
|
|
|
+ KeeperState authState = zooKeeperSaslClient.getKeeperState();
|
|
|
+ if (authState != null) {
|
|
|
+ if (authState == KeeperState.AuthFailed) {
|
|
|
+ // An authentication error occurred during authentication with the Zookeeper Server.
|
|
|
+ state = States.AUTH_FAILED;
|
|
|
+ sendAuthEvent = true;
|
|
|
+ } else {
|
|
|
+ if (authState == KeeperState.SaslAuthenticated) {
|
|
|
+ sendAuthEvent = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- if (zooKeeperSaslClient.readyToSendSaslAuthEvent()) {
|
|
|
+
|
|
|
+ if (sendAuthEvent == true) {
|
|
|
eventThread.queueEvent(new WatchedEvent(
|
|
|
- Watcher.Event.EventType.None,
|
|
|
- Watcher.Event.KeeperState.SaslAuthenticated, null));
|
|
|
+ Watcher.Event.EventType.None,
|
|
|
+ authState,null));
|
|
|
}
|
|
|
}
|
|
|
to = readTimeout - clientCnxnSocket.getIdleRecv();
|
|
@@ -1022,7 +1044,6 @@ public class ClientCnxn {
|
|
|
if (timeToNextPing <= 0) {
|
|
|
sendPing();
|
|
|
clientCnxnSocket.updateLastSend();
|
|
|
- clientCnxnSocket.enableWrite();
|
|
|
} else {
|
|
|
if (timeToNextPing < to) {
|
|
|
to = timeToNextPing;
|
|
@@ -1044,8 +1065,7 @@ public class ClientCnxn {
|
|
|
to = Math.min(to, pingRwTimeout - idlePingRwServer);
|
|
|
}
|
|
|
|
|
|
- clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
|
|
|
-
|
|
|
+ clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
|
|
|
} catch (Throwable e) {
|
|
|
if (closing) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -1204,6 +1224,26 @@ public class ClientCnxn {
|
|
|
void testableCloseSocket() throws IOException {
|
|
|
clientCnxnSocket.testableCloseSocket();
|
|
|
}
|
|
|
+
|
|
|
+ public boolean clientTunneledAuthenticationInProgress() {
|
|
|
+ // 1. SASL login failed.
|
|
|
+ if (saslLoginFailed == true) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. SendThread has not created the authenticating object yet,
|
|
|
+ // therefore authentication is (at the earliest stage of being) in progress.
|
|
|
+ if (zooKeeperSaslClient == null) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. authenticating object exists, so ask it for its progress.
|
|
|
+ return zooKeeperSaslClient.clientTunneledAuthenticationInProgress();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendPacket(Packet p) throws IOException {
|
|
|
+ clientCnxnSocket.sendPacket(p);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1250,7 +1290,11 @@ public class ClientCnxn {
|
|
|
|
|
|
private volatile States state = States.NOT_CONNECTED;
|
|
|
|
|
|
- synchronized private int getXid() {
|
|
|
+ /*
|
|
|
+ * getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to
|
|
|
+ * the server. Thus, getXid() must be public.
|
|
|
+ */
|
|
|
+ synchronized public int getXid() {
|
|
|
return xid++;
|
|
|
}
|
|
|
|
|
@@ -1268,15 +1312,37 @@ public class ClientCnxn {
|
|
|
return r;
|
|
|
}
|
|
|
|
|
|
+ public void enableWrite() {
|
|
|
+ sendThread.getClientCnxnSocket().enableWrite();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)
|
|
|
+ throws IOException {
|
|
|
+ // Generate Xid now because it will be sent immediately,
|
|
|
+ // by call to sendThread.sendPacket() below.
|
|
|
+ int xid = getXid();
|
|
|
+ RequestHeader h = new RequestHeader();
|
|
|
+ h.setXid(xid);
|
|
|
+ h.setType(opCode);
|
|
|
+
|
|
|
+ ReplyHeader r = new ReplyHeader();
|
|
|
+ r.setXid(xid);
|
|
|
+
|
|
|
+ Packet p = new Packet(h, r, request, response, null, false);
|
|
|
+ p.cb = cb;
|
|
|
+ sendThread.sendPacket(p);
|
|
|
+ }
|
|
|
+
|
|
|
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
|
|
|
Record response, AsyncCallback cb, String clientPath,
|
|
|
String serverPath, Object ctx, WatchRegistration watchRegistration)
|
|
|
{
|
|
|
Packet packet = null;
|
|
|
+
|
|
|
+ // Note that we do not generate the Xid for the packet yet. It is
|
|
|
+ // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
|
|
|
+ // where the packet is actually sent.
|
|
|
synchronized (outgoingQueue) {
|
|
|
- if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
|
|
|
- h.setXid(getXid());
|
|
|
- }
|
|
|
packet = new Packet(h, r, request, response, watchRegistration);
|
|
|
packet.cb = cb;
|
|
|
packet.ctx = ctx;
|