|
@@ -36,10 +36,12 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.ipc.Client;
|
|
|
+import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
|
|
|
import org.apache.hadoop.ipc.ProtocolProxy;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RpcEngine;
|
|
|
import org.apache.hadoop.ipc.ClientCache;
|
|
|
+import org.apache.hadoop.ipc.Client.ConnectionId;
|
|
|
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.SecretManager;
|
|
@@ -73,6 +75,17 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
|
|
|
addr, ticket, conf, factory, rpcTimeout)), false);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
|
|
|
+ ConnectionId connId, Configuration conf, SocketFactory factory)
|
|
|
+ throws IOException {
|
|
|
+ Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
|
|
|
+ return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
|
|
|
+ (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
|
|
|
+ new Class[] { protocol }, new Invoker(protocol, connId, conf,
|
|
|
+ factory)), false);
|
|
|
+ }
|
|
|
+
|
|
|
private static class Invoker implements InvocationHandler, Closeable {
|
|
|
private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
|
|
|
private boolean isClosed = false;
|
|
@@ -82,8 +95,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
|
|
|
public Invoker(Class<?> protocol, InetSocketAddress addr,
|
|
|
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
|
|
|
int rpcTimeout) throws IOException {
|
|
|
- this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
|
|
|
- ticket, rpcTimeout, conf);
|
|
|
+ this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
|
|
|
+ ticket, rpcTimeout, conf), conf, factory);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Invoker(Class<?> protocol, Client.ConnectionId connId,
|
|
|
+ Configuration conf, SocketFactory factory) {
|
|
|
+ this.remoteId = connId;
|
|
|
this.client = CLIENTS.getClient(conf, factory,
|
|
|
ProtoSpecificResponseWritable.class);
|
|
|
}
|