ソースを参照

HADOOP-8373. Port RPC.getServerAddress to 0.23 (Daryn Sharp via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1336297 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 年 前
コミット
ec5ca9cf0c

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -14,6 +14,8 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8288. Remove references of mapred.child.ulimit etc. since they are
     not being used any more (Ravi Prakash via bobby)
 
+    HADOOP-8373. Port RPC.getServerAddress to 0.23 (Daryn Sharp via bobby)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 8 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.io.retry;
 
-import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collections;
@@ -26,8 +25,11 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcInvocationHandler;
 
-class RetryInvocationHandler implements InvocationHandler {
+class RetryInvocationHandler implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
   private FailoverProxyProvider proxyProvider;
   
@@ -103,4 +105,8 @@ class RetryInvocationHandler implements InvocationHandler {
     }
   }
 
+  @Override //RpcInvocationHandler
+  public ConnectionId getConnectionId() {
+    return RPC.getConnectionIdForProxy(currentProxy);
+  }
 }

+ 27 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -18,28 +18,28 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Proxy;
+import java.io.IOException;
 import java.lang.reflect.Method;
-
+import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
-import java.io.*;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Map;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** A simple RPC mechanism.
@@ -312,7 +312,26 @@ public class RPC {
     return getProtocolProxy(
         protocol, clientVersion, addr, conf, factory).getProxy();
   }
+  
+  /**
+   * Returns the server address for a given proxy.
+   */
+  public static InetSocketAddress getServerAddress(Object proxy) {
+    return getConnectionIdForProxy(proxy).getAddress();
+  }
 
+  /**
+   * Return the connection ID of the given object.
+   * 
+   * @param proxy the proxy object to get the connection ID of.
+   * @return the connection ID for the provided proxy object.
+   */
+  public static ConnectionId getConnectionIdForProxy(Object proxy) {
+    RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
+        .getInvocationHandler(proxy);
+    return inv.getConnectionId();
+  }
+  
   /**
    * Get a protocol proxy that contains a proxy connection to a remote server
    * and a set of methods that are supported by the server

+ 35 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.lang.reflect.InvocationHandler;
+
+import org.apache.hadoop.ipc.Client.ConnectionId;
+
+/**
+ * This interface must be implemented by all InvocationHandler
+ * implementations.
+ */
+public interface RpcInvocationHandler extends InvocationHandler {
+  
+  /**
+   * Returns the connection id associated with the InvocationHandler instance.
+   * @return ConnectionId
+   */
+  ConnectionId getConnectionId();
+}

+ 22 - 15
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -18,29 +18,31 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
-
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
 
 /** An RpcEngine implementation for Writable data. */
 @InterfaceStability.Evolving
@@ -169,7 +171,7 @@ public class WritableRpcEngine implements RpcEngine {
 
   private static ClientCache CLIENTS=new ClientCache();
   
-  private static class Invoker implements InvocationHandler {
+  private static class Invoker implements RpcInvocationHandler {
     private Client.ConnectionId remoteId;
     private Client client;
     private boolean isClosed = false;
@@ -206,6 +208,11 @@ public class WritableRpcEngine implements RpcEngine {
         CLIENTS.stopClient(client);
       }
     }
+
+    @Override
+    public ConnectionId getConnectionId() {
+      return remoteId;
+    }
   }
   
   // for unit testing only

+ 22 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -230,6 +230,28 @@ public class TestRPC extends TestCase {
     server.stop();    
   }
 
+  public void testProxyAddress() throws Exception {
+    Server server = RPC.getServer(TestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, conf);
+    TestProtocol proxy = null;
+    
+    try {
+      server.start();
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+      // create a client
+      proxy = (TestProtocol)RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      
+      assertEquals(addr, RPC.getServerAddress(proxy));
+    } finally {
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
+  
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
     // create a server with two handlers