|
@@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
|
|
-import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -51,8 +50,9 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
****************************************************************/
|
|
|
@InterfaceAudience.Private
|
|
|
public class DFSInputStream extends FSInputStream {
|
|
|
+ private final SocketCache socketCache;
|
|
|
+
|
|
|
private final DFSClient dfsClient;
|
|
|
- private Socket s = null;
|
|
|
private boolean closed = false;
|
|
|
|
|
|
private final String src;
|
|
@@ -87,7 +87,9 @@ public class DFSInputStream extends FSInputStream {
|
|
|
private int buffersize = 1;
|
|
|
|
|
|
private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
|
|
|
-
|
|
|
+
|
|
|
+ private int nCachedConnRetry;
|
|
|
+
|
|
|
void addToDeadNodes(DatanodeInfo dnInfo) {
|
|
|
deadNodes.put(dnInfo, dnInfo);
|
|
|
}
|
|
@@ -98,9 +100,14 @@ public class DFSInputStream extends FSInputStream {
|
|
|
this.verifyChecksum = verifyChecksum;
|
|
|
this.buffersize = buffersize;
|
|
|
this.src = src;
|
|
|
+ this.socketCache = dfsClient.socketCache;
|
|
|
prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
|
|
10 * dfsClient.defaultBlockSize);
|
|
|
- timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
|
|
|
+ timeWindow = this.dfsClient.conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
|
|
|
+ nCachedConnRetry = this.dfsClient.conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
|
|
openInfo();
|
|
|
}
|
|
|
|
|
@@ -352,15 +359,11 @@ public class DFSInputStream extends FSInputStream {
|
|
|
throw new IOException("Attempted to read past end of file");
|
|
|
}
|
|
|
|
|
|
- if ( blockReader != null ) {
|
|
|
- blockReader.close();
|
|
|
+ // Will be getting a new BlockReader.
|
|
|
+ if (blockReader != null) {
|
|
|
+ closeBlockReader(blockReader);
|
|
|
blockReader = null;
|
|
|
}
|
|
|
-
|
|
|
- if (s != null) {
|
|
|
- s.close();
|
|
|
- s = null;
|
|
|
- }
|
|
|
|
|
|
//
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
@@ -381,14 +384,12 @@ public class DFSInputStream extends FSInputStream {
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
|
|
|
|
try {
|
|
|
- s = dfsClient.socketFactory.createSocket();
|
|
|
- NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
|
|
|
- s.setSoTimeout(dfsClient.socketTimeout);
|
|
|
Block blk = targetBlock.getBlock();
|
|
|
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
|
|
|
|
|
|
- blockReader = BlockReader.newBlockReader(s, src, blk,
|
|
|
- accessToken,
|
|
|
+ blockReader = getBlockReader(
|
|
|
+ targetAddr, src, blk,
|
|
|
+ accessToken,
|
|
|
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
|
|
|
buffersize, verifyChecksum, dfsClient.clientName);
|
|
|
return chosenNode;
|
|
@@ -415,13 +416,6 @@ public class DFSInputStream extends FSInputStream {
|
|
|
// Put chosen node into dead list, continue
|
|
|
addToDeadNodes(chosenNode);
|
|
|
}
|
|
|
- if (s != null) {
|
|
|
- try {
|
|
|
- s.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- }
|
|
|
- }
|
|
|
- s = null;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -435,16 +429,11 @@ public class DFSInputStream extends FSInputStream {
|
|
|
return;
|
|
|
}
|
|
|
dfsClient.checkOpen();
|
|
|
-
|
|
|
- if ( blockReader != null ) {
|
|
|
- blockReader.close();
|
|
|
+
|
|
|
+ if (blockReader != null) {
|
|
|
+ closeBlockReader(blockReader);
|
|
|
blockReader = null;
|
|
|
}
|
|
|
-
|
|
|
- if (s != null) {
|
|
|
- s.close();
|
|
|
- s = null;
|
|
|
- }
|
|
|
super.close();
|
|
|
closed = true;
|
|
|
}
|
|
@@ -457,7 +446,7 @@ public class DFSInputStream extends FSInputStream {
|
|
|
|
|
|
/* This is a used by regular read() and handles ChecksumExceptions.
|
|
|
* name readBuffer() is chosen to imply similarity to readBuffer() in
|
|
|
- * ChecksuFileSystem
|
|
|
+ * ChecksumFileSystem
|
|
|
*/
|
|
|
private synchronized int readBuffer(byte buf[], int off, int len)
|
|
|
throws IOException {
|
|
@@ -607,7 +596,6 @@ public class DFSInputStream extends FSInputStream {
|
|
|
//
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
|
//
|
|
|
- Socket dn = null;
|
|
|
int refetchToken = 1; // only need to get a new access token once
|
|
|
|
|
|
while (true) {
|
|
@@ -621,18 +609,15 @@ public class DFSInputStream extends FSInputStream {
|
|
|
BlockReader reader = null;
|
|
|
|
|
|
try {
|
|
|
- dn = dfsClient.socketFactory.createSocket();
|
|
|
- NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
|
|
|
- dn.setSoTimeout(dfsClient.socketTimeout);
|
|
|
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
|
|
|
|
|
|
int len = (int) (end - start + 1);
|
|
|
-
|
|
|
- reader = BlockReader.newBlockReader(dn, src,
|
|
|
- block.getBlock(),
|
|
|
- blockToken,
|
|
|
- start, len, buffersize,
|
|
|
- verifyChecksum, dfsClient.clientName);
|
|
|
+
|
|
|
+ reader = getBlockReader(targetAddr, src,
|
|
|
+ block.getBlock(),
|
|
|
+ blockToken,
|
|
|
+ start, len, buffersize,
|
|
|
+ verifyChecksum, dfsClient.clientName);
|
|
|
int nread = reader.readAll(buf, offset, len);
|
|
|
if (nread != len) {
|
|
|
throw new IOException("truncated return from reader.read(): " +
|
|
@@ -658,14 +643,104 @@ public class DFSInputStream extends FSInputStream {
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
}
|
|
|
} finally {
|
|
|
- IOUtils.closeStream(reader);
|
|
|
- IOUtils.closeSocket(dn);
|
|
|
+ if (reader != null) {
|
|
|
+ closeBlockReader(reader);
|
|
|
+ }
|
|
|
}
|
|
|
// Put chosen node into dead list, continue
|
|
|
addToDeadNodes(chosenNode);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Close the given BlockReader and cache its socket.
|
|
|
+ */
|
|
|
+ private void closeBlockReader(BlockReader reader) throws IOException {
|
|
|
+ if (reader.hasSentStatusCode()) {
|
|
|
+ Socket oldSock = reader.takeSocket();
|
|
|
+ socketCache.put(oldSock);
|
|
|
+ }
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Retrieve a BlockReader suitable for reading.
|
|
|
+ * This method will reuse the cached connection to the DN if appropriate.
|
|
|
+ * Otherwise, it will create a new connection.
|
|
|
+ *
|
|
|
+ * @param dnAddr Address of the datanode
|
|
|
+ * @param file File location
|
|
|
+ * @param block The Block object
|
|
|
+ * @param blockToken The access token for security
|
|
|
+ * @param startOffset The read offset, relative to block head
|
|
|
+ * @param len The number of bytes to read
|
|
|
+ * @param bufferSize The IO buffer size (not the client buffer size)
|
|
|
+ * @param verifyChecksum Whether to verify checksum
|
|
|
+ * @param clientName Client name
|
|
|
+ * @return New BlockReader instance
|
|
|
+ */
|
|
|
+ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
|
|
|
+ String file,
|
|
|
+ Block block,
|
|
|
+ Token<BlockTokenIdentifier> blockToken,
|
|
|
+ long startOffset,
|
|
|
+ long len,
|
|
|
+ int bufferSize,
|
|
|
+ boolean verifyChecksum,
|
|
|
+ String clientName)
|
|
|
+ throws IOException {
|
|
|
+ IOException err = null;
|
|
|
+ boolean fromCache = true;
|
|
|
+
|
|
|
+ // Allow retry since there is no way of knowing whether the cached socket
|
|
|
+ // is good until we actually use it.
|
|
|
+ for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
|
|
|
+ Socket sock = socketCache.get(dnAddr);
|
|
|
+ if (sock == null) {
|
|
|
+ fromCache = false;
|
|
|
+
|
|
|
+ sock = dfsClient.socketFactory.createSocket();
|
|
|
+
|
|
|
+ // TCP_NODELAY is crucial here because of bad interactions between
|
|
|
+ // Nagle's Algorithm and Delayed ACKs. With connection keepalive
|
|
|
+ // between the client and DN, the conversation looks like:
|
|
|
+ // 1. Client -> DN: Read block X
|
|
|
+ // 2. DN -> Client: data for block X
|
|
|
+ // 3. Client -> DN: Status OK (successful read)
|
|
|
+ // 4. Client -> DN: Read block Y
|
|
|
+ // The fact that step #3 and #4 are both in the client->DN direction
|
|
|
+ // triggers Nagling. If the DN is using delayed ACKs, this results
|
|
|
+ // in a delay of 40ms or more.
|
|
|
+ //
|
|
|
+ // TCP_NODELAY disables nagling and thus avoids this performance
|
|
|
+ // disaster.
|
|
|
+ sock.setTcpNoDelay(true);
|
|
|
+
|
|
|
+ NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
|
|
|
+ sock.setSoTimeout(dfsClient.socketTimeout);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // The OP_READ_BLOCK request is sent as we make the BlockReader
|
|
|
+ BlockReader reader =
|
|
|
+ BlockReader.newBlockReader(sock, file, block,
|
|
|
+ blockToken,
|
|
|
+ startOffset, len,
|
|
|
+ bufferSize, verifyChecksum,
|
|
|
+ clientName);
|
|
|
+ return reader;
|
|
|
+ } catch (IOException ex) {
|
|
|
+ // Our socket is no good.
|
|
|
+ DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
|
|
|
+ sock.close();
|
|
|
+ err = ex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Read bytes starting from the specified position.
|
|
|
*
|