|
@@ -94,6 +94,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private final FileSystem.Statistics stats;
|
|
|
private int maxBlockAcquireFailures;
|
|
|
private boolean shortCircuitLocalReads;
|
|
|
+ private boolean connectToDnViaHostname;
|
|
|
|
|
|
/**
|
|
|
* We assume we're talking to another CDH server, which supports
|
|
@@ -147,10 +148,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
/** Create {@link ClientDatanodeProtocol} proxy with block/token */
|
|
|
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
|
|
|
- DatanodeID datanodeid, Configuration conf,
|
|
|
- Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
|
|
|
- InetSocketAddress addr = NetUtils.makeSocketAddr(
|
|
|
- datanodeid.getHost(), datanodeid.getIpcPort());
|
|
|
+ DatanodeInfo di, Configuration conf,
|
|
|
+ Block block, Token<BlockTokenIdentifier> token, int socketTimeout,
|
|
|
+ boolean connectToDnViaHostname) throws IOException {
|
|
|
+ final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
|
|
|
+ LOG.debug("Connecting to " + dnName);
|
|
|
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
|
|
|
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
|
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
|
|
|
}
|
|
@@ -164,10 +167,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
|
|
|
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|
|
- DatanodeID datanodeid, Configuration conf, int socketTimeout)
|
|
|
- throws IOException {
|
|
|
- InetSocketAddress addr = NetUtils.createSocketAddr(
|
|
|
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
|
|
+ DatanodeInfo di, Configuration conf, int socketTimeout,
|
|
|
+ boolean connectToDnViaHostname) throws IOException {
|
|
|
+ final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
|
|
|
+ LOG.debug("Connecting to " + dnName);
|
|
|
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
|
|
|
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
|
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
|
|
|
}
|
|
@@ -252,6 +256,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
|
|
|
}
|
|
|
+ this.connectToDnViaHostname = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static int getMaxBlockAcquireFailures(Configuration conf) {
|
|
@@ -350,14 +360,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
/**
|
|
|
* Get {@link BlockReader} for short circuited local reads.
|
|
|
*/
|
|
|
- private static BlockReader getLocalBlockReader(Configuration conf,
|
|
|
+ private BlockReader getLocalBlockReader(Configuration conf,
|
|
|
String src, Block blk, Token<BlockTokenIdentifier> accessToken,
|
|
|
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
|
|
|
throws InvalidToken, IOException {
|
|
|
try {
|
|
|
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
|
|
|
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
|
|
|
- - offsetIntoBlock);
|
|
|
+ - offsetIntoBlock, connectToDnViaHostname);
|
|
|
} catch (RemoteException re) {
|
|
|
throw re.unwrapRemoteException(InvalidToken.class,
|
|
|
AccessControlException.class);
|
|
@@ -888,7 +898,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
*/
|
|
|
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
|
|
|
checkOpen();
|
|
|
- return getFileChecksum(src, namenode, socketFactory, socketTimeout);
|
|
|
+ return getFileChecksum(src, namenode, socketFactory, socketTimeout, connectToDnViaHostname);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -899,6 +909,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
|
|
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
|
|
|
) throws IOException {
|
|
|
+ return getFileChecksum(src, namenode, socketFactory, socketTimeout, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
|
|
+ ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
|
|
|
+ boolean connectToDnViaHostname) throws IOException {
|
|
|
//get all block locations
|
|
|
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
|
|
|
if (null == blockLocations) {
|
|
@@ -933,8 +949,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
for(int j = 0; !done && j < datanodes.length; j++) {
|
|
|
//connect to a datanode
|
|
|
final Socket sock = socketFactory.createSocket();
|
|
|
+ final String dnName = datanodes[j].getName(connectToDnViaHostname);
|
|
|
+ LOG.debug("Connecting to " + dnName);
|
|
|
NetUtils.connect(sock,
|
|
|
- NetUtils.createSocketAddr(datanodes[j].getName()),
|
|
|
+ NetUtils.createSocketAddr(dnName),
|
|
|
timeout);
|
|
|
sock.setSoTimeout(timeout);
|
|
|
|
|
@@ -946,7 +964,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// get block MD5
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("write to " + datanodes[j].getName() + ": "
|
|
|
+ LOG.debug("write to " + dnName + ": "
|
|
|
+ DataTransferProtocol.OP_BLOCK_CHECKSUM +
|
|
|
", block=" + block);
|
|
|
}
|
|
@@ -964,7 +982,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
|
|
+ "for file " + src + " for block " + block
|
|
|
- + " from datanode " + datanodes[j].getName()
|
|
|
+ + " from datanode " + dnName
|
|
|
+ ". Will retry the block once.");
|
|
|
}
|
|
|
lastRetriedIndex = i;
|
|
@@ -974,7 +992,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
break;
|
|
|
} else {
|
|
|
throw new IOException("Bad response " + reply + " for block "
|
|
|
- + block + " from datanode " + datanodes[j].getName());
|
|
|
+ + block + " from datanode " + dnName);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1005,12 +1023,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
LOG.debug("set bytesPerCRC=" + bytesPerCRC
|
|
|
+ ", crcPerBlock=" + crcPerBlock);
|
|
|
}
|
|
|
- LOG.debug("got reply from " + datanodes[j].getName()
|
|
|
- + ": md5=" + md5);
|
|
|
+ LOG.debug("got reply from " + dnName + ": md5=" + md5);
|
|
|
}
|
|
|
} catch (IOException ie) {
|
|
|
- LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
|
|
|
- + datanodes[j].getName(), ie);
|
|
|
+ LOG.warn("src=" + src + ", datanodes[" + j + "]=" + dnName, ie);
|
|
|
} finally {
|
|
|
IOUtils.closeStream(in);
|
|
|
IOUtils.closeStream(out);
|
|
@@ -1396,7 +1412,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Utility class to encapsulate data node info and its ip address. */
|
|
|
+ /** Utility class to encapsulate data node info and its address. */
|
|
|
private static class DNAddrPair {
|
|
|
DatanodeInfo info;
|
|
|
InetSocketAddress addr;
|
|
@@ -1880,7 +1896,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
DatanodeInfo primaryNode = last.getLocations()[0];
|
|
|
try {
|
|
|
primary = createClientDatanodeProtocolProxy(primaryNode, conf,
|
|
|
- last.getBlock(), last.getBlockToken(), socketTimeout);
|
|
|
+ last.getBlock(), last.getBlockToken(), socketTimeout,
|
|
|
+ connectToDnViaHostname);
|
|
|
Block newBlock = primary.getBlockInfo(last.getBlock());
|
|
|
long newBlockSize = newBlock.getNumBytes();
|
|
|
long delta = newBlockSize - last.getBlockSize();
|
|
@@ -2093,6 +2110,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
try {
|
|
|
s = socketFactory.createSocket();
|
|
|
+ LOG.debug("Connecting to " + targetAddr);
|
|
|
NetUtils.connect(s, targetAddr, socketTimeout);
|
|
|
s.setSoTimeout(socketTimeout);
|
|
|
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
|
|
@@ -2260,8 +2278,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
DatanodeInfo[] nodes = block.getLocations();
|
|
|
try {
|
|
|
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
|
|
|
- InetSocketAddress targetAddr =
|
|
|
- NetUtils.createSocketAddr(chosenNode.getName());
|
|
|
+ InetSocketAddress targetAddr =
|
|
|
+ NetUtils.createSocketAddr(chosenNode.getName(connectToDnViaHostname));
|
|
|
return new DNAddrPair(chosenNode, targetAddr);
|
|
|
} catch (IOException ie) {
|
|
|
String blockInfo = block.getBlock() + " file=" + src;
|
|
@@ -2324,6 +2342,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
} else {
|
|
|
// go to the datanode
|
|
|
dn = socketFactory.createSocket();
|
|
|
+ LOG.debug("Connecting to " + targetAddr);
|
|
|
NetUtils.connect(dn, targetAddr, socketTimeout);
|
|
|
dn.setSoTimeout(socketTimeout);
|
|
|
reader = RemoteBlockReader.newBlockReader(dn, src,
|
|
@@ -3122,7 +3141,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// to each DN and two rpcs to the NN.
|
|
|
int recoveryTimeout = (newnodes.length * 2 + 2) * socketTimeout;
|
|
|
primary = createClientDatanodeProtocolProxy(primaryNode, conf, block,
|
|
|
- accessToken, recoveryTimeout);
|
|
|
+ accessToken, recoveryTimeout, connectToDnViaHostname);
|
|
|
newBlock = primary.recoverBlock(block, isAppend, newnodes);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Failed recovery attempt #" + recoveryErrorCount +
|
|
@@ -3423,10 +3442,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
boolean result = false;
|
|
|
try {
|
|
|
- LOG.debug("Connecting to " + nodes[0].getName());
|
|
|
- InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
|
|
|
+ final String dnName = nodes[0].getName(connectToDnViaHostname);
|
|
|
+ InetSocketAddress target = NetUtils.createSocketAddr(dnName);
|
|
|
s = socketFactory.createSocket();
|
|
|
timeoutValue = 3000 * nodes.length + socketTimeout;
|
|
|
+ LOG.debug("Connecting to " + dnName);
|
|
|
NetUtils.connect(s, target, timeoutValue);
|
|
|
s.setSoTimeout(timeoutValue);
|
|
|
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|