|
@@ -72,12 +72,7 @@ public class QuorumCnxManager {
|
|
* Packet size
|
|
* Packet size
|
|
*/
|
|
*/
|
|
int packetSize;
|
|
int packetSize;
|
|
-
|
|
|
|
- /*
|
|
|
|
- * Port to listen on
|
|
|
|
- */
|
|
|
|
- int port;
|
|
|
|
-
|
|
|
|
|
|
+
|
|
/*
|
|
/*
|
|
* Challenge to initiate connections
|
|
* Challenge to initiate connections
|
|
*/
|
|
*/
|
|
@@ -126,27 +121,11 @@ public class QuorumCnxManager {
|
|
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
|
|
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
|
|
this.self = self;
|
|
this.self = self;
|
|
|
|
|
|
- // Generates a challenge to guarantee one connection between pairs of
|
|
|
|
- // servers
|
|
|
|
- //genChallenge();
|
|
|
|
-
|
|
|
|
// Starts listener thread that waits for connection requests
|
|
// Starts listener thread that waits for connection requests
|
|
listener = new Listener();
|
|
listener = new Listener();
|
|
listener.start();
|
|
listener.start();
|
|
}
|
|
}
|
|
|
|
|
|
- void genChallenge() {
|
|
|
|
- try{
|
|
|
|
- Random rand = new Random(System.currentTimeMillis()
|
|
|
|
- + InetAddress.getLocalHost().hashCode());
|
|
|
|
- long newValue = rand.nextLong();
|
|
|
|
- challenge = newValue;
|
|
|
|
- } catch(UnknownHostException e){
|
|
|
|
- LOG.error("Cannot resolve local address");
|
|
|
|
- challenge = 0;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* If this server has initiated the connection, then it gives up on the
|
|
* If this server has initiated the connection, then it gives up on the
|
|
* connection if it loses challenge. Otherwise, it keeps the connection.
|
|
* connection if it loses challenge. Otherwise, it keeps the connection.
|
|
@@ -177,30 +156,25 @@ public class QuorumCnxManager {
|
|
}
|
|
}
|
|
// Otherwise proceed with the connection
|
|
// Otherwise proceed with the connection
|
|
} else {
|
|
} else {
|
|
- if (s != null) {
|
|
|
|
- SendWorker sw = new SendWorker(s, sid);
|
|
|
|
- RecvWorker rw = new RecvWorker(s, sid);
|
|
|
|
- sw.setRecv(rw);
|
|
|
|
-
|
|
|
|
- if (senderWorkerMap
|
|
|
|
- .containsKey(sid)) {
|
|
|
|
- senderWorkerMap.get(sid).finish();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!queueSendMap.containsKey(sid)) {
|
|
|
|
- queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
|
- CAPACITY));
|
|
|
|
- }
|
|
|
|
|
|
+ SendWorker sw = new SendWorker(s, sid);
|
|
|
|
+ RecvWorker rw = new RecvWorker(s, sid);
|
|
|
|
+ sw.setRecv(rw);
|
|
|
|
+
|
|
|
|
+ if (senderWorkerMap
|
|
|
|
+ .containsKey(sid)) {
|
|
|
|
+ senderWorkerMap.get(sid).finish();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!queueSendMap.containsKey(sid)) {
|
|
|
|
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
|
+ CAPACITY));
|
|
|
|
+ }
|
|
|
|
|
|
- senderWorkerMap.put(sid, sw);
|
|
|
|
- sw.start();
|
|
|
|
- rw.start();
|
|
|
|
|
|
+ senderWorkerMap.put(sid, sw);
|
|
|
|
+ sw.start();
|
|
|
|
+ rw.start();
|
|
|
|
|
|
- return true;
|
|
|
|
- } else {
|
|
|
|
- LOG.warn("Channel null");
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
+ return true;
|
|
|
|
|
|
}
|
|
}
|
|
return false;
|
|
return false;
|
|
@@ -247,30 +221,24 @@ public class QuorumCnxManager {
|
|
}
|
|
}
|
|
//Otherwise start worker threads to receive data.
|
|
//Otherwise start worker threads to receive data.
|
|
} else {
|
|
} else {
|
|
-
|
|
|
|
- if (s != null) {
|
|
|
|
- SendWorker sw = new SendWorker(s, sid);
|
|
|
|
- RecvWorker rw = new RecvWorker(s, sid);
|
|
|
|
- sw.setRecv(rw);
|
|
|
|
|
|
+ SendWorker sw = new SendWorker(s, sid);
|
|
|
|
+ RecvWorker rw = new RecvWorker(s, sid);
|
|
|
|
+ sw.setRecv(rw);
|
|
|
|
|
|
- if (senderWorkerMap.containsKey(sid)) {
|
|
|
|
- senderWorkerMap.get(sid).finish();
|
|
|
|
- }
|
|
|
|
|
|
+ if (senderWorkerMap.containsKey(sid)) {
|
|
|
|
+ senderWorkerMap.get(sid).finish();
|
|
|
|
+ }
|
|
|
|
|
|
- senderWorkerMap.put(sid, sw);
|
|
|
|
|
|
+ senderWorkerMap.put(sid, sw);
|
|
|
|
|
|
- if (!queueSendMap.containsKey(sid)) {
|
|
|
|
- queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
|
- CAPACITY));
|
|
|
|
- }
|
|
|
|
- sw.start();
|
|
|
|
- rw.start();
|
|
|
|
-
|
|
|
|
- return true;
|
|
|
|
- } else {
|
|
|
|
- LOG.warn("Channel null");
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
+ if (!queueSendMap.containsKey(sid)) {
|
|
|
|
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
|
+ CAPACITY));
|
|
|
|
+ }
|
|
|
|
+ sw.start();
|
|
|
|
+ rw.start();
|
|
|
|
+
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|