|
@@ -18,15 +18,17 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
|
|
+import java.io.DataInputStream;
|
|
|
|
+import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
|
|
+import java.net.ServerSocket;
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
|
|
+import java.net.SocketException;
|
|
|
|
+import java.nio.BufferUnderflowException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
-import java.nio.channels.ServerSocketChannel;
|
|
|
|
-import java.nio.channels.SocketChannel;
|
|
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
import java.util.Enumeration;
|
|
import java.util.Enumeration;
|
|
-import java.util.Random;
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -141,49 +143,41 @@ public class QuorumCnxManager {
|
|
* @param sid
|
|
* @param sid
|
|
*/
|
|
*/
|
|
public void testInitiateConnection(long sid) throws Exception {
|
|
public void testInitiateConnection(long sid) throws Exception {
|
|
- SocketChannel channel;
|
|
|
|
- if(LOG.isDebugEnabled()){
|
|
|
|
- LOG.debug("Opening channel to server " + sid);
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Opening channel to server " + sid);
|
|
}
|
|
}
|
|
-
|
|
|
|
- channel = SocketChannel.open();
|
|
|
|
- channel.socket().connect(self.getVotingView().get(sid).electionAddr, cnxTO);
|
|
|
|
- channel.socket().setTcpNoDelay(true);
|
|
|
|
- initiateConnection(channel, sid);
|
|
|
|
|
|
+ Socket sock = new Socket();
|
|
|
|
+ setSockOpts(sock);
|
|
|
|
+ sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
|
|
|
|
+ initiateConnection(sock, sid);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* If this server has initiated the connection, then it gives up on the
|
|
* If this server has initiated the connection, then it gives up on the
|
|
* connection if it loses challenge. Otherwise, it keeps the connection.
|
|
* connection if it loses challenge. Otherwise, it keeps the connection.
|
|
*/
|
|
*/
|
|
-
|
|
|
|
- public boolean initiateConnection(SocketChannel s, Long sid) {
|
|
|
|
- try {
|
|
|
|
|
|
+ public boolean initiateConnection(Socket sock, Long sid) {
|
|
|
|
+ DataOutputStream dout = null;
|
|
|
|
+ try {
|
|
// Sending id and challenge
|
|
// Sending id and challenge
|
|
- byte[] msgBytes = new byte[8];
|
|
|
|
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
|
- msgBuffer.putLong(self.getId());
|
|
|
|
- msgBuffer.position(0);
|
|
|
|
- s.write(msgBuffer);
|
|
|
|
|
|
+ dout = new DataOutputStream(sock.getOutputStream());
|
|
|
|
+ dout.writeLong(self.getId());
|
|
|
|
+ dout.flush();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("Exception reading or writing challenge: ", e);
|
|
|
|
|
|
+ LOG.warn("Ignoring exception reading or writing challenge: ", e);
|
|
|
|
+ closeSocket(sock);
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
// If lost the challenge, then drop the new connection
|
|
// If lost the challenge, then drop the new connection
|
|
if (sid > self.getId()) {
|
|
if (sid > self.getId()) {
|
|
- try {
|
|
|
|
- LOG.info("Have smaller server identifier, so dropping the connection: (" +
|
|
|
|
- sid + ", " + self.getId() + ")");
|
|
|
|
- s.socket().close();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Ignoring exception when closing socket or trying to "
|
|
|
|
- + "reopen connection: ", e);
|
|
|
|
- }
|
|
|
|
- // Otherwise proceed with the connection
|
|
|
|
- } else {
|
|
|
|
- SendWorker sw = new SendWorker(s, sid);
|
|
|
|
- RecvWorker rw = new RecvWorker(s, sid);
|
|
|
|
|
|
+ LOG.info("Have smaller server identifier, so dropping the " +
|
|
|
|
+ "connection: (" + sid + ", " + self.getId() + ")");
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ // Otherwise proceed with the connection
|
|
|
|
+ } else {
|
|
|
|
+ SendWorker sw = new SendWorker(sock, sid);
|
|
|
|
+ RecvWorker rw = new RecvWorker(sock, sid);
|
|
sw.setRecv(rw);
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
@@ -215,19 +209,14 @@ public class QuorumCnxManager {
|
|
* possible long value to lose the challenge.
|
|
* possible long value to lose the challenge.
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
- boolean receiveConnection(SocketChannel s) {
|
|
|
|
|
|
+ public boolean receiveConnection(Socket sock) {
|
|
Long sid = null;
|
|
Long sid = null;
|
|
|
|
|
|
try {
|
|
try {
|
|
- byte[] msgBytes = new byte[8];
|
|
|
|
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
|
-
|
|
|
|
- s.read(msgBuffer);
|
|
|
|
- msgBuffer.position(0);
|
|
|
|
-
|
|
|
|
// Read server id
|
|
// Read server id
|
|
- sid = Long.valueOf(msgBuffer.getLong());
|
|
|
|
- if(sid == QuorumPeer.OBSERVER_ID){
|
|
|
|
|
|
+ DataInputStream din = new DataInputStream(sock.getInputStream());
|
|
|
|
+ sid = din.readLong();
|
|
|
|
+ if (sid == QuorumPeer.OBSERVER_ID) {
|
|
/*
|
|
/*
|
|
* Choose identifier at random. We need a value to identify
|
|
* Choose identifier at random. We need a value to identify
|
|
* the connection.
|
|
* the connection.
|
|
@@ -237,38 +226,34 @@ public class QuorumCnxManager {
|
|
LOG.info("Setting arbitrary identifier to observer: " + sid);
|
|
LOG.info("Setting arbitrary identifier to observer: " + sid);
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("Exception reading or writing challenge: "
|
|
|
|
- + e.toString());
|
|
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ LOG.warn("Exception reading or writing challenge: " + e.toString());
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
//If wins the challenge, then close the new connection.
|
|
//If wins the challenge, then close the new connection.
|
|
if (sid < self.getId()) {
|
|
if (sid < self.getId()) {
|
|
- try {
|
|
|
|
- /*
|
|
|
|
- * This replica might still believe that the connection to sid
|
|
|
|
- * is up, so we have to shut down the workers before trying to
|
|
|
|
- * open a new connection.
|
|
|
|
- */
|
|
|
|
- SendWorker sw = senderWorkerMap.get(sid);
|
|
|
|
- if(sw != null)
|
|
|
|
- sw.finish();
|
|
|
|
-
|
|
|
|
- /*
|
|
|
|
- * Now we start a new connection
|
|
|
|
- */
|
|
|
|
- LOG.debug("Create new connection to server: " + sid);
|
|
|
|
- s.socket().close();
|
|
|
|
- connectOne(sid);
|
|
|
|
-
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.info("Error when closing socket or trying to reopen connection: "
|
|
|
|
- + e.toString());
|
|
|
|
|
|
+ /*
|
|
|
|
+ * This replica might still believe that the connection to sid is
|
|
|
|
+ * up, so we have to shut down the workers before trying to open a
|
|
|
|
+ * new connection.
|
|
|
|
+ */
|
|
|
|
+ SendWorker sw = senderWorkerMap.get(sid);
|
|
|
|
+ if (sw != null) {
|
|
|
|
+ sw.finish();
|
|
}
|
|
}
|
|
- //Otherwise start worker threads to receive data.
|
|
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Now we start a new connection
|
|
|
|
+ */
|
|
|
|
+ LOG.debug("Create new connection to server: " + sid);
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ connectOne(sid);
|
|
|
|
+
|
|
|
|
+ // Otherwise start worker threads to receive data.
|
|
} else {
|
|
} else {
|
|
- SendWorker sw = new SendWorker(s, sid);
|
|
|
|
- RecvWorker rw = new RecvWorker(s, sid);
|
|
|
|
|
|
+ SendWorker sw = new SendWorker(sock, sid);
|
|
|
|
+ RecvWorker rw = new RecvWorker(sock, sid);
|
|
sw.setRecv(rw);
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
@@ -306,10 +291,10 @@ public class QuorumCnxManager {
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.warn("Exception when loopbacking", e);
|
|
LOG.warn("Exception when loopbacking", e);
|
|
}
|
|
}
|
|
- /*
|
|
|
|
- * Otherwise send to the corresponding thread to send.
|
|
|
|
- */
|
|
|
|
- } else
|
|
|
|
|
|
+ /*
|
|
|
|
+ * Otherwise send to the corresponding thread to send.
|
|
|
|
+ */
|
|
|
|
+ } else {
|
|
try {
|
|
try {
|
|
/*
|
|
/*
|
|
* Start a new connection if doesn't have one already.
|
|
* Start a new connection if doesn't have one already.
|
|
@@ -330,14 +315,15 @@ public class QuorumCnxManager {
|
|
} else {
|
|
} else {
|
|
LOG.error("No queue for server " + sid);
|
|
LOG.error("No queue for server " + sid);
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
connectOne(sid);
|
|
connectOne(sid);
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.warn("Interrupted while waiting to put message in queue.",
|
|
LOG.warn("Interrupted while waiting to put message in queue.",
|
|
e);
|
|
e);
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -349,31 +335,31 @@ public class QuorumCnxManager {
|
|
synchronized void connectOne(long sid){
|
|
synchronized void connectOne(long sid){
|
|
if (senderWorkerMap.get(sid) == null){
|
|
if (senderWorkerMap.get(sid) == null){
|
|
InetSocketAddress electionAddr;
|
|
InetSocketAddress electionAddr;
|
|
- if(self.quorumPeers.containsKey(sid))
|
|
|
|
- electionAddr =
|
|
|
|
- self.quorumPeers.get(sid).electionAddr;
|
|
|
|
- else{
|
|
|
|
|
|
+ if (self.quorumPeers.containsKey(sid)) {
|
|
|
|
+ electionAddr = self.quorumPeers.get(sid).electionAddr;
|
|
|
|
+ } else {
|
|
LOG.warn("Invalid server id: " + sid);
|
|
LOG.warn("Invalid server id: " + sid);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
- SocketChannel channel;
|
|
|
|
- if(LOG.isDebugEnabled()){
|
|
|
|
- LOG.debug("Opening channel to server " + sid);
|
|
|
|
|
|
+
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Opening channel to server " + sid);
|
|
}
|
|
}
|
|
-
|
|
|
|
- channel = SocketChannel.open();
|
|
|
|
- channel.socket().connect(self.getView().get(sid).electionAddr, cnxTO);
|
|
|
|
- channel.socket().setTcpNoDelay(true);
|
|
|
|
- initiateConnection(channel, sid);
|
|
|
|
|
|
+ Socket sock = new Socket();
|
|
|
|
+ setSockOpts(sock);
|
|
|
|
+ sock.connect(self.getView().get(sid).electionAddr, cnxTO);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Connected to server " + sid);
|
|
|
|
+ }
|
|
|
|
+ initiateConnection(sock, sid);
|
|
} catch (UnresolvedAddressException e) {
|
|
} catch (UnresolvedAddressException e) {
|
|
// Sun doesn't include the address that causes this
|
|
// Sun doesn't include the address that causes this
|
|
// exception to be thrown, also UAE cannot be wrapped cleanly
|
|
// exception to be thrown, also UAE cannot be wrapped cleanly
|
|
// so we log the exception in order to capture this critical
|
|
// so we log the exception in order to capture this critical
|
|
// detail.
|
|
// detail.
|
|
LOG.warn("Cannot open channel to " + sid
|
|
LOG.warn("Cannot open channel to " + sid
|
|
- + " at election address " + electionAddr,
|
|
|
|
- e);
|
|
|
|
|
|
+ + " at election address " + electionAddr, e);
|
|
throw e;
|
|
throw e;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn("Cannot open channel to " + sid
|
|
LOG.warn("Cannot open channel to " + sid
|
|
@@ -407,8 +393,9 @@ public class QuorumCnxManager {
|
|
boolean haveDelivered() {
|
|
boolean haveDelivered() {
|
|
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
|
|
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
|
|
LOG.debug("Queue size: " + queue.size());
|
|
LOG.debug("Queue size: " + queue.size());
|
|
- if (queue.size() == 0)
|
|
|
|
|
|
+ if (queue.size() == 0) {
|
|
return true;
|
|
return true;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
return false;
|
|
return false;
|
|
@@ -428,11 +415,36 @@ public class QuorumCnxManager {
|
|
/**
|
|
/**
|
|
* A soft halt simply finishes workers.
|
|
* A soft halt simply finishes workers.
|
|
*/
|
|
*/
|
|
- public void softHalt(){
|
|
|
|
- for(SendWorker sw: senderWorkerMap.values()){
|
|
|
|
- LOG.debug("Halting sender: " + sw);
|
|
|
|
- sw.finish();
|
|
|
|
- }
|
|
|
|
|
|
+ public void softHalt() {
|
|
|
|
+ for (SendWorker sw : senderWorkerMap.values()) {
|
|
|
|
+ LOG.debug("Halting sender: " + sw);
|
|
|
|
+ sw.finish();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Helper method to set socket options.
|
|
|
|
+ *
|
|
|
|
+ * @param sock
|
|
|
|
+ * Reference to socket
|
|
|
|
+ */
|
|
|
|
+ private void setSockOpts(Socket sock) throws SocketException {
|
|
|
|
+ sock.setTcpNoDelay(true);
|
|
|
|
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Helper method to close a socket.
|
|
|
|
+ *
|
|
|
|
+ * @param sock
|
|
|
|
+ * Reference to socket
|
|
|
|
+ */
|
|
|
|
+ private void closeSocket(Socket sock) {
|
|
|
|
+ try {
|
|
|
|
+ sock.close();
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ LOG.error("Exception while closing", ie);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -440,7 +452,8 @@ public class QuorumCnxManager {
|
|
*/
|
|
*/
|
|
public class Listener extends Thread {
|
|
public class Listener extends Thread {
|
|
|
|
|
|
- volatile ServerSocketChannel ss = null;
|
|
|
|
|
|
+ volatile ServerSocket ss = null;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Sleeps on accept().
|
|
* Sleeps on accept().
|
|
*/
|
|
*/
|
|
@@ -449,37 +462,44 @@ public class QuorumCnxManager {
|
|
int numRetries = 0;
|
|
int numRetries = 0;
|
|
while((!shutdown) && (numRetries < 3)){
|
|
while((!shutdown) && (numRetries < 3)){
|
|
try {
|
|
try {
|
|
- ss = ServerSocketChannel.open();
|
|
|
|
- int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
|
|
|
|
- ss.socket().setReuseAddress(true);
|
|
|
|
|
|
+ ss = new ServerSocket();
|
|
|
|
+ ss.setReuseAddress(true);
|
|
|
|
+ int port = self.quorumPeers.get(self.getId()).electionAddr
|
|
|
|
+ .getPort();
|
|
InetSocketAddress addr = new InetSocketAddress(port);
|
|
InetSocketAddress addr = new InetSocketAddress(port);
|
|
LOG.info("My election bind port: " + addr.toString());
|
|
LOG.info("My election bind port: " + addr.toString());
|
|
- setName(addr.toString());
|
|
|
|
- ss.socket().bind(addr);
|
|
|
|
-
|
|
|
|
|
|
+ setName(self.quorumPeers.get(self.getId()).electionAddr
|
|
|
|
+ .toString());
|
|
|
|
+ ss.bind(addr);
|
|
while (!shutdown) {
|
|
while (!shutdown) {
|
|
- SocketChannel client = ss.accept();
|
|
|
|
- Socket sock = client.socket();
|
|
|
|
- sock.setTcpNoDelay(true);
|
|
|
|
-
|
|
|
|
- LOG.debug("Connection request "
|
|
|
|
- + sock.getRemoteSocketAddress());
|
|
|
|
-
|
|
|
|
- LOG.debug("Connection request: " + self.getId());
|
|
|
|
|
|
+ Socket client = ss.accept();
|
|
|
|
+ setSockOpts(client);
|
|
|
|
+ LOG.info("Received connection request "
|
|
|
|
+ + client.getRemoteSocketAddress());
|
|
receiveConnection(client);
|
|
receiveConnection(client);
|
|
numRetries = 0;
|
|
numRetries = 0;
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Exception while listening", e);
|
|
LOG.error("Exception while listening", e);
|
|
numRetries++;
|
|
numRetries++;
|
|
|
|
+ try {
|
|
|
|
+ ss.close();
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ LOG.error("Error closing server socket", ie);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.error("Interrupted while sleeping. " +
|
|
|
|
+ "Ignoring exception", ie);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.info("Leaving listener");
|
|
LOG.info("Leaving listener");
|
|
- if(!shutdown)
|
|
|
|
- LOG.fatal("As I'm leaving the listener thread, " +
|
|
|
|
- "I won't be able to participate in leader " +
|
|
|
|
- "election any longer: " +
|
|
|
|
- self.quorumPeers.get(self.getId()).electionAddr);
|
|
|
|
|
|
+ if (!shutdown) {
|
|
|
|
+ LOG.fatal("As I'm leaving the listener thread, "
|
|
|
|
+ + "I won't be able to participate in leader "
|
|
|
|
+ + "election any longer: "
|
|
|
|
+ + self.quorumPeers.get(self.getId()).electionAddr);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -505,22 +525,31 @@ public class QuorumCnxManager {
|
|
*/
|
|
*/
|
|
class SendWorker extends Thread {
|
|
class SendWorker extends Thread {
|
|
Long sid;
|
|
Long sid;
|
|
- SocketChannel channel;
|
|
|
|
|
|
+ Socket sock;
|
|
RecvWorker recvWorker;
|
|
RecvWorker recvWorker;
|
|
volatile boolean running = true;
|
|
volatile boolean running = true;
|
|
|
|
+ DataOutputStream dout;
|
|
|
|
|
|
/**
|
|
/**
|
|
* An instance of this thread receives messages to send
|
|
* An instance of this thread receives messages to send
|
|
* through a queue and sends them to the server sid.
|
|
* through a queue and sends them to the server sid.
|
|
*
|
|
*
|
|
- * @param channel SocketChannel
|
|
|
|
- * @param sid Server identifier
|
|
|
|
|
|
+ * @param sock
|
|
|
|
+ * Socket to remote peer
|
|
|
|
+ * @param sid
|
|
|
|
+ * Server identifier of remote peer
|
|
*/
|
|
*/
|
|
- SendWorker(SocketChannel channel, Long sid) {
|
|
|
|
|
|
+ SendWorker(Socket sock, Long sid) {
|
|
this.sid = sid;
|
|
this.sid = sid;
|
|
- this.channel = channel;
|
|
|
|
|
|
+ this.sock = sock;
|
|
recvWorker = null;
|
|
recvWorker = null;
|
|
-
|
|
|
|
|
|
+ try {
|
|
|
|
+ dout = new DataOutputStream(sock.getOutputStream());
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Unable to access socket output stream", e);
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ running = false;
|
|
|
|
+ }
|
|
LOG.debug("Address of remote peer: " + this.sid);
|
|
LOG.debug("Address of remote peer: " + this.sid);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -538,8 +567,8 @@ public class QuorumCnxManager {
|
|
}
|
|
}
|
|
|
|
|
|
synchronized boolean finish() {
|
|
synchronized boolean finish() {
|
|
- if(LOG.isDebugEnabled()){
|
|
|
|
- LOG.debug("Calling finish");
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Calling finish for " + sid);
|
|
}
|
|
}
|
|
|
|
|
|
if(!running){
|
|
if(!running){
|
|
@@ -550,59 +579,59 @@ public class QuorumCnxManager {
|
|
}
|
|
}
|
|
|
|
|
|
running = false;
|
|
running = false;
|
|
-
|
|
|
|
- try{
|
|
|
|
- channel.close();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Exception while closing socket");
|
|
|
|
- }
|
|
|
|
- //channel = null;
|
|
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ // channel = null;
|
|
|
|
|
|
this.interrupt();
|
|
this.interrupt();
|
|
- if (recvWorker != null)
|
|
|
|
|
|
+ if (recvWorker != null) {
|
|
recvWorker.finish();
|
|
recvWorker.finish();
|
|
-
|
|
|
|
- if(LOG.isDebugEnabled()){
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
|
|
LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
|
|
}
|
|
}
|
|
- senderWorkerMap.remove(sid);
|
|
|
|
|
|
+ senderWorkerMap.remove(sid, this);
|
|
return running;
|
|
return running;
|
|
}
|
|
}
|
|
|
|
|
|
synchronized void send(ByteBuffer b) throws IOException {
|
|
synchronized void send(ByteBuffer b) throws IOException {
|
|
- byte[] msgBytes = new byte[b.capacity()
|
|
|
|
- + (Integer.SIZE / 8)];
|
|
|
|
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
|
- msgBuffer.putInt(b.capacity());
|
|
|
|
-
|
|
|
|
- msgBuffer.put(b.array(), 0, b.capacity());
|
|
|
|
- msgBuffer.position(0);
|
|
|
|
- if(channel != null)
|
|
|
|
- channel.write(msgBuffer);
|
|
|
|
- else
|
|
|
|
- throw new IOException("SocketChannel is null");
|
|
|
|
|
|
+ byte[] msgBytes = new byte[b.capacity()];
|
|
|
|
+ try {
|
|
|
|
+ b.position(0);
|
|
|
|
+ b.get(msgBytes);
|
|
|
|
+ } catch (BufferUnderflowException be) {
|
|
|
|
+ LOG.fatal("BufferUnderflowException ", be);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ dout.writeInt(b.capacity());
|
|
|
|
+ dout.write(b.array());
|
|
|
|
+ dout.flush();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
- try{
|
|
|
|
- ByteBuffer b = lastMessageSent.get(sid);
|
|
|
|
- if(b != null) send(b);
|
|
|
|
|
|
+ try {
|
|
|
|
+ ByteBuffer b = lastMessageSent.get(sid);
|
|
|
|
+ if (b != null) {
|
|
|
|
+ send(b);
|
|
|
|
+ }
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Failed to send last message. Shutting down thread.", e);
|
|
LOG.error("Failed to send last message. Shutting down thread.", e);
|
|
this.finish();
|
|
this.finish();
|
|
}
|
|
}
|
|
|
|
|
|
try {
|
|
try {
|
|
- while (running && !shutdown && channel != null) {
|
|
|
|
|
|
+ while (running && !shutdown && sock != null) {
|
|
|
|
|
|
ByteBuffer b = null;
|
|
ByteBuffer b = null;
|
|
try {
|
|
try {
|
|
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
|
|
|
|
- if(bq != null)
|
|
|
|
|
|
+ ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
|
|
|
|
+ .get(sid);
|
|
|
|
+ if (bq != null) {
|
|
b = bq.poll(1000, TimeUnit.MILLISECONDS);
|
|
b = bq.poll(1000, TimeUnit.MILLISECONDS);
|
|
- else {
|
|
|
|
- LOG.error("No queue of incoming messages for server " + sid);
|
|
|
|
|
|
+ } else {
|
|
|
|
+ LOG.error("No queue of incoming messages for " +
|
|
|
|
+ "server " + sid);
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -630,12 +659,22 @@ public class QuorumCnxManager {
|
|
*/
|
|
*/
|
|
class RecvWorker extends Thread {
|
|
class RecvWorker extends Thread {
|
|
Long sid;
|
|
Long sid;
|
|
- SocketChannel channel;
|
|
|
|
|
|
+ Socket sock;
|
|
volatile boolean running = true;
|
|
volatile boolean running = true;
|
|
|
|
+ DataInputStream din;
|
|
|
|
|
|
- RecvWorker(SocketChannel channel, Long sid) {
|
|
|
|
|
|
+ RecvWorker(Socket sock, Long sid) {
|
|
this.sid = sid;
|
|
this.sid = sid;
|
|
- this.channel = channel;
|
|
|
|
|
|
+ this.sock = sock;
|
|
|
|
+ try {
|
|
|
|
+ din = new DataInputStream(sock.getInputStream());
|
|
|
|
+ // OK to wait until socket disconnects while reading.
|
|
|
|
+ sock.setSoTimeout(0);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Error while accessing socket for " + sid, e);
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ running = false;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -660,54 +699,33 @@ public class QuorumCnxManager {
|
|
public void run() {
|
|
public void run() {
|
|
try {
|
|
try {
|
|
byte[] size = new byte[4];
|
|
byte[] size = new byte[4];
|
|
- ByteBuffer msgLength = ByteBuffer.wrap(size);
|
|
|
|
- while (running && !shutdown && channel != null) {
|
|
|
|
|
|
+ while (running && !shutdown && sock != null) {
|
|
/**
|
|
/**
|
|
* Reads the first int to determine the length of the
|
|
* Reads the first int to determine the length of the
|
|
* message
|
|
* message
|
|
*/
|
|
*/
|
|
- while (msgLength.hasRemaining()) {
|
|
|
|
- if (channel.read(msgLength) < 0) {
|
|
|
|
- throw new IOException("Channel eof");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- msgLength.position(0);
|
|
|
|
- int length = msgLength.getInt();
|
|
|
|
- if(length <= 0) {
|
|
|
|
- throw new IOException("Invalid packet length:" + length);
|
|
|
|
|
|
+ int length = din.readInt();
|
|
|
|
+ if (length <= 0 || length > PACKETMAXSIZE) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "Received packet with invalid packet: "
|
|
|
|
+ + length);
|
|
}
|
|
}
|
|
/**
|
|
/**
|
|
* Allocates a new ByteBuffer to receive the message
|
|
* Allocates a new ByteBuffer to receive the message
|
|
*/
|
|
*/
|
|
- if (length > PACKETMAXSIZE) {
|
|
|
|
- throw new IOException("Invalid packet of length " + length);
|
|
|
|
- }
|
|
|
|
byte[] msgArray = new byte[length];
|
|
byte[] msgArray = new byte[length];
|
|
|
|
+ din.readFully(msgArray, 0, length);
|
|
ByteBuffer message = ByteBuffer.wrap(msgArray);
|
|
ByteBuffer message = ByteBuffer.wrap(msgArray);
|
|
- int numbytes = 0;
|
|
|
|
- int temp_numbytes = 0;
|
|
|
|
- while (message.hasRemaining()) {
|
|
|
|
- temp_numbytes = channel.read(message);
|
|
|
|
- if(temp_numbytes < 0) {
|
|
|
|
- throw new IOException("Channel eof before end");
|
|
|
|
- }
|
|
|
|
- numbytes += temp_numbytes;
|
|
|
|
- }
|
|
|
|
- message.position(0);
|
|
|
|
synchronized (recvQueue) {
|
|
synchronized (recvQueue) {
|
|
- recvQueue
|
|
|
|
- .put(new Message(message.duplicate(), sid));
|
|
|
|
|
|
+ recvQueue.put(new Message(message.duplicate(), sid));
|
|
}
|
|
}
|
|
- msgLength.position(0);
|
|
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.warn("Connection broken for id " + sid + ", my id = " +
|
|
LOG.warn("Connection broken for id " + sid + ", my id = " +
|
|
self.getId() + ", error = " + e);
|
|
self.getId() + ", error = " + e);
|
|
} finally {
|
|
} finally {
|
|
- try{
|
|
|
|
- channel.socket().close();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Exception while trying to close channel");
|
|
|
|
|
|
+ if (sock != null) {
|
|
|
|
+ closeSocket(sock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|