Parcourir la source

HADOOP-18365. Update the remote address when a change is detected (#4692) (#4768)

Back port to branch-3.3, to avoid reconnecting to the old address after detecting that the address has been updated.

* Use a stable hashCode to allow safe IP addr changes
* Add test that updated address is used

Once the address has been updated, it will be used in future calls.  Test verifies that a second request succeeds and that it uses the existing updated address instead of having to re-resolve.

Co-authored-by: Steve Vaughan Jr <s_vaughan@apple.com>
Steve Vaughan il y a 2 ans
Parent
commit
cfc11d2e5f

+ 34 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -418,7 +418,7 @@ public class Client implements AutoCloseable {
    * socket: responses may be delivered out of order. */
   private class Connection extends Thread {
     private InetSocketAddress server;             // server ip:port
-    private final ConnectionId remoteId;                // connection id
+    private final ConnectionId remoteId;          // connection id
     private AuthMethod authMethod; // authentication method
     private AuthProtocol authProtocol;
     private int serviceClass;
@@ -644,6 +644,9 @@ public class Client implements AutoCloseable {
         LOG.warn("Address change detected. Old: " + server.toString() +
                                  " New: " + currentAddr.toString());
         server = currentAddr;
+        // Update the remote address so that reconnections are with the updated address.
+        // This avoids thrashing.
+        remoteId.setAddress(currentAddr);
         UserGroupInformation ticket = remoteId.getTicket();
         this.setName("IPC Client (" + socketFactory.hashCode()
             + ") connection to " + server.toString() + " from "
@@ -1698,9 +1701,9 @@ public class Client implements AutoCloseable {
   @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
   @InterfaceStability.Evolving
   public static class ConnectionId {
-    InetSocketAddress address;
-    UserGroupInformation ticket;
-    final Class<?> protocol;
+    private InetSocketAddress address;
+    private final UserGroupInformation ticket;
+    private final Class<?> protocol;
     private static final int PRIME = 16777619;
     private final int rpcTimeout;
     private final int maxIdleTime; //connections will be culled if it was idle for 
@@ -1751,7 +1754,28 @@ public class Client implements AutoCloseable {
     InetSocketAddress getAddress() {
       return address;
     }
-    
+
+    /**
+     * This is used to update the remote address when an address change is detected.  This method
+     * ensures that the {@link #hashCode()} won't change.
+     *
+     * @param address the updated address
+     * @throws IllegalArgumentException if the hostname or port doesn't match
+     * @see Connection#updateAddress()
+     */
+    void setAddress(InetSocketAddress address) {
+      if (!Objects.equals(this.address.getHostName(), address.getHostName())) {
+        throw new IllegalArgumentException("Hostname must match: " + this.address + " vs "
+            + address);
+      }
+      if (this.address.getPort() != address.getPort()) {
+        throw new IllegalArgumentException("Port must match: " + this.address + " vs " + address);
+      }
+
+      this.address = address;
+    }
+
+
     Class<?> getProtocol() {
       return protocol;
     }
@@ -1858,7 +1882,11 @@ public class Client implements AutoCloseable {
     @Override
     public int hashCode() {
       int result = connectionRetryPolicy.hashCode();
-      result = PRIME * result + ((address == null) ? 0 : address.hashCode());
+      // We calculate based on the host name and port without the IP address, since the hashCode
+      // must be stable even if the IP address is updated.
+      result = PRIME * result + ((address == null || address.getHostName() == null) ? 0 :
+          address.getHostName().hashCode());
+      result = PRIME * result + ((address == null) ? 0 : address.getPort());
       result = PRIME * result + (doPing ? 1231 : 1237);
       result = PRIME * result + maxIdleTime;
       result = PRIME * result + pingInterval;

+ 76 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -815,6 +816,81 @@ public class TestIPC {
     }
   }
 
+  /**
+   * The {@link ConnectionId#hashCode} has to be stable despite updates that occur as the the
+   * address evolves over time.  The {@link ConnectionId} is used as a primary key in maps, so
+   * its hashCode can't change.
+   *
+   * @throws IOException if there is a client or server failure
+   */
+  @Test
+  public void testStableHashCode() throws IOException {
+    Server server = new TestServer(5, false);
+    try {
+      server.start();
+
+      // Leave host unresolved to start. Use "localhost" as opposed
+      // to local IP from NetUtils.getConnectAddress(server) to force
+      // resolution later
+      InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
+          "localhost", NetUtils.getConnectAddress(server).getPort());
+
+      // Setup: Create a ConnectionID using an unresolved address, and get it's hashCode to serve
+      // as a point of comparison.
+      int rpcTimeout = MIN_SLEEP_TIME * 2;
+      final ConnectionId remoteId = getConnectionId(unresolvedAddr, rpcTimeout, conf);
+      int expected = remoteId.hashCode();
+
+      // Start client
+      Client.setConnectTimeout(conf, 100);
+      Client client = new Client(LongWritable.class, conf);
+      try {
+        // Test: Call should re-resolve host and succeed
+        LongWritable param = new LongWritable(RANDOM.nextLong());
+        client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
+            RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+        int actual = remoteId.hashCode();
+
+        // Verify: The hashCode should match, although the InetAddress is different since it has
+        // now been resolved
+        assertThat(remoteId.getAddress()).isNotEqualTo(unresolvedAddr);
+        assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName());
+        assertThat(remoteId.hashCode()).isEqualTo(expected);
+
+        // Test: Call should succeed without having to re-resolve
+        InetSocketAddress expectedSocketAddress = remoteId.getAddress();
+        param = new LongWritable(RANDOM.nextLong());
+        client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
+            RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+
+        // Verify: The same instance of the InetSocketAddress has been used to make the second
+        // call
+        assertThat(remoteId.getAddress()).isSameAs(expectedSocketAddress);
+
+        // Verify: The hashCode is protected against updates to the host name
+        String hostName = InetAddress.getLocalHost().getHostName();
+        InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr(
+            InetAddress.getLocalHost().getHostName(),
+            remoteId.getAddress().getPort());
+        assertThatExceptionOfType(IllegalArgumentException.class)
+            .isThrownBy(() -> remoteId.setAddress(mismatchedHostName))
+            .withMessageStartingWith("Hostname must match");
+
+        // Verify: The hashCode is protected against updates to the port
+        InetSocketAddress mismatchedPort = NetUtils.createSocketAddr(
+            remoteId.getAddress().getHostName(),
+            remoteId.getAddress().getPort() + 1);
+        assertThatExceptionOfType(IllegalArgumentException.class)
+            .isThrownBy(() -> remoteId.setAddress(mismatchedPort))
+            .withMessageStartingWith("Port must match");
+      } finally {
+        client.stop();
+      }
+    } finally {
+      server.stop();
+    }
+  }
+
   @Test(timeout=60000)
   public void testIpcFlakyHostResolution() throws IOException {
     // start server