|
@@ -0,0 +1,416 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.zookeeper;
|
|
|
+
|
|
|
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
|
|
|
+import org.apache.zookeeper.ClientCnxn.Packet;
|
|
|
+import org.jboss.netty.bootstrap.ClientBootstrap;
|
|
|
+import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
+import org.jboss.netty.buffer.ChannelBuffers;
|
|
|
+import org.jboss.netty.channel.Channel;
|
|
|
+import org.jboss.netty.channel.ChannelFactory;
|
|
|
+import org.jboss.netty.channel.ChannelFuture;
|
|
|
+import org.jboss.netty.channel.ChannelFutureListener;
|
|
|
+import org.jboss.netty.channel.ChannelHandlerContext;
|
|
|
+import org.jboss.netty.channel.ChannelPipeline;
|
|
|
+import org.jboss.netty.channel.ChannelPipelineFactory;
|
|
|
+import org.jboss.netty.channel.ChannelStateEvent;
|
|
|
+import org.jboss.netty.channel.Channels;
|
|
|
+import org.jboss.netty.channel.ExceptionEvent;
|
|
|
+import org.jboss.netty.channel.MessageEvent;
|
|
|
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
|
|
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.SocketAddress;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
+
|
|
|
+/**
|
|
|
+ * ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
|
|
|
+ * It's responsible for connecting to server, reading/writing network traffic and
|
|
|
+ * being a layer between network data and higher level packets.
|
|
|
+ */
|
|
|
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
|
|
|
+
|
|
|
+ ChannelFactory channelFactory = new NioClientSocketChannelFactory(
|
|
|
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
|
|
|
+ Channel channel;
|
|
|
+ CountDownLatch firstConnect;
|
|
|
+ ChannelFuture connectFuture;
|
|
|
+ Lock connectLock = new ReentrantLock();
|
|
|
+ AtomicBoolean disconnected = new AtomicBoolean();
|
|
|
+ AtomicBoolean needSasl = new AtomicBoolean();
|
|
|
+ Semaphore waitSasl = new Semaphore(0);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * lifecycles diagram:
|
|
|
+ * <p/>
|
|
|
+ * loop:
|
|
|
+ * - try:
|
|
|
+ * - - !isConnected()
|
|
|
+ * - - - connect()
|
|
|
+ * - - doTransport()
|
|
|
+ * - catch:
|
|
|
+ * - - cleanup()
|
|
|
+ * close()
|
|
|
+ * <p/>
|
|
|
+ * Other none lifecycle methods are in jeopardy getting a null channel
|
|
|
+ * when calling in concurrency. We must handle it.
|
|
|
+ */
|
|
|
+
|
|
|
+ @Override
|
|
|
+ boolean isConnected() {
|
|
|
+ // Assuming that isConnected() is only used to initiate connection,
|
|
|
+ // not used by some other connection status judgement.
|
|
|
+ return channel != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void connect(InetSocketAddress addr) throws IOException {
|
|
|
+ firstConnect = new CountDownLatch(1);
|
|
|
+
|
|
|
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
|
|
|
+
|
|
|
+ bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
|
|
|
+ bootstrap.setOption("soLinger", -1);
|
|
|
+ bootstrap.setOption("tcpNoDelay", true);
|
|
|
+
|
|
|
+ connectFuture = bootstrap.connect(addr);
|
|
|
+ connectFuture.addListener(new ChannelFutureListener() {
|
|
|
+ @Override
|
|
|
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
|
|
+ // this lock guarantees that channel won't be assgined after cleanup().
|
|
|
+ connectLock.lock();
|
|
|
+ try {
|
|
|
+ if (!channelFuture.isSuccess() || connectFuture == null) {
|
|
|
+ LOG.info("future isn't success, cause: {}", channelFuture.getCause());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // setup channel, variables, connection, etc.
|
|
|
+ channel = channelFuture.getChannel();
|
|
|
+
|
|
|
+ disconnected.set(false);
|
|
|
+ initialized = false;
|
|
|
+ lenBuffer.clear();
|
|
|
+ incomingBuffer = lenBuffer;
|
|
|
+
|
|
|
+ sendThread.primeConnection();
|
|
|
+ updateNow();
|
|
|
+ updateLastSendAndHeard();
|
|
|
+
|
|
|
+ if (sendThread.tunnelAuthInProgress()) {
|
|
|
+ waitSasl.drainPermits();
|
|
|
+ needSasl.set(true);
|
|
|
+ sendPrimePacket();
|
|
|
+ } else {
|
|
|
+ needSasl.set(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ // we need to wake up on first connect to avoid timeout.
|
|
|
+ wakeupCnxn();
|
|
|
+ firstConnect.countDown();
|
|
|
+ LOG.info("channel is connected: {}", channelFuture.getChannel());
|
|
|
+ } finally {
|
|
|
+ connectLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void cleanup() {
|
|
|
+ connectLock.lock();
|
|
|
+ try {
|
|
|
+ if (connectFuture != null) {
|
|
|
+ connectFuture.cancel();
|
|
|
+ connectFuture = null;
|
|
|
+ }
|
|
|
+ if (channel != null) {
|
|
|
+ channel.close().awaitUninterruptibly();
|
|
|
+ channel = null;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ connectLock.unlock();
|
|
|
+ }
|
|
|
+ Iterator<Packet> iter = outgoingQueue.iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Packet p = iter.next();
|
|
|
+ if (p == WakeupPacket.getInstance()) {
|
|
|
+ iter.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void close() {
|
|
|
+ channelFactory.releaseExternalResources();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void saslCompleted() {
|
|
|
+ needSasl.set(false);
|
|
|
+ waitSasl.release();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void connectionPrimed() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void packetAdded() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void onClosing() {
|
|
|
+ firstConnect.countDown();
|
|
|
+ wakeupCnxn();
|
|
|
+ LOG.info("channel is told closing");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void wakeupCnxn() {
|
|
|
+ if (needSasl.get()) {
|
|
|
+ waitSasl.release();
|
|
|
+ }
|
|
|
+ outgoingQueue.add(WakeupPacket.getInstance());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void doTransport(int waitTimeOut,
|
|
|
+ List<Packet> pendingQueue,
|
|
|
+ ClientCnxn cnxn)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ try {
|
|
|
+ if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Packet head = null;
|
|
|
+ if (needSasl.get()) {
|
|
|
+ if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // check if being waken up on closing.
|
|
|
+ if (!sendThread.getZkState().isAlive()) {
|
|
|
+ // adding back the patck to notify of failure in conLossPacket().
|
|
|
+ addBack(head);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // channel disconnection happened
|
|
|
+ if (disconnected.get()) {
|
|
|
+ addBack(head);
|
|
|
+ throw new EndOfStreamException("channel for sessionid 0x"
|
|
|
+ + Long.toHexString(sessionId)
|
|
|
+ + " is lost");
|
|
|
+ }
|
|
|
+ if (head != null) {
|
|
|
+ doWrite(pendingQueue, head, cnxn);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ updateNow();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addBack(Packet head) {
|
|
|
+ if (head != null && head != WakeupPacket.getInstance()) {
|
|
|
+ outgoingQueue.addFirst(head);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendPkt(Packet p) {
|
|
|
+ // Assuming the packet will be sent out successfully. Because if it fails,
|
|
|
+ // the channel will close and clean up queues.
|
|
|
+ p.createBB();
|
|
|
+ updateLastSend();
|
|
|
+ sentCount++;
|
|
|
+ channel.write(ChannelBuffers.wrappedBuffer(p.bb));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendPrimePacket() {
|
|
|
+ // assuming the first packet is the priming packet.
|
|
|
+ sendPkt(outgoingQueue.remove());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * doWrite handles writing the packets from outgoingQueue via network to server.
|
|
|
+ */
|
|
|
+ private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
|
|
|
+ updateNow();
|
|
|
+ while (true) {
|
|
|
+ if (p != WakeupPacket.getInstance()) {
|
|
|
+ if ((p.requestHeader != null) &&
|
|
|
+ (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
|
|
|
+ (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
|
|
|
+ p.requestHeader.setXid(cnxn.getXid());
|
|
|
+ synchronized (pendingQueue) {
|
|
|
+ pendingQueue.add(p);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sendPkt(p);
|
|
|
+ }
|
|
|
+ if (outgoingQueue.isEmpty()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ p = outgoingQueue.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void sendPacket(ClientCnxn.Packet p) throws IOException {
|
|
|
+ if (channel == null) {
|
|
|
+ throw new IOException("channel has been closed");
|
|
|
+ }
|
|
|
+ sendPkt(p);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ SocketAddress getRemoteSocketAddress() {
|
|
|
+ Channel copiedChanRef = channel;
|
|
|
+ return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ SocketAddress getLocalSocketAddress() {
|
|
|
+ Channel copiedChanRef = channel;
|
|
|
+ return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void testableCloseSocket() throws IOException {
|
|
|
+ Channel copiedChanRef = channel;
|
|
|
+ if (copiedChanRef != null) {
|
|
|
+ copiedChanRef.disconnect().awaitUninterruptibly();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // *************** <END> CientCnxnSocketNetty </END> ******************
|
|
|
+ private static class WakeupPacket {
|
|
|
+ private static Packet instance = null;
|
|
|
+
|
|
|
+ protected WakeupPacket() {
|
|
|
+ // Exists only to defeat instantiation.
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Packet getInstance() {
|
|
|
+ if (instance == null) {
|
|
|
+ instance = new Packet(null, null, null, null, null);
|
|
|
+ }
|
|
|
+ return instance;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * ZKClientPipelineFactory is the netty pipeline factory for this netty
|
|
|
+ * connection implementation.
|
|
|
+ */
|
|
|
+ private class ZKClientPipelineFactory implements ChannelPipelineFactory {
|
|
|
+ @Override
|
|
|
+ public ChannelPipeline getPipeline() throws Exception {
|
|
|
+ ChannelPipeline pipeline = Channels.pipeline();
|
|
|
+ // add ssl here
|
|
|
+ pipeline.addLast("handler", new ZKClientHandler());
|
|
|
+ return pipeline;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * ZKClientHandler is the netty handler that sits in netty upstream last
|
|
|
+ * place. It mainly handles read traffic and helps synchronize connection state.
|
|
|
+ */
|
|
|
+ private class ZKClientHandler extends SimpleChannelUpstreamHandler {
|
|
|
+ AtomicBoolean channelClosed = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelDisconnected(ChannelHandlerContext ctx,
|
|
|
+ ChannelStateEvent e) throws Exception {
|
|
|
+ LOG.info("channel is disconnected: {}", ctx.getChannel());
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * netty handler has encountered problems. We are cleaning it up and tell outside to close
|
|
|
+ * the channel/connection.
|
|
|
+ */
|
|
|
+ private void cleanup() {
|
|
|
+ if (!channelClosed.compareAndSet(false, true)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ disconnected.set(true);
|
|
|
+ onClosing();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageReceived(ChannelHandlerContext ctx,
|
|
|
+ MessageEvent e) throws Exception {
|
|
|
+ updateNow();
|
|
|
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
|
|
|
+ while (buf.readable()) {
|
|
|
+ if (incomingBuffer.remaining() > buf.readableBytes()) {
|
|
|
+ int newLimit = incomingBuffer.position()
|
|
|
+ + buf.readableBytes();
|
|
|
+ incomingBuffer.limit(newLimit);
|
|
|
+ }
|
|
|
+ buf.readBytes(incomingBuffer);
|
|
|
+ incomingBuffer.limit(incomingBuffer.capacity());
|
|
|
+
|
|
|
+ if (!incomingBuffer.hasRemaining()) {
|
|
|
+ incomingBuffer.flip();
|
|
|
+ if (incomingBuffer == lenBuffer) {
|
|
|
+ recvCount++;
|
|
|
+ readLength();
|
|
|
+ } else if (!initialized) {
|
|
|
+ readConnectResult();
|
|
|
+ lenBuffer.clear();
|
|
|
+ incomingBuffer = lenBuffer;
|
|
|
+ initialized = true;
|
|
|
+ updateLastHeard();
|
|
|
+ } else {
|
|
|
+ sendThread.readResponse(incomingBuffer);
|
|
|
+ lenBuffer.clear();
|
|
|
+ incomingBuffer = lenBuffer;
|
|
|
+ updateLastHeard();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ wakeupCnxn();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx,
|
|
|
+ ExceptionEvent e) throws Exception {
|
|
|
+ LOG.warn("Exception caught: {}", e, e.getCause());
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|