|
@@ -133,7 +133,7 @@ class QuorumCnxManager extends Thread {
|
|
|
genChallenge();
|
|
|
|
|
|
// Starts listener thread that waits for connection requests
|
|
|
- listener = new Listener(this);
|
|
|
+ listener = new Listener();
|
|
|
listener.start();
|
|
|
}
|
|
|
|
|
@@ -172,12 +172,12 @@ class QuorumCnxManager extends Thread {
|
|
|
msgBuffer.position(0);
|
|
|
s.write(msgBuffer);
|
|
|
|
|
|
- // Reading challenge
|
|
|
- msgBuffer.position(0);
|
|
|
- int numBytes = s.read(msgBuffer);
|
|
|
+ // Reading challenge
|
|
|
+ msgBuffer.position(0);
|
|
|
+ s.read(msgBuffer);
|
|
|
|
|
|
- msgBuffer.position(0);
|
|
|
- newChallenge = msgBuffer.getLong();
|
|
|
+ msgBuffer.position(0);
|
|
|
+ newChallenge = msgBuffer.getLong();
|
|
|
if (challenge > newChallenge) {
|
|
|
wins = true;
|
|
|
challenged = false;
|
|
@@ -219,8 +219,8 @@ class QuorumCnxManager extends Thread {
|
|
|
/*
|
|
|
* Start new worker thread with a clean state.
|
|
|
*/
|
|
|
- SendWorker sw = new SendWorker(s);
|
|
|
if (s != null) {
|
|
|
+ SendWorker sw = new SendWorker(s);
|
|
|
RecvWorker rw = new RecvWorker(s);
|
|
|
sw.setRecv(rw);
|
|
|
|
|
@@ -330,8 +330,8 @@ class QuorumCnxManager extends Thread {
|
|
|
senderWorkerMap.get(s.socket().getInetAddress()).finish();
|
|
|
}
|
|
|
|
|
|
- SendWorker sw = new SendWorker(s);
|
|
|
if (s != null) {
|
|
|
+ SendWorker sw = new SendWorker(s);
|
|
|
RecvWorker rw = new RecvWorker(s);
|
|
|
sw.setRecv(rw);
|
|
|
|
|
@@ -435,11 +435,6 @@ class QuorumCnxManager extends Thread {
|
|
|
* Thread to listen on some port
|
|
|
*/
|
|
|
class Listener extends Thread {
|
|
|
- QuorumCnxManager manager;
|
|
|
-
|
|
|
- Listener(QuorumCnxManager m) {
|
|
|
- manager = m;
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Sleeps on accept().
|
|
@@ -447,8 +442,6 @@ class QuorumCnxManager extends Thread {
|
|
|
public void run() {
|
|
|
ServerSocketChannel ss = null;
|
|
|
try {
|
|
|
- if (ss != null)
|
|
|
- ss.close();
|
|
|
ss = ServerSocketChannel.open();
|
|
|
ss.socket().bind(new InetSocketAddress(port));
|
|
|
|
|
@@ -504,22 +497,6 @@ class QuorumCnxManager extends Thread {
|
|
|
this.recvWorker = recvWorker;
|
|
|
}
|
|
|
|
|
|
- //boolean connect() throws IOException {
|
|
|
- // if (recvWorker != null)
|
|
|
- // recvWorker.finish();
|
|
|
- //
|
|
|
- // channel = SocketChannel.open(new InetSocketAddress(addr, port));
|
|
|
- // if (channel.isConnected()) {
|
|
|
- // recvWorker = new RecvWorker(channel);
|
|
|
- // initiateConnection(channel);
|
|
|
- // LOG.warn("Opened new connection");
|
|
|
- // } else {
|
|
|
- // LOG.warn("Channel not connected.");
|
|
|
- // }
|
|
|
- //
|
|
|
- // return channel.isConnected();
|
|
|
- //}
|
|
|
-
|
|
|
boolean finish() {
|
|
|
running = false;
|
|
|
|
|
@@ -532,12 +509,10 @@ class QuorumCnxManager extends Thread {
|
|
|
|
|
|
public void run() {
|
|
|
|
|
|
- long init, init1, end1, end;
|
|
|
while (running && !shutdown) {
|
|
|
|
|
|
ByteBuffer b = null;
|
|
|
try {
|
|
|
- init = System.currentTimeMillis();
|
|
|
b = queueSendMap.get(addr).take();
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Interrupted while waiting for message on queue ("
|
|
@@ -550,12 +525,10 @@ class QuorumCnxManager extends Thread {
|
|
|
+ (Integer.SIZE / 8)];
|
|
|
ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
msgBuffer.putInt(b.capacity());
|
|
|
- int tmpSize = b.capacity() + (Integer.SIZE / 8);
|
|
|
|
|
|
msgBuffer.put(b.array(), 0, b.capacity());
|
|
|
msgBuffer.position(0);
|
|
|
- int numbytes = channel.write(msgBuffer);
|
|
|
- end = System.currentTimeMillis() - init;
|
|
|
+ channel.write(msgBuffer);
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
/*
|
|
@@ -571,12 +544,10 @@ class QuorumCnxManager extends Thread {
|
|
|
|
|
|
senderWorkerMap.remove(channel.socket().getInetAddress());
|
|
|
|
|
|
- if (b != null) {
|
|
|
- if (queueSendMap.get(channel.socket().getInetAddress())
|
|
|
+ if (queueSendMap.get(channel.socket().getInetAddress())
|
|
|
.size() == 0)
|
|
|
- queueSendMap.get(channel.socket().getInetAddress())
|
|
|
+ queueSendMap.get(channel.socket().getInetAddress())
|
|
|
.offer(b);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|