|
@@ -27,10 +27,12 @@ import java.io.ByteArrayOutputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.CancelledKeyException;
|
|
|
import java.nio.channels.ClosedChannelException;
|
|
|
+import java.nio.channels.ReadableByteChannel;
|
|
|
import java.nio.channels.SelectionKey;
|
|
|
import java.nio.channels.Selector;
|
|
|
import java.nio.channels.ServerSocketChannel;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
+import java.nio.channels.WritableByteChannel;
|
|
|
|
|
|
import java.net.BindException;
|
|
|
import java.net.InetAddress;
|
|
@@ -584,7 +586,7 @@ public abstract class Server {
|
|
|
//
|
|
|
// Send as much data as we can in the non-blocking fashion
|
|
|
//
|
|
|
- int numBytes = channel.write(call.response);
|
|
|
+ int numBytes = channelWrite(channel, call.response);
|
|
|
if (numBytes < 0) {
|
|
|
return true;
|
|
|
}
|
|
@@ -759,7 +761,7 @@ public abstract class Server {
|
|
|
*/
|
|
|
int count = -1;
|
|
|
if (dataLengthBuffer.remaining() > 0) {
|
|
|
- count = channel.read(dataLengthBuffer);
|
|
|
+ count = channelRead(channel, dataLengthBuffer);
|
|
|
if (count < 0 || dataLengthBuffer.remaining() > 0)
|
|
|
return count;
|
|
|
}
|
|
@@ -767,7 +769,7 @@ public abstract class Server {
|
|
|
if (!versionRead) {
|
|
|
//Every connection is expected to send the header.
|
|
|
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
|
|
|
- count = channel.read(versionBuffer);
|
|
|
+ count = channelRead(channel, versionBuffer);
|
|
|
if (count <= 0) {
|
|
|
return count;
|
|
|
}
|
|
@@ -799,7 +801,7 @@ public abstract class Server {
|
|
|
incRpcCount(); // Increment the rpc count
|
|
|
}
|
|
|
|
|
|
- count = channel.read(data);
|
|
|
+ count = channelRead(channel, data);
|
|
|
|
|
|
if (data.remaining() == 0) {
|
|
|
dataLengthBuffer.clear();
|
|
@@ -1051,4 +1053,80 @@ public abstract class Server {
|
|
|
return callQueue.size();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * When the read or write buffer size is lager than this limit, i/o will be
|
|
|
+ * done in chunks of this size. Most RPC requests and responses will
|
|
|
+ * be smaller than this.
|
|
|
+ */
|
|
|
+ private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
|
|
|
+ * If the amount of data is large, it writes to channel in smaller chunks.
|
|
|
+ * This is to avoid jdk from creating many direct buffers as the size of
|
|
|
+ * buffer increases. his also minimizes extra copies in NIO layer
|
|
|
+ * as a result of multiple write operations required to write a large
|
|
|
+ * buffer.
|
|
|
+ *
|
|
|
+ * @see WritableByteChannel#write(ByteBuffer)
|
|
|
+ */
|
|
|
+ private static int channelWrite(WritableByteChannel channel,
|
|
|
+ ByteBuffer buffer) throws IOException {
|
|
|
+
|
|
|
+ return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
|
|
|
+ channel.write(buffer) : channelIO(null, channel, buffer);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
|
|
|
+ * If the amount of data is large, it writes to channel in smaller chunks.
|
|
|
+ * This is to avoid jdk from creating many direct buffers as the size of
|
|
|
+ * ByteBuffer increases. There should not be any performance degredation.
|
|
|
+ *
|
|
|
+ * @see ReadableByteChannel#read(ByteBuffer)
|
|
|
+ */
|
|
|
+ private static int channelRead(ReadableByteChannel channel,
|
|
|
+ ByteBuffer buffer) throws IOException {
|
|
|
+
|
|
|
+ return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
|
|
|
+ channel.read(buffer) : channelIO(channel, null, buffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
|
|
|
+ * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
|
|
|
+ * one of readCh or writeCh should be non-null.
|
|
|
+ *
|
|
|
+ * @see #channelRead(ReadableByteChannel, ByteBuffer)
|
|
|
+ * @see #channelWrite(WritableByteChannel, ByteBuffer)
|
|
|
+ */
|
|
|
+ private static int channelIO(ReadableByteChannel readCh,
|
|
|
+ WritableByteChannel writeCh,
|
|
|
+ ByteBuffer buf) throws IOException {
|
|
|
+
|
|
|
+ int originalLimit = buf.limit();
|
|
|
+ int initialRemaining = buf.remaining();
|
|
|
+ int ret = 0;
|
|
|
+
|
|
|
+ while (buf.remaining() > 0) {
|
|
|
+ try {
|
|
|
+ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
|
|
|
+ buf.limit(buf.position() + ioSize);
|
|
|
+
|
|
|
+ ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
|
|
|
+
|
|
|
+ if (ret < ioSize) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ buf.limit(originalLimit);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int nBytes = initialRemaining - buf.remaining();
|
|
|
+ return (nBytes > 0) ? nBytes : ret;
|
|
|
+ }
|
|
|
}
|