Bladeren bron

HADOOP-9380 Add totalLength to rpc response (sanjay Radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1459392 13f79535-47bb-0310-9956-ffa450edef68
Sanjay Radia 12 jaren geleden
bovenliggende
commit
3574720017

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -15,6 +15,8 @@ Trunk (Unreleased)
     HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
     HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
     it separately (sanjay Radia)
     it separately (sanjay Radia)
 
 
+    HADOOP-9380 Add totalLength to rpc response  (sanjay Radia)
+
   NEW FEATURES
   NEW FEATURES
     
     
     HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
     HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child

+ 25 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -83,6 +83,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
 
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -242,7 +243,7 @@ public class Client {
       callComplete();
       callComplete();
     }
     }
     
     
-    public synchronized Writable getRpcResult() {
+    public synchronized Writable getRpcResponse() {
       return rpcResponse;
       return rpcResponse;
     }
     }
   }
   }
@@ -944,11 +945,14 @@ public class Client {
       touch();
       touch();
       
       
       try {
       try {
+        int totalLen = in.readInt();
         RpcResponseHeaderProto header = 
         RpcResponseHeaderProto header = 
             RpcResponseHeaderProto.parseDelimitedFrom(in);
             RpcResponseHeaderProto.parseDelimitedFrom(in);
         if (header == null) {
         if (header == null) {
           throw new IOException("Response is null.");
           throw new IOException("Response is null.");
         }
         }
+        int headerLen = header.getSerializedSize();
+        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
 
 
         int callId = header.getCallId();
         int callId = header.getCallId();
         if (LOG.isDebugEnabled())
         if (LOG.isDebugEnabled())
@@ -961,11 +965,28 @@ public class Client {
           value.readFields(in);                 // read value
           value.readFields(in);                 // read value
           call.setRpcResponse(value);
           call.setRpcResponse(value);
           calls.remove(callId);
           calls.remove(callId);
+          
+          // verify that length was correct
+          // only for ProtobufEngine where len can be verified easily
+          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
+            ProtobufRpcEngine.RpcWrapper resWrapper = 
+                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
+            if (totalLen != headerLen + resWrapper.getLength()) { 
+              throw new RpcClientException(
+                  "RPC response length mismatch on rpc success");
+            }
+          }
         } else { // Rpc Request failed
         } else { // Rpc Request failed
-            final String exceptionClassName = header.hasExceptionClassName() ?
+          // Verify that length was correct
+          if (totalLen != headerLen) {
+            throw new RpcClientException(
+                "RPC response length mismatch on rpc error");
+          }
+          
+          final String exceptionClassName = header.hasExceptionClassName() ?
                 header.getExceptionClassName() : 
                 header.getExceptionClassName() : 
                   "ServerDidNotSetExceptionClassName";
                   "ServerDidNotSetExceptionClassName";
-            final String errorMsg = header.hasErrorMsg() ? 
+          final String errorMsg = header.hasErrorMsg() ? 
                 header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
                 header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
           RemoteException re = 
           RemoteException re = 
               new RemoteException(exceptionClassName, errorMsg);
               new RemoteException(exceptionClassName, errorMsg);
@@ -1251,7 +1272,7 @@ public class Client {
                   call.error);
                   call.error);
         }
         }
       } else {
       } else {
-        return call.getRpcResult();
+        return call.getRpcResponse();
       }
       }
     }
     }
   }
   }

+ 46 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -48,7 +48,9 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.AbstractMessageLite;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 import com.google.protobuf.ServiceException;
@@ -226,7 +228,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       Message returnMessage;
       Message returnMessage;
       try {
       try {
         returnMessage = prototype.newBuilderForType()
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(val.responseMessage).build();
+            .mergeFrom(val.theResponseRead).build();
 
 
         if (LOG.isTraceEnabled()) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +
@@ -267,6 +269,9 @@ public class ProtobufRpcEngine implements RpcEngine {
     }
     }
   }
   }
 
 
