|
@@ -77,6 +77,17 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
LOG.error("Thread " + t + " died", e);
|
|
|
}
|
|
|
});
|
|
|
+
|
|
|
+ /**
|
|
|
+ * this is to avoid the jvm bug:
|
|
|
+ * NullPointerException in Selector.open()
|
|
|
+ * http://bugs.sun.com/view_bug.do?bug_id=6427854
|
|
|
+ */
|
|
|
+ try {
|
|
|
+ Selector.open().close();
|
|
|
+ } catch(IOException ie) {
|
|
|
+ LOG.error("Exception while opening a selector", ie);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static public class Factory extends Thread {
|
|
@@ -350,36 +361,40 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
|
|
|
void sendBuffer(ByteBuffer bb) {
|
|
|
- if (bb != closeConn) {
|
|
|
- // We check if write interest here because if it is NOT set,
|
|
|
- // nothing is queued, so we can try to send the buffer right
|
|
|
- // away without waking up the selector
|
|
|
- if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
|
|
|
- try {
|
|
|
- sock.write(bb);
|
|
|
- } catch (IOException e) {
|
|
|
- // we are just doing best effort right now
|
|
|
+ try {
|
|
|
+ if (bb != closeConn) {
|
|
|
+ // We check if write interest here because if it is NOT set,
|
|
|
+ // nothing is queued, so we can try to send the buffer right
|
|
|
+ // away without waking up the selector
|
|
|
+ if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
|
|
|
+ try {
|
|
|
+ sock.write(bb);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // we are just doing best effort right now
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // if there is nothing left to send, we are done
|
|
|
+ if (bb.remaining() == 0) {
|
|
|
+ packetSent();
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
- // if there is nothing left to send, we are done
|
|
|
- if (bb.remaining() == 0) {
|
|
|
- packetSent();
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- synchronized (factory) {
|
|
|
- sk.selector().wakeup();
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
|
|
|
- + " is valid: " + sk.isValid());
|
|
|
- }
|
|
|
- outgoingBuffers.add(bb);
|
|
|
- if (sk.isValid()) {
|
|
|
- sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
|
|
|
+ synchronized (factory) {
|
|
|
+ sk.selector().wakeup();
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
|
|
|
+ + " is valid: " + sk.isValid());
|
|
|
+ }
|
|
|
+ outgoingBuffers.add(bb);
|
|
|
+ if (sk.isValid()) {
|
|
|
+ sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch(Exception e) {
|
|
|
+ LOG.error("Unexpected Exception: ", e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private static class CloseRequestException extends IOException {
|
|
|
private static final long serialVersionUID = -7854505709816442681L;
|
|
|
|