Bläddra i källkod

Give server implementations access to a server's context. This consists of two additions. First is a static method Server.get() which returns the server instance it is called under, if any. Second is the new public class RPC.Server, that replaces a former anonymous class. RPC server implementation methods can now subclass RPC.Server to keep server state in the subclass. Application code can then call Server.get() to access that state. Note that Server.get() may be called under parameter deserialization and return value serialization methods as well, called before and after actual server method calls, respectively.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@386459 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 år sedan
förälder
incheckning
a174e6219b

+ 60 - 32
src/java/org/apache/hadoop/ipc/RPC.java

@@ -192,40 +192,68 @@ public class RPC {
   public static Server getServer(final Object instance, final int port,
                                  final int numHandlers,
                                  final boolean verbose, Configuration conf) {
-    return new Server(port, Invocation.class, numHandlers, conf) {
+    return new Server(instance, conf, port, numHandlers, verbose);
+  }
+        
+  /** An RPC Server. */
+  public static class Server extends org.apache.hadoop.ipc.Server {
+    private Object instance;
+    private Class implementation;
+    private boolean verbose;
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param port the port to listen for connections on
+     */
+    public Server(Object instance, Configuration conf, int port) {
+      this(instance, conf, port, 1, false);
+    }
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     */
+    public Server(Object instance, Configuration conf, int port,
+                  int numHandlers, boolean verbose) {
+      super(port, Invocation.class, numHandlers, conf);
+      this.instance = instance;
+      this.implementation = instance.getClass();
+      this.verbose = verbose;
+    }
+
+    public Writable call(Writable param) throws IOException {
+      try {
+        Invocation call = (Invocation)param;
+        if (verbose) log("Call: " + call);
         
-        Class implementation = instance.getClass();
-
-        public Writable call(Writable param) throws IOException {
-          try {
-            Invocation call = (Invocation)param;
-            if (verbose) log("Call: " + call);
-
-            Method method =
-              implementation.getMethod(call.getMethodName(),
-                                       call.getParameterClasses());
-
-            Object value = method.invoke(instance, call.getParameters());
-            if (verbose) log("Return: "+value);
-
-            return new ObjectWritable(method.getReturnType(), value);
-
-          } catch (InvocationTargetException e) {
-            Throwable target = e.getTargetException();
-            if (target instanceof IOException) {
-              throw (IOException)target;
-            } else {
-              IOException ioe = new IOException(target.toString());
-              ioe.setStackTrace(target.getStackTrace());
-              throw ioe;
-            }
-          } catch (Throwable e) {
-            IOException ioe = new IOException(e.toString());
-            ioe.setStackTrace(e.getStackTrace());
-            throw ioe;
-          }
+        Method method =
+          implementation.getMethod(call.getMethodName(),
+                                   call.getParameterClasses());
+
+        Object value = method.invoke(instance, call.getParameters());
+        if (verbose) log("Return: "+value);
+
+        return new ObjectWritable(method.getReturnType(), value);
+
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException)target;
+        } else {
+          IOException ioe = new IOException(target.toString());
+          ioe.setStackTrace(target.getStackTrace());
+          throw ioe;
         }
-      };
+      } catch (Throwable e) {
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
   }
 
   private static void log(String value) {

+ 12 - 0
src/java/org/apache/hadoop/ipc/Server.java

@@ -49,6 +49,16 @@ public abstract class Server {
   public static final Logger LOG =
     LogFormatter.getLogger("org.apache.hadoop.ipc.Server");
 
+  private static final ThreadLocal SERVER = new ThreadLocal();
+
+  /** Returns the server instance called under or null.  May be called under
+   * {@link #call(Writable)} implementations, and under {@link Writable}
+   * methods of paramters and return values.  Permits applications to access
+   * the server context.*/
+  public static Server get() {
+    return (Server)SERVER.get();
+  }
+
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int maxQueuedCalls;                     // max number of queued calls
@@ -124,6 +134,7 @@ public abstract class Server {
 
     public void run() {
       LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
       try {
         while (running) {
           int id;
@@ -177,6 +188,7 @@ public abstract class Server {
 
     public void run() {
       LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
       while (running) {
         try {
           Call call;

+ 9 - 0
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -54,6 +54,7 @@ public class TestRPC extends TestCase {
     int add(int v1, int v2) throws IOException;
     int add(int[] values) throws IOException;
     int error() throws IOException;
+    void testServerGet() throws IOException;
   }
 
   public class TestImpl implements TestProtocol {
@@ -80,6 +81,12 @@ public class TestRPC extends TestCase {
       throw new IOException("bobo");
     }
 
+    public void testServerGet() throws IOException {
+      if (!(Server.get() instanceof RPC.Server)) {
+        throw new IOException("Server.get() failed");
+      }
+    }
+
   }
 
   public void testCalls() throws Exception {
@@ -113,6 +120,8 @@ public class TestRPC extends TestCase {
     }
     assertTrue(caught);
 
+    proxy.testServerGet();
+
     // try some multi-calls
     Method echo =
       TestProtocol.class.getMethod("echo", new Class[] { String.class });