|
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
+import org.apache.hadoop.net.DNS;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.*;
|
|
@@ -60,6 +61,8 @@ import java.nio.ByteBuffer;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
|
|
|
|
+import sun.net.util.IPAddressUtil;
|
|
|
+
|
|
|
/********************************************************
|
|
|
* DFSClient can connect to a Hadoop Filesystem and
|
|
|
* perform basic file tasks. It uses the ClientProtocol
|
|
@@ -95,6 +98,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private int maxBlockAcquireFailures;
|
|
|
private boolean shortCircuitLocalReads;
|
|
|
private boolean connectToDnViaHostname;
|
|
|
+ private SocketAddress[] localInterfaceAddrs;
|
|
|
|
|
|
/**
|
|
|
* We assume we're talking to another CDH server, which supports
|
|
@@ -262,6 +266,17 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
|
|
|
}
|
|
|
+ String localInterfaces[] =
|
|
|
+ conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
|
|
|
+ if (null == localInterfaces) {
|
|
|
+ localInterfaces = new String[0];
|
|
|
+ }
|
|
|
+ this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
|
|
|
+ if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
|
|
|
+ LOG.debug("Using local interfaces [" +
|
|
|
+ StringUtils.join(",",localInterfaces)+ "] with addresses [" +
|
|
|
+ StringUtils.join(",",localInterfaceAddrs) + "]");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static int getMaxBlockAcquireFailures(Configuration conf) {
|
|
@@ -890,6 +905,60 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return the socket addresses to use with each configured
|
|
|
+ * local interface. Local interfaces may be specified by IP
|
|
|
+ * address, IP address range using CIDR notation, interface
|
|
|
+ * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
|
|
|
+ * The socket addresses consist of the IPs for the interfaces
|
|
|
+ * and the ephemeral port (port 0). If an IP, IP range, or
|
|
|
+ * interface name matches an interface with sub-interfaces
|
|
|
+ * only the IP of the interface is used. Sub-interfaces can
|
|
|
+ * be used by specifying them explicitly (by IP or name).
|
|
|
+ *
|
|
|
+ * @return SocketAddresses for the configured local interfaces,
|
|
|
+ * or an empty array if none are configured
|
|
|
+ * @throws UnknownHostException if a given interface name is invalid
|
|
|
+ */
|
|
|
+ private static SocketAddress[] getLocalInterfaceAddrs(
|
|
|
+ String interfaceNames[]) throws UnknownHostException {
|
|
|
+ List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
|
|
|
+ for (String interfaceName : interfaceNames) {
|
|
|
+ if (IPAddressUtil.isIPv4LiteralAddress(interfaceName)) {
|
|
|
+ localAddrs.add(new InetSocketAddress(interfaceName, 0));
|
|
|
+ } else if (NetUtils.isValidSubnet(interfaceName)) {
|
|
|
+ for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
|
|
|
+ localAddrs.add(new InetSocketAddress(addr, 0));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ for (String ip : DNS.getIPs(interfaceName, false)) {
|
|
|
+ localAddrs.add(new InetSocketAddress(ip, 0));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Select one of the configured local interfaces at random. We use a random
|
|
|
+ * interface because other policies like round-robin are less effective
|
|
|
+ * given that we cache connections to datanodes.
|
|
|
+ *
|
|
|
+ * @return one of the local interface addresses at random, or null if no
|
|
|
+ * local interfaces are configured
|
|
|
+ */
|
|
|
+ private SocketAddress getRandomLocalInterfaceAddr() {
|
|
|
+ if (localInterfaceAddrs.length == 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ final int idx = r.nextInt(localInterfaceAddrs.length);
|
|
|
+ final SocketAddress addr = localInterfaceAddrs[idx];
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Using local interface " + addr);
|
|
|
+ }
|
|
|
+ return addr;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get the checksum of a file.
|
|
|
* @param src The file path
|
|
@@ -2111,7 +2180,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
try {
|
|
|
s = socketFactory.createSocket();
|
|
|
LOG.debug("Connecting to " + targetAddr);
|
|
|
- NetUtils.connect(s, targetAddr, socketTimeout);
|
|
|
+ NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(),
|
|
|
+ socketTimeout);
|
|
|
s.setSoTimeout(socketTimeout);
|
|
|
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
|
|
|
accessToken,
|
|
@@ -2343,7 +2413,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// go to the datanode
|
|
|
dn = socketFactory.createSocket();
|
|
|
LOG.debug("Connecting to " + targetAddr);
|
|
|
- NetUtils.connect(dn, targetAddr, socketTimeout);
|
|
|
+ NetUtils.connect(dn, targetAddr, getRandomLocalInterfaceAddr(),
|
|
|
+ socketTimeout);
|
|
|
dn.setSoTimeout(socketTimeout);
|
|
|
reader = RemoteBlockReader.newBlockReader(dn, src,
|
|
|
block.getBlock().getBlockId(), accessToken,
|
|
@@ -3447,7 +3518,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
s = socketFactory.createSocket();
|
|
|
timeoutValue = 3000 * nodes.length + socketTimeout;
|
|
|
LOG.debug("Connecting to " + dnName);
|
|
|
- NetUtils.connect(s, target, timeoutValue);
|
|
|
+ NetUtils.connect(s, target, getRandomLocalInterfaceAddr(), timeoutValue);
|
|
|
s.setSoTimeout(timeoutValue);
|
|
|
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
LOG.debug("Send buf size " + s.getSendBufferSize());
|