|
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
+import java.io.FileDescriptor;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
@@ -34,6 +35,9 @@ import org.apache.hadoop.fs.ChecksumException;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.ReadaheadPool;
|
|
|
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
|
|
+import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
import org.apache.hadoop.net.SocketOutputStream;
|
|
|
import org.apache.hadoop.util.ChecksumUtil;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
@@ -51,6 +55,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
private long blockInPosition = -1; // updated while using transferTo().
|
|
|
private DataInputStream checksumIn; // checksum datastream
|
|
|
private DataChecksum checksum; // checksum stream
|
|
|
+ private long initialOffset; // initial position to read
|
|
|
private long offset; // starting position to read
|
|
|
private long endOffset; // ending position
|
|
|
private long blockLength;
|
|
@@ -74,6 +79,22 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
*/
|
|
|
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
|
|
|
|
|
|
+ /** The file descriptor of the block being sent */
|
|
|
+ private FileDescriptor blockInFd;
|
|
|
+
|
|
|
+ // Cache-management related fields
|
|
|
+ private final long readaheadLength;
|
|
|
+ private boolean shouldDropCacheBehindRead;
|
|
|
+ private ReadaheadRequest curReadahead;
|
|
|
+ private long lastCacheDropOffset;
|
|
|
+ private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
|
|
|
+ /**
|
|
|
+ * Minimum length of read below which management of the OS buffer cache is
|
|
|
+ * disabled.
|
|
|
+ */
|
|
|
+ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
|
|
|
+
|
|
|
+ private static ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
|
|
|
|
|
|
BlockSender(Block block, long startOffset, long length,
|
|
|
boolean corruptChecksumOk, boolean chunkOffsetOK,
|
|
@@ -94,7 +115,9 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
this.blockLength = datanode.data.getVisibleLength(block);
|
|
|
this.transferToAllowed = datanode.transferToAllowed;
|
|
|
this.clientTraceFmt = clientTraceFmt;
|
|
|
-
|
|
|
+ this.readaheadLength = datanode.getReadaheadLength();
|
|
|
+ this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
|
|
|
+
|
|
|
if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
|
|
|
checksumIn = new DataInputStream(
|
|
|
new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
|
|
@@ -166,6 +189,11 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
seqno = 0;
|
|
|
|
|
|
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
|
|
|
+ if (blockIn instanceof FileInputStream) {
|
|
|
+ blockInFd = ((FileInputStream) blockIn).getFD();
|
|
|
+ } else {
|
|
|
+ blockInFd = null;
|
|
|
+ }
|
|
|
memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block);
|
|
|
} catch (IOException ioe) {
|
|
|
IOUtils.closeStream(this);
|
|
@@ -178,6 +206,19 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
* close opened files.
|
|
|
*/
|
|
|
public void close() throws IOException {
|
|
|
+ if (blockInFd != null && shouldDropCacheBehindRead) {
|
|
|
+ // drop the last few MB of the file from cache
|
|
|
+ try {
|
|
|
+ NativeIO.posixFadviseIfPossible(blockInFd, lastCacheDropOffset, offset
|
|
|
+ - lastCacheDropOffset, NativeIO.POSIX_FADV_DONTNEED);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Unable to drop cache on file close", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (curReadahead != null) {
|
|
|
+ curReadahead.cancel();
|
|
|
+ }
|
|
|
+
|
|
|
IOException ioe = null;
|
|
|
// close checksum file
|
|
|
if(checksumIn!=null) {
|
|
@@ -196,6 +237,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
ioe = e;
|
|
|
}
|
|
|
blockIn = null;
|
|
|
+ blockInFd = null;
|
|
|
}
|
|
|
// throw IOException if there is any
|
|
|
if(ioe!= null) {
|
|
@@ -387,10 +429,21 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
this.throttler = throttler;
|
|
|
|
|
|
- long initialOffset = offset;
|
|
|
+ initialOffset = offset;
|
|
|
long totalRead = 0;
|
|
|
OutputStream streamForSendChunks = out;
|
|
|
|
|
|
+ lastCacheDropOffset = initialOffset;
|
|
|
+
|
|
|
+ if (isLongRead() && blockInFd != null) {
|
|
|
+ // Advise that this file descriptor will be accessed sequentially.
|
|
|
+ NativeIO.posixFadviseIfPossible(blockInFd, 0, 0,
|
|
|
+ NativeIO.POSIX_FADV_SEQUENTIAL);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Trigger readahead of beginning of file if configured.
|
|
|
+ manageOsCache();
|
|
|
+
|
|
|
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
|
try {
|
|
|
try {
|
|
@@ -433,6 +486,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
|
|
|
|
|
|
while (endOffset > offset) {
|
|
|
+ manageOsCache();
|
|
|
long len = sendChunks(pktBuf, maxChunksPerPacket,
|
|
|
streamForSendChunks);
|
|
|
offset += len;
|
|
@@ -465,6 +519,39 @@ class BlockSender implements java.io.Closeable, FSConstants {
|
|
|
return totalRead;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Manage the OS buffer cache by performing read-ahead and drop-behind.
|
|
|
+ */
|
|
|
+ private void manageOsCache() throws IOException {
|
|
|
+ if (!isLongRead() || blockInFd == null) {
|
|
|
+ // don't manage cache manually for short-reads, like
|
|
|
+ // HBase random read workloads.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Perform readahead if necessary
|
|
|
+ if (readaheadLength > 0 && readaheadPool != null) {
|
|
|
+ curReadahead = readaheadPool.readaheadStream(clientTraceFmt, blockInFd,
|
|
|
+ offset, readaheadLength, Long.MAX_VALUE, curReadahead);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Drop what we've just read from cache, since we aren't
|
|
|
+ // likely to need it again
|
|
|
+ long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
|
|
|
+ if (shouldDropCacheBehindRead && offset >= nextCacheDropOffset) {
|
|
|
+ long dropLength = offset - lastCacheDropOffset;
|
|
|
+ if (dropLength >= 1024) {
|
|
|
+ NativeIO.posixFadviseIfPossible(blockInFd, lastCacheDropOffset,
|
|
|
+ dropLength, NativeIO.POSIX_FADV_DONTNEED);
|
|
|
+ }
|
|
|
+ lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isLongRead() {
|
|
|
+ return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
|
|
|
+ }
|
|
|
+
|
|
|
boolean isBlockReadFully() {
|
|
|
return blockReadFully;
|
|
|
}
|