Browse Source

MAPREDUCE-3885. Avoid an unnecessary copy for all requests/responses in MRs ProtoOverHadoopRpcEngine. (Contributed by Devaraj Das)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1295362 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 years ago
parent
commit
7ae1523865

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -97,6 +97,9 @@ Release 0.23.3 - UNRELEASED
 
 
     MAPREDUCE-3909 Javadoc the Service interfaces (stevel)
     MAPREDUCE-3909 Javadoc the Service interfaces (stevel)
 
 
+    MAPREDUCE-3885. Avoid an unnecessary copy for all requests/responses in 
+    MRs ProtoOverHadoopRpcEngine. (Devaraj Das via sseth)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 9 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java

@@ -34,6 +34,8 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputOutputStream;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
 import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
@@ -46,6 +48,7 @@ import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
 import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
 import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest;
 import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest;
 import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse;
 import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse;
@@ -213,13 +216,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
 
 
     @Override
     @Override
     public void write(DataOutput out) throws IOException {
     public void write(DataOutput out) throws IOException {
-      out.writeInt(message.toByteArray().length);
-      out.write(message.toByteArray());
+      ((Message)message).writeDelimitedTo(
+          DataOutputOutputStream.constructOutputStream(out));
     }
     }
 
 
     @Override
     @Override
     public void readFields(DataInput in) throws IOException {
     public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
+      int length = ProtoUtil.readRawVarint32(in);
       byte[] bytes = new byte[length];
       byte[] bytes = new byte[length];
       in.readFully(bytes);
       in.readFully(bytes);
       message = ProtoSpecificRpcRequest.parseFrom(bytes);
       message = ProtoSpecificRpcRequest.parseFrom(bytes);
@@ -241,13 +244,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
 
 
     @Override
     @Override
     public void write(DataOutput out) throws IOException {
     public void write(DataOutput out) throws IOException {
-      out.writeInt(message.toByteArray().length);
-      out.write(message.toByteArray());
+      ((Message)message).writeDelimitedTo(
+          DataOutputOutputStream.constructOutputStream(out));      
     }
     }
 
 
     @Override
     @Override
     public void readFields(DataInput in) throws IOException {
     public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
+      int length = ProtoUtil.readRawVarint32(in);
       byte[] bytes = new byte[length];
       byte[] bytes = new byte[length];
       in.readFully(bytes);
       in.readFully(bytes);
       message = ProtoSpecificRpcResponse.parseFrom(bytes);
       message = ProtoSpecificRpcResponse.parseFrom(bytes);