|
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
-import java.io.FileDescriptor;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
@@ -42,11 +41,11 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
|
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
|
|
-import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
import org.apache.hadoop.net.SocketOutputStream;
|
|
|
import org.apache.hadoop.util.AutoCloseableLock;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
@@ -120,12 +119,11 @@ class BlockSender implements java.io.Closeable {
|
|
|
|
|
|
/** the block to read from */
|
|
|
private final ExtendedBlock block;
|
|
|
- /** Stream to read block data from */
|
|
|
- private InputStream blockIn;
|
|
|
+
|
|
|
+ /** InputStreams and file descriptors to read block/checksum. */
|
|
|
+ private ReplicaInputStreams ris;
|
|
|
/** updated while using transferTo() */
|
|
|
private long blockInPosition = -1;
|
|
|
- /** Stream to read checksum */
|
|
|
- private DataInputStream checksumIn;
|
|
|
/** Checksum utility */
|
|
|
private final DataChecksum checksum;
|
|
|
/** Initial position to read */
|
|
@@ -152,11 +150,6 @@ class BlockSender implements java.io.Closeable {
|
|
|
private final String clientTraceFmt;
|
|
|
private volatile ChunkChecksum lastChunkChecksum = null;
|
|
|
private DataNode datanode;
|
|
|
-
|
|
|
- /** The file descriptor of the block being sent */
|
|
|
- private FileDescriptor blockInFd;
|
|
|
- /** The reference to the volume where the block is located */
|
|
|
- private FsVolumeReference volumeRef;
|
|
|
|
|
|
/** The replica of the block that is being read. */
|
|
|
private final Replica replica;
|
|
@@ -201,6 +194,9 @@ class BlockSender implements java.io.Closeable {
|
|
|
boolean sendChecksum, DataNode datanode, String clientTraceFmt,
|
|
|
CachingStrategy cachingStrategy)
|
|
|
throws IOException {
|
|
|
+ InputStream blockIn = null;
|
|
|
+ DataInputStream checksumIn = null;
|
|
|
+ FsVolumeReference volumeRef = null;
|
|
|
try {
|
|
|
this.block = block;
|
|
|
this.corruptChecksumOk = corruptChecksumOk;
|
|
@@ -281,7 +277,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
(!is32Bit || length <= Integer.MAX_VALUE);
|
|
|
|
|
|
// Obtain a reference before reading data
|
|
|
- this.volumeRef = datanode.data.getVolume(block).obtainReference();
|
|
|
+ volumeRef = datanode.data.getVolume(block).obtainReference();
|
|
|
|
|
|
/*
|
|
|
* (corruptChecksumOK, meta_file_exist): operation
|
|
@@ -405,14 +401,9 @@ class BlockSender implements java.io.Closeable {
|
|
|
DataNode.LOG.debug("replica=" + replica);
|
|
|
}
|
|
|
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
|
|
|
- if (blockIn instanceof FileInputStream) {
|
|
|
- blockInFd = ((FileInputStream)blockIn).getFD();
|
|
|
- } else {
|
|
|
- blockInFd = null;
|
|
|
- }
|
|
|
+ ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
|
|
|
} catch (IOException ioe) {
|
|
|
IOUtils.closeStream(this);
|
|
|
- IOUtils.closeStream(blockIn);
|
|
|
throw ioe;
|
|
|
}
|
|
|
}
|
|
@@ -422,12 +413,11 @@ class BlockSender implements java.io.Closeable {
|
|
|
*/
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
- if (blockInFd != null &&
|
|
|
+ if (ris.getDataInFd() != null &&
|
|
|
((dropCacheBehindAllReads) ||
|
|
|
(dropCacheBehindLargeReads && isLongRead()))) {
|
|
|
try {
|
|
|
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
|
|
- block.getBlockName(), blockInFd, lastCacheDropOffset,
|
|
|
+ ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
|
|
|
offset - lastCacheDropOffset, POSIX_FADV_DONTNEED);
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Unable to drop cache on file close", e);
|
|
@@ -436,32 +426,12 @@ class BlockSender implements java.io.Closeable {
|
|
|
if (curReadahead != null) {
|
|
|
curReadahead.cancel();
|
|
|
}
|
|
|
-
|
|
|
- IOException ioe = null;
|
|
|
- if(checksumIn!=null) {
|
|
|
- try {
|
|
|
- checksumIn.close(); // close checksum file
|
|
|
- } catch (IOException e) {
|
|
|
- ioe = e;
|
|
|
- }
|
|
|
- checksumIn = null;
|
|
|
- }
|
|
|
- if(blockIn!=null) {
|
|
|
- try {
|
|
|
- blockIn.close(); // close data file
|
|
|
- } catch (IOException e) {
|
|
|
- ioe = e;
|
|
|
- }
|
|
|
- blockIn = null;
|
|
|
- blockInFd = null;
|
|
|
- }
|
|
|
- if (volumeRef != null) {
|
|
|
- IOUtils.cleanup(null, volumeRef);
|
|
|
- volumeRef = null;
|
|
|
- }
|
|
|
- // throw IOException if there is any
|
|
|
- if(ioe!= null) {
|
|
|
- throw ioe;
|
|
|
+
|
|
|
+ try {
|
|
|
+ ris.closeStreams();
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(ris);
|
|
|
+ ris = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -565,7 +535,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
int checksumOff = pkt.position();
|
|
|
byte[] buf = pkt.array();
|
|
|
|
|
|
- if (checksumSize > 0 && checksumIn != null) {
|
|
|
+ if (checksumSize > 0 && ris.getChecksumIn() != null) {
|
|
|
readChecksum(buf, checksumOff, checksumDataLen);
|
|
|
|
|
|
// write in progress that we need to use to get last checksum
|
|
@@ -581,7 +551,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
|
|
|
int dataOff = checksumOff + checksumDataLen;
|
|
|
if (!transferTo) { // normal transfer
|
|
|
- IOUtils.readFully(blockIn, buf, dataOff, dataLen);
|
|
|
+ ris.readDataFully(buf, dataOff, dataLen);
|
|
|
|
|
|
if (verifyChecksum) {
|
|
|
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
|
|
@@ -593,12 +563,12 @@ class BlockSender implements java.io.Closeable {
|
|
|
SocketOutputStream sockOut = (SocketOutputStream)out;
|
|
|
// First write header and checksums
|
|
|
sockOut.write(buf, headerOff, dataOff - headerOff);
|
|
|
-
|
|
|
+
|
|
|
// no need to flush since we know out is not a buffered stream
|
|
|
- FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
|
|
|
+ FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
|
|
|
LongWritable waitTime = new LongWritable();
|
|
|
LongWritable transferTime = new LongWritable();
|
|
|
- sockOut.transferToFully(fileCh, blockInPosition, dataLen,
|
|
|
+ sockOut.transferToFully(fileCh, blockInPosition, dataLen,
|
|
|
waitTime, transferTime);
|
|
|
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
|
|
|
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
|
|
@@ -630,7 +600,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
|
|
|
LOG.error("BlockSender.sendChunks() exception: ", e);
|
|
|
datanode.getBlockScanner().markSuspectBlock(
|
|
|
- volumeRef.getVolume().getStorageID(),
|
|
|
+ ris.getVolumeRef().getVolume().getStorageID(),
|
|
|
block);
|
|
|
}
|
|
|
}
|
|
@@ -653,16 +623,15 @@ class BlockSender implements java.io.Closeable {
|
|
|
*/
|
|
|
private void readChecksum(byte[] buf, final int checksumOffset,
|
|
|
final int checksumLen) throws IOException {
|
|
|
- if (checksumSize <= 0 && checksumIn == null) {
|
|
|
+ if (checksumSize <= 0 && ris.getChecksumIn() == null) {
|
|
|
return;
|
|
|
}
|
|
|
try {
|
|
|
- checksumIn.readFully(buf, checksumOffset, checksumLen);
|
|
|
+ ris.readChecksumFully(buf, checksumOffset, checksumLen);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn(" Could not read or failed to verify checksum for data"
|
|
|
+ " at offset " + offset + " for block " + block, e);
|
|
|
- IOUtils.closeStream(checksumIn);
|
|
|
- checksumIn = null;
|
|
|
+ ris.closeChecksumStream();
|
|
|
if (corruptChecksumOk) {
|
|
|
if (checksumOffset < checksumLen) {
|
|
|
// Just fill the array with zeros.
|
|
@@ -746,10 +715,10 @@ class BlockSender implements java.io.Closeable {
|
|
|
|
|
|
lastCacheDropOffset = initialOffset;
|
|
|
|
|
|
- if (isLongRead() && blockInFd != null) {
|
|
|
+ if (isLongRead() && ris.getDataInFd() != null) {
|
|
|
// Advise that this file descriptor will be accessed sequentially.
|
|
|
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
|
|
- block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL);
|
|
|
+ ris.dropCacheBehindReads(block.getBlockName(), 0, 0,
|
|
|
+ POSIX_FADV_SEQUENTIAL);
|
|
|
}
|
|
|
|
|
|
// Trigger readahead of beginning of file if configured.
|
|
@@ -761,9 +730,10 @@ class BlockSender implements java.io.Closeable {
|
|
|
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
|
|
|
boolean transferTo = transferToAllowed && !verifyChecksum
|
|
|
&& baseStream instanceof SocketOutputStream
|
|
|
- && blockIn instanceof FileInputStream;
|
|
|
+ && ris.getDataIn() instanceof FileInputStream;
|
|
|
if (transferTo) {
|
|
|
- FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
|
|
|
+ FileChannel fileChannel =
|
|
|
+ ((FileInputStream)ris.getDataIn()).getChannel();
|
|
|
blockInPosition = fileChannel.position();
|
|
|
streamForSendChunks = baseStream;
|
|
|
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
|
|
@@ -818,14 +788,16 @@ class BlockSender implements java.io.Closeable {
|
|
|
private void manageOsCache() throws IOException {
|
|
|
// We can't manage the cache for this block if we don't have a file
|
|
|
// descriptor to work with.
|
|
|
- if (blockInFd == null) return;
|
|
|
+ if (ris.getDataInFd() == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
// Perform readahead if necessary
|
|
|
if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
|
|
|
(alwaysReadahead || isLongRead())) {
|
|
|
curReadahead = datanode.readaheadPool.readaheadStream(
|
|
|
- clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
|
|
|
- curReadahead);
|
|
|
+ clientTraceFmt, ris.getDataInFd(), offset, readaheadLength,
|
|
|
+ Long.MAX_VALUE, curReadahead);
|
|
|
}
|
|
|
|
|
|
// Drop what we've just read from cache, since we aren't
|
|
@@ -835,8 +807,7 @@ class BlockSender implements java.io.Closeable {
|
|
|
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
|
|
|
if (offset >= nextCacheDropOffset) {
|
|
|
long dropLength = offset - lastCacheDropOffset;
|
|
|
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
|
|
- block.getBlockName(), blockInFd, lastCacheDropOffset,
|
|
|
+ ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
|
|
|
dropLength, POSIX_FADV_DONTNEED);
|
|
|
lastCacheDropOffset = offset;
|
|
|
}
|