Sfoglia il codice sorgente

HADOOP-12491. Hadoop-common - Avoid unsafe split and append on fields that might be IPv6 literals. Contributed by Nemanja Matkovic And Hemanth Boyina

Brahma Reddy Battula 3 anni fa
parent
commit
809cca765f

+ 1 - 2
hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh

@@ -85,8 +85,7 @@
 # Kerberos security.
 # export HADOOP_JAAS_DEBUG=true
 
-# Extra Java runtime options for all Hadoop commands. We don't support
-# IPv6 yet/still, so by default the preference is set to IPv4.
+# Extra Java runtime options for all Hadoop commands.
 # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true"
 # For Kerberos debugging, an extended option set logs more information
 # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug"

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java

@@ -2562,7 +2562,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       return updateConnectAddr(addressProperty, addr);
     }
 
-    final String connectHost = connectHostPort.split(":")[0];
+    final String connectHost = NetUtils.getHostFromHostPort(connectHostPort);
     // Create connect address using client address hostname and server port.
     return updateConnectAddr(addressProperty, NetUtils.createSocketAddrForHost(
         connectHost, addr.getPort()));

+ 9 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java

@@ -82,6 +82,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.thirdparty.com.google.common.net.HostAndPort;
 
 import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
 import static org.apache.hadoop.util.KMSUtil.checkNotNull;
@@ -290,16 +291,20 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
         // In the current scheme, all hosts have to run on the same port
         int port = -1;
         String hostsPart = authority;
+
         if (authority.contains(":")) {
-          String[] t = authority.split(":");
           try {
-            port = Integer.parseInt(t[1]);
-          } catch (Exception e) {
+            HostAndPort hp = HostAndPort.fromString(hostsPart);
+            if (hp.hasPort()) {
+              port = hp.getPort();
+              hostsPart = hp.getHost();
+            }
+          } catch (IllegalArgumentException e) {
             throw new IOException(
                 "Could not parse port in kms uri [" + origUrl + "]");
           }
-          hostsPart = t[0];
         }
+
         KMSClientProvider[] providers =
             createProviders(conf, origUrl, port, hostsPart);
         return new LoadBalancingKMSClientProvider(providerUri, providers, conf);

+ 9 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -499,10 +499,11 @@ public class Client implements AutoCloseable {
       boolean trySasl = UserGroupInformation.isSecurityEnabled() ||
                         (ticket != null && !ticket.getTokens().isEmpty());
       this.authProtocol = trySasl ? AuthProtocol.SASL : AuthProtocol.NONE;
-      
-      this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
-          server.toString() +
-          " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
+
+      this.setName(
+          "IPC Client (" + socketFactory.hashCode() + ") connection to "
+              + NetUtils.getSocketAddressString(server) + " from " + ((ticket
+              == null) ? "an unknown user" : ticket.getUserName()));
       this.setDaemon(true);
     }
 
