|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
+import java.lang.reflect.InvocationHandler;
|
|
|
import java.lang.reflect.Proxy;
|
|
|
import java.lang.reflect.Method;
|
|
|
|
|
@@ -26,6 +27,7 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.NoRouteToHostException;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.io.*;
|
|
|
+import java.io.Closeable;
|
|
|
import java.util.Map;
|
|
|
import java.util.HashMap;
|
|
|
|
|
@@ -80,12 +82,8 @@ public class RPC {
|
|
|
private RPC() {} // no public ctor
|
|
|
|
|
|
// cache of RpcEngines by protocol
|
|
|
- private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
|
|
|
- = new HashMap<Class,RpcEngine>();
|
|
|
-
|
|
|
- // track what RpcEngine is used by a proxy class, for stopProxy()
|
|
|
- private static final Map<Class,RpcEngine> PROXY_ENGINES
|
|
|
- = new HashMap<Class,RpcEngine>();
|
|
|
+ private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES
|
|
|
+ = new HashMap<Class<?>,RpcEngine>();
|
|
|
|
|
|
private static final String ENGINE_PROP = "rpc.engine";
|
|
|
|
|
@@ -96,32 +94,23 @@ public class RPC {
|
|
|
* @param engine the RpcEngine impl
|
|
|
*/
|
|
|
public static void setProtocolEngine(Configuration conf,
|
|
|
- Class protocol, Class engine) {
|
|
|
+ Class<?> protocol, Class<?> engine) {
|
|
|
conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
|
|
|
}
|
|
|
|
|
|
// return the RpcEngine configured to handle a protocol
|
|
|
- private static synchronized RpcEngine getProtocolEngine(Class protocol,
|
|
|
+ private static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
|
|
|
Configuration conf) {
|
|
|
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
|
|
|
if (engine == null) {
|
|
|
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
|
|
|
WritableRpcEngine.class);
|
|
|
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
|
|
|
- if (protocol.isInterface())
|
|
|
- PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
|
|
|
- protocol),
|
|
|
- engine);
|
|
|
PROTOCOL_ENGINES.put(protocol, engine);
|
|
|
}
|
|
|
return engine;
|
|
|
}
|
|
|
|
|
|
- // return the RpcEngine that handles a proxy object
|
|
|
- private static synchronized RpcEngine getProxyEngine(Object proxy) {
|
|
|
- return PROXY_ENGINES.get(proxy.getClass());
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* A version mismatch for the RPC protocol.
|
|
|
*/
|
|
@@ -477,13 +466,30 @@ public class RPC {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Stop this proxy and release its invoker's resource
|
|
|
- * @param proxy the proxy to be stopped
|
|
|
+ * Stop this proxy and release its invoker's resource by getting the
|
|
|
+ * invocation handler for the given proxy object and calling
|
|
|
+ * {@link Closeable#close} if that invocation handler implements
|
|
|
+ * {@link Closeable}.
|
|
|
+ *
|
|
|
+ * @param proxy the RPC proxy object to be stopped
|
|
|
*/
|
|
|
public static void stopProxy(Object proxy) {
|
|
|
- RpcEngine rpcEngine;
|
|
|
- if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) {
|
|
|
- rpcEngine.stopProxy(proxy);
|
|
|
+ InvocationHandler invocationHandler = null;
|
|
|
+ try {
|
|
|
+ invocationHandler = Proxy.getInvocationHandler(proxy);
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
+ LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
|
|
|
+ }
|
|
|
+ if (proxy != null && invocationHandler != null &&
|
|
|
+ invocationHandler instanceof Closeable) {
|
|
|
+ try {
|
|
|
+ ((Closeable)invocationHandler).close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Stopping RPC invocation handler caused exception", e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.error("Could not get invocation handler " + invocationHandler +
|
|
|
+ " for proxy " + proxy + ", or invocation handler is not closeable.");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -532,7 +538,7 @@ public class RPC {
|
|
|
}
|
|
|
|
|
|
/** Construct a server for a protocol implementation instance. */
|
|
|
- public static Server getServer(Class protocol,
|
|
|
+ public static Server getServer(Class<?> protocol,
|
|
|
Object instance, String bindAddress,
|
|
|
int port, Configuration conf)
|
|
|
throws IOException {
|
|
@@ -543,7 +549,7 @@ public class RPC {
|
|
|
* @deprecated secretManager should be passed.
|
|
|
*/
|
|
|
@Deprecated
|
|
|
- public static Server getServer(Class protocol,
|
|
|
+ public static Server getServer(Class<?> protocol,
|
|
|
Object instance, String bindAddress, int port,
|
|
|
int numHandlers,
|
|
|
boolean verbose, Configuration conf)
|