|
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
+import java.io.FileDescriptor;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
@@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
|
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.ReadaheadPool;
|
|
|
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
|
|
+import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
import org.apache.hadoop.net.SocketOutputStream;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
|
|
@@ -118,7 +122,9 @@ class BlockSender implements java.io.Closeable {
|
|
|
private DataInputStream checksumIn;
|
|
|
/** Checksum utility */
|
|
|
private final DataChecksum checksum;
|
|
|
- /** Starting position to read */
|
|
|
+ /** Initial position to read */
|
|
|
+ private long initialOffset;
|
|
|
+ /** Current position of read */
|
|
|
private long offset;
|
|
|
/** Position of last byte to read from block file */
|
|
|
private final long endOffset;
|
|
@@ -142,6 +148,24 @@ class BlockSender implements java.io.Closeable {
|
|
|
private final String clientTraceFmt;
|
|
|
private volatile ChunkChecksum lastChunkChecksum = null;
|
|
|
|
|
|
+ /** The file descriptor of the block being sent */
|
|
|
+ private FileDescriptor blockInFd;
|
|
|
+
|
|
|
+ // Cache-management related fields
|
|
|
+ private final long readaheadLength;
|
|
|
+ private boolean shouldDropCacheBehindRead;
|
|
|
+ private ReadaheadRequest curReadahead;
|
|
|
+ private long lastCacheDropOffset;
|
|
|
+ private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
|
|
|
+ /**
|
|
|
+ * Minimum length of read below which management of the OS
|
|
|
+ * buffer cache is disabled.
|
|
|
+ */
|
|
|
+ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
|
|
|
+
|
|
|
+ private static ReadaheadPool readaheadPool =
|
|
|
+ ReadaheadPool.getInstance();
|
|
|
+
|
|
|
/**
|
|
|
* Constructor
|
|
|
*
|
|
@@ -165,6 +189,8 @@ class BlockSender implements java.io.Closeable {
|
|
|
this.corruptChecksumOk = corruptChecksumOk;
|
|
|
this.verifyChecksum = verifyChecksum;
|
|
|
this.clientTraceFmt = clientTraceFmt;
|
|
|
+ this.readaheadLength = datanode.getReadaheadLength();
|
|
|
+ this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
|
|
|
|
|
|
synchronized(datanode.data) {
|
|
|
this.replica = getReplica(block, datanode);
|
|
@@ -277,6 +303,11 @@ class BlockSender implements java.io.Closeable {
|
|
|
DataNode.LOG.debug("replica=" + replica);
|
|
|
}
|
|
|
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
|
|
|
+ if (blockIn instanceof FileInputStream) {
|
|
|
+ blockInFd = ((FileInputStream)blockIn).getFD();
|
|
|
+ } else {
|
|
|
+ blockInFd = null;
|
|
|
+ }
|
|
|
} catch (IOException ioe) {
|
|
|
IOUtils.closeStream(this);
|
|
|
IOUtils.closeStream(blockIn);
|
|
@@ -288,6 +319,20 @@ class BlockSender implements java.io.Closeable {
|
|
|
* close opened files.
|
|
|
*/
|
|
|
public void close() throws IOException {
|
|
|
+ if (blockInFd != null && shouldDropCacheBehindRead) {
|
|
|
+ // drop the last few MB of the file from cache
|
|
|
+ try {
|
|
|
+ NativeIO.posixFadviseIfPossible(
|
|
|
+ blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
|
|
|
+ NativeIO.POSIX_FADV_DONTNEED);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Unable to drop cache on file close", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (curReadahead != null) {
|
|
|
+ curReadahead.cancel();
|
|
|
+ }
|
|
|
+
|
|
|
IOException ioe = null;
|
|
|
if(checksumIn!=null) {
|
|
|
try {
|
|
@@ -304,6 +349,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
ioe = e;
|
|
|
}
|
|
|
blockIn = null;
|
|
|
+ blockInFd = null;
|
|
|
}
|
|
|
// throw IOException if there is any
|
|
|
if(ioe!= null) {
|
|
@@ -538,10 +584,20 @@ class BlockSender implements java.io.Closeable {
|
|
|
if (out == null) {
|
|
|
throw new IOException( "out stream is null" );
|
|
|
}
|
|
|
- final long initialOffset = offset;
|
|
|
+ initialOffset = offset;
|
|
|
long totalRead = 0;
|
|
|
OutputStream streamForSendChunks = out;
|
|
|
|
|
|
+ lastCacheDropOffset = initialOffset;
|
|
|
+
|
|
|
+ if (isLongRead() && blockInFd != null) {
|
|
|
+ // Advise that this file descriptor will be accessed sequentially.
|
|
|
+ NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Trigger readahead of beginning of file if configured.
|
|
|
+ manageOsCache();
|
|
|
+
|
|
|
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
|
try {
|
|
|
writeChecksumHeader(out);
|
|
@@ -569,6 +625,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
|
|
|
|
|
|
while (endOffset > offset) {
|
|
|
+ manageOsCache();
|
|
|
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
|
|
|
transferTo, throttler);
|
|
|
offset += len;
|
|
@@ -595,6 +652,45 @@ class BlockSender implements java.io.Closeable {
|
|
|
}
|
|
|
return totalRead;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Manage the OS buffer cache by performing read-ahead
|
|
|
+ * and drop-behind.
|
|
|
+ */
|
|
|
+ private void manageOsCache() throws IOException {
|
|
|
+ if (!isLongRead() || blockInFd == null) {
|
|
|
+ // don't manage cache manually for short-reads, like
|
|
|
+ // HBase random read workloads.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Perform readahead if necessary
|
|
|
+ if (readaheadLength > 0 && readaheadPool != null) {
|
|
|
+ curReadahead = readaheadPool.readaheadStream(
|
|
|
+ clientTraceFmt, blockInFd,
|
|
|
+ offset, readaheadLength, Long.MAX_VALUE,
|
|
|
+ curReadahead);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Drop what we've just read from cache, since we aren't
|
|
|
+ // likely to need it again
|
|
|
+ long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
|
|
|
+ if (shouldDropCacheBehindRead &&
|
|
|
+ offset >= nextCacheDropOffset) {
|
|
|
+ long dropLength = offset - lastCacheDropOffset;
|
|
|
+ if (dropLength >= 1024) {
|
|
|
+ NativeIO.posixFadviseIfPossible(blockInFd,
|
|
|
+ lastCacheDropOffset, dropLength,
|
|
|
+ NativeIO.POSIX_FADV_DONTNEED);
|
|
|
+ }
|
|
|
+ lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isLongRead() {
|
|
|
+ return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Write checksum header to the output stream
|