@@ -636,8 +637,9 @@ public class Client implements AutoCloseable {
                                server.getHostName(), server.getPort());
 
       if (!server.equals(currentAddr)) {
-        LOG.warn("Address change detected. Old: " + server.toString() +
-                                 " New: " + currentAddr.toString());
+        LOG.warn("Address change detected. Old: " + NetUtils
+            .getSocketAddressString(server) + " New: " + NetUtils
+            .getSocketAddressString(currentAddr));
         server = currentAddr;
         UserGroupInformation ticket = remoteId.getTicket();
         this.setName("IPC Client (" + socketFactory.hashCode()
@@ -1835,7 +1837,7 @@ public class Client implements AutoCloseable {
     
     @Override
     public String toString() {
-      return address.toString();
+      return NetUtils.getSocketAddressString(address);
     }
   }  
 

+ 53 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java

@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.net.Inet6Address;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
@@ -74,22 +75,23 @@ public class DNS {
    * @throws NamingException If a NamingException is encountered
    */
   public static String reverseDns(InetAddress hostIp, @Nullable String ns)
-    throws NamingException {
-    //
-    // Builds the reverse IP lookup form
-    // This is formed by reversing the IP numbers and appending in-addr.arpa
-    //
-    String[] parts = hostIp.getHostAddress().split("\\.");
-    String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
-      + parts[0] + ".in-addr.arpa";
+      throws NamingException {
+    String dnsQueryAddress;
+    if (hostIp instanceof Inet6Address) {
+      dnsQueryAddress = getIPv6DnsAddr((Inet6Address) hostIp, ns);
+    } else {
+      dnsQueryAddress = getIPv4DnsAddr(hostIp, ns);
+    }
+    LOG.info("Querying using DNS address: " + dnsQueryAddress);
 
     DirContext ictx = new InitialDirContext();
     Attributes attribute;
     try {
-      attribute = ictx.getAttributes("dns://"               // Use "dns:///" if the default
-                         + ((ns == null) ? "" : ns) +
-                         // nameserver is to be used
-                         "/" + reverseIP, new String[] { "PTR" });
+      // Use "dns:///" if the default
+      // nameserver is to be used
+      attribute = ictx.getAttributes(
+          "dns://" + ((ns == null) ? "" : ns) + "/" + dnsQueryAddress,
+          new String[] {"PTR"});
     } finally {
       ictx.close();
     }
@@ -102,18 +104,53 @@ public class DNS {
     return hostname;
   }
 
+  private static String getIPv4DnsAddr(InetAddress hostIp, @Nullable String ns)
+      throws NamingException {
+    String ipString = hostIp.getHostAddress();
+    LOG.info("Doing reverse DNS lookup for IPv4 address: " + ipString);
+    String[] parts = ipString.split("\\.");
+    if (parts.length != 4) {
+      throw new NamingException("Invalid IPv4 address " + ipString);
+    }
+
+    return parts[3] + "." + parts[2] + "." + parts[1] + "." + parts[0]
+        + ".in-addr.arpa";
+  }
+
+  @VisibleForTesting
+  public static String getIPv6DnsAddr(Inet6Address hostIp, @Nullable String ns)
+      throws NamingException {
+    LOG.info("Doing reverse DNS lookup for IPv6 address: " +
+        hostIp.getHostAddress());
+
+    // bytes need to be converted to hex string and reversed to get IPv6
+    // reverse resolution address
+    byte[] bytes = hostIp.getAddress();
+    StringBuilder sb = new StringBuilder();
+    for(int pos = bytes.length - 1; pos >= 0; pos--) {
+      byte b = bytes[pos];
+      String hexStr = String.format("%02x", b);
+      sb.append(hexStr.charAt(1));
+      sb.append(".");
+      sb.append(hexStr.charAt(0));
+      sb.append(".");
+    }
+    sb.append("ip6.arpa");
+    return sb.toString();
+  }
+
   /**
    * @return NetworkInterface for the given subinterface name (eg eth0:0)
    *    or null if no interface with the given name can be found  
    */
   private static NetworkInterface getSubinterface(String strInterface)
       throws SocketException {
-    Enumeration<NetworkInterface> nifs = 
-      NetworkInterface.getNetworkInterfaces();
+    Enumeration<NetworkInterface> nifs =
+        NetworkInterface.getNetworkInterfaces();
       
     while (nifs.hasMoreElements()) {
-      Enumeration<NetworkInterface> subNifs = 
-        nifs.nextElement().getSubInterfaces();
+      Enumeration<NetworkInterface> subNifs =
+          nifs.nextElement().getSubInterfaces();
 
       while (subNifs.hasMoreElements()) {
         NetworkInterface nif = subNifs.nextElement();

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -60,8 +60,8 @@ import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.net.HostAndPort;
-import com.google.common.net.InetAddresses;
+import org.apache.hadoop.thirdparty.com.google.common.net.HostAndPort;
+import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
 import org.apache.http.conn.util.InetAddressUtils;
 import java.net.*;
 

+ 11 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocksSocketFactory.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.net.HostAndPort;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY;
 
@@ -148,13 +149,16 @@ public class SocksSocketFactory extends SocketFactory implements
    * @param proxyStr the proxy address using the format "host:port"
    */
   private void setProxy(String proxyStr) {
-    String[] strs = proxyStr.split(":", 2);
-    if (strs.length != 2)
+    try {
+      HostAndPort hp = HostAndPort.fromString(proxyStr);
+      if (!hp.hasPort()) {
+        throw new RuntimeException("Bad SOCKS proxy parameter: " + proxyStr);
+      }
+      String host = hp.getHost();
+      this.proxy = new Proxy(Proxy.Type.SOCKS,
+          InetSocketAddress.createUnresolved(host, hp.getPort()));
+    } catch (IllegalArgumentException e) {
       throw new RuntimeException("Bad SOCKS proxy parameter: " + proxyStr);
-    String host = strs[0];
-    int port = Integer.parseInt(strs[1]);
-    this.proxy =
-        new Proxy(Proxy.Type.SOCKS, InetSocketAddress.createUnresolved(host,
-            port));
+    }
   }
 }

+ 30 - 46
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java

@@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
@@ -330,56 +331,39 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
         return tmpDir;
     }
 
-    private static int getPort(String hostPort) {
-        String[] split = hostPort.split(":");
-        String portstr = split[split.length-1];
-        String[] pc = portstr.split("/");
-        if (pc.length > 1) {
-            portstr = pc[0];
-        }
-        return Integer.parseInt(portstr);
-    }
-
-    static ServerCnxnFactory createNewServerInstance(File dataDir,
-            ServerCnxnFactory factory, String hostPort, int maxCnxns)
-        throws IOException, InterruptedException
-    {
-        ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
-        final int PORT = getPort(hostPort);
-        if (factory == null) {
-            factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
-        }
-        factory.startup(zks);
-        Assert.assertTrue("waiting for server up",
-                   ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
-                                              CONNECTION_TIMEOUT));
-
-        return factory;
+  static ServerCnxnFactory createNewServerInstance(File dataDir,
+      ServerCnxnFactory factory, String hostPort, int maxCnxns)
+      throws IOException, InterruptedException {
+    ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
+    final int port = NetUtils.getPortFromHostPort(hostPort);
+    if (factory == null) {
+      factory = ServerCnxnFactory.createFactory(port, maxCnxns);
     }
+    factory.startup(zks);
+    Assert.assertTrue("waiting for server up", ClientBaseWithFixes
+        .waitForServerUp("127.0.0.1:" + port, CONNECTION_TIMEOUT));
 
-    static void shutdownServerInstance(ServerCnxnFactory factory,
-            String hostPort)
-    {
-        if (factory != null) {
-            ZKDatabase zkDb;
-            {
-                ZooKeeperServer zs = getServer(factory);
-        
-                zkDb = zs.getZKDatabase();
-            }
-            factory.shutdown();
-            try {
-                zkDb.close();
-            } catch (IOException ie) {
-                LOG.warn("Error closing logs ", ie);
-            }
-            final int PORT = getPort(hostPort);
+    return factory;
+  }
 
-            Assert.assertTrue("waiting for server down",
-                       ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
-                                                    CONNECTION_TIMEOUT));
-        }
+  static void shutdownServerInstance(ServerCnxnFactory factory,
+      String hostPort) {
+    if (factory != null) {
+      ZKDatabase zkDb;
+      ZooKeeperServer zs = getServer(factory);
+      zkDb = zs.getZKDatabase();
+      factory.shutdown();
+      try {
+        zkDb.close();
+      } catch (IOException ie) {
+        LOG.warn("Error closing logs ", ie);
+      }
+      final int port = NetUtils.getPortFromHostPort(hostPort);
+
+      Assert.assertTrue("waiting for server down", ClientBaseWithFixes
+          .waitForServerDown("127.0.0.1:" + port, CONNECTION_TIMEOUT));
     }
+  }
 
     /**
      * Test specific setup

+ 17 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestDNS.java

@@ -24,6 +24,7 @@ import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.net.InetAddress;
+import java.net.Inet6Address;
 
 import javax.naming.CommunicationException;
 import javax.naming.NameNotFoundException;
@@ -251,4 +252,20 @@ public class TestDNS {
     assertNotNull("localhost is null", localhost);
     LOG.info("Localhost IPAddr is " + localhost.toString());
   }
+
+  /**
+   * Test that dns query address is calculated correctly for ipv6 addresses.
+   */
+  @Test
+  public void testIPv6ReverseDNSAddress() throws Exception {
+    Inet6Address adr = (Inet6Address) InetAddress.getByName("::");
+    assertEquals(
+        "0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa",
+        DNS.getIPv6DnsAddr(adr, null));
+
+    adr = (Inet6Address) InetAddress.getByName("fe80::62eb:69ff:fe9b:bade");
+    assertEquals(
+        "e.d.a.b.b.9.e.f.f.f.9.6.b.e.2.6.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa",
+        DNS.getIPv6DnsAddr(adr, null));
+  }
 }

+ 28 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java

@@ -57,6 +57,14 @@ public class TestNetUtils {
   private static final String DEST_PORT_NAME = Integer.toString(DEST_PORT);
   private static final int LOCAL_PORT = 8080;
   private static final String LOCAL_PORT_NAME = Integer.toString(LOCAL_PORT);
+  private static final String IPV6_LOOPBACK_LONG_STRING = "0:0:0:0:0:0:0:1";
+  private static final String IPV6_SAMPLE_ADDRESS =
+      "2a03:2880:2130:cf05:face:b00c:0:1";
+  private static final String IPV6_LOOPBACK_SHORT_STRING = "::1";
+  private static final String IPV6_LOOPBACK_WITH_PORT =
+      "[" + IPV6_LOOPBACK_LONG_STRING + "]:10";
+  private static final String IPV6_SAMPLE_WITH_PORT =
+      "[" + IPV6_SAMPLE_ADDRESS + "]:10";
 
   /**
    * Some slop around expected times when making sure timeouts behave
@@ -754,6 +762,15 @@ public class TestNetUtils {
     assertEquals("localhost", NetUtils.getHostNameOfIP("127.0.0.1:"));
   }
 
+  @Test
+  public void testGetHostNameOfIPworksWithIPv6() {
+    assertNotNull(NetUtils.getHostNameOfIP(IPV6_LOOPBACK_LONG_STRING));
+    assertNotNull(NetUtils.getHostNameOfIP(IPV6_LOOPBACK_SHORT_STRING));
+    assertNotNull(NetUtils.getHostNameOfIP(IPV6_SAMPLE_ADDRESS));
+    assertNotNull(NetUtils.getHostNameOfIP(IPV6_SAMPLE_WITH_PORT));
+    assertNotNull(NetUtils.getHostNameOfIP(IPV6_LOOPBACK_WITH_PORT));
+  }
+
   @Test
   public void testTrimCreateSocketAddress() {
     Configuration conf = new Configuration();
@@ -765,6 +782,17 @@ public class TestNetUtils {
     assertEquals(defaultAddr.trim(), NetUtils.getHostPortString(addr));
   }
 
+  @Test
+  public void testTrimCreateSocketAddressIPv6() {
+    Configuration conf = new Configuration();
+    NetUtils.addStaticResolution("hostIPv6", IPV6_LOOPBACK_LONG_STRING);
+    final String defaultAddr = "hostIPv6:1  ";
+
+    InetSocketAddress addr = NetUtils.createSocketAddr(defaultAddr);
+    conf.setSocketAddr("myAddress", addr);
+    assertEquals(defaultAddr.trim(), NetUtils.getHostPortString(addr));
+  }
+
   @Test
   public void testBindToLocalAddress() throws Exception {
     assertNotNull(NetUtils