|
@@ -62,7 +62,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
|
|
|
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
|
|
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
|
|
- RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
|
|
|
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
|
|
|
new Server.ProtoBufRpcInvoker());
|
|
|
}
|
|
|
|
|
@@ -122,7 +122,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
public Invoker(Class<?> protocol, Client.ConnectionId connId,
|
|
|
Configuration conf, SocketFactory factory) {
|
|
|
this.remoteId = connId;
|
|
|
- this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
|
|
|
+ this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
|
|
|
this.protocolName = RPC.getProtocolName(protocol);
|
|
|
this.clientProtocolVersion = RPC
|
|
|
.getProtocolVersion(protocol);
|
|
@@ -191,7 +191,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
}
|
|
|
|
|
|
RequestProto rpcRequest = constructRpcRequest(method, args);
|
|
|
- RpcResponseWritable val = null;
|
|
|
+ RpcResponseWrapper val = null;
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
|
|
@@ -199,8 +199,8 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
" {" + TextFormat.shortDebugString((Message) args[1]) + "}");
|
|
|
}
|
|
|
try {
|
|
|
- val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
|
|
- new RpcRequestWritable(rpcRequest), remoteId);
|
|
|
+ val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
|
|
+ new RpcRequestWrapper(rpcRequest), remoteId);
|
|
|
|
|
|
} catch (Throwable e) {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
@@ -268,16 +268,20 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Writable Wrapper for Protocol Buffer Requests
|
|
|
+ * Wrapper for Protocol Buffer Requests
|
|
|
+ *
|
|
|
+ * Note while this wrapper is writable, the request on the wire is in
|
|
|
+ * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
|
|
|
+ * use type Writable as a wrapper to work across multiple RpcEngine kinds.
|
|
|
*/
|
|
|
- private static class RpcRequestWritable implements Writable {
|
|
|
+ private static class RpcRequestWrapper implements Writable {
|
|
|
RequestProto message;
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
- public RpcRequestWritable() {
|
|
|
+ public RpcRequestWrapper() {
|
|
|
}
|
|
|
|
|
|
- RpcRequestWritable(RequestProto message) {
|
|
|
+ RpcRequestWrapper(RequestProto message) {
|
|
|
this.message = message;
|
|
|
}
|
|
|
|
|
@@ -303,16 +307,20 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Writable Wrapper for Protocol Buffer Responses
|
|
|
+ * Wrapper for Protocol Buffer Responses
|
|
|
+ *
|
|
|
+ * Note while this wrapper is writable, the request on the wire is in
|
|
|
+ * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
|
|
|
+ * use type Writable as a wrapper to work across multiple RpcEngine kinds.
|
|
|
*/
|
|
|
- private static class RpcResponseWritable implements Writable {
|
|
|
+ private static class RpcResponseWrapper implements Writable {
|
|
|
byte[] responseMessage;
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
- public RpcResponseWritable() {
|
|
|
+ public RpcResponseWrapper() {
|
|
|
}
|
|
|
|
|
|
- public RpcResponseWritable(Message message) {
|
|
|
+ public RpcResponseWrapper(Message message) {
|
|
|
this.responseMessage = message.toByteArray();
|
|
|
}
|
|
|
|
|
@@ -336,7 +344,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
@InterfaceStability.Unstable
|
|
|
static Client getClient(Configuration conf) {
|
|
|
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
|
|
|
- RpcResponseWritable.class);
|
|
|
+ RpcResponseWrapper.class);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -425,7 +433,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
*/
|
|
|
public Writable call(RPC.Server server, String connectionProtocolName,
|
|
|
Writable writableRequest, long receiveTime) throws Exception {
|
|
|
- RpcRequestWritable request = (RpcRequestWritable) writableRequest;
|
|
|
+ RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
|
|
|
RequestProto rpcRequest = request.message;
|
|
|
String methodName = rpcRequest.getMethodName();
|
|
|
|
|
@@ -487,7 +495,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
} catch (Exception e) {
|
|
|
throw e;
|
|
|
}
|
|
|
- return new RpcResponseWritable(result);
|
|
|
+ return new RpcResponseWrapper(result);
|
|
|
}
|
|
|
}
|
|
|
}
|