|
@@ -28,6 +28,7 @@ import java.nio.channels.SocketChannel;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
|
@@ -51,7 +52,7 @@ import org.apache.log4j.Logger;
|
|
|
*
|
|
|
*/
|
|
|
|
|
|
-class QuorumCnxManager {
|
|
|
+public class QuorumCnxManager {
|
|
|
private static final Logger LOG = Logger.getLogger(QuorumCnxManager.class);
|
|
|
|
|
|
/*
|
|
@@ -84,13 +85,13 @@ class QuorumCnxManager {
|
|
|
/*
|
|
|
* Local IP address
|
|
|
*/
|
|
|
- InetAddress localIP;
|
|
|
+ QuorumPeer self;
|
|
|
|
|
|
/*
|
|
|
* Mapping from Peer to Thread number
|
|
|
*/
|
|
|
- HashMap<InetAddress, SendWorker> senderWorkerMap;
|
|
|
- HashMap<InetAddress, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
|
|
|
+ ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
|
|
|
+ ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
|
|
|
|
|
|
/*
|
|
|
* Reception queue
|
|
@@ -109,26 +110,21 @@ class QuorumCnxManager {
|
|
|
Listener listener;
|
|
|
|
|
|
static class Message {
|
|
|
- Message(ByteBuffer buffer, InetAddress addr) {
|
|
|
+ Message(ByteBuffer buffer, long sid) {
|
|
|
this.buffer = buffer;
|
|
|
- this.addr = addr;
|
|
|
+ this.sid = sid;
|
|
|
}
|
|
|
|
|
|
ByteBuffer buffer;
|
|
|
- InetAddress addr;
|
|
|
+ long sid;
|
|
|
}
|
|
|
|
|
|
- QuorumCnxManager(int port) {
|
|
|
+ public QuorumCnxManager(QuorumPeer self) {
|
|
|
this.port = port;
|
|
|
this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
|
|
|
- this.queueSendMap = new HashMap<InetAddress, ArrayBlockingQueue<ByteBuffer>>();
|
|
|
- this.senderWorkerMap = new HashMap<InetAddress, SendWorker>();
|
|
|
-
|
|
|
- try {
|
|
|
- localIP = InetAddress.getLocalHost();
|
|
|
- } catch (UnknownHostException e) {
|
|
|
- LOG.warn("Couldn't get local address");
|
|
|
- }
|
|
|
+ this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
|
|
|
+ this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
|
|
|
+ this.self = self;
|
|
|
|
|
|
// Generates a challenge to guarantee one connection between pairs of
|
|
|
// servers
|
|
@@ -140,10 +136,15 @@ class QuorumCnxManager {
|
|
|
}
|
|
|
|
|
|
void genChallenge() {
|
|
|
- Random rand = new Random(System.currentTimeMillis()
|
|
|
- + localIP.hashCode());
|
|
|
- long newValue = rand.nextLong();
|
|
|
- challenge = newValue;
|
|
|
+ try{
|
|
|
+ Random rand = new Random(System.currentTimeMillis()
|
|
|
+ + InetAddress.getLocalHost().hashCode());
|
|
|
+ long newValue = rand.nextLong();
|
|
|
+ challenge = newValue;
|
|
|
+ } catch(UnknownHostException e){
|
|
|
+ LOG.error("Cannot resolve local address");
|
|
|
+ challenge = 0;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -151,54 +152,29 @@ class QuorumCnxManager {
|
|
|
* connection if it loses challenge. Otherwise, it keeps the connection.
|
|
|
*/
|
|
|
|
|
|
- boolean initiateConnection(SocketChannel s) {
|
|
|
+ boolean initiateConnection(SocketChannel s, Long sid) {
|
|
|
boolean challenged = true;
|
|
|
boolean wins = false;
|
|
|
long newChallenge;
|
|
|
-
|
|
|
- // Compare IP addresses based on their hash codes
|
|
|
- //int hashCodeRemote = s.socket().getInetAddress().hashCode();
|
|
|
- //if(hashCodeRemote >= localIP.hashCode()){
|
|
|
- // wins = false;
|
|
|
- //} else {
|
|
|
- // wins = true;
|
|
|
- //}
|
|
|
- //LOG.warn("Hash codes: " + hashCodeRemote + ", " + localIP.hashCode());
|
|
|
|
|
|
try {
|
|
|
- while (challenged && s.isConnected()) {
|
|
|
- // Sending challenge
|
|
|
- byte[] msgBytes = new byte[8];
|
|
|
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
- msgBuffer.putLong(challenge);
|
|
|
- msgBuffer.position(0);
|
|
|
- s.write(msgBuffer);
|
|
|
-
|
|
|
- // Reading challenge
|
|
|
- msgBuffer.position(0);
|
|
|
- s.read(msgBuffer);
|
|
|
-
|
|
|
- msgBuffer.position(0);
|
|
|
- newChallenge = msgBuffer.getLong();
|
|
|
- if (challenge > newChallenge) {
|
|
|
- wins = true;
|
|
|
- challenged = false;
|
|
|
- } else if (challenge == newChallenge) {
|
|
|
- genChallenge();
|
|
|
- } else {
|
|
|
- challenged = false;
|
|
|
- }
|
|
|
- }
|
|
|
+ // Sending id and challenge
|
|
|
+ byte[] msgBytes = new byte[8];
|
|
|
+ ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
+ msgBuffer.putLong(self.getId());
|
|
|
+ msgBuffer.position(0);
|
|
|
+ s.write(msgBuffer);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Exception reading or writing challenge: "
|
|
|
+ e.toString());
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// If lost the challenge, then drop the new connection
|
|
|
- if (!wins) {
|
|
|
+ if (sid > self.getId()) {
|
|
|
try {
|
|
|
- //LOG.warn("lost cause (initiate");
|
|
|
+ LOG.warn("Have smaller server identifier, so dropping the connection: (" +
|
|
|
+ sid + ", " + self.getId());
|
|
|
s.socket().close();
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Error when closing socket or trying to reopen connection: "
|
|
@@ -206,33 +182,23 @@ class QuorumCnxManager {
|
|
|
|
|
|
}
|
|
|
// Otherwise proceed with the connection
|
|
|
- } else
|
|
|
- synchronized (senderWorkerMap) {
|
|
|
- /*
|
|
|
- * It may happen that a thread from a previous connection to the same
|
|
|
- * server is still active. In this case, we terminate the thread by
|
|
|
- * calling finish(). Note that senderWorkerMap is a map from IP
|
|
|
- * addresses to worker thread.
|
|
|
- */
|
|
|
- if (senderWorkerMap.get(s.socket().getInetAddress()) != null) {
|
|
|
- senderWorkerMap.get(s.socket().getInetAddress()).finish();
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * Start new worker thread with a clean state.
|
|
|
- */
|
|
|
+ } else {
|
|
|
if (s != null) {
|
|
|
- SendWorker sw = new SendWorker(s);
|
|
|
- RecvWorker rw = new RecvWorker(s);
|
|
|
+ SendWorker sw = new SendWorker(s, sid);
|
|
|
+ RecvWorker rw = new RecvWorker(s, sid);
|
|
|
sw.setRecv(rw);
|
|
|
|
|
|
if (senderWorkerMap
|
|
|
- .containsKey(s.socket().getInetAddress())) {
|
|
|
- InetAddress addr = s.socket().getInetAddress();
|
|
|
- senderWorkerMap.get(addr).finish();
|
|
|
+ .containsKey(sid)) {
|
|
|
+ senderWorkerMap.get(sid).finish();
|
|
|
}
|
|
|
|
|
|
- senderWorkerMap.put(s.socket().getInetAddress(), sw);
|
|
|
+ if (!queueSendMap.containsKey(sid)) {
|
|
|
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
+ CAPACITY));
|
|
|
+ }
|
|
|
+
|
|
|
+ senderWorkerMap.put(sid, sw);
|
|
|
sw.start();
|
|
|
rw.start();
|
|
|
|
|
@@ -241,8 +207,8 @@ class QuorumCnxManager {
|
|
|
LOG.warn("Channel null");
|
|
|
return false;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
+ }
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -257,93 +223,60 @@ class QuorumCnxManager {
|
|
|
boolean challenged = true;
|
|
|
boolean wins = false;
|
|
|
long newChallenge;
|
|
|
-
|
|
|
-
|
|
|
- //Compare IP addresses based on their hash codes.
|
|
|
- //int hashCodeRemote = s.socket().getInetAddress().hashCode();
|
|
|
- //if(hashCodeRemote >= localIP.hashCode()){
|
|
|
- // wins = false;
|
|
|
- //} else {
|
|
|
- // wins = true;
|
|
|
- //}
|
|
|
-
|
|
|
- //LOG.warn("Hash codes: " + hashCodeRemote + ", " + localIP.hashCode());
|
|
|
-
|
|
|
+ Long sid = null;
|
|
|
|
|
|
try {
|
|
|
- while (challenged && s.isConnected()) {
|
|
|
- // Sending challenge
|
|
|
- byte[] msgBytes = new byte[8];
|
|
|
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
- long vsent;
|
|
|
- if (senderWorkerMap.get(s.socket().getInetAddress()) == null)
|
|
|
- vsent = Long.MIN_VALUE;
|
|
|
- else
|
|
|
- vsent = challenge;
|
|
|
- msgBuffer.putLong(vsent);
|
|
|
- msgBuffer.position(0);
|
|
|
- s.write(msgBuffer);
|
|
|
-
|
|
|
- // Reading challenge
|
|
|
- msgBuffer.position(0);
|
|
|
- s.read(msgBuffer);
|
|
|
-
|
|
|
- msgBuffer.position(0);
|
|
|
- newChallenge = msgBuffer.getLong();
|
|
|
- if (vsent > newChallenge) {
|
|
|
- wins = true;
|
|
|
- challenged = false;
|
|
|
- } else if (challenge == newChallenge) {
|
|
|
- genChallenge();
|
|
|
- } else {
|
|
|
- challenged = false;
|
|
|
- }
|
|
|
- }
|
|
|
+ // Sending challenge and sid
|
|
|
+ byte[] msgBytes = new byte[8];
|
|
|
+ ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
+
|
|
|
+ s.read(msgBuffer);
|
|
|
+ msgBuffer.position(0);
|
|
|
+
|
|
|
+ // Read server id
|
|
|
+ sid = Long.valueOf(msgBuffer.getLong());
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Exception reading or writing challenge: "
|
|
|
+ e.toString());
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//If wins the challenge, then close the new connection.
|
|
|
- if (wins) {
|
|
|
+ if (sid < self.getId()) {
|
|
|
try {
|
|
|
- InetAddress addr = s.socket().getInetAddress();
|
|
|
- SendWorker sw = senderWorkerMap.get(addr);
|
|
|
+ SendWorker sw = senderWorkerMap.get(sid);
|
|
|
|
|
|
- //LOG.warn("Keep connection (received)");
|
|
|
+ LOG.warn("Create new connection");
|
|
|
//sw.connect();
|
|
|
s.socket().close();
|
|
|
- sw.finish();
|
|
|
- SocketChannel channel = SocketChannel.open(new InetSocketAddress(addr, port));
|
|
|
+ if(sw != null) sw.finish();
|
|
|
+ SocketChannel channel = SocketChannel.open(self.quorumPeers.get(sid).electionAddr);
|
|
|
if (channel.isConnected()) {
|
|
|
- initiateConnection(channel);
|
|
|
+ initiateConnection(channel, sid);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Error when closing socket or trying to reopen connection: "
|
|
|
+ e.toString());
|
|
|
}
|
|
|
//Otherwise start worker threads to receive data.
|
|
|
- } else
|
|
|
- synchronized (senderWorkerMap) {
|
|
|
- if (senderWorkerMap.get(s.socket().getInetAddress()) != null) {
|
|
|
- senderWorkerMap.get(s.socket().getInetAddress()).finish();
|
|
|
- }
|
|
|
-
|
|
|
+ } else {
|
|
|
+
|
|
|
if (s != null) {
|
|
|
- SendWorker sw = new SendWorker(s);
|
|
|
- RecvWorker rw = new RecvWorker(s);
|
|
|
+ SendWorker sw = new SendWorker(s, sid);
|
|
|
+ RecvWorker rw = new RecvWorker(s, sid);
|
|
|
sw.setRecv(rw);
|
|
|
|
|
|
- if (senderWorkerMap
|
|
|
- .containsKey(s.socket().getInetAddress())) {
|
|
|
- InetAddress addr = s.socket().getInetAddress();
|
|
|
- senderWorkerMap.get(addr).finish();
|
|
|
+ if (senderWorkerMap.containsKey(sid)) {
|
|
|
+ senderWorkerMap.get(sid).finish();
|
|
|
}
|
|
|
-
|
|
|
- senderWorkerMap.put(s.socket().getInetAddress(), sw);
|
|
|
+
|
|
|
+ senderWorkerMap.put(sid, sw);
|
|
|
+
|
|
|
+ if (!queueSendMap.containsKey(sid)) {
|
|
|
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
+ CAPACITY));
|
|
|
+ }
|
|
|
sw.start();
|
|
|
rw.start();
|
|
|
|
|
@@ -352,8 +285,7 @@ class QuorumCnxManager {
|
|
|
LOG.warn("Channel null");
|
|
|
return false;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -361,14 +293,14 @@ class QuorumCnxManager {
|
|
|
* Processes invoke this message to send a message. Currently, only leader
|
|
|
* election uses it.
|
|
|
*/
|
|
|
- void toSend(InetAddress addr, ByteBuffer b) {
|
|
|
+ void toSend(Long sid, ByteBuffer b) {
|
|
|
/*
|
|
|
* If sending message to myself, then simply enqueue it (loopback).
|
|
|
*/
|
|
|
- if (addr.equals(localIP)) {
|
|
|
+ if (self.getId() == sid) {
|
|
|
try {
|
|
|
b.position(0);
|
|
|
- recvQueue.put(new Message(b.duplicate(), addr));
|
|
|
+ recvQueue.put(new Message(b.duplicate(), sid));
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Exception when loopbacking");
|
|
|
}
|
|
@@ -380,33 +312,33 @@ class QuorumCnxManager {
|
|
|
/*
|
|
|
* Start a new connection if doesn't have one already.
|
|
|
*/
|
|
|
- if (!queueSendMap.containsKey(addr)) {
|
|
|
- queueSendMap.put(addr, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
+ if (!queueSendMap.containsKey(sid)) {
|
|
|
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
CAPACITY));
|
|
|
- queueSendMap.get(addr).put(b);
|
|
|
+ queueSendMap.get(sid).put(b);
|
|
|
|
|
|
} else {
|
|
|
- if (queueSendMap.get(addr).remainingCapacity() == 0) {
|
|
|
- queueSendMap.get(addr).take();
|
|
|
+ if (queueSendMap.get(sid).remainingCapacity() == 0) {
|
|
|
+ queueSendMap.get(sid).take();
|
|
|
}
|
|
|
- queueSendMap.get(addr).put(b);
|
|
|
+ queueSendMap.get(sid).put(b);
|
|
|
}
|
|
|
|
|
|
- synchronized (senderWorkerMap) {
|
|
|
- if (senderWorkerMap.get(addr) == null) {
|
|
|
+ //synchronized (senderWorkerMap) {
|
|
|
+ if ((senderWorkerMap.get(sid) == null)) {
|
|
|
SocketChannel channel;
|
|
|
try {
|
|
|
channel = SocketChannel
|
|
|
- .open(new InetSocketAddress(addr, port));
|
|
|
+ .open(self.quorumPeers.get(sid).electionAddr);
|
|
|
channel.socket().setTcpNoDelay(true);
|
|
|
- initiateConnection(channel);
|
|
|
+ initiateConnection(channel, sid);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Cannot open channel to "
|
|
|
- + addr.toString() + "( " + e.toString()
|
|
|
+ + sid + "( " + e.toString()
|
|
|
+ ")");
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ //}
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Interrupted while waiting to put message in queue."
|
|
|
+ e.toString());
|
|
@@ -428,9 +360,15 @@ class QuorumCnxManager {
|
|
|
/**
|
|
|
* Flag that it is time to wrap up all activities and interrupt the listener.
|
|
|
*/
|
|
|
- public void shutdown() {
|
|
|
+ public void halt() {
|
|
|
shutdown = true;
|
|
|
- listener.interrupt();
|
|
|
+ LOG.warn("Halting listener");
|
|
|
+ listener.halt();
|
|
|
+
|
|
|
+ for(SendWorker sw: senderWorkerMap.values()){
|
|
|
+ LOG.warn("Halting sender: " + sw);
|
|
|
+ sw.finish();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -438,6 +376,7 @@ class QuorumCnxManager {
|
|
|
*/
|
|
|
class Listener extends Thread {
|
|
|
|
|
|
+ ServerSocketChannel ss = null;
|
|
|
/**
|
|
|
* Sleeps on accept().
|
|
|
*/
|
|
@@ -446,35 +385,31 @@ class QuorumCnxManager {
|
|
|
ServerSocketChannel ss = null;
|
|
|
try {
|
|
|
ss = ServerSocketChannel.open();
|
|
|
+ int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
|
|
|
+ LOG.warn("My election bind port: " + port);
|
|
|
ss.socket().bind(new InetSocketAddress(port));
|
|
|
|
|
|
while (!shutdown) {
|
|
|
SocketChannel client = ss.accept();
|
|
|
client.socket().setTcpNoDelay(true);
|
|
|
- /*
|
|
|
- * This synchronized block guarantees that if
|
|
|
- * both parties try to connect to each other
|
|
|
- * simultaneously, then only one will succeed.
|
|
|
- * If we don't have this block, then there
|
|
|
- * are runs in which both parties act as if they
|
|
|
- * don't have any connection starting or started.
|
|
|
- * In receiveConnection(), a server sends the minimum
|
|
|
- * value for a challenge, if they believe they must
|
|
|
- * accept the connection because they don't have one.
|
|
|
- *
|
|
|
- * This synchronized block prevents that the same server
|
|
|
- * invokes receiveConnection() and initiateConnection()
|
|
|
- * simultaneously.
|
|
|
- */
|
|
|
- synchronized(senderWorkerMap){
|
|
|
- LOG.warn("Connection request");
|
|
|
- receiveConnection(client);
|
|
|
- }
|
|
|
+
|
|
|
+ //synchronized(senderWorkerMap){
|
|
|
+ LOG.warn("Connection request");
|
|
|
+ receiveConnection(client);
|
|
|
+ //}
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
System.err.println("Listener.run: " + e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ void halt(){
|
|
|
+ try{
|
|
|
+ if(ss != null) ss.close();
|
|
|
+ } catch (IOException e){
|
|
|
+ LOG.warn("Exception when shutting down listener: " + e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -485,17 +420,17 @@ class QuorumCnxManager {
|
|
|
|
|
|
class SendWorker extends Thread {
|
|
|
// Send msgs to peer
|
|
|
- InetAddress addr;
|
|
|
+ Long sid;
|
|
|
SocketChannel channel;
|
|
|
RecvWorker recvWorker;
|
|
|
boolean running = true;
|
|
|
|
|
|
- SendWorker(SocketChannel channel) {
|
|
|
- this.addr = channel.socket().getInetAddress();
|
|
|
+ SendWorker(SocketChannel channel, Long sid) {
|
|
|
+ this.sid = sid;
|
|
|
this.channel = channel;
|
|
|
recvWorker = null;
|
|
|
|
|
|
- LOG.debug("Address of remote peer: " + this.addr);
|
|
|
+ LOG.debug("Address of remote peer: " + this.sid);
|
|
|
}
|
|
|
|
|
|
void setRecv(RecvWorker recvWorker) {
|
|
@@ -508,7 +443,7 @@ class QuorumCnxManager {
|
|
|
this.interrupt();
|
|
|
if (recvWorker != null)
|
|
|
recvWorker.finish();
|
|
|
- senderWorkerMap.remove(channel.socket().getInetAddress());
|
|
|
+ senderWorkerMap.remove(sid);
|
|
|
return running;
|
|
|
}
|
|
|
|
|
@@ -519,7 +454,7 @@ class QuorumCnxManager {
|
|
|
|
|
|
ByteBuffer b = null;
|
|
|
try {
|
|
|
- b = queueSendMap.get(addr).take();
|
|
|
+ b = queueSendMap.get(sid).take();
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Interrupted while waiting for message on queue ("
|
|
|
+ e.toString() + ")");
|
|
@@ -541,18 +476,18 @@ class QuorumCnxManager {
|
|
|
* If reconnection doesn't work, then put the
|
|
|
* message back to the beginning of the queue and leave.
|
|
|
*/
|
|
|
- LOG.warn("Exception when using channel: " + addr
|
|
|
+ LOG.warn("Exception when using channel: " + sid
|
|
|
+ ")" + e.toString());
|
|
|
running = false;
|
|
|
synchronized (senderWorkerMap) {
|
|
|
recvWorker.finish();
|
|
|
recvWorker = null;
|
|
|
|
|
|
- senderWorkerMap.remove(channel.socket().getInetAddress());
|
|
|
+ senderWorkerMap.remove(sid);
|
|
|
|
|
|
- if (queueSendMap.get(channel.socket().getInetAddress())
|
|
|
+ if (queueSendMap.get(sid)
|
|
|
.size() == 0)
|
|
|
- queueSendMap.get(channel.socket().getInetAddress())
|
|
|
+ queueSendMap.get(sid)
|
|
|
.offer(b);
|
|
|
}
|
|
|
}
|
|
@@ -566,12 +501,12 @@ class QuorumCnxManager {
|
|
|
* channel breaks, then removes itself from the pool of receivers.
|
|
|
*/
|
|
|
class RecvWorker extends Thread {
|
|
|
- InetAddress addr;
|
|
|
+ Long sid;
|
|
|
SocketChannel channel;
|
|
|
boolean running = true;
|
|
|
|
|
|
- RecvWorker(SocketChannel channel) {
|
|
|
- this.addr = channel.socket().getInetAddress();
|
|
|
+ RecvWorker(SocketChannel channel, Long sid) {
|
|
|
+ this.sid = sid;
|
|
|
this.channel = channel;
|
|
|
}
|
|
|
|
|
@@ -610,7 +545,7 @@ class QuorumCnxManager {
|
|
|
message.position(0);
|
|
|
synchronized (recvQueue) {
|
|
|
recvQueue
|
|
|
- .put(new Message(message.duplicate(), addr));
|
|
|
+ .put(new Message(message.duplicate(), sid));
|
|
|
}
|
|
|
msgLength.position(0);
|
|
|
}
|