|
@@ -74,7 +74,6 @@ import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
|
|
import com.amazonaws.services.s3.transfer.Upload;
|
|
import com.amazonaws.services.s3.transfer.Upload;
|
|
import com.amazonaws.services.s3.transfer.model.UploadResult;
|
|
import com.amazonaws.services.s3.transfer.model.UploadResult;
|
|
import com.amazonaws.event.ProgressListener;
|
|
import com.amazonaws.event.ProgressListener;
|
|
-import com.amazonaws.event.ProgressEvent;
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
|
@@ -865,7 +864,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* The inner rename operation. See {@link #rename(Path, Path)} for
|
|
* The inner rename operation. See {@link #rename(Path, Path)} for
|
|
* the description of the operation.
|
|
* the description of the operation.
|
|
* This operation throws an exception on any failure which needs to be
|
|
* This operation throws an exception on any failure which needs to be
|
|
- * reported and downgraded to a failure. That is: if a rename
|
|
|
|
|
|
+ * reported and downgraded to a failure.
|
|
|
|
+ * Retries: retry translated, assuming all operations it is called do
|
|
|
|
+ * so. For safely, consider catch and handle AmazonClientException
|
|
|
|
+ * because this is such a complex method there's a risk it could surface.
|
|
* @param source path to be renamed
|
|
* @param source path to be renamed
|
|
* @param dest new path after rename
|
|
* @param dest new path after rename
|
|
* @throws RenameFailedException if some criteria for a state changing
|
|
* @throws RenameFailedException if some criteria for a state changing
|
|
@@ -876,6 +878,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @throws IOException on IO failure.
|
|
* @throws IOException on IO failure.
|
|
* @throws AmazonClientException on failures inside the AWS SDK
|
|
* @throws AmazonClientException on failures inside the AWS SDK
|
|
*/
|
|
*/
|
|
|
|
+ @Retries.RetryMixed
|
|
private boolean innerRename(Path source, Path dest)
|
|
private boolean innerRename(Path source, Path dest)
|
|
throws RenameFailedException, FileNotFoundException, IOException,
|
|
throws RenameFailedException, FileNotFoundException, IOException,
|
|
AmazonClientException {
|
|
AmazonClientException {
|
|
@@ -969,10 +972,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
LOG.debug("rename: renaming file {} to {}", src, dst);
|
|
LOG.debug("rename: renaming file {} to {}", src, dst);
|
|
long length = srcStatus.getLen();
|
|
long length = srcStatus.getLen();
|
|
if (dstStatus != null && dstStatus.isDirectory()) {
|
|
if (dstStatus != null && dstStatus.isDirectory()) {
|
|
- String newDstKey = dstKey;
|
|
|
|
- if (!newDstKey.endsWith("/")) {
|
|
|
|
- newDstKey = newDstKey + "/";
|
|
|
|
- }
|
|
|
|
|
|
+ String newDstKey = maybeAddTrailingSlash(dstKey);
|
|
String filename =
|
|
String filename =
|
|
srcKey.substring(pathToKey(src.getParent()).length()+1);
|
|
srcKey.substring(pathToKey(src.getParent()).length()+1);
|
|
newDstKey = newDstKey + filename;
|
|
newDstKey = newDstKey + filename;
|
|
@@ -990,13 +990,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
LOG.debug("rename: renaming directory {} to {}", src, dst);
|
|
LOG.debug("rename: renaming directory {} to {}", src, dst);
|
|
|
|
|
|
// This is a directory to directory copy
|
|
// This is a directory to directory copy
|
|
- if (!dstKey.endsWith("/")) {
|
|
|
|
- dstKey = dstKey + "/";
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!srcKey.endsWith("/")) {
|
|
|
|
- srcKey = srcKey + "/";
|
|
|
|
- }
|
|
|
|
|
|
+ dstKey = maybeAddTrailingSlash(dstKey);
|
|
|
|
+ srcKey = maybeAddTrailingSlash(srcKey);
|
|
|
|
|
|
//Verify dest is not a child of the source directory
|
|
//Verify dest is not a child of the source directory
|
|
if (dstKey.startsWith(srcKey)) {
|
|
if (dstKey.startsWith(srcKey)) {
|
|
@@ -1065,7 +1060,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
|
|
|
metadataStore.move(srcPaths, dstMetas);
|
|
metadataStore.move(srcPaths, dstMetas);
|
|
|
|
|
|
- if (src.getParent() != dst.getParent()) {
|
|
|
|
|
|
+ if (!src.getParent().equals(dst.getParent())) {
|
|
|
|
+ LOG.debug("source & dest parents are different; fix up dir markers");
|
|
deleteUnnecessaryFakeDirectories(dst.getParent());
|
|
deleteUnnecessaryFakeDirectories(dst.getParent());
|
|
maybeCreateFakeParentDirectory(src);
|
|
maybeCreateFakeParentDirectory(src);
|
|
}
|
|
}
|
|
@@ -1321,6 +1317,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* <i>does not</i> update the metastore.
|
|
* <i>does not</i> update the metastore.
|
|
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
|
|
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
|
|
* operation statistics.
|
|
* operation statistics.
|
|
|
|
+ * This call does <i>not</i> create any mock parent entries.
|
|
*
|
|
*
|
|
* Retry policy: retry untranslated; delete considered idempotent.
|
|
* Retry policy: retry untranslated; delete considered idempotent.
|
|
* @param key key to blob to delete.
|
|
* @param key key to blob to delete.
|
|
@@ -1515,7 +1512,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @return the upload initiated
|
|
* @return the upload initiated
|
|
* @throws AmazonClientException on problems
|
|
* @throws AmazonClientException on problems
|
|
*/
|
|
*/
|
|
- @Retries.OnceRaw
|
|
|
|
|
|
+ @Retries.OnceRaw("For PUT; post-PUT actions are RetriesExceptionsSwallowed")
|
|
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
|
|
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
|
|
throws AmazonClientException {
|
|
throws AmazonClientException {
|
|
long len = getPutRequestLength(putObjectRequest);
|
|
long len = getPutRequestLength(putObjectRequest);
|
|
@@ -1685,7 +1682,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
try {
|
|
try {
|
|
entryPoint(INVOCATION_DELETE);
|
|
entryPoint(INVOCATION_DELETE);
|
|
- return innerDelete(innerGetFileStatus(f, true), recursive);
|
|
|
|
|
|
+ boolean outcome = innerDelete(innerGetFileStatus(f, true), recursive);
|
|
|
|
+ if (outcome) {
|
|
|
|
+ maybeCreateFakeParentDirectory(f);
|
|
|
|
+ }
|
|
|
|
+ return outcome;
|
|
} catch (FileNotFoundException e) {
|
|
} catch (FileNotFoundException e) {
|
|
LOG.debug("Couldn't delete {} - does not exist", f);
|
|
LOG.debug("Couldn't delete {} - does not exist", f);
|
|
instrumentation.errorIgnored();
|
|
instrumentation.errorIgnored();
|
|
@@ -1697,7 +1698,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Delete an object. See {@link #delete(Path, boolean)}.
|
|
* Delete an object. See {@link #delete(Path, boolean)}.
|
|
- *
|
|
|
|
|
|
+ * This call does not create any fake parent directory; that is
|
|
|
|
+ * left to the caller.
|
|
* @param status fileStatus object
|
|
* @param status fileStatus object
|
|
* @param recursive if path is a directory and set to
|
|
* @param recursive if path is a directory and set to
|
|
* true, the directory is deleted else throws an exception. In
|
|
* true, the directory is deleted else throws an exception. In
|
|
@@ -1771,7 +1773,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
deleteObjectAtPath(f, key, true);
|
|
deleteObjectAtPath(f, key, true);
|
|
}
|
|
}
|
|
|
|
|
|
- maybeCreateFakeParentDirectory(f);
|
|
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2049,11 +2050,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
fPart = fPart.getParent();
|
|
fPart = fPart.getParent();
|
|
}
|
|
}
|
|
String key = pathToKey(f);
|
|
String key = pathToKey(f);
|
|
|
|
+ // this will create the marker file, delete the parent entries
|
|
|
|
+ // and update S3Guard
|
|
createFakeDirectory(key);
|
|
createFakeDirectory(key);
|
|
- S3Guard.makeDirsOrdered(metadataStore, metadataStoreDirs, username, true);
|
|
|
|
- // this is complicated because getParent(a/b/c/) returns a/b/c, but
|
|
|
|
- // we want a/b. See HADOOP-14428 for more details.
|
|
|
|
- deleteUnnecessaryFakeDirectories(new Path(f.toString()).getParent());
|
|
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -2389,7 +2388,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @return the upload result
|
|
* @return the upload result
|
|
* @throws InterruptedIOException if the blocking was interrupted.
|
|
* @throws InterruptedIOException if the blocking was interrupted.
|
|
*/
|
|
*/
|
|
- @Retries.OnceRaw
|
|
|
|
|
|
+ @Retries.OnceRaw("For PUT; post-PUT actions are RetriesExceptionsSwallowed")
|
|
UploadResult executePut(PutObjectRequest putObjectRequest,
|
|
UploadResult executePut(PutObjectRequest putObjectRequest,
|
|
Progressable progress)
|
|
Progressable progress)
|
|
throws InterruptedIOException {
|
|
throws InterruptedIOException {
|
|
@@ -2483,6 +2482,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Copy a single object in the bucket via a COPY operation.
|
|
* Copy a single object in the bucket via a COPY operation.
|
|
|
|
+ * There's no update of metadata, directory markers, etc.
|
|
|
|
+ * Callers must implement.
|
|
* @param srcKey source object path
|
|
* @param srcKey source object path
|
|
* @param dstKey destination object path
|
|
* @param dstKey destination object path
|
|
* @param size object size
|
|
* @param size object size
|
|
@@ -2490,46 +2491,42 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @throws InterruptedIOException the operation was interrupted
|
|
* @throws InterruptedIOException the operation was interrupted
|
|
* @throws IOException Other IO problems
|
|
* @throws IOException Other IO problems
|
|
*/
|
|
*/
|
|
|
|
+ @Retries.RetryMixed
|
|
private void copyFile(String srcKey, String dstKey, long size)
|
|
private void copyFile(String srcKey, String dstKey, long size)
|
|
- throws IOException, InterruptedIOException, AmazonClientException {
|
|
|
|
|
|
+ throws IOException, InterruptedIOException {
|
|
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
|
|
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
|
|
|
|
|
|
- try {
|
|
|
|
- ObjectMetadata srcom = getObjectMetadata(srcKey);
|
|
|
|
- ObjectMetadata dstom = cloneObjectMetadata(srcom);
|
|
|
|
- setOptionalObjectMetadata(dstom);
|
|
|
|
- CopyObjectRequest copyObjectRequest =
|
|
|
|
- new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
|
|
|
- setOptionalCopyObjectRequestParameters(copyObjectRequest);
|
|
|
|
- copyObjectRequest.setCannedAccessControlList(cannedACL);
|
|
|
|
- copyObjectRequest.setNewObjectMetadata(dstom);
|
|
|
|
-
|
|
|
|
- ProgressListener progressListener = new ProgressListener() {
|
|
|
|
- public void progressChanged(ProgressEvent progressEvent) {
|
|
|
|
- switch (progressEvent.getEventType()) {
|
|
|
|
- case TRANSFER_PART_COMPLETED_EVENT:
|
|
|
|
- incrementWriteOperations();
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- Copy copy = transfers.copy(copyObjectRequest);
|
|
|
|
- copy.addProgressListener(progressListener);
|
|
|
|
- try {
|
|
|
|
- copy.waitForCopyResult();
|
|
|
|
|
|
+ ProgressListener progressListener = progressEvent -> {
|
|
|
|
+ switch (progressEvent.getEventType()) {
|
|
|
|
+ case TRANSFER_PART_COMPLETED_EVENT:
|
|
incrementWriteOperations();
|
|
incrementWriteOperations();
|
|
- instrumentation.filesCopied(1, size);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- throw new InterruptedIOException("Interrupted copying " + srcKey
|
|
|
|
- + " to " + dstKey + ", cancelling");
|
|
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
- } catch (AmazonClientException e) {
|
|
|
|
- throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")",
|
|
|
|
- srcKey, e);
|
|
|
|
- }
|
|
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ once("copyFile(" + srcKey + ", " + dstKey + ")", srcKey,
|
|
|
|
+ () -> {
|
|
|
|
+ ObjectMetadata srcom = getObjectMetadata(srcKey);
|
|
|
|
+ ObjectMetadata dstom = cloneObjectMetadata(srcom);
|
|
|
|
+ setOptionalObjectMetadata(dstom);
|
|
|
|
+ CopyObjectRequest copyObjectRequest =
|
|
|
|
+ new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
|
|
|
+ setOptionalCopyObjectRequestParameters(copyObjectRequest);
|
|
|
|
+ copyObjectRequest.setCannedAccessControlList(cannedACL);
|
|
|
|
+ copyObjectRequest.setNewObjectMetadata(dstom);
|
|
|
|
+ Copy copy = transfers.copy(copyObjectRequest);
|
|
|
|
+ copy.addProgressListener(progressListener);
|
|
|
|
+ try {
|
|
|
|
+ copy.waitForCopyResult();
|
|
|
|
+ incrementWriteOperations();
|
|
|
|
+ instrumentation.filesCopied(1, size);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new InterruptedIOException("Interrupted copying " + srcKey
|
|
|
|
+ + " to " + dstKey + ", cancelling");
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
protected void setOptionalMultipartUploadRequestParameters(
|
|
protected void setOptionalMultipartUploadRequestParameters(
|
|
@@ -2626,9 +2623,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Perform post-write actions.
|
|
* Perform post-write actions.
|
|
|
|
+ * Calls {@link #deleteUnnecessaryFakeDirectories(Path)} and then
|
|
|
|
+ * {@link S3Guard#addAncestors(MetadataStore, Path, String)}}.
|
|
* This operation MUST be called after any PUT/multipart PUT completes
|
|
* This operation MUST be called after any PUT/multipart PUT completes
|
|
* successfully.
|
|
* successfully.
|
|
- * This includes
|
|
|
|
|
|
+ *
|
|
|
|
+ * The operations actions include
|
|
* <ol>
|
|
* <ol>
|
|
* <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li>
|
|
* <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li>
|
|
* <li>Updating any metadata store with details on the newly created
|
|
* <li>Updating any metadata store with details on the newly created
|
|
@@ -2638,12 +2638,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @param length total length of file written
|
|
* @param length total length of file written
|
|
*/
|
|
*/
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
- @Retries.RetryTranslated("Exceptions are swallowed")
|
|
|
|
|
|
+ @Retries.RetryExceptionsSwallowed
|
|
void finishedWrite(String key, long length) {
|
|
void finishedWrite(String key, long length) {
|
|
LOG.debug("Finished write to {}, len {}", key, length);
|
|
LOG.debug("Finished write to {}, len {}", key, length);
|
|
Path p = keyToQualifiedPath(key);
|
|
Path p = keyToQualifiedPath(key);
|
|
- deleteUnnecessaryFakeDirectories(p.getParent());
|
|
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
|
|
+ deleteUnnecessaryFakeDirectories(p.getParent());
|
|
|
|
|
|
// See note about failure semantics in S3Guard documentation
|
|
// See note about failure semantics in S3Guard documentation
|
|
try {
|
|
try {
|
|
@@ -2666,7 +2666,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* Retry policy: retrying; exceptions swallowed.
|
|
* Retry policy: retrying; exceptions swallowed.
|
|
* @param path path
|
|
* @param path path
|
|
*/
|
|
*/
|
|
- @Retries.RetryRaw("Exceptions are swallowed")
|
|
|
|
|
|
+ @Retries.RetryExceptionsSwallowed
|
|
private void deleteUnnecessaryFakeDirectories(Path path) {
|
|
private void deleteUnnecessaryFakeDirectories(Path path) {
|
|
List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>();
|
|
List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>();
|
|
while (!path.isRoot()) {
|
|
while (!path.isRoot()) {
|
|
@@ -2960,7 +2960,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @throws IOException IO failure
|
|
* @throws IOException IO failure
|
|
* @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
|
|
* @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
|
|
*/
|
|
*/
|
|
-
|
|
|
|
|
|
+ @Override
|
|
|
|
+ @Retries.RetryTranslated
|
|
public EtagChecksum getFileChecksum(Path f, final long length)
|
|
public EtagChecksum getFileChecksum(Path f, final long length)
|
|
throws IOException {
|
|
throws IOException {
|
|
Preconditions.checkArgument(length >= 0);
|
|
Preconditions.checkArgument(length >= 0);
|
|
@@ -3002,18 +3003,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @throws IOException if any I/O error occurred
|
|
* @throws IOException if any I/O error occurred
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
|
|
+ @Retries.OnceTranslated
|
|
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
|
|
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
|
|
boolean recursive) throws FileNotFoundException, IOException {
|
|
boolean recursive) throws FileNotFoundException, IOException {
|
|
return innerListFiles(f, recursive,
|
|
return innerListFiles(f, recursive,
|
|
new Listing.AcceptFilesOnly(qualify(f)));
|
|
new Listing.AcceptFilesOnly(qualify(f)));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Retries.OnceTranslated
|
|
public RemoteIterator<LocatedFileStatus> listFilesAndEmptyDirectories(Path f,
|
|
public RemoteIterator<LocatedFileStatus> listFilesAndEmptyDirectories(Path f,
|
|
boolean recursive) throws IOException {
|
|
boolean recursive) throws IOException {
|
|
return innerListFiles(f, recursive,
|
|
return innerListFiles(f, recursive,
|
|
new Listing.AcceptAllButS3nDirs());
|
|
new Listing.AcceptAllButS3nDirs());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Retries.OnceTranslated
|
|
private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
|
|
private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
|
|
recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
|
|
recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
|
|
entryPoint(INVOCATION_LIST_FILES);
|
|
entryPoint(INVOCATION_LIST_FILES);
|
|
@@ -3097,42 +3101,43 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @throws IOException if any I/O error occurred
|
|
* @throws IOException if any I/O error occurred
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- @Retries.OnceTranslated
|
|
|
|
|
|
+ @Retries.OnceTranslated("s3guard not retrying")
|
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
|
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
|
|
final PathFilter filter)
|
|
final PathFilter filter)
|
|
throws FileNotFoundException, IOException {
|
|
throws FileNotFoundException, IOException {
|
|
entryPoint(INVOCATION_LIST_LOCATED_STATUS);
|
|
entryPoint(INVOCATION_LIST_LOCATED_STATUS);
|
|
Path path = qualify(f);
|
|
Path path = qualify(f);
|
|
LOG.debug("listLocatedStatus({}, {}", path, filter);
|
|
LOG.debug("listLocatedStatus({}, {}", path, filter);
|
|
- try {
|
|
|
|
- // lookup dir triggers existence check
|
|
|
|
- final FileStatus fileStatus = getFileStatus(path);
|
|
|
|
- if (fileStatus.isFile()) {
|
|
|
|
- // simple case: File
|
|
|
|
- LOG.debug("Path is a file");
|
|
|
|
- return new Listing.SingleStatusRemoteIterator(
|
|
|
|
- filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
|
|
|
|
- } else {
|
|
|
|
- // directory: trigger a lookup
|
|
|
|
- final String key = maybeAddTrailingSlash(pathToKey(path));
|
|
|
|
- final Listing.FileStatusAcceptor acceptor =
|
|
|
|
- new Listing.AcceptAllButSelfAndS3nDirs(path);
|
|
|
|
- DirListingMetadata meta = metadataStore.listChildren(path);
|
|
|
|
- final RemoteIterator<FileStatus> cachedFileStatusIterator =
|
|
|
|
- listing.createProvidedFileStatusIterator(
|
|
|
|
- S3Guard.dirMetaToStatuses(meta), filter, acceptor);
|
|
|
|
- return (allowAuthoritative && meta != null && meta.isAuthoritative())
|
|
|
|
- ? listing.createLocatedFileStatusIterator(cachedFileStatusIterator)
|
|
|
|
- : listing.createLocatedFileStatusIterator(
|
|
|
|
- listing.createFileStatusListingIterator(path,
|
|
|
|
- createListObjectsRequest(key, "/"),
|
|
|
|
- filter,
|
|
|
|
- acceptor,
|
|
|
|
- cachedFileStatusIterator));
|
|
|
|
- }
|
|
|
|
- } catch (AmazonClientException e) {
|
|
|
|
- throw translateException("listLocatedStatus", path, e);
|
|
|
|
- }
|
|
|
|
|
|
+ return once("listLocatedStatus", path.toString(),
|
|
|
|
+ () -> {
|
|
|
|
+ // lookup dir triggers existence check
|
|
|
|
+ final FileStatus fileStatus = getFileStatus(path);
|
|
|
|
+ if (fileStatus.isFile()) {
|
|
|
|
+ // simple case: File
|
|
|
|
+ LOG.debug("Path is a file");
|
|
|
|
+ return new Listing.SingleStatusRemoteIterator(
|
|
|
|
+ filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
|
|
|
|
+ } else {
|
|
|
|
+ // directory: trigger a lookup
|
|
|
|
+ final String key = maybeAddTrailingSlash(pathToKey(path));
|
|
|
|
+ final Listing.FileStatusAcceptor acceptor =
|
|
|
|
+ new Listing.AcceptAllButSelfAndS3nDirs(path);
|
|
|
|
+ DirListingMetadata meta = metadataStore.listChildren(path);
|
|
|
|
+ final RemoteIterator<FileStatus> cachedFileStatusIterator =
|
|
|
|
+ listing.createProvidedFileStatusIterator(
|
|
|
|
+ S3Guard.dirMetaToStatuses(meta), filter, acceptor);
|
|
|
|
+ return (allowAuthoritative && meta != null
|
|
|
|
+ && meta.isAuthoritative())
|
|
|
|
+ ? listing.createLocatedFileStatusIterator(
|
|
|
|
+ cachedFileStatusIterator)
|
|
|
|
+ : listing.createLocatedFileStatusIterator(
|
|
|
|
+ listing.createFileStatusListingIterator(path,
|
|
|
|
+ createListObjectsRequest(key, "/"),
|
|
|
|
+ filter,
|
|
|
|
+ acceptor,
|
|
|
|
+ cachedFileStatusIterator));
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -3159,6 +3164,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|
* @return Iterator over multipart uploads.
|
|
* @return Iterator over multipart uploads.
|
|
* @throws IOException on failure
|
|
* @throws IOException on failure
|
|
*/
|
|
*/
|
|
|
|
+ @InterfaceAudience.Private
|
|
|
|
+ @Retries.RetryTranslated
|
|
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
|
|
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
|
|
throws IOException {
|
|
throws IOException {
|
|
return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys,
|
|
return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys,
|