|
@@ -114,13 +114,6 @@ public class AvroRpc {
|
|
|
public void close() throws IOException {}
|
|
|
}
|
|
|
|
|
|
- private static class Invoker extends ReflectRequestor {
|
|
|
- public Invoker(Protocol protocol, Transceiver transceiver)
|
|
|
- throws IOException {
|
|
|
- super(protocol, transceiver);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/** Construct a client-side proxy object that implements the named protocol,
|
|
|
* talking to a server at the named address. */
|
|
|
public static Object getProxy(Class<?> protocol,
|
|
@@ -150,36 +143,14 @@ public class AvroRpc {
|
|
|
new InvocationHandler() {
|
|
|
public Object invoke(Object proxy, Method method, Object[] args)
|
|
|
throws Throwable {
|
|
|
- return new Invoker
|
|
|
- (ReflectData.get().getProtocol(protocol),
|
|
|
+ return new ReflectRequestor
|
|
|
+ (protocol,
|
|
|
new ClientTransceiver(addr, ticket, conf, factory))
|
|
|
.invoke(proxy, method, args);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- /** An Avro RPC Transceiver that provides a request passed through Hadoop RPC
|
|
|
- * to the Avro RPC Responder for processing. */
|
|
|
- private static class ServerTransceiver extends Transceiver {
|
|
|
- List<ByteBuffer> request;
|
|
|
-
|
|
|
- public ServerTransceiver(List<ByteBuffer> request) {
|
|
|
- this.request = request;
|
|
|
- }
|
|
|
-
|
|
|
- public String getRemoteName() { return "remote"; }
|
|
|
-
|
|
|
- public List<ByteBuffer> readBuffers() throws IOException {
|
|
|
- return request;
|
|
|
- }
|
|
|
-
|
|
|
- public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
|
|
|
- throw new UnsupportedOperationException();
|
|
|
- }
|
|
|
-
|
|
|
- public void close() throws IOException {}
|
|
|
- }
|
|
|
-
|
|
|
/** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
|
|
|
private static class TunnelResponder extends ReflectResponder
|
|
|
implements TunnelProtocol {
|
|
@@ -195,8 +166,7 @@ public class AvroRpc {
|
|
|
|
|
|
public BufferListWritable call(final BufferListWritable request)
|
|
|
throws IOException {
|
|
|
- return new BufferListWritable
|
|
|
- (respond(new ServerTransceiver(request.buffers)));
|
|
|
+ return new BufferListWritable(respond(request.buffers));
|
|
|
}
|
|
|
}
|
|
|
|