浏览代码

HADOOP-7472. RPC client should deal with IP address change. Contributed by Kihwal Lee.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1156350 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 年之前
父节点
当前提交
2a990ed646
共有 2 个文件被更改,包括 34 次插入1 次删除
  1. 3 0
      hadoop-common/CHANGES.txt
  2. 31 1
      hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

+ 3 - 0
hadoop-common/CHANGES.txt

@@ -311,6 +311,9 @@ Trunk (unreleased changes)
 
     HADOOP-7525. Make arguments to test-patch optional. (tomwhite)
 
+    HADOOP-7472. RPC client should deal with IP address change.
+    (Kihwal Lee via suresh)
+
   OPTIMIZATIONS
   
     HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole

+ 31 - 1
hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -405,6 +405,27 @@ public class Client {
       saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal);
       return saslRpcClient.saslConnect(in2, out2);
     }
+
+    /**
+     * Update the server address if the address corresponding to the host
+     * name has changed.
+     *
+     * @return true if an addr change was detected.
+     * @throws IOException when the hostname cannot be resolved.
+     */
+    private synchronized boolean updateAddress() throws IOException {
+      // Do a fresh lookup with the old host name.
+      InetSocketAddress currentAddr =  new InetSocketAddress(
+                               server.getHostName(), server.getPort());
+
+      if (!server.equals(currentAddr)) {
+        LOG.warn("Address change detected. Old: " + server.toString() +
+                                 " New: " + currentAddr.toString());
+        server = currentAddr;
+        return true;
+      }
+      return false;
+    }
     
     private synchronized void setupConnection() throws IOException {
       short ioFailures = 0;
@@ -435,19 +456,28 @@ public class Client {
           }
           
           // connection time out is 20s
-          NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+          NetUtils.connect(this.socket, server, 20000);
           if (rpcTimeout > 0) {
             pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
           }
           this.socket.setSoTimeout(pingInterval);
           return;
         } catch (SocketTimeoutException toe) {
+          /* Check for an address change and update the local reference.
+           * Reset the failure counter if the address was changed
+           */
+          if (updateAddress()) {
+            timeoutFailures = ioFailures = 0;
+          }
           /*
            * The max number of retries is 45, which amounts to 20s*45 = 15
            * minutes retries.
            */
           handleConnectionFailure(timeoutFailures++, 45, toe);
         } catch (IOException ie) {
+          if (updateAddress()) {
+            timeoutFailures = ioFailures = 0;
+          }
           handleConnectionFailure(ioFailures++, maxRetries, ie);
         }
       }