|
@@ -93,6 +93,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
final int writePacketSize;
|
|
|
private final FileSystem.Statistics stats;
|
|
|
private int maxBlockAcquireFailures;
|
|
|
+ private boolean shortCircuitLocalReads;
|
|
|
|
|
|
/**
|
|
|
* We assume we're talking to another CDH server, which supports
|
|
@@ -144,6 +145,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
rpcNamenode, methodNameToPolicyMap);
|
|
|
}
|
|
|
|
|
|
+ /** Create {@link ClientDatanodeProtocol} proxy with block/token */
|
|
|
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
|
|
|
DatanodeID datanodeid, Configuration conf,
|
|
|
Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
|
|
@@ -160,6 +162,20 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
.getDefaultSocketFactory(conf), socketTimeout);
|
|
|
}
|
|
|
|
|
|
+ /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
|
|
|
+ static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|
|
+ DatanodeID datanodeid, Configuration conf, int socketTimeout)
|
|
|
+ throws IOException {
|
|
|
+ InetSocketAddress addr = NetUtils.createSocketAddr(
|
|
|
+ datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
|
|
+ if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
|
+ ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
|
|
|
+ }
|
|
|
+ return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
|
|
|
+ ClientDatanodeProtocol.versionID, addr, conf, NetUtils
|
|
|
+ .getDefaultSocketFactory(conf), socketTimeout);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Same as this(NameNode.getAddress(conf), conf);
|
|
|
* @see #DFSClient(InetSocketAddress, Configuration)
|
|
@@ -206,7 +222,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// dfs.write.packet.size is an internal config variable
|
|
|
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
|
|
|
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
|
|
|
-
|
|
|
+
|
|
|
ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
|
|
String taskId = conf.get("mapred.task.id");
|
|
@@ -229,6 +245,13 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
|
|
|
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
|
|
|
}
|
|
|
+ // read directly from the block file if configured.
|
|
|
+ this.shortCircuitLocalReads = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Short circuit read is " + shortCircuitLocalReads);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static int getMaxBlockAcquireFailures(Configuration conf) {
|
|
@@ -324,6 +347,82 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get {@link BlockReader} for short circuited local reads.
|
|
|
+ */
|
|
|
+ private static BlockReader getLocalBlockReader(Configuration conf,
|
|
|
+ String src, Block blk, Token<BlockTokenIdentifier> accessToken,
|
|
|
+ DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
|
|
|
+ throws InvalidToken, IOException {
|
|
|
+ try {
|
|
|
+ return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
|
|
|
+ chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
|
|
|
+ - offsetIntoBlock);
|
|
|
+ } catch (RemoteException re) {
|
|
|
+ throw re.unwrapRemoteException(InvalidToken.class,
|
|
|
+ AccessControlException.class);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Set<String> localIpAddresses = Collections
|
|
|
+ .synchronizedSet(new HashSet<String>());
|
|
|
+
|
|
|
+ private static boolean isLocalAddress(InetSocketAddress targetAddr) {
|
|
|
+ InetAddress addr = targetAddr.getAddress();
|
|
|
+ if (localIpAddresses.contains(addr.getHostAddress())) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Address " + targetAddr + " is local");
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if the address is any local or loop back
|
|
|
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
|
|
|
+
|
|
|
+ // Check if the address is defined on any interface
|
|
|
+ if (!local) {
|
|
|
+ try {
|
|
|
+ local = NetworkInterface.getByInetAddress(addr) != null;
|
|
|
+ } catch (SocketException e) {
|
|
|
+ local = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Address " + targetAddr + " is local");
|
|
|
+ }
|
|
|
+ if (local == true) {
|
|
|
+ localIpAddresses.add(addr.getHostAddress());
|
|
|
+ }
|
|
|
+ return local;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Should the block access token be refetched on an exception
|
|
|
+ *
|
|
|
+ * @param ex Exception received
|
|
|
+ * @param targetAddr Target datanode address from where exception was received
|
|
|
+ * @return true if block access token has expired or invalid and it should be
|
|
|
+ * refetched
|
|
|
+ */
|
|
|
+ private static boolean tokenRefetchNeeded(IOException ex,
|
|
|
+ InetSocketAddress targetAddr) {
|
|
|
+ /*
|
|
|
+ * Get a new access token and retry. Retry is needed in 2 cases. 1) When
|
|
|
+ * both NN and DN re-started while DFSClient holding a cached access token.
|
|
|
+ * 2) In the case that NN fails to update its access key at pre-set interval
|
|
|
+ * (by a wide margin) and subsequently restarts. In this case, DN
|
|
|
+ * re-registers itself with NN and receives a new access key, but DN will
|
|
|
+ * delete the old access key from its memory since it's considered expired
|
|
|
+ * based on the estimated expiration date.
|
|
|
+ */
|
|
|
+ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
|
|
|
+ LOG.info("Access token was invalid when connecting to " + targetAddr
|
|
|
+ + " : " + ex);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Cancel a delegation token
|
|
|
* @param token the token to cancel
|
|
@@ -1312,16 +1411,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
private Socket dnSock; //for now just sending checksumOk.
|
|
|
private DataInputStream in;
|
|
|
- private DataChecksum checksum;
|
|
|
- private long lastChunkOffset = -1;
|
|
|
- private long lastChunkLen = -1;
|
|
|
+ protected DataChecksum checksum;
|
|
|
+ protected long lastChunkOffset = -1;
|
|
|
+ protected long lastChunkLen = -1;
|
|
|
private long lastSeqNo = -1;
|
|
|
|
|
|
- private long startOffset;
|
|
|
- private long firstChunkOffset;
|
|
|
- private int bytesPerChecksum;
|
|
|
- private int checksumSize;
|
|
|
- private boolean gotEOS = false;
|
|
|
+ protected long startOffset;
|
|
|
+ protected long firstChunkOffset;
|
|
|
+ protected int bytesPerChecksum;
|
|
|
+ protected int checksumSize;
|
|
|
+ protected boolean gotEOS = false;
|
|
|
|
|
|
byte[] skipBuf = null;
|
|
|
ByteBuffer checksumBytes = null;
|
|
@@ -1358,7 +1457,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
int nRead = super.read(buf, off, len);
|
|
|
|
|
|
// if gotEOS was set in the previous read and checksum is enabled :
|
|
|
- if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
|
|
|
+ if (dnSock != null && gotEOS && !eosBefore && nRead >= 0
|
|
|
+ && needChecksum()) {
|
|
|
//checksum is verified and there are no errors.
|
|
|
checksumOk(dnSock);
|
|
|
}
|
|
@@ -1536,14 +1636,44 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
checksumSize = this.checksum.getChecksumSize();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Public constructor
|
|
|
+ */
|
|
|
+ BlockReader(Path file, int numRetries) {
|
|
|
+ super(file, numRetries);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected BlockReader(Path file, int numRetries, DataChecksum checksum,
|
|
|
+ boolean verifyChecksum) {
|
|
|
+ super(file,
|
|
|
+ numRetries,
|
|
|
+ verifyChecksum,
|
|
|
+ checksum.getChecksumSize() > 0? checksum : null,
|
|
|
+ checksum.getBytesPerChecksum(),
|
|
|
+ checksum.getChecksumSize());
|
|
|
+ }
|
|
|
+
|
|
|
public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken,
|
|
|
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
|
|
|
return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
|
|
|
true);
|
|
|
}
|
|
|
|
|
|
- /** Java Doc required */
|
|
|
- public static BlockReader newBlockReader( Socket sock, String file, long blockId,
|
|
|
+ /**
|
|
|
+ * Creates a new {@link BlockReader} for the given blockId.
|
|
|
+ * @param sock Socket to read the block.
|
|
|
+ * @param file File to which this block belongs.
|
|
|
+ * @param blockId Block id.
|
|
|
+ * @param accessToken Block access token.
|
|
|
+ * @param genStamp Generation stamp of the block.
|
|
|
+ * @param startOffset Start offset for the data.
|
|
|
+ * @param len Length to be read.
|
|
|
+ * @param bufferSize Buffer size to use.
|
|
|
+ * @param verifyChecksum Checksum verification is required or not.
|
|
|
+ * @return BlockReader object.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static BlockReader newBlockReader(Socket sock, String file, long blockId,
|
|
|
Token<BlockTokenIdentifier> accessToken,
|
|
|
long genStamp,
|
|
|
long startOffset, long len,
|
|
@@ -1887,6 +2017,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
return blockRange;
|
|
|
}
|
|
|
|
|
|
+ private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
|
|
|
+ throws IOException {
|
|
|
+ if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Open a DataInputStream to a DataNode so that it can be read from.
|
|
|
* We get block ID and the IDs of the destinations at startup, from the namenode.
|
|
@@ -1923,13 +2061,37 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
chosenNode = retval.info;
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
|
|
|
|
+ // try reading the block locally. if this fails, then go via
|
|
|
+ // the datanode
|
|
|
+ Block blk = targetBlock.getBlock();
|
|
|
+ Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
|
|
|
+ if (shouldTryShortCircuitRead(targetAddr)) {
|
|
|
+ try {
|
|
|
+ blockReader = getLocalBlockReader(conf, src, blk, accessToken,
|
|
|
+ chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock);
|
|
|
+ return chosenNode;
|
|
|
+ } catch (AccessControlException ex) {
|
|
|
+ LOG.warn("Short circuit access failed ", ex);
|
|
|
+ //Disable short circuit reads
|
|
|
+ shortCircuitLocalReads = false;
|
|
|
+ } catch (IOException ex) {
|
|
|
+ if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
|
|
|
+ /* Get a new access token and retry. */
|
|
|
+ refetchToken--;
|
|
|
+ fetchBlockAt(target);
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
+ LOG.info("Failed to read block " + targetBlock.getBlock()
|
|
|
+ + " on local machine" + StringUtils.stringifyException(ex));
|
|
|
+ LOG.info("Try reading via the datanode on " + targetAddr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
s = socketFactory.createSocket();
|
|
|
NetUtils.connect(s, targetAddr, socketTimeout);
|
|
|
s.setSoTimeout(socketTimeout);
|
|
|
- Block blk = targetBlock.getBlock();
|
|
|
- Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
|
|
|
-
|
|
|
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
|
|
|
accessToken,
|
|
|
blk.getGenerationStamp(),
|
|
@@ -1937,20 +2099,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
buffersize, verifyChecksum, clientName);
|
|
|
return chosenNode;
|
|
|
} catch (IOException ex) {
|
|
|
- if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
|
|
|
- LOG.info("Will fetch a new access token and retry, "
|
|
|
- + "access token was invalid when connecting to " + targetAddr
|
|
|
- + " : " + ex);
|
|
|
- /*
|
|
|
- * Get a new access token and retry. Retry is needed in 2 cases. 1)
|
|
|
- * When both NN and DN re-started while DFSClient holding a cached
|
|
|
- * access token. 2) In the case that NN fails to update its
|
|
|
- * access key at pre-set interval (by a wide margin) and
|
|
|
- * subsequently restarts. In this case, DN re-registers itself with
|
|
|
- * NN and receives a new access key, but DN will delete the old
|
|
|
- * access key from its memory since it's considered expired based on
|
|
|
- * the estimated expiration date.
|
|
|
- */
|
|
|
+ if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
|
|
|
refetchToken--;
|
|
|
fetchBlockAt(target);
|
|
|
} else {
|
|
@@ -1965,8 +2114,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
if (s != null) {
|
|
|
try {
|
|
|
s.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- }
|
|
|
+ } catch (IOException iex) { }
|
|
|
}
|
|
|
s = null;
|
|
|
}
|
|
@@ -2154,21 +2302,31 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
DatanodeInfo chosenNode = retval.info;
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
|
BlockReader reader = null;
|
|
|
-
|
|
|
+
|
|
|
+ int len = (int) (end - start + 1);
|
|
|
try {
|
|
|
- dn = socketFactory.createSocket();
|
|
|
- NetUtils.connect(dn, targetAddr, socketTimeout);
|
|
|
- dn.setSoTimeout(socketTimeout);
|
|
|
Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
|
|
|
-
|
|
|
- int len = (int) (end - start + 1);
|
|
|
-
|
|
|
- reader = BlockReader.newBlockReader(dn, src,
|
|
|
- block.getBlock().getBlockId(),
|
|
|
- accessToken,
|
|
|
- block.getBlock().getGenerationStamp(),
|
|
|
- start, len, buffersize,
|
|
|
- verifyChecksum, clientName);
|
|
|
+ // first try reading the block locally.
|
|
|
+ if (shouldTryShortCircuitRead(targetAddr)) {
|
|
|
+ try {
|
|
|
+ reader = getLocalBlockReader(conf, src, block.getBlock(),
|
|
|
+ accessToken, chosenNode, DFSClient.this.socketTimeout, start);
|
|
|
+ } catch (AccessControlException ex) {
|
|
|
+ LOG.warn("Short circuit access failed ", ex);
|
|
|
+ //Disable short circuit reads
|
|
|
+ shortCircuitLocalReads = false;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // go to the datanode
|
|
|
+ dn = socketFactory.createSocket();
|
|
|
+ NetUtils.connect(dn, targetAddr, socketTimeout);
|
|
|
+ dn.setSoTimeout(socketTimeout);
|
|
|
+ reader = BlockReader.newBlockReader(dn, src,
|
|
|
+ block.getBlock().getBlockId(), accessToken,
|
|
|
+ block.getBlock().getGenerationStamp(), start, len, buffersize,
|
|
|
+ verifyChecksum, clientName);
|
|
|
+ }
|
|
|
int nread = reader.readAll(buf, offset, len);
|
|
|
if (nread != len) {
|
|
|
throw new IOException("truncated return from reader.read(): " +
|
|
@@ -2181,10 +2339,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
e.getPos() + " from " + chosenNode.getName());
|
|
|
reportChecksumFailure(src, block.getBlock(), chosenNode);
|
|
|
} catch (IOException e) {
|
|
|
- if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
|
|
|
- LOG.info("Will get a new access token and retry, "
|
|
|
- + "access token was invalid when connecting to " + targetAddr
|
|
|
- + " : " + e);
|
|
|
+ if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
|
|
|
refetchToken--;
|
|
|
fetchBlockAt(block.getStartOffset());
|
|
|
continue;
|
|
@@ -3314,7 +3469,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
} catch (IOException ie) {
|
|
|
|
|
|
- LOG.info("Exception in createBlockOutputStream " + ie);
|
|
|
+ LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() +
|
|
|
+ " " + ie);
|
|
|
|
|
|
// find the datanode that matches
|
|
|
if (firstBadLink.length() != 0) {
|