|
@@ -31,6 +31,7 @@ import java.util.Date;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Locale;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
@@ -81,6 +82,9 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
|
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
|
|
|
|
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
|
|
|
|
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -95,6 +99,7 @@ import org.apache.hadoop.fs.PathIOException;
|
|
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
|
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
import org.apache.hadoop.fs.StorageStatistics;
|
|
import org.apache.hadoop.fs.StorageStatistics;
|
|
|
|
+import org.apache.hadoop.fs.StreamCapabilities;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
|
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
|
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
|
|
@@ -131,7 +136,7 @@ import org.slf4j.LoggerFactory;
|
|
*/
|
|
*/
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
@InterfaceStability.Evolving
|
|
@InterfaceStability.Evolving
|
|
-public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
+public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
/**
|
|
/**
|
|
* Default blocksize as used in blocksize and FS status queries.
|
|
* Default blocksize as used in blocksize and FS status queries.
|
|
*/
|
|
*/
|
|
@@ -170,6 +175,11 @@ public class S3AFileSystem extends FileSystem {
|
|
private int blockOutputActiveBlocks;
|
|
private int blockOutputActiveBlocks;
|
|
private boolean useListV1;
|
|
private boolean useListV1;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Directory policy.
|
|
|
|
+ */
|
|
|
|
+ private DirectoryPolicy directoryPolicy;
|
|
|
|
+
|
|
/** Add any deprecated keys. */
|
|
/** Add any deprecated keys. */
|
|
@SuppressWarnings("deprecation")
|
|
@SuppressWarnings("deprecation")
|
|
private static void addDeprecatedKeys() {
|
|
private static void addDeprecatedKeys() {
|
|
@@ -304,6 +314,9 @@ public class S3AFileSystem extends FileSystem {
|
|
LOG.debug("Using metadata store {}, authoritative={}",
|
|
LOG.debug("Using metadata store {}, authoritative={}",
|
|
getMetadataStore(), allowAuthoritative);
|
|
getMetadataStore(), allowAuthoritative);
|
|
}
|
|
}
|
|
|
|
+ // directory policy, which may look at authoritative paths
|
|
|
|
+ directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf);
|
|
|
|
+ LOG.debug("Directory marker retention policy is {}", directoryPolicy);
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|
|
throw translateException("initializing ", new Path(name), e);
|
|
throw translateException("initializing ", new Path(name), e);
|
|
}
|
|
}
|
|
@@ -419,6 +432,19 @@ public class S3AFileSystem extends FileSystem {
|
|
return s3;
|
|
return s3;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Returns the S3 client used by this filesystem.
|
|
|
|
+ * <i>Warning: this must only be used for testing, as it bypasses core
|
|
|
|
+ * S3A operations. </i>
|
|
|
|
+ * @param reason a justification for requesting access.
|
|
|
|
+ * @return AmazonS3Client
|
|
|
|
+ */
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public AmazonS3 getAmazonS3ClientForTesting(String reason) {
|
|
|
|
+ LOG.warn("Access to S3A client requested, reason {}", reason);
|
|
|
|
+ return s3;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Get the region of a bucket.
|
|
* Get the region of a bucket.
|
|
* @return the region in which a bucket is located
|
|
* @return the region in which a bucket is located
|
|
@@ -838,6 +864,10 @@ public class S3AFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
// TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
|
|
// TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
|
|
// TODO S3Guard: performance: mark destination dirs as authoritative
|
|
// TODO S3Guard: performance: mark destination dirs as authoritative
|
|
|
|
+ // The path to whichever file or directory is created by the
|
|
|
|
+ // rename. When deleting markers all parents of
|
|
|
|
+ // this path will need their markers pruned.
|
|
|
|
+ Path destCreated = dst;
|
|
|
|
|
|
// Ok! Time to start
|
|
// Ok! Time to start
|
|
if (srcStatus.isFile()) {
|
|
if (srcStatus.isFile()) {
|
|
@@ -851,9 +881,11 @@ public class S3AFileSystem extends FileSystem {
|
|
String filename =
|
|
String filename =
|
|
srcKey.substring(pathToKey(src.getParent()).length()+1);
|
|
srcKey.substring(pathToKey(src.getParent()).length()+1);
|
|
newDstKey = newDstKey + filename;
|
|
newDstKey = newDstKey + filename;
|
|
|
|
+ destCreated = keyToQualifiedPath(newDstKey);
|
|
|
|
+
|
|
copyFile(srcKey, newDstKey, length);
|
|
copyFile(srcKey, newDstKey, length);
|
|
S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
|
|
S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
|
|
- keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
|
|
|
|
|
|
+ destCreated, length, getDefaultBlockSize(dst),
|
|
username);
|
|
username);
|
|
} else {
|
|
} else {
|
|
copyFile(srcKey, dstKey, srcStatus.getLen());
|
|
copyFile(srcKey, dstKey, srcStatus.getLen());
|
|
@@ -940,9 +972,10 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
metadataStore.move(srcPaths, dstMetas);
|
|
metadataStore.move(srcPaths, dstMetas);
|
|
|
|
|
|
- if (src.getParent() != dst.getParent()) {
|
|
|
|
- deleteUnnecessaryFakeDirectories(dst.getParent());
|
|
|
|
- createFakeDirectoryIfNecessary(src.getParent());
|
|
|
|
|
|
+ if (!src.getParent().equals(destCreated.getParent())) {
|
|
|
|
+ LOG.debug("source & dest parents are different; fix up dir markers");
|
|
|
|
+ deleteUnnecessaryFakeDirectories(destCreated.getParent());
|
|
|
|
+ maybeCreateFakeParentDirectory(src);
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -1555,6 +1588,21 @@ public class S3AFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Create a fake parent directory if required.
|
|
|
|
+ * That is: it parent is not the root path and does not yet exist.
|
|
|
|
+ * @param path whose parent is created if needed.
|
|
|
|
+ * @throws IOException IO problem
|
|
|
|
+ * @throws AmazonClientException untranslated AWS client problem
|
|
|
|
+ */
|
|
|
|
+ void maybeCreateFakeParentDirectory(Path path)
|
|
|
|
+ throws IOException, AmazonClientException {
|
|
|
|
+ Path parent = path.getParent();
|
|
|
|
+ if (parent != null) {
|
|
|
|
+ createFakeDirectoryIfNecessary(parent);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* List the statuses of the files/directories in the given path if the path is
|
|
* List the statuses of the files/directories in the given path if the path is
|
|
* a directory.
|
|
* a directory.
|
|
@@ -1822,6 +1870,8 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
FileStatus msStatus = pm.getFileStatus();
|
|
FileStatus msStatus = pm.getFileStatus();
|
|
if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
|
|
if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
|
|
|
|
+ // the caller needs to know if a directory is empty,
|
|
|
|
+ // and that this is a directory.
|
|
if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
|
|
if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
|
|
// We have a definitive true / false from MetadataStore, we are done.
|
|
// We have a definitive true / false from MetadataStore, we are done.
|
|
return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
|
|
return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
|
|
@@ -1830,28 +1880,33 @@ public class S3AFileSystem extends FileSystem {
|
|
if (children != null) {
|
|
if (children != null) {
|
|
tombstones = children.listTombstones();
|
|
tombstones = children.listTombstones();
|
|
}
|
|
}
|
|
- LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
|
|
|
|
|
|
+ LOG.debug("MetadataStore doesn't know if {} is empty, using S3.",
|
|
|
|
+ path);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
// Either this is not a directory, or we don't care if it is empty
|
|
// Either this is not a directory, or we don't care if it is empty
|
|
return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
|
|
return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
|
|
}
|
|
}
|
|
|
|
|
|
- // If the metadata store has no children for it and it's not listed in
|
|
|
|
- // S3 yet, we'll assume the empty directory is true;
|
|
|
|
- S3AFileStatus s3FileStatus;
|
|
|
|
|
|
+ // now issue the S3 getFileStatus call.
|
|
try {
|
|
try {
|
|
- s3FileStatus = s3GetFileStatus(path, key, tombstones);
|
|
|
|
|
|
+ S3AFileStatus s3FileStatus = s3GetFileStatus(path, key,
|
|
|
|
+ StatusProbeEnum.ALL,
|
|
|
|
+ tombstones,
|
|
|
|
+ true);
|
|
|
|
+ // entry was found, so save in S3Guard and return the final value.
|
|
|
|
+ return S3Guard.putAndReturn(metadataStore, s3FileStatus,
|
|
|
|
+ instrumentation);
|
|
} catch (FileNotFoundException e) {
|
|
} catch (FileNotFoundException e) {
|
|
return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
|
|
return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
|
|
}
|
|
}
|
|
- // entry was found, save in S3Guard
|
|
|
|
- return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
|
|
|
|
} else {
|
|
} else {
|
|
// there was no entry in S3Guard
|
|
// there was no entry in S3Guard
|
|
// retrieve the data and update the metadata store in the process.
|
|
// retrieve the data and update the metadata store in the process.
|
|
return S3Guard.putAndReturn(metadataStore,
|
|
return S3Guard.putAndReturn(metadataStore,
|
|
- s3GetFileStatus(path, key, tombstones), instrumentation);
|
|
|
|
|
|
+ s3GetFileStatus(path, key, StatusProbeEnum.ALL,
|
|
|
|
+ tombstones, needEmptyDirectoryFlag),
|
|
|
|
+ instrumentation);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1861,86 +1916,94 @@ public class S3AFileSystem extends FileSystem {
|
|
* and for direct management of empty directory blobs.
|
|
* and for direct management of empty directory blobs.
|
|
* @param path Qualified path
|
|
* @param path Qualified path
|
|
* @param key Key string for the path
|
|
* @param key Key string for the path
|
|
|
|
+ * @param probes probes to make
|
|
|
|
+ * @param tombstones tombstones to filter
|
|
|
|
+ * @param needEmptyDirectoryFlag if true, implementation will calculate
|
|
|
|
+ * a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
|
|
* @return Status
|
|
* @return Status
|
|
- * @throws FileNotFoundException when the path does not exist
|
|
|
|
|
|
+ * @throws FileNotFoundException the supplied probes failed.
|
|
* @throws IOException on other problems.
|
|
* @throws IOException on other problems.
|
|
*/
|
|
*/
|
|
- private S3AFileStatus s3GetFileStatus(final Path path, String key,
|
|
|
|
- Set<Path> tombstones) throws IOException {
|
|
|
|
- if (!key.isEmpty()) {
|
|
|
|
|
|
+ private S3AFileStatus s3GetFileStatus(final Path path,
|
|
|
|
+ final String key,
|
|
|
|
+ final Set<StatusProbeEnum> probes,
|
|
|
|
+ final Set<Path> tombstones,
|
|
|
|
+ final boolean needEmptyDirectoryFlag) throws IOException {
|
|
|
|
+ LOG.debug("S3GetFileStatus {}", path);
|
|
|
|
+ Preconditions.checkArgument(!needEmptyDirectoryFlag
|
|
|
|
+ || probes.contains(StatusProbeEnum.List),
|
|
|
|
+ "s3GetFileStatus(%s) wants to know if a directory is empty but"
|
|
|
|
+ + " does not request a list probe", path);
|
|
|
|
+
|
|
|
|
+ if (!key.isEmpty() && !key.endsWith("/")
|
|
|
|
+ && probes.contains(StatusProbeEnum.Head)) {
|
|
try {
|
|
try {
|
|
|
|
+ // look for the simple file
|
|
ObjectMetadata meta = getObjectMetadata(key);
|
|
ObjectMetadata meta = getObjectMetadata(key);
|
|
-
|
|
|
|
- if (objectRepresentsDirectory(key, meta.getContentLength())) {
|
|
|
|
- LOG.debug("Found exact file: fake directory");
|
|
|
|
- return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
|
|
- } else {
|
|
|
|
- LOG.debug("Found exact file: normal file");
|
|
|
|
|
|
+ LOG.debug("Found exact file: normal file {}", key);
|
|
return new S3AFileStatus(meta.getContentLength(),
|
|
return new S3AFileStatus(meta.getContentLength(),
|
|
dateToLong(meta.getLastModified()),
|
|
dateToLong(meta.getLastModified()),
|
|
path,
|
|
path,
|
|
getDefaultBlockSize(path),
|
|
getDefaultBlockSize(path),
|
|
username);
|
|
username);
|
|
- }
|
|
|
|
} catch (AmazonServiceException e) {
|
|
} catch (AmazonServiceException e) {
|
|
|
|
+ // if the response is a 404 error, it just means that there is
|
|
|
|
+ // no file at that path...the remaining checks will be needed.
|
|
if (e.getStatusCode() != 404) {
|
|
if (e.getStatusCode() != 404) {
|
|
throw translateException("getFileStatus", path, e);
|
|
throw translateException("getFileStatus", path, e);
|
|
}
|
|
}
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|
|
throw translateException("getFileStatus", path, e);
|
|
throw translateException("getFileStatus", path, e);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- // Necessary?
|
|
|
|
- if (!key.endsWith("/")) {
|
|
|
|
- String newKey = key + "/";
|
|
|
|
|
|
+ // execute the list
|
|
|
|
+ if (probes.contains(StatusProbeEnum.List)) {
|
|
try {
|
|
try {
|
|
- ObjectMetadata meta = getObjectMetadata(newKey);
|
|
|
|
-
|
|
|
|
- if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
|
|
|
|
- LOG.debug("Found file (with /): fake directory");
|
|
|
|
- return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
|
|
|
|
+ // this will find a marker dir / as well as an entry.
|
|
|
|
+ // When making a simple "is this a dir check" all is good.
|
|
|
|
+ // but when looking for an empty dir, we need to verify there are no
|
|
|
|
+ // children, so ask for two entries, so as to find
|
|
|
|
+ // a child
|
|
|
|
+ String dirKey = maybeAddTrailingSlash(key);
|
|
|
|
+ // list size is dir marker + at least one non-tombstone entry
|
|
|
|
+ // there's a corner case: more tombstones than you have in a
|
|
|
|
+ // single page list. We assume that if you have been deleting
|
|
|
|
+ // that many files, then the AWS listing will have purged some
|
|
|
|
+ // by the time of listing so that the response includes some
|
|
|
|
+ // which have not.
|
|
|
|
+
|
|
|
|
+ int listSize;
|
|
|
|
+ if (tombstones == null) {
|
|
|
|
+ // no tombstones so look for a marker and at least one child.
|
|
|
|
+ listSize = 2;
|
|
} else {
|
|
} else {
|
|
- LOG.warn("Found file (with /): real file? should not happen: {}",
|
|
|
|
- key);
|
|
|
|
-
|
|
|
|
- return new S3AFileStatus(meta.getContentLength(),
|
|
|
|
- dateToLong(meta.getLastModified()),
|
|
|
|
- path,
|
|
|
|
- getDefaultBlockSize(path),
|
|
|
|
- username);
|
|
|
|
- }
|
|
|
|
- } catch (AmazonServiceException e) {
|
|
|
|
- if (e.getStatusCode() != 404) {
|
|
|
|
- throw translateException("getFileStatus", newKey, e);
|
|
|
|
- }
|
|
|
|
- } catch (AmazonClientException e) {
|
|
|
|
- throw translateException("getFileStatus", newKey, e);
|
|
|
|
- }
|
|
|
|
|
|
+ // build a listing > tombstones. If the caller has many thousands
|
|
|
|
+ // of tombstones this won't work properly, which is why pruning
|
|
|
|
+ // of expired tombstones matters.
|
|
|
|
+ listSize = Math.min(2 + tombstones.size(), Math.max(2, maxKeys));
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- key = maybeAddTrailingSlash(key);
|
|
|
|
- S3ListRequest request = createListObjectsRequest(key, "/", 1);
|
|
|
|
|
|
+ S3ListRequest request = createListObjectsRequest(dirKey, "/",
|
|
|
|
+ listSize);
|
|
|
|
+ // execute the request
|
|
|
|
+ S3ListResult listResult = listObjects(request);
|
|
|
|
|
|
- S3ListResult objects = listObjects(request);
|
|
|
|
|
|
|
|
- Collection<String> prefixes = objects.getCommonPrefixes();
|
|
|
|
- Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
|
|
|
|
- if (!isEmptyOfKeys(prefixes, tombstones) ||
|
|
|
|
- !isEmptyOfObjects(summaries, tombstones)) {
|
|
|
|
|
|
+ if (listResult.hasPrefixesOrObjects(this::keyToPath, tombstones)) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- LOG.debug("Found path as directory (with /): {}/{}",
|
|
|
|
- prefixes.size(), summaries.size());
|
|
|
|
-
|
|
|
|
- for (S3ObjectSummary summary : summaries) {
|
|
|
|
- LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
|
|
|
|
|
|
+ LOG.debug("Found path as directory (with /)");
|
|
|
|
+ listResult.logAtDebug(LOG);
|
|
}
|
|
}
|
|
- for (String prefix : prefixes) {
|
|
|
|
- LOG.debug("Prefix: {}", prefix);
|
|
|
|
|
|
+ // At least one entry has been found.
|
|
|
|
+ // If looking for an empty directory, the marker must exist but no children.
|
|
|
|
+ // So the listing must contain the marker entry only.
|
|
|
|
+ if (needEmptyDirectoryFlag
|
|
|
|
+ && listResult.representsEmptyDirectory(
|
|
|
|
+ this::keyToPath, dirKey, tombstones)) {
|
|
|
|
+ return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ // either an empty directory is not needed, or the
|
|
|
|
+ // listing does not meet the requirements.
|
|
return new S3AFileStatus(Tristate.FALSE, path, username);
|
|
return new S3AFileStatus(Tristate.FALSE, path, username);
|
|
} else if (key.isEmpty()) {
|
|
} else if (key.isEmpty()) {
|
|
LOG.debug("Found root directory");
|
|
LOG.debug("Found root directory");
|
|
@@ -1953,53 +2016,12 @@ public class S3AFileSystem extends FileSystem {
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|
|
throw translateException("getFileStatus", key, e);
|
|
throw translateException("getFileStatus", key, e);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
LOG.debug("Not Found: {}", path);
|
|
LOG.debug("Not Found: {}", path);
|
|
throw new FileNotFoundException("No such file or directory: " + path);
|
|
throw new FileNotFoundException("No such file or directory: " + path);
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Helper function to determine if a collection of paths is empty
|
|
|
|
- * after accounting for tombstone markers (if provided).
|
|
|
|
- * @param keys Collection of path (prefixes / directories or keys).
|
|
|
|
- * @param tombstones Set of tombstone markers, or null if not applicable.
|
|
|
|
- * @return false if summaries contains objects not accounted for by
|
|
|
|
- * tombstones.
|
|
|
|
- */
|
|
|
|
- private boolean isEmptyOfKeys(Collection<String> keys, Set<Path>
|
|
|
|
- tombstones) {
|
|
|
|
- if (tombstones == null) {
|
|
|
|
- return keys.isEmpty();
|
|
|
|
- }
|
|
|
|
- for (String key : keys) {
|
|
|
|
- Path qualified = keyToQualifiedPath(key);
|
|
|
|
- if (!tombstones.contains(qualified)) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Helper function to determine if a collection of object summaries is empty
|
|
|
|
- * after accounting for tombstone markers (if provided).
|
|
|
|
- * @param summaries Collection of objects as returned by listObjects.
|
|
|
|
- * @param tombstones Set of tombstone markers, or null if not applicable.
|
|
|
|
- * @return false if summaries contains objects not accounted for by
|
|
|
|
- * tombstones.
|
|
|
|
- */
|
|
|
|
- private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries,
|
|
|
|
- Set<Path> tombstones) {
|
|
|
|
- if (tombstones == null) {
|
|
|
|
- return summaries.isEmpty();
|
|
|
|
- }
|
|
|
|
- Collection<String> stringCollection = new ArrayList<>(summaries.size());
|
|
|
|
- for (S3ObjectSummary summary : summaries) {
|
|
|
|
- stringCollection.add(summary.getKey());
|
|
|
|
- }
|
|
|
|
- return isEmptyOfKeys(stringCollection, tombstones);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
|
|
* Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
|
|
* S3Guard MetadataStore, if any, will be skipped.
|
|
* S3Guard MetadataStore, if any, will be skipped.
|
|
@@ -2009,7 +2031,8 @@ public class S3AFileSystem extends FileSystem {
|
|
Path path = qualify(f);
|
|
Path path = qualify(f);
|
|
String key = pathToKey(path);
|
|
String key = pathToKey(path);
|
|
try {
|
|
try {
|
|
- s3GetFileStatus(path, key, null);
|
|
|
|
|
|
+ s3GetFileStatus(path, key, StatusProbeEnum.ALL,
|
|
|
|
+ null, false);
|
|
return true;
|
|
return true;
|
|
} catch (FileNotFoundException e) {
|
|
} catch (FileNotFoundException e) {
|
|
return false;
|
|
return false;
|
|
@@ -2463,6 +2486,14 @@ public class S3AFileSystem extends FileSystem {
|
|
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
|
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the directory marker policy of this filesystem.
|
|
|
|
+ * @return the marker policy.
|
|
|
|
+ */
|
|
|
|
+ public DirectoryPolicy getDirectoryMarkerPolicy() {
|
|
|
|
+ return directoryPolicy;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public String toString() {
|
|
public String toString() {
|
|
final StringBuilder sb = new StringBuilder(
|
|
final StringBuilder sb = new StringBuilder(
|
|
@@ -2494,6 +2525,7 @@ public class S3AFileSystem extends FileSystem {
|
|
sb.append(", useListV1=").append(useListV1);
|
|
sb.append(", useListV1=").append(useListV1);
|
|
sb.append(", boundedExecutor=").append(boundedThreadPool);
|
|
sb.append(", boundedExecutor=").append(boundedThreadPool);
|
|
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
|
|
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
|
|
|
|
+ sb.append(", ").append(directoryPolicy);
|
|
sb.append(", statistics {")
|
|
sb.append(", statistics {")
|
|
.append(statistics)
|
|
.append(statistics)
|
|
.append("}");
|
|
.append("}");
|
|
@@ -2957,4 +2989,48 @@ public class S3AFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * This Hadoop version does not support PathCapabilities.
|
|
|
|
+ * By implementing the method on its own, code which
|
|
|
|
+ * introspects to find the method can still probe for
|
|
|
|
+ * the capabilities of the store.
|
|
|
|
+ * @param path path (unused)
|
|
|
|
+ * @param capability capability to probe for.
|
|
|
|
+ * @return true if the FS has the specific capability.
|
|
|
|
+ * @throws IOException failure
|
|
|
|
+ */
|
|
|
|
+ public boolean hasPathCapability(final Path path, final String capability)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return hasCapability(capability);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Return the capabilities of this filesystem instance.
|
|
|
|
+ * @param capability string to query the stream support for.
|
|
|
|
+ * @return whether the FS instance has the capability.
|
|
|
|
+ */
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
|
+ @Override
|
|
|
|
+ public boolean hasCapability(String capability) {
|
|
|
|
+
|
|
|
|
+ final String cap = capability.toLowerCase(Locale.ENGLISH);
|
|
|
|
+ switch (cap) {
|
|
|
|
+
|
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
|
|
|
|
+ return true;
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Marker policy capabilities are handed off.
|
|
|
|
+ */
|
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
|
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
|
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
|
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
|
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
|
|
|
|
+ return getDirectoryMarkerPolicy().hasPathCapability(new Path("/"), cap);
|
|
|
|
+
|
|
|
|
+ default:
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|