|
@@ -69,7 +69,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
|
|
|
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
|
|
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
|
|
- RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcWritable.Buffer.class,
|
|
|
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
|
|
|
new Server.ProtoBufRpcInvoker());
|
|
|
}
|
|
|
|
|
@@ -613,9 +613,8 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
*/
|
|
|
public Writable call(RPC.Server server, String connectionProtocolName,
|
|
|
Writable writableRequest, long receiveTime) throws Exception {
|
|
|
- RpcWritable.Buffer request = (RpcWritable.Buffer) writableRequest;
|
|
|
- RequestHeaderProto rpcRequest =
|
|
|
- request.getValue(RequestHeaderProto.getDefaultInstance());
|
|
|
+ RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
|
|
|
+ RequestHeaderProto rpcRequest = request.getRequestHeader();
|
|
|
String methodName = rpcRequest.getMethodName();
|
|
|
|
|
|
/**
|
|
@@ -687,4 +686,33 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // htrace in the ipc layer creates the span name based on toString()
|
|
|
+ // which uses the rpc header. in the normal case we want to defer decoding
|
|
|
+ // the rpc header until needed by the rpc engine.
|
|
|
+ static class RpcProtobufRequest extends RpcWritable.Buffer {
|
|
|
+ private RequestHeaderProto lazyHeader;
|
|
|
+
|
|
|
+ public RpcProtobufRequest() {
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized RequestHeaderProto getRequestHeader() throws IOException {
|
|
|
+ if (lazyHeader == null) {
|
|
|
+ lazyHeader = getValue(RequestHeaderProto.getDefaultInstance());
|
|
|
+ }
|
|
|
+ return lazyHeader;
|
|
|
+ }
|
|
|
+
|
|
|
+ // this is used by htrace to name the span.
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ try {
|
|
|
+ RequestHeaderProto header = getRequestHeader();
|
|
|
+ return header.getDeclaringClassProtocolName() + "." +
|
|
|
+ header.getMethodName();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new IllegalArgumentException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|