+  interface RpcWrapper extends Writable {
+    int getLength();
+  }
   /**
   /**
    * Wrapper for Protocol Buffer Requests
    * Wrapper for Protocol Buffer Requests
    * 
    * 
@@ -274,7 +279,7 @@ public class ProtobufRpcEngine implements RpcEngine {
    * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
    * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    */
    */
-  private static class RpcRequestWrapper implements Writable {
+  private static class RpcRequestWrapper implements RpcWrapper {
     RequestHeaderProto requestHeader;
     RequestHeaderProto requestHeader;
     Message theRequest; // for clientSide, the request is here
     Message theRequest; // for clientSide, the request is here
     byte[] theRequestRead; // for server side, the request is here
     byte[] theRequestRead; // for server side, the request is here
@@ -312,6 +317,22 @@ public class ProtobufRpcEngine implements RpcEngine {
       return requestHeader.getDeclaringClassProtocolName() + "." +
       return requestHeader.getDeclaringClassProtocolName() + "." +
           requestHeader.getMethodName();
           requestHeader.getMethodName();
     }
     }
+
+    @Override
+    public int getLength() {
+      int headerLen = requestHeader.getSerializedSize();
+      int reqLen;
+      if (theRequest != null) {
+        reqLen = theRequest.getSerializedSize();
+      } else if (theRequestRead != null ) {
+        reqLen = theRequestRead.length;
+      } else {
+        throw new IllegalArgumentException(
+            "getLenght on uninilialized RpcWrapper");      
+      }
+      return CodedOutputStream.computeRawVarint32Size(headerLen) +  headerLen
+          + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
+    }
   }
   }
 
 
   /**
   /**
@@ -321,29 +342,43 @@ public class ProtobufRpcEngine implements RpcEngine {
    * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
    * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    */
    */
-  private static class RpcResponseWrapper implements Writable {
-    byte[] responseMessage;
+  private static class RpcResponseWrapper implements RpcWrapper {
+    Message theResponse; // for senderSide, the response is here
+    byte[] theResponseRead; // for receiver side, the response is here
 
 
     @SuppressWarnings("unused")
     @SuppressWarnings("unused")
     public RpcResponseWrapper() {
     public RpcResponseWrapper() {
     }
     }
 
 
     public RpcResponseWrapper(Message message) {
     public RpcResponseWrapper(Message message) {
-      this.responseMessage = message.toByteArray();
+      this.theResponse = message;
     }
     }
 
 
     @Override
     @Override
     public void write(DataOutput out) throws IOException {
     public void write(DataOutput out) throws IOException {
-      out.writeInt(responseMessage.length);
-      out.write(responseMessage);     
+      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+      theResponse.writeDelimitedTo(os);   
     }
     }
 
 
     @Override
     @Override
     public void readFields(DataInput in) throws IOException {
     public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
-      byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      responseMessage = bytes;
+      int length = ProtoUtil.readRawVarint32(in);
+      theResponseRead = new byte[length];
+      in.readFully(theResponseRead);
+    }
+    
+    @Override
+    public int getLength() {
+      int resLen;
+      if (theResponse != null) {
+        resLen = theResponse.getSerializedSize();
+      } else if (theResponseRead != null ) {
+        resLen = theResponseRead.length;
+      } else {
+        throw new IllegalArgumentException(
+            "getLenght on uninilialized RpcWrapper");      
+      }
+      return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
     }
     }
   }
   }
 
 

