浏览代码

HADOOP-13549. Eliminate intermediate buffer for server-side PB encoding. Contributed by Daryn Sharp.

(cherry picked from commit 39d1b1d747b1e325792b897b3264272f32b756a9)
Kihwal Lee 8 年之前
父节点
当前提交
56340ac029
共有 1 个文件被更改,包括 49 次插入3 次删除
  1. 49 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

+ 49 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -118,6 +118,7 @@ import org.apache.htrace.core.Tracer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
@@ -2668,24 +2669,69 @@ public abstract class Server {
 
   private void setupResponse(RpcCall call,
       RpcResponseHeaderProto header, Writable rv) throws IOException {
+    final byte[] response;
+    if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
+      response = setupResponseForProtobuf(header, rv);
+    } else {
+      response = setupResponseForWritable(header, rv);
+    }
+    if (response.length > maxRespSize) {
+      LOG.warn("Large response size " + response.length + " for call "
+          + call.toString());
+    }
+    call.setResponse(ByteBuffer.wrap(response));
+  }
+
+  private byte[] setupResponseForWritable(
+      RpcResponseHeaderProto header, Writable rv) throws IOException {
     ResponseBuffer buf = responseBuffer.get().reset();
     try {
       RpcWritable.wrap(header).writeTo(buf);
       if (rv != null) {
         RpcWritable.wrap(rv).writeTo(buf);
       }
-      call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+      return buf.toByteArray();
     } finally {
       // Discard a large buf and reset it back to smaller size
       // to free up heap.
       if (buf.capacity() > maxRespSize) {
-        LOG.warn("Large response size " + buf.size() + " for call "
-            + call.toString());
         buf.setCapacity(INITIAL_RESP_BUF_SIZE);
       }
     }
   }
 
+
+  // writing to a pre-allocated array is the most efficient way to construct
+  // a protobuf response.
+  private byte[] setupResponseForProtobuf(
+      RpcResponseHeaderProto header, Writable rv) throws IOException {
+    Message payload = (rv != null)
+        ? ((RpcWritable.ProtobufWrapper)rv).getMessage() : null;
+    int length = getDelimitedLength(header);
+    if (payload != null) {
+      length += getDelimitedLength(payload);
+    }
+    byte[] buf = new byte[length + 4];
+    CodedOutputStream cos = CodedOutputStream.newInstance(buf);
+    // the stream only supports little endian ints
+    cos.writeRawByte((byte)((length >>> 24) & 0xFF));
+    cos.writeRawByte((byte)((length >>> 16) & 0xFF));
+    cos.writeRawByte((byte)((length >>>  8) & 0xFF));
+    cos.writeRawByte((byte)((length >>>  0) & 0xFF));
+    cos.writeRawVarint32(header.getSerializedSize());
+    header.writeTo(cos);
+    if (payload != null) {
+      cos.writeRawVarint32(payload.getSerializedSize());
+      payload.writeTo(cos);
+    }
+    return buf;
+  }
+
+  private static int getDelimitedLength(Message message) {
+    int length = message.getSerializedSize();
+    return length + CodedOutputStream.computeRawVarint32Size(length);
+  }
+
   /**
    * Setup response for the IPC Call on Fatal Error from a 
    * client that is using old version of Hadoop.