|
@@ -121,25 +121,33 @@ public class RPC {
|
|
|
|
|
|
}
|
|
|
|
|
|
- //TODO mb@media-style.com: static client or non-static client?
|
|
|
private static Client CLIENT;
|
|
|
|
|
|
+ private static synchronized Client getClient(Configuration conf) {
|
|
|
+ // Construct & cache client. The configuration is only used for timeout,
|
|
|
+ // and Clients have connection pools. So we can either (a) lose some
|
|
|
+ // connection pooling and leak sockets, or (b) use the same timeout for all
|
|
|
+ // configurations. Since the IPC is usually intended globally, not
|
|
|
+ // per-job, we choose (a).
|
|
|
+ if (CLIENT == null) {
|
|
|
+ CLIENT = new Client(ObjectWritable.class, conf);
|
|
|
+ }
|
|
|
+ return CLIENT;
|
|
|
+ }
|
|
|
+
|
|
|
private static class Invoker implements InvocationHandler {
|
|
|
private InetSocketAddress address;
|
|
|
+ private Client client;
|
|
|
|
|
|
public Invoker(InetSocketAddress address, Configuration conf) {
|
|
|
this.address = address;
|
|
|
- CLIENT = (Client) conf.getObject(Client.class.getName());
|
|
|
- if(CLIENT == null) {
|
|
|
- CLIENT = new Client(ObjectWritable.class, conf);
|
|
|
- conf.setObject(Client.class.getName(), CLIENT);
|
|
|
- }
|
|
|
+ this.client = getClient(conf);
|
|
|
}
|
|
|
|
|
|
public Object invoke(Object proxy, Method method, Object[] args)
|
|
|
throws Throwable {
|
|
|
ObjectWritable value = (ObjectWritable)
|
|
|
- CLIENT.call(new Invocation(method, args), address);
|
|
|
+ client.call(new Invocation(method, args), address);
|
|
|
return value.get();
|
|
|
}
|
|
|
}
|
|
@@ -160,12 +168,7 @@ public class RPC {
|
|
|
Invocation[] invocations = new Invocation[params.length];
|
|
|
for (int i = 0; i < params.length; i++)
|
|
|
invocations[i] = new Invocation(method, params[i]);
|
|
|
- CLIENT = (Client) conf.getObject(Client.class.getName());
|
|
|
- if(CLIENT == null) {
|
|
|
- CLIENT = new Client(ObjectWritable.class, conf);
|
|
|
- conf.setObject(Client.class.getName(), CLIENT);
|
|
|
- }
|
|
|
- Writable[] wrappedValues = CLIENT.call(invocations, addrs);
|
|
|
+ Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
|
|
|
|
|
|
if (method.getReturnType() == Void.TYPE) {
|
|
|
return null;
|