|
@@ -51,13 +51,14 @@ import com.google.protobuf.BlockingService;
|
|
|
import com.google.protobuf.Descriptors.MethodDescriptor;
|
|
|
import com.google.protobuf.Message;
|
|
|
import com.google.protobuf.ServiceException;
|
|
|
+import com.google.protobuf.TextFormat;
|
|
|
|
|
|
/**
|
|
|
* RPC Engine for for protobuf based RPCs.
|
|
|
*/
|
|
|
@InterfaceStability.Evolving
|
|
|
public class ProtobufRpcEngine implements RpcEngine {
|
|
|
- private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
|
|
+ public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
|
|
|
|
|
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
|
|
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
|
@@ -191,16 +192,29 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
|
|
|
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
|
|
|
RpcResponseWritable val = null;
|
|
|
+
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace(Thread.currentThread().getId() + ": Call -> " +
|
|
|
+ remoteId + ": " + method.getName() +
|
|
|
+ " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
|
|
|
+ }
|
|
|
try {
|
|
|
val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
|
|
new RpcRequestWritable(rpcRequest), remoteId);
|
|
|
+
|
|
|
} catch (Throwable e) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
|
|
|
+ remoteId + ": " + method.getName() +
|
|
|
+ " {" + e + "}");
|
|
|
+ }
|
|
|
+
|
|
|
throw new ServiceException(e);
|
|
|
}
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
long callTime = Time.now() - startTime;
|
|
|
- LOG.debug("Call: " + method.getName() + " " + callTime);
|
|
|
+ LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
|
|
|
}
|
|
|
|
|
|
Message prototype = null;
|
|
@@ -213,6 +227,13 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|
|
try {
|
|
|
returnMessage = prototype.newBuilderForType()
|
|
|
.mergeFrom(val.responseMessage).build();
|
|
|
+
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace(Thread.currentThread().getId() + ": Response <- " +
|
|
|
+ remoteId + ": " + method.getName() +
|
|
|
+ " {" + TextFormat.shortDebugString(returnMessage) + "}");
|
|
|
+ }
|
|
|
+
|
|
|
} catch (Throwable e) {
|
|
|
throw new ServiceException(e);
|
|
|
}
|