|
@@ -207,10 +207,8 @@ public class QuorumCnxManager {
|
|
|
vsw.finish();
|
|
|
|
|
|
senderWorkerMap.put(sid, sw);
|
|
|
- if (!queueSendMap.containsKey(sid)) {
|
|
|
- queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
+ queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
SEND_CAPACITY));
|
|
|
- }
|
|
|
|
|
|
sw.start();
|
|
|
rw.start();
|
|
@@ -304,10 +302,8 @@ public class QuorumCnxManager {
|
|
|
|
|
|
senderWorkerMap.put(sid, sw);
|
|
|
|
|
|
- if (!queueSendMap.containsKey(sid)) {
|
|
|
- queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
+ queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
SEND_CAPACITY));
|
|
|
- }
|
|
|
|
|
|
sw.start();
|
|
|
rw.start();
|
|
@@ -335,19 +331,13 @@ public class QuorumCnxManager {
|
|
|
/*
|
|
|
* Start a new connection if doesn't have one already.
|
|
|
*/
|
|
|
- if (!queueSendMap.containsKey(sid)) {
|
|
|
- ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
|
|
|
- SEND_CAPACITY);
|
|
|
- queueSendMap.put(sid, bq);
|
|
|
- addToSendQueue(bq, b);
|
|
|
-
|
|
|
+ ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
|
|
|
+ SEND_CAPACITY);
|
|
|
+ ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
|
|
|
+ if (oldq != null) {
|
|
|
+ addToSendQueue(oldq, b);
|
|
|
} else {
|
|
|
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
|
|
|
- if(bq != null){
|
|
|
- addToSendQueue(bq, b);
|
|
|
- } else {
|
|
|
- LOG.error("No queue for server " + sid);
|
|
|
- }
|
|
|
+ addToSendQueue(bq, b);
|
|
|
}
|
|
|
connectOne(sid);
|
|
|
|