|
@@ -27,6 +27,9 @@ import java.lang.reflect.InvocationTargetException;
|
|
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.io.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.HashMap;
|
|
|
|
|
@@ -35,6 +38,7 @@ import javax.net.SocketFactory;
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.io.*;
|
|
|
+import org.apache.hadoop.ipc.VersionedProtocol;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.SecretManager;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
@@ -47,10 +51,46 @@ import org.apache.hadoop.conf.*;
|
|
|
public class WritableRpcEngine implements RpcEngine {
|
|
|
private static final Log LOG = LogFactory.getLog(RPC.class);
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all superInterfaces that extend VersionedProtocol
|
|
|
+ * @param childInterfaces
|
|
|
+ * @return the super interfaces that extend VersionedProtocol
|
|
|
+ */
|
|
|
+ private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
|
|
|
+ List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
|
|
|
+
|
|
|
+ for (Class<?> childInterface : childInterfaces) {
|
|
|
+ if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
|
|
|
+ allInterfaces.add(childInterface);
|
|
|
+ allInterfaces.addAll(
|
|
|
+ Arrays.asList(
|
|
|
+ getSuperInterfaces(childInterface.getInterfaces())));
|
|
|
+ } else {
|
|
|
+ LOG.warn("Interface " + childInterface +
|
|
|
+ " ignored because it does not extend VersionedProtocol");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all interfaces that the given protocol implements or extends
|
|
|
+ * which are assignable from VersionedProtocol.
|
|
|
+ */
|
|
|
+ private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
|
|
|
+ Class<?>[] interfaces = protocol.getInterfaces();
|
|
|
+ return getSuperInterfaces(interfaces);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
//writableRpcVersion should be updated if there is a change
|
|
|
//in format of the rpc messages.
|
|
|
- public static long writableRpcVersion = 1L;
|
|
|
+
|
|
|
+ // 2L - added declared class to Invocation
|
|
|
+ public static final long writableRpcVersion = 2L;
|
|
|
|
|
|
+
|
|
|
/** A method invocation, including the method name and its parameters.*/
|
|
|
private static class Invocation implements Writable, Configurable {
|
|
|
private String methodName;
|
|
@@ -59,11 +99,13 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
private Configuration conf;
|
|
|
private long clientVersion;
|
|
|
private int clientMethodsHash;
|
|
|
+ private String declaringClassProtocolName;
|
|
|
|
|
|
//This could be different from static writableRpcVersion when received
|
|
|
//at server, if client is using a different version.
|
|
|
private long rpcVersion;
|
|
|
|
|
|
+ @SuppressWarnings("unused") // called when deserializing an invocation
|
|
|
public Invocation() {}
|
|
|
|
|
|
public Invocation(Method method, Object[] parameters) {
|
|
@@ -88,6 +130,8 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
|
|
|
.getDeclaringClass().getMethods());
|
|
|
}
|
|
|
+ this.declaringClassProtocolName =
|
|
|
+ RPC.getProtocolName(method.getDeclaringClass());
|
|
|
}
|
|
|
|
|
|
/** The name of the method invoked. */
|
|
@@ -103,6 +147,7 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
return clientVersion;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unused")
|
|
|
private int getClientMethodsHash() {
|
|
|
return clientMethodsHash;
|
|
|
}
|
|
@@ -115,8 +160,10 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
return rpcVersion;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
public void readFields(DataInput in) throws IOException {
|
|
|
rpcVersion = in.readLong();
|
|
|
+ declaringClassProtocolName = UTF8.readString(in);
|
|
|
methodName = UTF8.readString(in);
|
|
|
clientVersion = in.readLong();
|
|
|
clientMethodsHash = in.readInt();
|
|
@@ -124,13 +171,16 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
parameterClasses = new Class[parameters.length];
|
|
|
ObjectWritable objectWritable = new ObjectWritable();
|
|
|
for (int i = 0; i < parameters.length; i++) {
|
|
|
- parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
|
|
|
+ parameters[i] =
|
|
|
+ ObjectWritable.readObject(in, objectWritable, this.conf);
|
|
|
parameterClasses[i] = objectWritable.getDeclaredClass();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
public void write(DataOutput out) throws IOException {
|
|
|
out.writeLong(rpcVersion);
|
|
|
+ UTF8.writeString(out, declaringClassProtocolName);
|
|
|
UTF8.writeString(out, methodName);
|
|
|
out.writeLong(clientVersion);
|
|
|
out.writeInt(clientMethodsHash);
|
|
@@ -273,30 +323,161 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
|
|
|
/** Construct a server for a protocol implementation instance listening on a
|
|
|
* port and address. */
|
|
|
- public Server getServer(Class<?> protocol,
|
|
|
- Object instance, String bindAddress, int port,
|
|
|
- int numHandlers, int numReaders, int queueSizePerHandler,
|
|
|
- boolean verbose, Configuration conf,
|
|
|
+ public RPC.Server getServer(Class<?> protocolClass,
|
|
|
+ Object protocolImpl, String bindAddress, int port,
|
|
|
+ int numHandlers, int numReaders, int queueSizePerHandler,
|
|
|
+ boolean verbose, Configuration conf,
|
|
|
SecretManager<? extends TokenIdentifier> secretManager)
|
|
|
throws IOException {
|
|
|
- return new Server(instance, conf, bindAddress, port, numHandlers,
|
|
|
- numReaders, queueSizePerHandler, verbose, secretManager);
|
|
|
+ return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
|
|
|
+ numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/** An RPC Server. */
|
|
|
public static class Server extends RPC.Server {
|
|
|
- private Object instance;
|
|
|
private boolean verbose;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The key in Map
|
|
|
+ */
|
|
|
+ static class ProtoNameVer {
|
|
|
+ final String protocol;
|
|
|
+ final long version;
|
|
|
+ ProtoNameVer(String protocol, long ver) {
|
|
|
+ this.protocol = protocol;
|
|
|
+ this.version = ver;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public boolean equals(Object o) {
|
|
|
+ if (o == null)
|
|
|
+ return false;
|
|
|
+ if (this == o)
|
|
|
+ return true;
|
|
|
+ if (! (o instanceof ProtoNameVer))
|
|
|
+ return false;
|
|
|
+ ProtoNameVer pv = (ProtoNameVer) o;
|
|
|
+ return ((pv.protocol.equals(this.protocol)) &&
|
|
|
+ (pv.version == this.version));
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public int hashCode() {
|
|
|
+ return protocol.hashCode() * 37 + (int) version;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The value in map
|
|
|
+ */
|
|
|
+ static class ProtoClassProtoImpl {
|
|
|
+ final Class<?> protocolClass;
|
|
|
+ final Object protocolImpl;
|
|
|
+ ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
|
|
|
+ this.protocolClass = protocolClass;
|
|
|
+ this.protocolImpl = protocolImpl;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap =
|
|
|
+ new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
|
|
|
+
|
|
|
+ // Register protocol and its impl for rpc calls
|
|
|
+ private void registerProtocolAndImpl(Class<?> protocolClass,
|
|
|
+ Object protocolImpl) throws IOException {
|
|
|
+ String protocolName = RPC.getProtocolName(protocolClass);
|
|
|
+ VersionedProtocol vp = (VersionedProtocol) protocolImpl;
|
|
|
+ long version;
|
|
|
+ try {
|
|
|
+ version = vp.getProtocolVersion(protocolName, 0);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.warn("Protocol " + protocolClass +
|
|
|
+ " NOT registered as getProtocolVersion throws exception ");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ protocolImplMap.put(new ProtoNameVer(protocolName, version),
|
|
|
+ new ProtoClassProtoImpl(protocolClass, protocolImpl));
|
|
|
+ LOG.info("ProtocolImpl=" + protocolImpl.getClass().getName() +
|
|
|
+ " protocolClass=" + protocolClass.getName() + " version=" + version);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class VerProtocolImpl {
|
|
|
+ final long version;
|
|
|
+ final ProtoClassProtoImpl protocolTarget;
|
|
|
+ VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
|
|
|
+ this.version = ver;
|
|
|
+ this.protocolTarget = protocolTarget;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @SuppressWarnings("unused") // will be useful later.
|
|
|
+ private VerProtocolImpl[] getSupportedProtocolVersions(
|
|
|
+ String protocolName) {
|
|
|
+ VerProtocolImpl[] resultk = new VerProtocolImpl[protocolImplMap.size()];
|
|
|
+ int i = 0;
|
|
|
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
|
|
|
+ protocolImplMap.entrySet()) {
|
|
|
+ if (pv.getKey().protocol.equals(protocolName)) {
|
|
|
+ resultk[i++] =
|
|
|
+ new VerProtocolImpl(pv.getKey().version, pv.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (i == 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ VerProtocolImpl[] result = new VerProtocolImpl[i];
|
|
|
+ System.arraycopy(resultk, 0, result, 0, i);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {
|
|
|
+ Long highestVersion = 0L;
|
|
|
+ ProtoClassProtoImpl highest = null;
|
|
|
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
|
|
|
+ .entrySet()) {
|
|
|
+ if (pv.getKey().protocol.equals(protocolName)) {
|
|
|
+ if ((highest == null) || (pv.getKey().version > highestVersion)) {
|
|
|
+ highest = pv.getValue();
|
|
|
+ highestVersion = pv.getKey().version;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (highest == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return new VerProtocolImpl(highestVersion, highest);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/** Construct an RPC server.
|
|
|
+ * @param instance the instance whose methods will be called
|
|
|
+ * @param conf the configuration to use
|
|
|
+ * @param bindAddress the address to bind on to listen for connection
|
|
|
+ * @param port the port to listen for connections on
|
|
|
+ *
|
|
|
+ * @deprecated Use #Server(Class, Object, Configuration, String, int)
|
|
|
+ *
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
+ public Server(Object instance, Configuration conf, String bindAddress,
|
|
|
+ int port)
|
|
|
+ throws IOException {
|
|
|
+ this(null, instance, conf, bindAddress, port);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /** Construct an RPC server.
|
|
|
+ * @param protocol class
|
|
|
* @param instance the instance whose methods will be called
|
|
|
* @param conf the configuration to use
|
|
|
* @param bindAddress the address to bind on to listen for connection
|
|
|
* @param port the port to listen for connections on
|
|
|
*/
|
|
|
- public Server(Object instance, Configuration conf, String bindAddress, int port)
|
|
|
+ public Server(Class<?> protocolClass, Object protocolImpl,
|
|
|
+ Configuration conf, String bindAddress, int port)
|
|
|
throws IOException {
|
|
|
- this(instance, conf, bindAddress, port, 1, -1, -1, false, null);
|
|
|
+ this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
|
|
|
+ false, null);
|
|
|
}
|
|
|
|
|
|
private static String classNameBase(String className) {
|
|
@@ -307,35 +488,103 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
return names[names.length-1];
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/** Construct an RPC server.
|
|
|
- * @param instance the instance whose methods will be called
|
|
|
+ * @param protocolImpl the instance whose methods will be called
|
|
|
* @param conf the configuration to use
|
|
|
* @param bindAddress the address to bind on to listen for connection
|
|
|
* @param port the port to listen for connections on
|
|
|
* @param numHandlers the number of method handler threads to run
|
|
|
* @param verbose whether each call should be logged
|
|
|
+ *
|
|
|
+ * @deprecated use Server#Server(Class, Object,
|
|
|
+ * Configuration, String, int, int, int, int, boolean, SecretManager)
|
|
|
*/
|
|
|
- public Server(Object instance, Configuration conf, String bindAddress, int port,
|
|
|
- int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose,
|
|
|
- SecretManager<? extends TokenIdentifier> secretManager)
|
|
|
+ @Deprecated
|
|
|
+ public Server(Object protocolImpl, Configuration conf, String bindAddress,
|
|
|
+ int port, int numHandlers, int numReaders, int queueSizePerHandler,
|
|
|
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
|
|
|
+ throws IOException {
|
|
|
+ this(null, protocolImpl, conf, bindAddress, port,
|
|
|
+ numHandlers, numReaders, queueSizePerHandler, verbose,
|
|
|
+ secretManager);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Construct an RPC server.
|
|
|
+ * @param protocolClass - the protocol being registered
|
|
|
+ * can be null for compatibility with old usage (see below for details)
|
|
|
+ * @param protocolImpl the protocol impl that will be called
|
|
|
+ * @param conf the configuration to use
|
|
|
+ * @param bindAddress the address to bind on to listen for connection
|
|
|
+ * @param port the port to listen for connections on
|
|
|
+ * @param numHandlers the number of method handler threads to run
|
|
|
+ * @param verbose whether each call should be logged
|
|
|
+ */
|
|
|
+ public Server(Class<?> protocolClass, Object protocolImpl,
|
|
|
+ Configuration conf, String bindAddress, int port,
|
|
|
+ int numHandlers, int numReaders, int queueSizePerHandler,
|
|
|
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
|
|
|
throws IOException {
|
|
|
super(bindAddress, port, Invocation.class, numHandlers, numReaders,
|
|
|
queueSizePerHandler, conf,
|
|
|
- classNameBase(instance.getClass().getName()), secretManager);
|
|
|
- this.instance = instance;
|
|
|
+ classNameBase(protocolImpl.getClass().getName()), secretManager);
|
|
|
+
|
|
|
this.verbose = verbose;
|
|
|
+
|
|
|
+
|
|
|
+ Class<?>[] protocols;
|
|
|
+ if (protocolClass == null) { // derive protocol from impl
|
|
|
+ /*
|
|
|
+ * In order to remain compatible with the old usage where a single
|
|
|
+ * target protocolImpl is suppled for all protocol interfaces, and
|
|
|
+ * the protocolImpl is derived from the protocolClass(es)
|
|
|
+ * we register all interfaces extended by the protocolImpl
|
|
|
+ */
|
|
|
+ protocols = getProtocolInterfaces(protocolImpl.getClass());
|
|
|
+
|
|
|
+ } else {
|
|
|
+ if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
|
|
|
+ throw new IOException("protocolClass "+ protocolClass +
|
|
|
+ " is not implemented by protocolImpl which is of class " +
|
|
|
+ protocolImpl.getClass());
|
|
|
+ }
|
|
|
+ // register protocol class and its super interfaces
|
|
|
+ registerProtocolAndImpl(protocolClass, protocolImpl);
|
|
|
+ protocols = getProtocolInterfaces(protocolClass);
|
|
|
+ }
|
|
|
+ for (Class<?> p : protocols) {
|
|
|
+ if (!p.equals(VersionedProtocol.class)) {
|
|
|
+ registerProtocolAndImpl(p, protocolImpl);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- public Writable call(Class<?> protocol, Writable param, long receivedTime)
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
|
|
|
+ addProtocol(
|
|
|
+ Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
|
|
|
+ registerProtocolAndImpl(protocolClass, protocolImpl);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process a client call
|
|
|
+ * @param protocolName - the protocol name (the class of the client proxy
|
|
|
+ * used to make calls to the rpc server.
|
|
|
+ * @param param parameters
|
|
|
+ * @param receivedTime time at which the call receoved (for metrics)
|
|
|
+ * @return the call's return
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Writable call(String protocolName, Writable param, long receivedTime)
|
|
|
throws IOException {
|
|
|
try {
|
|
|
Invocation call = (Invocation)param;
|
|
|
if (verbose) log("Call: " + call);
|
|
|
|
|
|
- Method method = protocol.getMethod(call.getMethodName(),
|
|
|
- call.getParameterClasses());
|
|
|
- method.setAccessible(true);
|
|
|
-
|
|
|
// Verify rpc version
|
|
|
if (call.getRpcVersion() != writableRpcVersion) {
|
|
|
// Client is using a different version of WritableRpc
|
|
@@ -344,25 +593,51 @@ public class WritableRpcEngine implements RpcEngine {
|
|
|
+ call.getRpcVersion() + ", server side version="
|
|
|
+ writableRpcVersion);
|
|
|
}
|
|
|
-
|
|
|
- //Verify protocol version.
|
|
|
- //Bypass the version check for VersionedProtocol
|
|
|
- if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
|
|
|
- long clientVersion = call.getProtocolVersion();
|
|
|
- ProtocolSignature serverInfo = ((VersionedProtocol) instance)
|
|
|
- .getProtocolSignature(protocol.getCanonicalName(), call
|
|
|
- .getProtocolVersion(), call.getClientMethodsHash());
|
|
|
- long serverVersion = serverInfo.getVersion();
|
|
|
- if (serverVersion != clientVersion) {
|
|
|
- LOG.warn("Version mismatch: client version=" + clientVersion
|
|
|
- + ", server version=" + serverVersion);
|
|
|
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
|
|
|
- serverVersion);
|
|
|
+
|
|
|
+ long clientVersion = call.getProtocolVersion();
|
|
|
+ final String protoName;
|
|
|
+ ProtoClassProtoImpl protocolImpl;
|
|
|
+ if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
|
|
|
+ // VersionProtocol methods are often used by client to figure out
|
|
|
+ // which version of protocol to use.
|
|
|
+ //
|
|
|
+ // Versioned protocol methods should go the protocolName protocol
|
|
|
+ // rather than the declaring class of the method since the
|
|
|
+ // the declaring class is VersionedProtocol which is not
|
|
|
+ // registered directly.
|
|
|
+ // Send the call to the highest protocol version
|
|
|
+ protocolImpl =
|
|
|
+ getHighestSupportedProtocol(protocolName).protocolTarget;
|
|
|
+ } else {
|
|
|
+ protoName = call.declaringClassProtocolName;
|
|
|
+
|
|
|
+ // Find the right impl for the protocol based on client version.
|
|
|
+ ProtoNameVer pv =
|
|
|
+ new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
|
|
|
+ protocolImpl = protocolImplMap.get(pv);
|
|
|
+ if (protocolImpl == null) { // no match for Protocol AND Version
|
|
|
+ VerProtocolImpl highest =
|
|
|
+ getHighestSupportedProtocol(protoName);
|
|
|
+ if (highest == null) {
|
|
|
+ throw new IOException("Unknown protocol: " + protoName);
|
|
|
+ } else { // protocol supported but not the version that client wants
|
|
|
+ throw new RPC.VersionMismatch(protoName, clientVersion,
|
|
|
+ highest.version);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ // Invoke the protocol method
|
|
|
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
- Object value = method.invoke(instance, call.getParameters());
|
|
|
+ Method method =
|
|
|
+ protocolImpl.protocolClass.getMethod(call.getMethodName(),
|
|
|
+ call.getParameterClasses());
|
|
|
+ method.setAccessible(true);
|
|
|
+ rpcDetailedMetrics.init(protocolImpl.protocolClass);
|
|
|
+ Object value =
|
|
|
+ method.invoke(protocolImpl.protocolImpl, call.getParameters());
|
|
|
int processingTime = (int) (System.currentTimeMillis() - startTime);
|
|
|
int qTime = (int) (startTime-receivedTime);
|
|
|
if (LOG.isDebugEnabled()) {
|