|
@@ -955,10 +955,12 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
jmxConnectionBean = null;
|
|
|
|
|
|
- if (closed) {
|
|
|
- return;
|
|
|
+ synchronized(this) {
|
|
|
+ if (closed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ closed = true;
|
|
|
}
|
|
|
- closed = true;
|
|
|
synchronized (factory.ipMap)
|
|
|
{
|
|
|
Set<NIOServerCnxn> s = factory.ipMap.get(sock.socket().getInetAddress());
|
|
@@ -1038,42 +1040,46 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
* org.apache.jute.Record, java.lang.String)
|
|
|
*/
|
|
|
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
|
|
|
- if (closed) {
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("send called on closed session 0x"
|
|
|
- + Long.toHexString(sessionId)
|
|
|
- + " with record " + r);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
- // Make space for length
|
|
|
- BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
|
|
|
try {
|
|
|
- baos.write(fourBytes);
|
|
|
- bos.writeRecord(h, "header");
|
|
|
- if (r != null) {
|
|
|
- bos.writeRecord(r, tag);
|
|
|
+ if (closed) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("send called on closed session 0x"
|
|
|
+ + Long.toHexString(sessionId)
|
|
|
+ + " with record " + r);
|
|
|
+ }
|
|
|
+ return;
|
|
|
}
|
|
|
- baos.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Error serializing response");
|
|
|
- }
|
|
|
- byte b[] = baos.toByteArray();
|
|
|
- ByteBuffer bb = ByteBuffer.wrap(b);
|
|
|
- bb.putInt(b.length - 4).rewind();
|
|
|
- sendBuffer(bb);
|
|
|
- if (h.getXid() > 0) {
|
|
|
- synchronized (this.factory) {
|
|
|
- outstandingRequests--;
|
|
|
- // check throttling
|
|
|
- if (zk.getInProcess() < factory.outstandingLimit
|
|
|
- || outstandingRequests < 1) {
|
|
|
- sk.selector().wakeup();
|
|
|
- enableRecv();
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ // Make space for length
|
|
|
+ BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
|
|
|
+ try {
|
|
|
+ baos.write(fourBytes);
|
|
|
+ bos.writeRecord(h, "header");
|
|
|
+ if (r != null) {
|
|
|
+ bos.writeRecord(r, tag);
|
|
|
}
|
|
|
+ baos.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error serializing response");
|
|
|
}
|
|
|
- }
|
|
|
+ byte b[] = baos.toByteArray();
|
|
|
+ ByteBuffer bb = ByteBuffer.wrap(b);
|
|
|
+ bb.putInt(b.length - 4).rewind();
|
|
|
+ sendBuffer(bb);
|
|
|
+ if (h.getXid() > 0) {
|
|
|
+ synchronized (this.factory) {
|
|
|
+ outstandingRequests--;
|
|
|
+ // check throttling
|
|
|
+ if (zk.getInProcess() < factory.outstandingLimit
|
|
|
+ || outstandingRequests < 1) {
|
|
|
+ sk.selector().wakeup();
|
|
|
+ enableRecv();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch(Exception e) {
|
|
|
+ LOG.error("Unexpected exception. Destruction averted.", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/*
|