+ 44 - 14
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -73,6 +73,7 @@ import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
@@ -107,6 +108,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
 
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -202,7 +204,8 @@ public abstract class Server {
   // 6 : Made RPC Request header explicit
   // 6 : Made RPC Request header explicit
   // 7 : Changed Ipc Connection Header to use Protocol buffers
   // 7 : Changed Ipc Connection Header to use Protocol buffers
   // 8 : SASL server always sends a final response
   // 8 : SASL server always sends a final response
-  public static final byte CURRENT_VERSION = 8;
+  // 9 : Changes to protocol for HADOOP-8990
+  public static final byte CURRENT_VERSION = 9;
 
 
   /**
   /**
    * Initial and max size of response buffer
    * Initial and max size of response buffer
@@ -1512,10 +1515,15 @@ public abstract class Server {
       " cannot communicate with client version " + clientVersion;
       " cannot communicate with client version " + clientVersion;
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
       
       
-      if (clientVersion >= 3) {
+      if (clientVersion >= 9) {
+        // Versions >>9  understand the normal response
         Call fakeCall =  new Call(-1, null, this);
         Call fakeCall =  new Call(-1, null, this);
-        // Versions 3 and greater can interpret this exception
-        // response in the same manner
+        setupResponse(buffer, fakeCall, RpcStatusProto.FATAL,
+            null, VersionMismatch.class.getName(), errMsg);
+        responder.doRespond(fakeCall);
+      } else if (clientVersion >= 3) {
+        Call fakeCall =  new Call(-1, null, this);
+        // Versions 3 to 8 use older response
         setupResponseOldVersionFatal(buffer, fakeCall,
         setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
             null, VersionMismatch.class.getName(), errMsg);
 
 
@@ -1997,17 +2005,34 @@ public abstract class Server {
   throws IOException {
   throws IOException {
     responseBuf.reset();
     responseBuf.reset();
     DataOutputStream out = new DataOutputStream(responseBuf);
     DataOutputStream out = new DataOutputStream(responseBuf);
-    RpcResponseHeaderProto.Builder response =  
+    RpcResponseHeaderProto.Builder headerBuilder =  
         RpcResponseHeaderProto.newBuilder();
         RpcResponseHeaderProto.newBuilder();
-    response.setCallId(call.callId);
-    response.setStatus(status);
-    response.setServerIpcVersionNum(Server.CURRENT_VERSION);
-
+    headerBuilder.setCallId(call.callId);
+    headerBuilder.setStatus(status);
+    headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
 
 
     if (status == RpcStatusProto.SUCCESS) {
     if (status == RpcStatusProto.SUCCESS) {
+      RpcResponseHeaderProto header = headerBuilder.build();
+      final int headerLen = header.getSerializedSize();
+      int fullLength  = CodedOutputStream.computeRawVarint32Size(headerLen) +
+          headerLen;
       try {
       try {
-        response.build().writeDelimitedTo(out);
-        rv.write(out);
+        if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
+          ProtobufRpcEngine.RpcWrapper resWrapper = 
+              (ProtobufRpcEngine.RpcWrapper) rv;
+          fullLength += resWrapper.getLength();
+          out.writeInt(fullLength);
+          header.writeDelimitedTo(out);
+          rv.write(out);
+        } else { // Have to serialize to buffer to get len
+          final DataOutputBuffer buf = new DataOutputBuffer();
+          rv.write(buf);
+          byte[] data = buf.getData();
+          fullLength += buf.getLength();
+          out.writeInt(fullLength);
+          header.writeDelimitedTo(out);
+          out.write(data, 0, buf.getLength());
+        }
       } catch (Throwable t) {
       } catch (Throwable t) {
         LOG.warn("Error serializing call response for call " + call, t);
         LOG.warn("Error serializing call response for call " + call, t);
         // Call back to same function - this is OK since the
         // Call back to same function - this is OK since the
@@ -2019,9 +2044,14 @@ public abstract class Server {
         return;
         return;
       }
       }
     } else { // Rpc Failure
     } else { // Rpc Failure
-      response.setExceptionClassName(errorClass);
-      response.setErrorMsg(error);
-      response.build().writeDelimitedTo(out);
+      headerBuilder.setExceptionClassName(errorClass);
+      headerBuilder.setErrorMsg(error);
+      RpcResponseHeaderProto header = headerBuilder.build();
+      int headerLen = header.getSerializedSize();
+      final int fullLength  = 
+          CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
+      out.writeInt(fullLength);
+      header.writeDelimitedTo(out);
     }
     }
     if (call.connection.useWrap) {
     if (call.connection.useWrap) {
       wrapWithSasl(responseBuf, call);
       wrapWithSasl(responseBuf, call);