|
@@ -67,7 +67,7 @@ import com.amazonaws.services.s3.model.MultipartUpload;
|
|
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
|
|
-import com.amazonaws.services.s3.model.S3ObjectSummary;
|
|
|
+
|
|
|
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
|
|
|
import com.amazonaws.services.s3.model.SSECustomerKey;
|
|
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
|
@@ -104,6 +104,8 @@ import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
|
|
|
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
|
|
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
|
|
@@ -116,6 +118,8 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
|
|
import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatisticsImpl;
|
|
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
|
|
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
|
|
|
+import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
|
|
|
+import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.security.token.DelegationTokenIssuer;
|
|
@@ -295,6 +299,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
private final ListingOperationCallbacks listingOperationCallbacks =
|
|
|
new ListingOperationCallbacksImpl();
|
|
|
+ /**
|
|
|
+ * Directory policy.
|
|
|
+ */
|
|
|
+ private DirectoryPolicy directoryPolicy;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Context accessors for re-use.
|
|
|
+ */
|
|
|
+ private final ContextAccessors contextAccessors = new ContextAccessorsImpl();
|
|
|
|
|
|
/** Add any deprecated keys. */
|
|
|
@SuppressWarnings("deprecation")
|
|
@@ -452,6 +465,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
DEFAULT_S3GUARD_DISABLED_WARN_LEVEL);
|
|
|
S3Guard.logS3GuardDisabled(LOG, warnLevel, bucket);
|
|
|
}
|
|
|
+ // directory policy, which may look at authoritative paths
|
|
|
+ directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf,
|
|
|
+ this::allowAuthoritative);
|
|
|
+ LOG.debug("Directory marker retention policy is {}", directoryPolicy);
|
|
|
|
|
|
initMultipartUploads(conf);
|
|
|
|
|
@@ -1285,7 +1302,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* is not a directory.
|
|
|
*/
|
|
|
@Override
|
|
|
- public FSDataOutputStream createNonRecursive(Path path,
|
|
|
+ public FSDataOutputStream createNonRecursive(Path p,
|
|
|
FsPermission permission,
|
|
|
EnumSet<CreateFlag> flags,
|
|
|
int bufferSize,
|
|
@@ -1293,10 +1310,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
long blockSize,
|
|
|
Progressable progress) throws IOException {
|
|
|
entryPoint(INVOCATION_CREATE_NON_RECURSIVE);
|
|
|
+ final Path path = makeQualified(p);
|
|
|
Path parent = path.getParent();
|
|
|
- if (parent != null) {
|
|
|
- // expect this to raise an exception if there is no parent
|
|
|
- if (!getFileStatus(parent).isDirectory()) {
|
|
|
+ // expect this to raise an exception if there is no parent dir
|
|
|
+ if (parent != null && !parent.isRoot()) {
|
|
|
+ S3AFileStatus status;
|
|
|
+ try {
|
|
|
+ // optimize for the directory existing: Call list first
|
|
|
+ status = innerGetFileStatus(parent, false,
|
|
|
+ StatusProbeEnum.DIRECTORIES);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ // no dir, fall back to looking for a file
|
|
|
+ // (failure condition if true)
|
|
|
+ status = innerGetFileStatus(parent, false,
|
|
|
+ StatusProbeEnum.HEAD_ONLY);
|
|
|
+ }
|
|
|
+ if (!status.isDirectory()) {
|
|
|
throw new FileAlreadyExistsException("Not a directory: " + parent);
|
|
|
}
|
|
|
}
|
|
@@ -1431,10 +1460,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("rename: destination path {} not found", dst);
|
|
|
// Parent must exist
|
|
|
Path parent = dst.getParent();
|
|
|
- if (!pathToKey(parent).isEmpty()) {
|
|
|
+ if (!pathToKey(parent).isEmpty()
|
|
|
+ && !parent.equals(src.getParent())) {
|
|
|
try {
|
|
|
- S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(),
|
|
|
- false, StatusProbeEnum.ALL);
|
|
|
+ // only look against S3 for directories; saves
|
|
|
+ // a HEAD request on all normal operations.
|
|
|
+ S3AFileStatus dstParentStatus = innerGetFileStatus(parent,
|
|
|
+ false, StatusProbeEnum.DIRECTORIES);
|
|
|
if (!dstParentStatus.isDirectory()) {
|
|
|
throw new RenameFailedException(src, dst,
|
|
|
"destination parent is not a directory");
|
|
@@ -1535,7 +1567,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final boolean isFile,
|
|
|
final BulkOperationState operationState)
|
|
|
throws IOException {
|
|
|
- once("delete", key, () ->
|
|
|
+ once("delete", path.toString(), () ->
|
|
|
S3AFileSystem.this.deleteObjectAtPath(path, key, isFile,
|
|
|
operationState));
|
|
|
}
|
|
@@ -1585,7 +1617,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
Path destParent = destCreated.getParent();
|
|
|
if (!sourceRenamed.getParent().equals(destParent)) {
|
|
|
LOG.debug("source & dest parents are different; fix up dir markers");
|
|
|
- deleteUnnecessaryFakeDirectories(destParent);
|
|
|
+ if (!keepDirectoryMarkers(destParent)) {
|
|
|
+ deleteUnnecessaryFakeDirectories(destParent, null);
|
|
|
+ }
|
|
|
maybeCreateFakeParentDirectory(sourceRenamed);
|
|
|
}
|
|
|
}
|
|
@@ -1940,6 +1974,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
protected S3ListResult listObjects(S3ListRequest request) throws IOException {
|
|
|
incrementReadOperations();
|
|
|
incrementStatistic(OBJECT_LIST_REQUESTS);
|
|
|
+ LOG.debug("LIST {}", request);
|
|
|
validateListArguments(request);
|
|
|
try(DurationInfo ignored =
|
|
|
new DurationInfo(LOG, false, "LIST")) {
|
|
@@ -2381,6 +2416,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
boolean quiet)
|
|
|
throws MultiObjectDeleteException, AmazonClientException,
|
|
|
IOException {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Initiating delete operation for {} objects",
|
|
|
+ keysToDelete.size());
|
|
|
+ for (DeleteObjectsRequest.KeyVersion key : keysToDelete) {
|
|
|
+ LOG.debug(" {} {}", key.getKey(),
|
|
|
+ key.getVersion() != null ? key.getVersion() : "");
|
|
|
+ }
|
|
|
+ }
|
|
|
DeleteObjectsResult result = null;
|
|
|
if (keysToDelete.isEmpty()) {
|
|
|
// exit fast if there are no keys to delete
|
|
@@ -2490,7 +2533,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final boolean quiet)
|
|
|
throws MultiObjectDeleteException, AmazonClientException, IOException {
|
|
|
undeletedObjectsOnFailure.clear();
|
|
|
- try (DurationInfo ignored = new DurationInfo(LOG, false, "Deleting")) {
|
|
|
+ try (DurationInfo ignored = new DurationInfo(LOG, false,
|
|
|
+ "Deleting %d keys", keysToDelete.size())) {
|
|
|
return removeKeysS3(keysToDelete, deleteFakeDir, quiet);
|
|
|
} catch (MultiObjectDeleteException ex) {
|
|
|
LOG.debug("Partial delete failure");
|
|
@@ -2573,7 +2617,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// we only make the LIST call; the codepaths to get here should not
|
|
|
// be reached if there is an empty dir marker -and if they do, it
|
|
|
// is mostly harmless to create a new one.
|
|
|
- if (!key.isEmpty() && !s3Exists(f, EnumSet.of(StatusProbeEnum.List))) {
|
|
|
+ if (!key.isEmpty() && !s3Exists(f, StatusProbeEnum.DIRECTORIES)) {
|
|
|
LOG.debug("Creating new fake directory at {}", f);
|
|
|
createFakeDirectory(key);
|
|
|
}
|
|
@@ -2589,7 +2633,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
void maybeCreateFakeParentDirectory(Path path)
|
|
|
throws IOException, AmazonClientException {
|
|
|
Path parent = path.getParent();
|
|
|
- if (parent != null) {
|
|
|
+ if (parent != null && !parent.isRoot()) {
|
|
|
createFakeDirectoryIfNecessary(parent);
|
|
|
}
|
|
|
}
|
|
@@ -2618,7 +2662,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @throws IOException due to an IO problem.
|
|
|
* @throws AmazonClientException on failures inside the AWS SDK
|
|
|
*/
|
|
|
- public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
|
|
|
+ private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
|
|
|
IOException, AmazonClientException {
|
|
|
Path path = qualify(f);
|
|
|
String key = pathToKey(path);
|
|
@@ -2626,7 +2670,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
entryPoint(INVOCATION_LIST_STATUS);
|
|
|
|
|
|
List<S3AFileStatus> result;
|
|
|
- final FileStatus fileStatus = getFileStatus(path);
|
|
|
+ final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
|
|
|
+ StatusProbeEnum.ALL);
|
|
|
|
|
|
if (fileStatus.isDirectory()) {
|
|
|
if (!key.isEmpty()) {
|
|
@@ -2658,7 +2703,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
allowAuthoritative, ttlTimeProvider);
|
|
|
} else {
|
|
|
LOG.debug("Adding: rd (not a dir): {}", path);
|
|
|
- FileStatus[] stats = new FileStatus[1];
|
|
|
+ S3AFileStatus[] stats = new S3AFileStatus[1];
|
|
|
stats[0]= fileStatus;
|
|
|
return stats;
|
|
|
}
|
|
@@ -2769,9 +2814,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
public boolean mkdirs(Path path, FsPermission permission) throws IOException,
|
|
|
FileAlreadyExistsException {
|
|
|
try {
|
|
|
+ entryPoint(INVOCATION_MKDIRS);
|
|
|
return innerMkdirs(path, permission);
|
|
|
} catch (AmazonClientException e) {
|
|
|
- throw translateException("innerMkdirs", path, e);
|
|
|
+ throw translateException("mkdirs", path, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2791,11 +2837,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
throws IOException, FileAlreadyExistsException, AmazonClientException {
|
|
|
Path f = qualify(p);
|
|
|
LOG.debug("Making directory: {}", f);
|
|
|
- entryPoint(INVOCATION_MKDIRS);
|
|
|
+ if (p.isRoot()) {
|
|
|
+ // fast exit for root.
|
|
|
+ return true;
|
|
|
+ }
|
|
|
FileStatus fileStatus;
|
|
|
|
|
|
try {
|
|
|
- fileStatus = getFileStatus(f);
|
|
|
+ fileStatus = innerGetFileStatus(f, false,
|
|
|
+ StatusProbeEnum.ALL);
|
|
|
|
|
|
if (fileStatus.isDirectory()) {
|
|
|
return true;
|
|
@@ -2805,7 +2855,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
} catch (FileNotFoundException e) {
|
|
|
// Walk path to root, ensuring closest ancestor is a directory, not file
|
|
|
Path fPart = f.getParent();
|
|
|
- while (fPart != null) {
|
|
|
+ while (fPart != null && !fPart.isRoot()) {
|
|
|
try {
|
|
|
fileStatus = getFileStatus(fPart);
|
|
|
if (fileStatus.isDirectory()) {
|
|
@@ -2866,7 +2916,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final Set<StatusProbeEnum> probes) throws IOException {
|
|
|
final Path path = qualify(f);
|
|
|
String key = pathToKey(path);
|
|
|
- LOG.debug("Getting path status for {} ({})", path, key);
|
|
|
+ LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}",
|
|
|
+ path, key, needEmptyDirectoryFlag);
|
|
|
|
|
|
boolean allowAuthoritative = allowAuthoritative(path);
|
|
|
// Check MetadataStore, if any.
|
|
@@ -2877,9 +2928,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
Set<Path> tombstones = Collections.emptySet();
|
|
|
if (pm != null) {
|
|
|
+ S3AFileStatus msStatus = pm.getFileStatus();
|
|
|
if (pm.isDeleted()) {
|
|
|
OffsetDateTime deletedAt = OffsetDateTime.ofInstant(
|
|
|
- Instant.ofEpochMilli(pm.getFileStatus().getModificationTime()),
|
|
|
+ Instant.ofEpochMilli(msStatus.getModificationTime()),
|
|
|
ZoneOffset.UTC);
|
|
|
throw new FileNotFoundException("Path " + path + " is recorded as " +
|
|
|
"deleted by S3Guard at " + deletedAt);
|
|
@@ -2890,72 +2942,114 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// Skip going to s3 if the file checked is a directory. Because if the
|
|
|
// dest is also a directory, there's no difference.
|
|
|
|
|
|
- if (!pm.getFileStatus().isDirectory() &&
|
|
|
+ if (!msStatus.isDirectory() &&
|
|
|
!allowAuthoritative &&
|
|
|
probes.contains(StatusProbeEnum.Head)) {
|
|
|
// a file has been found in a non-auth path and the caller has not said
|
|
|
// they only care about directories
|
|
|
LOG.debug("Metadata for {} found in the non-auth metastore.", path);
|
|
|
- final long msModTime = pm.getFileStatus().getModificationTime();
|
|
|
-
|
|
|
- S3AFileStatus s3AFileStatus;
|
|
|
- try {
|
|
|
- s3AFileStatus = s3GetFileStatus(path, key, probes, tombstones);
|
|
|
- } catch (FileNotFoundException fne) {
|
|
|
- s3AFileStatus = null;
|
|
|
- }
|
|
|
- if (s3AFileStatus == null) {
|
|
|
- LOG.warn("Failed to find file {}. Either it is not yet visible, or "
|
|
|
- + "it has been deleted.", path);
|
|
|
- } else {
|
|
|
- final long s3ModTime = s3AFileStatus.getModificationTime();
|
|
|
-
|
|
|
- if(s3ModTime > msModTime) {
|
|
|
- LOG.debug("S3Guard metadata for {} is outdated;"
|
|
|
- + " s3modtime={}; msModTime={} updating metastore",
|
|
|
- path, s3ModTime, msModTime);
|
|
|
- return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
|
|
|
- ttlTimeProvider);
|
|
|
+ // If the timestamp of the pm is close to "now", we don't need to
|
|
|
+ // bother with a check of S3. that means:
|
|
|
+ // one of : status modtime is close to now,
|
|
|
+ // or pm.getLastUpdated() == now
|
|
|
+
|
|
|
+ // get the time in which a status modtime is considered valid
|
|
|
+ // in a non-auth metastore
|
|
|
+ long validTime =
|
|
|
+ ttlTimeProvider.getNow() - ttlTimeProvider.getMetadataTtl();
|
|
|
+ final long msModTime = msStatus.getModificationTime();
|
|
|
+
|
|
|
+ if (msModTime < validTime) {
|
|
|
+ LOG.debug("Metastore entry of {} is out of date, probing S3", path);
|
|
|
+ try {
|
|
|
+ S3AFileStatus s3AFileStatus = s3GetFileStatus(path,
|
|
|
+ key,
|
|
|
+ probes,
|
|
|
+ tombstones,
|
|
|
+ needEmptyDirectoryFlag);
|
|
|
+ // if the new status is more current than that in the metastore,
|
|
|
+ // it means S3 has changed and the store needs updating
|
|
|
+ final long s3ModTime = s3AFileStatus.getModificationTime();
|
|
|
+
|
|
|
+ if (s3ModTime > msModTime) {
|
|
|
+ // there's new data in S3
|
|
|
+ LOG.debug("S3Guard metadata for {} is outdated;"
|
|
|
+ + " s3modtime={}; msModTime={} updating metastore",
|
|
|
+ path, s3ModTime, msModTime);
|
|
|
+ // add to S3Guard
|
|
|
+ S3Guard.putAndReturn(metadataStore, s3AFileStatus,
|
|
|
+ ttlTimeProvider);
|
|
|
+ } else {
|
|
|
+ // the modtime of the data is the same as/older than the s3guard
|
|
|
+ // value either an old object has been found, or the existing one
|
|
|
+ // was retrieved in both cases -refresh the S3Guard entry so the
|
|
|
+ // record's TTL is updated.
|
|
|
+ S3Guard.refreshEntry(metadataStore, pm, s3AFileStatus,
|
|
|
+ ttlTimeProvider);
|
|
|
+ }
|
|
|
+ // return the value
|
|
|
+ // note that the checks for empty dir status below can be skipped
|
|
|
+ // because the call to s3GetFileStatus include the checks there
|
|
|
+ return s3AFileStatus;
|
|
|
+ } catch (FileNotFoundException fne) {
|
|
|
+ // the attempt to refresh the record failed because there was
|
|
|
+ // no entry. Either it is a new file not visible, or it
|
|
|
+ // has been deleted (and therefore S3Guard is out of sync with S3)
|
|
|
+ LOG.warn("Failed to find file {}. Either it is not yet visible, or "
|
|
|
+ + "it has been deleted.", path);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- S3AFileStatus msStatus = pm.getFileStatus();
|
|
|
if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
|
|
|
+ // the caller needs to know if a directory is empty,
|
|
|
+ // and that this is a directory.
|
|
|
if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
|
|
|
// We have a definitive true / false from MetadataStore, we are done.
|
|
|
return msStatus;
|
|
|
} else {
|
|
|
+ // execute a S3Guard listChildren command to list tombstones under the
|
|
|
+ // path.
|
|
|
+ // This list will be used in the forthcoming s3GetFileStatus call.
|
|
|
DirListingMetadata children =
|
|
|
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
|
|
|
allowAuthoritative);
|
|
|
if (children != null) {
|
|
|
tombstones = children.listTombstones();
|
|
|
}
|
|
|
- LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
|
|
|
+ LOG.debug("MetadataStore doesn't know if {} is empty, using S3.",
|
|
|
+ path);
|
|
|
}
|
|
|
} else {
|
|
|
// Either this is not a directory, or we don't care if it is empty
|
|
|
return msStatus;
|
|
|
}
|
|
|
|
|
|
- // If the metadata store has no children for it and it's not listed in
|
|
|
- // S3 yet, we'll assume the empty directory is true;
|
|
|
- S3AFileStatus s3FileStatus;
|
|
|
+ // now issue the S3 getFileStatus call.
|
|
|
try {
|
|
|
- s3FileStatus = s3GetFileStatus(path, key, probes, tombstones);
|
|
|
+ S3AFileStatus s3FileStatus = s3GetFileStatus(path,
|
|
|
+ key,
|
|
|
+ probes,
|
|
|
+ tombstones,
|
|
|
+ true);
|
|
|
+ // entry was found, so save in S3Guard and return the final value.
|
|
|
+ return S3Guard.putAndReturn(metadataStore, s3FileStatus,
|
|
|
+ ttlTimeProvider);
|
|
|
} catch (FileNotFoundException e) {
|
|
|
+ // If the metadata store has no children for it and it's not listed in
|
|
|
+ // S3 yet, we'll conclude that it is an empty directory
|
|
|
return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE,
|
|
|
null, null);
|
|
|
}
|
|
|
- // entry was found, save in S3Guard
|
|
|
- return S3Guard.putAndReturn(metadataStore, s3FileStatus,
|
|
|
- ttlTimeProvider);
|
|
|
} else {
|
|
|
// there was no entry in S3Guard
|
|
|
// retrieve the data and update the metadata store in the process.
|
|
|
return S3Guard.putAndReturn(metadataStore,
|
|
|
- s3GetFileStatus(path, key, probes, tombstones),
|
|
|
+ s3GetFileStatus(path,
|
|
|
+ key,
|
|
|
+ probes,
|
|
|
+ tombstones,
|
|
|
+ needEmptyDirectoryFlag),
|
|
|
ttlTimeProvider);
|
|
|
}
|
|
|
}
|
|
@@ -3010,6 +3104,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param key Key string for the path
|
|
|
* @param probes probes to make
|
|
|
* @param tombstones tombstones to filter
|
|
|
+ * @param needEmptyDirectoryFlag if true, implementation will calculate
|
|
|
+ * a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
|
|
|
* @return Status
|
|
|
* @throws FileNotFoundException the supplied probes failed.
|
|
|
* @throws IOException on other problems.
|
|
@@ -3019,88 +3115,88 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3AFileStatus s3GetFileStatus(final Path path,
|
|
|
final String key,
|
|
|
final Set<StatusProbeEnum> probes,
|
|
|
- @Nullable Set<Path> tombstones) throws IOException {
|
|
|
- if (!key.isEmpty()) {
|
|
|
- if (probes.contains(StatusProbeEnum.Head) && !key.endsWith("/")) {
|
|
|
- try {
|
|
|
- // look for the simple file
|
|
|
- ObjectMetadata meta = getObjectMetadata(key);
|
|
|
- LOG.debug("Found exact file: normal file {}", key);
|
|
|
- return new S3AFileStatus(meta.getContentLength(),
|
|
|
- dateToLong(meta.getLastModified()),
|
|
|
- path,
|
|
|
- getDefaultBlockSize(path),
|
|
|
- username,
|
|
|
- meta.getETag(),
|
|
|
- meta.getVersionId());
|
|
|
- } catch (AmazonServiceException e) {
|
|
|
- // if the response is a 404 error, it just means that there is
|
|
|
- // no file at that path...the remaining checks will be needed.
|
|
|
- if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
|
|
|
- throw translateException("getFileStatus", path, e);
|
|
|
- }
|
|
|
- } catch (AmazonClientException e) {
|
|
|
+ @Nullable final Set<Path> tombstones,
|
|
|
+ final boolean needEmptyDirectoryFlag) throws IOException {
|
|
|
+ LOG.debug("S3GetFileStatus {}", path);
|
|
|
+ // either you aren't looking for the directory flag, or you are,
|
|
|
+ // and if you are, the probe list must contain list.
|
|
|
+ Preconditions.checkArgument(!needEmptyDirectoryFlag
|
|
|
+ || probes.contains(StatusProbeEnum.List),
|
|
|
+ "s3GetFileStatus(%s) wants to know if a directory is empty but"
|
|
|
+ + " does not request a list probe", path);
|
|
|
+
|
|
|
+ if (!key.isEmpty() && !key.endsWith("/")
|
|
|
+ && probes.contains(StatusProbeEnum.Head)) {
|
|
|
+ try {
|
|
|
+ // look for the simple file
|
|
|
+ ObjectMetadata meta = getObjectMetadata(key);
|
|
|
+ LOG.debug("Found exact file: normal file {}", key);
|
|
|
+ return new S3AFileStatus(meta.getContentLength(),
|
|
|
+ dateToLong(meta.getLastModified()),
|
|
|
+ path,
|
|
|
+ getDefaultBlockSize(path),
|
|
|
+ username,
|
|
|
+ meta.getETag(),
|
|
|
+ meta.getVersionId());
|
|
|
+ } catch (AmazonServiceException e) {
|
|
|
+ // if the response is a 404 error, it just means that there is
|
|
|
+ // no file at that path...the remaining checks will be needed.
|
|
|
+ if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
|
|
|
throw translateException("getFileStatus", path, e);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- // Either a normal file was not found or the probe was skipped.
|
|
|
- // because the key ended in "/" or it was not in the set of probes.
|
|
|
- // Look for the dir marker
|
|
|
- if (probes.contains(StatusProbeEnum.DirMarker)) {
|
|
|
- String newKey = maybeAddTrailingSlash(key);
|
|
|
- try {
|
|
|
- ObjectMetadata meta = getObjectMetadata(newKey);
|
|
|
-
|
|
|
- if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
|
|
|
- LOG.debug("Found file (with /): fake directory");
|
|
|
- return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
|
- } else {
|
|
|
- LOG.warn("Found file (with /): real file? should not happen: {}",
|
|
|
- key);
|
|
|
-
|
|
|
- return new S3AFileStatus(meta.getContentLength(),
|
|
|
- dateToLong(meta.getLastModified()),
|
|
|
- path,
|
|
|
- getDefaultBlockSize(path),
|
|
|
- username,
|
|
|
- meta.getETag(),
|
|
|
- meta.getVersionId());
|
|
|
- }
|
|
|
- } catch (AmazonServiceException e) {
|
|
|
- if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
|
|
|
- throw translateException("getFileStatus", newKey, e);
|
|
|
- }
|
|
|
- } catch (AmazonClientException e) {
|
|
|
- throw translateException("getFileStatus", newKey, e);
|
|
|
- }
|
|
|
+ } catch (AmazonClientException e) {
|
|
|
+ throw translateException("getFileStatus", path, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// execute the list
|
|
|
if (probes.contains(StatusProbeEnum.List)) {
|
|
|
try {
|
|
|
+ // this will find a marker dir / as well as an entry.
|
|
|
+ // When making a simple "is this a dir check" all is good.
|
|
|
+ // but when looking for an empty dir, we need to verify there are no
|
|
|
+ // children, so ask for two entries, so as to find
|
|
|
+ // a child
|
|
|
String dirKey = maybeAddTrailingSlash(key);
|
|
|
- S3ListRequest request = createListObjectsRequest(dirKey, "/", 1);
|
|
|
+ // list size is dir marker + at least one non-tombstone entry
|
|
|
+ // there's a corner case: more tombstones than you have in a
|
|
|
+ // single page list. We assume that if you have been deleting
|
|
|
+ // that many files, then the AWS listing will have purged some
|
|
|
+ // by the time of listing so that the response includes some
|
|
|
+ // which have not.
|
|
|
+
|
|
|
+ int listSize;
|
|
|
+ if (tombstones == null) {
|
|
|
+ // no tombstones so look for a marker and at least one child.
|
|
|
+ listSize = 2;
|
|
|
+ } else {
|
|
|
+ // build a listing > tombstones. If the caller has many thousands
|
|
|
+ // of tombstones this won't work properly, which is why pruning
|
|
|
+ // of expired tombstones matters.
|
|
|
+ listSize = Math.min(2 + tombstones.size(), Math.max(2, maxKeys));
|
|
|
+ }
|
|
|
+ S3ListRequest request = createListObjectsRequest(dirKey, "/",
|
|
|
+ listSize);
|
|
|
+ // execute the request
|
|
|
+ S3ListResult listResult = listObjects(request);
|
|
|
|
|
|
- S3ListResult objects = listObjects(request);
|
|
|
|
|
|
- Collection<String> prefixes = objects.getCommonPrefixes();
|
|
|
- Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
|
|
|
- if (!isEmptyOfKeys(prefixes, tombstones) ||
|
|
|
- !isEmptyOfObjects(summaries, tombstones)) {
|
|
|
+ if (listResult.hasPrefixesOrObjects(contextAccessors, tombstones)) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Found path as directory (with /): {}/{}",
|
|
|
- prefixes.size(), summaries.size());
|
|
|
-
|
|
|
- for (S3ObjectSummary summary : summaries) {
|
|
|
- LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
|
|
|
- }
|
|
|
- for (String prefix : prefixes) {
|
|
|
- LOG.debug("Prefix: {}", prefix);
|
|
|
- }
|
|
|
+ LOG.debug("Found path as directory (with /)");
|
|
|
+ listResult.logAtDebug(LOG);
|
|
|
}
|
|
|
-
|
|
|
+ // At least one entry has been found.
|
|
|
+ // If looking for an empty directory, the marker must exist but no
|
|
|
+ // children.
|
|
|
+ // So the listing must contain the marker entry only.
|
|
|
+ if (needEmptyDirectoryFlag
|
|
|
+ && listResult.representsEmptyDirectory(
|
|
|
+ contextAccessors, dirKey, tombstones)) {
|
|
|
+ return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
|
+ }
|
|
|
+ // either an empty directory is not needed, or the
|
|
|
+ // listing does not meet the requirements.
|
|
|
return new S3AFileStatus(Tristate.FALSE, path, username);
|
|
|
} else if (key.isEmpty()) {
|
|
|
LOG.debug("Found root directory");
|
|
@@ -3119,48 +3215,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
throw new FileNotFoundException("No such file or directory: " + path);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Helper function to determine if a collection of paths is empty
|
|
|
- * after accounting for tombstone markers (if provided).
|
|
|
- * @param keys Collection of path (prefixes / directories or keys).
|
|
|
- * @param tombstones Set of tombstone markers, or null if not applicable.
|
|
|
- * @return false if summaries contains objects not accounted for by
|
|
|
- * tombstones.
|
|
|
- */
|
|
|
- private boolean isEmptyOfKeys(Collection<String> keys, Set<Path>
|
|
|
- tombstones) {
|
|
|
- if (tombstones == null) {
|
|
|
- return keys.isEmpty();
|
|
|
- }
|
|
|
- for (String key : keys) {
|
|
|
- Path qualified = keyToQualifiedPath(key);
|
|
|
- if (!tombstones.contains(qualified)) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Helper function to determine if a collection of object summaries is empty
|
|
|
- * after accounting for tombstone markers (if provided).
|
|
|
- * @param summaries Collection of objects as returned by listObjects.
|
|
|
- * @param tombstones Set of tombstone markers, or null if not applicable.
|
|
|
- * @return false if summaries contains objects not accounted for by
|
|
|
- * tombstones.
|
|
|
- */
|
|
|
- private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries,
|
|
|
- Set<Path> tombstones) {
|
|
|
- if (tombstones == null) {
|
|
|
- return summaries.isEmpty();
|
|
|
- }
|
|
|
- Collection<String> stringCollection = new ArrayList<>(summaries.size());
|
|
|
- for (S3ObjectSummary summary : summaries) {
|
|
|
- stringCollection.add(summary.getKey());
|
|
|
- }
|
|
|
- return isEmptyOfKeys(stringCollection, tombstones);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
|
|
|
* S3Guard MetadataStore, if any, will be skipped.
|
|
@@ -3175,7 +3229,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
throws IOException {
|
|
|
String key = pathToKey(path);
|
|
|
try {
|
|
|
- s3GetFileStatus(path, key, probes, null);
|
|
|
+ s3GetFileStatus(path, key, probes, null, false);
|
|
|
return true;
|
|
|
} catch (FileNotFoundException e) {
|
|
|
return false;
|
|
@@ -3578,6 +3632,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
copyObjectRequest.setNewObjectMetadata(dstom);
|
|
|
Optional.ofNullable(srcom.getStorageClass())
|
|
|
.ifPresent(copyObjectRequest::setStorageClass);
|
|
|
+ incrementStatistic(OBJECT_COPY_REQUESTS);
|
|
|
Copy copy = transfers.copy(copyObjectRequest);
|
|
|
copy.addProgressListener(progressListener);
|
|
|
CopyOutcome copyOutcome = CopyOutcome.waitForCopy(copy);
|
|
@@ -3711,16 +3766,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
/**
|
|
|
* Perform post-write actions.
|
|
|
- * Calls {@link #deleteUnnecessaryFakeDirectories(Path)} and then
|
|
|
- * updates any metastore.
|
|
|
+ * <p></p>
|
|
|
* This operation MUST be called after any PUT/multipart PUT completes
|
|
|
* successfully.
|
|
|
- *
|
|
|
- * The operations actions include
|
|
|
+ * <p></p>
|
|
|
+ * The actions include:
|
|
|
* <ol>
|
|
|
- * <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li>
|
|
|
- * <li>Updating any metadata store with details on the newly created
|
|
|
- * object.</li>
|
|
|
+ * <li>
|
|
|
+ * Calling
|
|
|
+ * {@link #deleteUnnecessaryFakeDirectories(Path, BulkOperationState)}
|
|
|
+ * if directory markers are not being retained.
|
|
|
+ * </li>
|
|
|
+ * <li>
|
|
|
+ * Updating any metadata store with details on the newly created
|
|
|
+ * object.
|
|
|
+ * </li>
|
|
|
* </ol>
|
|
|
* @param key key written to
|
|
|
* @param length total length of file written
|
|
@@ -3743,12 +3803,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
|
final boolean isDir = objectRepresentsDirectory(key, length);
|
|
|
// kick off an async delete
|
|
|
- final CompletableFuture<?> deletion = submit(
|
|
|
- unboundedThreadPool,
|
|
|
- () -> {
|
|
|
- deleteUnnecessaryFakeDirectories(p.getParent());
|
|
|
- return null;
|
|
|
- });
|
|
|
+ CompletableFuture<?> deletion;
|
|
|
+ if (!keepDirectoryMarkers(p)) {
|
|
|
+ deletion = submit(
|
|
|
+ unboundedThreadPool,
|
|
|
+ () -> {
|
|
|
+ deleteUnnecessaryFakeDirectories(
|
|
|
+ p.getParent(),
|
|
|
+ operationState);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ deletion = null;
|
|
|
+ }
|
|
|
// this is only set if there is a metastore to update and the
|
|
|
// operationState parameter passed in was null.
|
|
|
BulkOperationState stateToClose = null;
|
|
@@ -3807,13 +3874,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Should we keep directory markers under the path being created
|
|
|
+ * by mkdir/file creation/rename?
|
|
|
+ * @param path path to probe
|
|
|
+ * @return true if the markers MAY be retained,
|
|
|
+ * false if they MUST be deleted
|
|
|
+ */
|
|
|
+ private boolean keepDirectoryMarkers(Path path) {
|
|
|
+ return directoryPolicy.keepDirectoryMarkers(path);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Delete mock parent directories which are no longer needed.
|
|
|
* Retry policy: retrying; exceptions swallowed.
|
|
|
* @param path path
|
|
|
+ * @param operationState (nullable) operational state for a bulk update
|
|
|
*/
|
|
|
@Retries.RetryExceptionsSwallowed
|
|
|
- private void deleteUnnecessaryFakeDirectories(Path path) {
|
|
|
+ private void deleteUnnecessaryFakeDirectories(Path path,
|
|
|
+ final BulkOperationState operationState) {
|
|
|
List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>();
|
|
|
while (!path.isRoot()) {
|
|
|
String key = pathToKey(path);
|
|
@@ -3823,7 +3903,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
path = path.getParent();
|
|
|
}
|
|
|
try {
|
|
|
- removeKeys(keysToRemove, true, null);
|
|
|
+ removeKeys(keysToRemove, true, operationState);
|
|
|
} catch(AmazonClientException | IOException e) {
|
|
|
instrumentation.errorIgnored();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -3952,6 +4032,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the directory marker policy of this filesystem.
|
|
|
+ * @return the marker policy.
|
|
|
+ */
|
|
|
+ public DirectoryPolicy getDirectoryMarkerPolicy() {
|
|
|
+ return directoryPolicy;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
final StringBuilder sb = new StringBuilder(
|
|
@@ -3990,6 +4078,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
sb.append(", credentials=").append(credentials);
|
|
|
sb.append(", delegation tokens=")
|
|
|
.append(delegationTokens.map(Objects::toString).orElse("disabled"));
|
|
|
+ sb.append(", ").append(directoryPolicy);
|
|
|
sb.append(", statistics {")
|
|
|
.append(statistics)
|
|
|
.append("}");
|
|
@@ -4086,25 +4175,41 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Override superclass so as to add statistic collection.
|
|
|
+ * Optimized probe for a path referencing a dir.
|
|
|
+ * Even though it is optimized to a single HEAD, applications
|
|
|
+ * should not over-use this method...it is all too common.
|
|
|
* {@inheritDoc}
|
|
|
*/
|
|
|
@Override
|
|
|
@SuppressWarnings("deprecation")
|
|
|
public boolean isDirectory(Path f) throws IOException {
|
|
|
entryPoint(INVOCATION_IS_DIRECTORY);
|
|
|
- return super.isDirectory(f);
|
|
|
+ try {
|
|
|
+ return innerGetFileStatus(f, false, StatusProbeEnum.DIRECTORIES)
|
|
|
+ .isDirectory();
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ // not found or it is a file.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Override superclass so as to add statistic collection.
|
|
|
+ * Optimized probe for a path referencing a file.
|
|
|
+ * Even though it is optimized to a single HEAD, applications
|
|
|
+ * should not over-use this method...it is all too common.
|
|
|
* {@inheritDoc}
|
|
|
*/
|
|
|
@Override
|
|
|
@SuppressWarnings("deprecation")
|
|
|
public boolean isFile(Path f) throws IOException {
|
|
|
entryPoint(INVOCATION_IS_FILE);
|
|
|
- return super.isFile(f);
|
|
|
+ try {
|
|
|
+ return innerGetFileStatus(f, false, StatusProbeEnum.HEAD_ONLY)
|
|
|
+ .isFile();
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ // not found or it is a dir.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4511,7 +4616,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
public boolean hasPathCapability(final Path path, final String capability)
|
|
|
throws IOException {
|
|
|
final Path p = makeQualified(path);
|
|
|
- switch (validatePathCapabilityArgs(p, capability)) {
|
|
|
+ String cap = validatePathCapabilityArgs(p, capability);
|
|
|
+ switch (cap) {
|
|
|
|
|
|
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER:
|
|
|
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER_OLD:
|
|
@@ -4530,8 +4636,24 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
|
|
|
return true;
|
|
|
|
|
|
+ // this client is safe to use with buckets
|
|
|
+ // containing directory markers anywhere in
|
|
|
+ // the hierarchy
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
|
|
|
+ return true;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Marker policy capabilities are handed off.
|
|
|
+ */
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
|
|
|
+ case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
|
|
|
+ return getDirectoryMarkerPolicy().hasPathCapability(path, cap);
|
|
|
+
|
|
|
default:
|
|
|
- return super.hasPathCapability(p, capability);
|
|
|
+ return super.hasPathCapability(p, cap);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -4546,7 +4668,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Override
|
|
|
public boolean hasCapability(String capability) {
|
|
|
try {
|
|
|
- return hasPathCapability(workingDir, capability);
|
|
|
+ return hasPathCapability(new Path("/"), capability);
|
|
|
} catch (IOException ex) {
|
|
|
// should never happen, so log and downgrade.
|
|
|
LOG.debug("Ignoring exception on hasCapability({}})", capability, ex);
|
|
@@ -4800,6 +4922,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a marker tools operations binding for this store.
|
|
|
+ * @return callbacks for operations.
|
|
|
+ */
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public MarkerToolOperations createMarkerToolOperations() {
|
|
|
+ return new MarkerToolOperationsImpl(operationCallbacks);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The implementation of context accessors.
|
|
|
*/
|