Ver Fonte

HDFS-15300. RBF: updateActiveNamenode() is invalid when RPC address is IP. Contributed by xuzq.

Ayush Saxena há 5 anos atrás
pai
commit
936bf09c37

+ 16 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -638,6 +638,22 @@ public class NetUtils {
     }
   }
 
+  /**
+   * Attempt to normalize the given string to "host:port"
+   * if it like "ip:port".
+   *
+   * @param ipPort maybe lik ip:port or host:port.
+   * @return host:port
+   */
+  public static String normalizeIP2HostName(String ipPort) {
+    if (null == ipPort || !ipPortPattern.matcher(ipPort).matches()) {
+      return ipPort;
+    }
+
+    InetSocketAddress address = createSocketAddr(ipPort);
+    return getHostPortString(address);
+  }
+
   /**
    * Return hostname without throwing exception.
    * The returned hostname String format is "hostname".

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeat
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -263,7 +264,8 @@ public class MembershipNamenodeResolver
 
     MembershipState record = MembershipState.newInstance(
         routerId, report.getNameserviceId(), report.getNamenodeId(),
-        report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
+        report.getClusterId(), report.getBlockPoolId(),
+        NetUtils.normalizeIP2HostName(report.getRpcAddress()),
         report.getServiceAddress(), report.getLifelineAddress(),
         report.getWebScheme(), report.getWebAddress(), report.getState(),
         report.getSafemode());

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java

@@ -138,8 +138,15 @@ public final class FederationTestUtils {
   public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
       HAServiceState state) {
     Random rand = new Random();
-    NamenodeStatusReport report = new NamenodeStatusReport(ns, nn,
-        "localhost:" + rand.nextInt(10000), "localhost:" + rand.nextInt(10000),
+    return createNamenodeReport(ns, nn, "localhost:"
+        + rand.nextInt(10000), state);
+  }
+
+  public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
+      String rpcAddress, HAServiceState state) {
+    Random rand = new Random();
+    NamenodeStatusReport report = new NamenodeStatusReport(ns, nn, rpcAddress,
+        "localhost:" + rand.nextInt(10000),
         "localhost:" + rand.nextInt(10000), "http",
         "testwebaddress-" + ns + nn);
     if (state == null) {

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java

@@ -307,6 +307,23 @@ public class TestNamenodeResolver {
         FederationNamenodeServiceState.ACTIVE, namenode1.getState());
   }
 
+  @Test
+  public void testCacheUpdateOnNamenodeStateUpdateWithIp()
+      throws IOException {
+    final String rpcAddress = "127.0.0.1:10000";
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(NAMESERVICES[0], NAMENODES[0], rpcAddress,
+            HAServiceState.STANDBY)));
+    stateStore.refreshCaches(true);
+
+    InetSocketAddress inetAddr = getInetSocketAddress(rpcAddress);
+    namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
+    FederationNamenodeContext namenode =
+        namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
+    assertEquals("The namenode state should be ACTIVE post update.",
+        FederationNamenodeServiceState.ACTIVE, namenode.getState());
+  }
+
   /**
    * Creates InetSocketAddress from the given RPC address.
    * @param rpcAddr RPC address (host:port).