|
@@ -560,33 +560,32 @@ public class QuorumCnxManager {
|
|
|
this.finish();
|
|
|
}
|
|
|
|
|
|
- while (running && !shutdown && channel != null) {
|
|
|
+ try {
|
|
|
+ while (running && !shutdown && channel != null) {
|
|
|
|
|
|
- ByteBuffer b = null;
|
|
|
- try {
|
|
|
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
|
|
|
- if(bq != null)
|
|
|
- b = bq.poll(1000, TimeUnit.MILLISECONDS);
|
|
|
- else {
|
|
|
- LOG.error("No queue of incoming messages for server " + sid);
|
|
|
- this.finish();
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.warn("Interrupted while waiting for message on queue",
|
|
|
- e);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- if(b != null){
|
|
|
- lastMessageSent.put(sid, b);
|
|
|
- send(b);
|
|
|
+ ByteBuffer b = null;
|
|
|
+ try {
|
|
|
+ ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
|
|
|
+ if(bq != null)
|
|
|
+ b = bq.poll(1000, TimeUnit.MILLISECONDS);
|
|
|
+ else {
|
|
|
+ LOG.error("No queue of incoming messages for server " + sid);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(b != null){
|
|
|
+ lastMessageSent.put(sid, b);
|
|
|
+ send(b);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Interrupted while waiting for message on queue",
|
|
|
+ e);
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Exception when using channel: " + sid, e);
|
|
|
- this.finish();
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Exception when using channel: " + sid, e);
|
|
|
}
|
|
|
+ this.finish();
|
|
|
LOG.warn("Send worker leaving thread");
|
|
|
}
|
|
|
}
|