|
@@ -187,6 +187,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
private long readAhead;
|
|
|
private S3AInputPolicy inputPolicy;
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
+ private volatile boolean isClosed = false;
|
|
|
private MetadataStore metadataStore;
|
|
|
private boolean allowAuthoritative;
|
|
|
|
|
@@ -678,7 +679,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
*/
|
|
|
public FSDataInputStream open(Path f, int bufferSize)
|
|
|
throws IOException {
|
|
|
-
|
|
|
+ checkNotClosed();
|
|
|
LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy);
|
|
|
final FileStatus fileStatus = getFileStatus(f);
|
|
|
if (fileStatus.isDirectory()) {
|
|
@@ -722,6 +723,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
public FSDataOutputStream create(Path f, FsPermission permission,
|
|
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
|
|
Progressable progress) throws IOException {
|
|
|
+ checkNotClosed();
|
|
|
final Path path = qualify(f);
|
|
|
String key = pathToKey(path);
|
|
|
FileStatus status = null;
|
|
@@ -871,7 +873,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
Path dst = qualify(dest);
|
|
|
|
|
|
LOG.debug("Rename path {} to {}", src, dst);
|
|
|
- incrementStatistic(INVOCATION_RENAME);
|
|
|
+ entryPoint(INVOCATION_RENAME);
|
|
|
|
|
|
String srcKey = pathToKey(src);
|
|
|
String dstKey = pathToKey(dst);
|
|
@@ -1097,6 +1099,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
metadataStore = ms;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Entry point to an operation.
|
|
|
+ * Increments the statistic; verifies the FS is active.
|
|
|
+ * @param operation The operation to increment
|
|
|
+ * @throws IOException if the
|
|
|
+ */
|
|
|
+ protected void entryPoint(Statistic operation) throws IOException {
|
|
|
+ checkNotClosed();
|
|
|
+ incrementStatistic(operation);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Increment a statistic by 1.
|
|
|
* @param statistic The operation to increment
|
|
@@ -1660,6 +1673,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
@Retries.RetryTranslated
|
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
try {
|
|
|
+ checkNotClosed();
|
|
|
return innerDelete(innerGetFileStatus(f, true), recursive);
|
|
|
} catch (FileNotFoundException e) {
|
|
|
LOG.debug("Couldn't delete {} - does not exist", f);
|
|
@@ -1838,7 +1852,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
Path path = qualify(f);
|
|
|
String key = pathToKey(path);
|
|
|
LOG.debug("List status for path: {}", path);
|
|
|
- incrementStatistic(INVOCATION_LIST_STATUS);
|
|
|
+ entryPoint(INVOCATION_LIST_STATUS);
|
|
|
|
|
|
List<FileStatus> result;
|
|
|
final FileStatus fileStatus = getFileStatus(path);
|
|
@@ -1981,7 +1995,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
throws IOException, FileAlreadyExistsException, AmazonClientException {
|
|
|
Path f = qualify(p);
|
|
|
LOG.debug("Making directory: {}", f);
|
|
|
- incrementStatistic(INVOCATION_MKDIRS);
|
|
|
+ entryPoint(INVOCATION_MKDIRS);
|
|
|
FileStatus fileStatus;
|
|
|
List<Path> metadataStoreDirs = null;
|
|
|
if (hasMetadataStore()) {
|
|
@@ -2058,7 +2072,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
@Retries.RetryTranslated
|
|
|
S3AFileStatus innerGetFileStatus(final Path f,
|
|
|
boolean needEmptyDirectoryFlag) throws IOException {
|
|
|
- incrementStatistic(INVOCATION_GET_FILE_STATUS);
|
|
|
+ entryPoint(INVOCATION_GET_FILE_STATUS);
|
|
|
final Path path = qualify(f);
|
|
|
String key = pathToKey(path);
|
|
|
LOG.debug("Getting path status for {} ({})", path, key);
|
|
@@ -2319,7 +2333,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
|
|
|
Path src, Path dst)
|
|
|
throws IOException, FileAlreadyExistsException, AmazonClientException {
|
|
|
- incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
|
|
|
+ entryPoint(INVOCATION_COPY_FROM_LOCAL_FILE);
|
|
|
LOG.debug("Copying local file from {} to {}", src, dst);
|
|
|
|
|
|
// Since we have a local file, we don't need to stream into a temporary file
|
|
@@ -2418,6 +2432,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
// already closed
|
|
|
return;
|
|
|
}
|
|
|
+ isClosed = true;
|
|
|
+ LOG.debug("Filesystem {} is closed", uri);
|
|
|
try {
|
|
|
super.close();
|
|
|
} finally {
|
|
@@ -2434,6 +2450,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Verify that the input stream is open. Non blocking; this gives
|
|
|
+ * the last state of the volatile {@link #closed} field.
|
|
|
+ * @throws IOException if the connection is closed.
|
|
|
+ */
|
|
|
+ private void checkNotClosed() throws IOException {
|
|
|
+ if (isClosed) {
|
|
|
+ throw new IOException(uri + ": " + E_FS_CLOSED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Override getCanonicalServiceName because we don't support token in S3A.
|
|
|
*/
|
|
@@ -2860,7 +2887,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
*/
|
|
|
@Override
|
|
|
public FileStatus[] globStatus(Path pathPattern) throws IOException {
|
|
|
- incrementStatistic(INVOCATION_GLOB_STATUS);
|
|
|
+ entryPoint(INVOCATION_GLOB_STATUS);
|
|
|
return super.globStatus(pathPattern);
|
|
|
}
|
|
|
|
|
@@ -2871,7 +2898,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
@Override
|
|
|
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
|
|
|
throws IOException {
|
|
|
- incrementStatistic(INVOCATION_GLOB_STATUS);
|
|
|
+ entryPoint(INVOCATION_GLOB_STATUS);
|
|
|
return super.globStatus(pathPattern, filter);
|
|
|
}
|
|
|
|
|
@@ -2881,7 +2908,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
*/
|
|
|
@Override
|
|
|
public boolean exists(Path f) throws IOException {
|
|
|
- incrementStatistic(INVOCATION_EXISTS);
|
|
|
+ entryPoint(INVOCATION_EXISTS);
|
|
|
return super.exists(f);
|
|
|
}
|
|
|
|
|
@@ -2892,7 +2919,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
@Override
|
|
|
@SuppressWarnings("deprecation")
|
|
|
public boolean isDirectory(Path f) throws IOException {
|
|
|
- incrementStatistic(INVOCATION_IS_DIRECTORY);
|
|
|
+ entryPoint(INVOCATION_IS_DIRECTORY);
|
|
|
return super.isDirectory(f);
|
|
|
}
|
|
|
|
|
@@ -2903,7 +2930,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
@Override
|
|
|
@SuppressWarnings("deprecation")
|
|
|
public boolean isFile(Path f) throws IOException {
|
|
|
- incrementStatistic(INVOCATION_IS_FILE);
|
|
|
+ entryPoint(INVOCATION_IS_FILE);
|
|
|
return super.isFile(f);
|
|
|
}
|
|
|
|
|
@@ -2948,7 +2975,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
|
|
|
private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
|
|
|
recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
|
|
|
- incrementStatistic(INVOCATION_LIST_FILES);
|
|
|
+ entryPoint(INVOCATION_LIST_FILES);
|
|
|
Path path = qualify(f);
|
|
|
LOG.debug("listFiles({}, {})", path, recursive);
|
|
|
try {
|
|
@@ -3033,7 +3060,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
|
|
|
final PathFilter filter)
|
|
|
throws FileNotFoundException, IOException {
|
|
|
- incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
|
|
|
+ entryPoint(INVOCATION_LIST_LOCATED_STATUS);
|
|
|
Path path = qualify(f);
|
|
|
LOG.debug("listLocatedStatus({}, {}", path, filter);
|
|
|
try {
|