|
@@ -133,63 +133,76 @@ public class RPC {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private static Map<SocketFactory, Client> CLIENTS =
|
|
|
+ /* Cache a client using its socket factory as the hash key */
|
|
|
+ static private class ClientCache {
|
|
|
+ private Map<SocketFactory, Client> clients =
|
|
|
new HashMap<SocketFactory, Client>();
|
|
|
|
|
|
- private static synchronized Client getClient(Configuration conf,
|
|
|
- SocketFactory factory) {
|
|
|
- // Construct & cache client. The configuration is only used for timeout,
|
|
|
- // and Clients have connection pools. So we can either (a) lose some
|
|
|
- // connection pooling and leak sockets, or (b) use the same timeout for all
|
|
|
- // configurations. Since the IPC is usually intended globally, not
|
|
|
- // per-job, we choose (a).
|
|
|
- Client client = CLIENTS.get(factory);
|
|
|
- if (client == null) {
|
|
|
- client = new Client(ObjectWritable.class, conf, factory);
|
|
|
- CLIENTS.put(factory, client);
|
|
|
+ /**
|
|
|
+ * Construct & cache an IPC client with the user-provided SocketFactory
|
|
|
+ * if no cached client exists.
|
|
|
+ *
|
|
|
+ * @param conf Configuration
|
|
|
+ * @return an IPC client
|
|
|
+ */
|
|
|
+ private synchronized Client getClient(Configuration conf,
|
|
|
+ SocketFactory factory) {
|
|
|
+ // Construct & cache client. The configuration is only used for timeout,
|
|
|
+ // and Clients have connection pools. So we can either (a) lose some
|
|
|
+ // connection pooling and leak sockets, or (b) use the same timeout for all
|
|
|
+ // configurations. Since the IPC is usually intended globally, not
|
|
|
+ // per-job, we choose (a).
|
|
|
+ Client client = clients.get(factory);
|
|
|
+ if (client == null) {
|
|
|
+ client = new Client(ObjectWritable.class, conf, factory);
|
|
|
+ clients.put(factory, client);
|
|
|
+ } else {
|
|
|
+ client.incCount();
|
|
|
+ }
|
|
|
+ return client;
|
|
|
}
|
|
|
- return client;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Construct & cache client with the default SocketFactory.
|
|
|
- * @param conf
|
|
|
- * @return
|
|
|
- */
|
|
|
- private static Client getClient(Configuration conf) {
|
|
|
- return getClient(conf, SocketFactory.getDefault());
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- * Stop all RPC client connections
|
|
|
- */
|
|
|
- public static synchronized void stopClient(){
|
|
|
- for (Client client : CLIENTS.values())
|
|
|
- client.stop();
|
|
|
- CLIENTS.clear();
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * remove specified client from the list of clients.
|
|
|
- */
|
|
|
- static synchronized void removeClients() {
|
|
|
- CLIENTS.clear();
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Construct & cache an IPC client with the default SocketFactory
|
|
|
+ * if no cached client exists.
|
|
|
+ *
|
|
|
+ * @param conf Configuration
|
|
|
+ * @return an IPC client
|
|
|
+ */
|
|
|
+ private synchronized Client getClient(Configuration conf) {
|
|
|
+ return getClient(conf, SocketFactory.getDefault());
|
|
|
+ }
|
|
|
|
|
|
- static synchronized Collection allClients() {
|
|
|
- return CLIENTS.values();
|
|
|
+ /**
|
|
|
+ * Stop a RPC client connection
|
|
|
+ * A RPC client is closed only when its reference count becomes zero.
|
|
|
+ */
|
|
|
+ private void stopClient(Client client) {
|
|
|
+ synchronized (this) {
|
|
|
+ client.decCount();
|
|
|
+ if (client.isZeroReference()) {
|
|
|
+ clients.remove(client.getSocketFactory());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (client.isZeroReference()) {
|
|
|
+ client.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ private static ClientCache CLIENTS=new ClientCache();
|
|
|
+
|
|
|
private static class Invoker implements InvocationHandler {
|
|
|
private InetSocketAddress address;
|
|
|
private UserGroupInformation ticket;
|
|
|
private Client client;
|
|
|
+ private boolean isClosed = false;
|
|
|
|
|
|
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
|
|
|
Configuration conf, SocketFactory factory) {
|
|
|
this.address = address;
|
|
|
this.ticket = ticket;
|
|
|
- this.client = getClient(conf, factory);
|
|
|
+ this.client = CLIENTS.getClient(conf, factory);
|
|
|
}
|
|
|
|
|
|
public Object invoke(Object proxy, Method method, Object[] args)
|
|
@@ -201,6 +214,14 @@ public class RPC {
|
|
|
LOG.debug("Call: " + method.getName() + " " + callTime);
|
|
|
return value.get();
|
|
|
}
|
|
|
+
|
|
|
+ /* close the IPC client that's responsible for this invoker's RPCs */
|
|
|
+ synchronized private void close() {
|
|
|
+ if (!isClosed) {
|
|
|
+ isClosed = true;
|
|
|
+ CLIENTS.stopClient(client);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -236,7 +257,7 @@ public class RPC {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the client's prefered version
|
|
|
+ * Get the client's preferred version
|
|
|
*/
|
|
|
public long getClientVersion() {
|
|
|
return clientVersion;
|
|
@@ -316,6 +337,16 @@ public class RPC {
|
|
|
.getDefaultSocketFactory(conf));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Stop this proxy and release its invoker's resource
|
|
|
+ * @param proxy the proxy to be stopped
|
|
|
+ */
|
|
|
+ public static void stopProxy(VersionedProtocol proxy) {
|
|
|
+ if (proxy!=null) {
|
|
|
+ ((Invoker)Proxy.getInvocationHandler(proxy)).close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Expert: Make multiple, parallel calls to a set of servers. */
|
|
|
public static Object[] call(Method method, Object[][] params,
|
|
|
InetSocketAddress[] addrs, Configuration conf)
|
|
@@ -324,7 +355,9 @@ public class RPC {
|
|
|
Invocation[] invocations = new Invocation[params.length];
|
|
|
for (int i = 0; i < params.length; i++)
|
|
|
invocations[i] = new Invocation(method, params[i]);
|
|
|
- Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
|
|
|
+ Client client = CLIENTS.getClient(conf);
|
|
|
+ try {
|
|
|
+ Writable[] wrappedValues = client.call(invocations, addrs);
|
|
|
|
|
|
if (method.getReturnType() == Void.TYPE) {
|
|
|
return null;
|
|
@@ -337,6 +370,9 @@ public class RPC {
|
|
|
values[i] = ((ObjectWritable)wrappedValues[i]).get();
|
|
|
|
|
|
return values;
|
|
|
+ } finally {
|
|
|
+ CLIENTS.stopClient(client);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Construct a server for a protocol implementation instance listening on a
|