|
@@ -22,6 +22,7 @@ import java.io.FileDescriptor;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.RandomAccessFile;
|
|
import java.io.RandomAccessFile;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
import org.apache.hadoop.io.ReadaheadPool;
|
|
import org.apache.hadoop.io.ReadaheadPool;
|
|
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
|
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
|
@@ -37,13 +38,14 @@ public class FadvisedChunkedFile extends ChunkedFile {
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(FadvisedChunkedFile.class);
|
|
LoggerFactory.getLogger(FadvisedChunkedFile.class);
|
|
|
|
|
|
|
|
+ private final Object closeLock = new Object();
|
|
private final boolean manageOsCache;
|
|
private final boolean manageOsCache;
|
|
private final int readaheadLength;
|
|
private final int readaheadLength;
|
|
private final ReadaheadPool readaheadPool;
|
|
private final ReadaheadPool readaheadPool;
|
|
private final FileDescriptor fd;
|
|
private final FileDescriptor fd;
|
|
private final String identifier;
|
|
private final String identifier;
|
|
|
|
|
|
- private ReadaheadRequest readaheadRequest;
|
|
|
|
|
|
+ private volatile ReadaheadRequest readaheadRequest;
|
|
|
|
|
|
public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
|
|
public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
|
|
int chunkSize, boolean manageOsCache, int readaheadLength,
|
|
int chunkSize, boolean manageOsCache, int readaheadLength,
|
|
@@ -56,31 +58,50 @@ public class FadvisedChunkedFile extends ChunkedFile {
|
|
this.identifier = identifier;
|
|
this.identifier = identifier;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ FileDescriptor getFd() {
|
|
|
|
+ return fd;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public Object nextChunk() throws Exception {
|
|
public Object nextChunk() throws Exception {
|
|
- if (manageOsCache && readaheadPool != null) {
|
|
|
|
- readaheadRequest = readaheadPool
|
|
|
|
- .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
|
|
|
|
- getEndOffset(), readaheadRequest);
|
|
|
|
|
|
+ synchronized (closeLock) {
|
|
|
|
+ if (fd.valid()) {
|
|
|
|
+ if (manageOsCache && readaheadPool != null) {
|
|
|
|
+ readaheadRequest = readaheadPool
|
|
|
|
+ .readaheadStream(
|
|
|
|
+ identifier, fd, getCurrentOffset(), readaheadLength,
|
|
|
|
+ getEndOffset(), readaheadRequest);
|
|
|
|
+ }
|
|
|
|
+ return super.nextChunk();
|
|
|
|
+ } else {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- return super.nextChunk();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void close() throws Exception {
|
|
public void close() throws Exception {
|
|
- if (readaheadRequest != null) {
|
|
|
|
- readaheadRequest.cancel();
|
|
|
|
- }
|
|
|
|
- if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
|
|
|
|
- try {
|
|
|
|
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
|
|
|
|
- fd,
|
|
|
|
- getStartOffset(), getEndOffset() - getStartOffset(),
|
|
|
|
- POSIX_FADV_DONTNEED);
|
|
|
|
- } catch (Throwable t) {
|
|
|
|
- LOG.warn("Failed to manage OS cache for " + identifier, t);
|
|
|
|
|
|
+ synchronized (closeLock) {
|
|
|
|
+ if (readaheadRequest != null) {
|
|
|
|
+ readaheadRequest.cancel();
|
|
|
|
+ readaheadRequest = null;
|
|
|
|
+ }
|
|
|
|
+ if (fd.valid() &&
|
|
|
|
+ manageOsCache && getEndOffset() - getStartOffset() > 0) {
|
|
|
|
+ try {
|
|
|
|
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
|
|
|
+ identifier,
|
|
|
|
+ fd,
|
|
|
|
+ getStartOffset(), getEndOffset() - getStartOffset(),
|
|
|
|
+ POSIX_FADV_DONTNEED);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.warn("Failed to manage OS cache for " + identifier +
|
|
|
|
+ " fd " + fd.toString(), t);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ // fd becomes invalid upon closing
|
|
|
|
+ super.close();
|
|
}
|
|
}
|
|
- super.close();
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|