|
@@ -22,6 +22,7 @@ import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InterruptedIOException;
|
|
|
+import java.io.UncheckedIOException;
|
|
|
import java.net.URI;
|
|
|
import java.nio.file.AccessDeniedException;
|
|
|
import java.text.DateFormat;
|
|
@@ -91,6 +92,7 @@ import org.apache.hadoop.fs.ContentSummary;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
|
|
import org.apache.hadoop.fs.Globber;
|
|
|
import org.apache.hadoop.fs.Options;
|
|
|
import org.apache.hadoop.fs.impl.OpenFileParameters;
|
|
@@ -104,6 +106,7 @@ import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
|
|
@@ -114,6 +117,7 @@ import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
|
|
import org.apache.hadoop.fs.s3a.impl.MkdirOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.OpenFileSupport;
|
|
|
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
|
|
|
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
|
|
|
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
|
|
@@ -207,7 +211,8 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDe
|
|
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
|
|
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
|
|
|
-import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_INACCESSIBLE;
|
|
@@ -1614,10 +1619,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
|
|
Progressable progress) throws IOException {
|
|
|
final Path path = qualify(f);
|
|
|
+
|
|
|
// the span will be picked up inside the output stream
|
|
|
return trackDurationAndSpan(INVOCATION_CREATE, path, () ->
|
|
|
- innerCreateFile(path, permission, overwrite, bufferSize, replication,
|
|
|
- blockSize, progress));
|
|
|
+ innerCreateFile(path,
|
|
|
+ progress,
|
|
|
+ getActiveAuditSpan(),
|
|
|
+ overwrite
|
|
|
+ ? OPTIONS_CREATE_FILE_OVERWRITE
|
|
|
+ : OPTIONS_CREATE_FILE_NO_OVERWRITE));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1625,58 +1635,68 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* reporting; in the active span.
|
|
|
* Retry policy: retrying, translated on the getFileStatus() probe.
|
|
|
* No data is uploaded to S3 in this call, so no retry issues related to that.
|
|
|
+ * The "performance" flag disables safety checks for the path being a file,
|
|
|
+ * parent directory existing, and doesn't attempt to delete
|
|
|
+ * dir markers, irrespective of FS settings.
|
|
|
+ * If true, this method call does no IO at all.
|
|
|
* @param path the file name to open
|
|
|
- * @param permission the permission to set.
|
|
|
- * @param overwrite if a file with this name already exists, then if true,
|
|
|
- * the file will be overwritten, and if false an error will be thrown.
|
|
|
- * @param bufferSize the size of the buffer to be used.
|
|
|
- * @param replication required block replication for the file.
|
|
|
- * @param blockSize the requested block size.
|
|
|
* @param progress the progress reporter.
|
|
|
+ * @param auditSpan audit span
|
|
|
+ * @param options options for the file
|
|
|
* @throws IOException in the event of IO related errors.
|
|
|
- * @see #setPermission(Path, FsPermission)
|
|
|
*/
|
|
|
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
|
|
|
@Retries.RetryTranslated
|
|
|
- private FSDataOutputStream innerCreateFile(Path path,
|
|
|
- FsPermission permission,
|
|
|
- boolean overwrite,
|
|
|
- int bufferSize,
|
|
|
- short replication,
|
|
|
- long blockSize,
|
|
|
- Progressable progress) throws IOException {
|
|
|
+ private FSDataOutputStream innerCreateFile(
|
|
|
+ final Path path,
|
|
|
+ final Progressable progress,
|
|
|
+ final AuditSpan auditSpan,
|
|
|
+ final CreateFileBuilder.CreateFileOptions options) throws IOException {
|
|
|
+ auditSpan.activate();
|
|
|
String key = pathToKey(path);
|
|
|
- FileStatus status = null;
|
|
|
- try {
|
|
|
- // get the status or throw an FNFE.
|
|
|
- // when overwriting, there is no need to look for any existing file,
|
|
|
- // and attempting to do so can poison the load balancers with 404
|
|
|
- // entries.
|
|
|
- status = innerGetFileStatus(path, false,
|
|
|
- overwrite
|
|
|
- ? StatusProbeEnum.DIRECTORIES
|
|
|
- : StatusProbeEnum.ALL);
|
|
|
-
|
|
|
- // if the thread reaches here, there is something at the path
|
|
|
- if (status.isDirectory()) {
|
|
|
- // path references a directory: automatic error
|
|
|
- throw new FileAlreadyExistsException(path + " is a directory");
|
|
|
- }
|
|
|
- if (!overwrite) {
|
|
|
- // path references a file and overwrite is disabled
|
|
|
- throw new FileAlreadyExistsException(path + " already exists");
|
|
|
+ EnumSet<CreateFlag> flags = options.getFlags();
|
|
|
+ boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
|
|
|
+ boolean performance = options.isPerformance();
|
|
|
+ boolean skipProbes = performance || isUnderMagicCommitPath(path);
|
|
|
+ if (skipProbes) {
|
|
|
+ LOG.debug("Skipping existence/overwrite checks");
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ // get the status or throw an FNFE.
|
|
|
+ // when overwriting, there is no need to look for any existing file,
|
|
|
+ // just a directory (for safety)
|
|
|
+ FileStatus status = innerGetFileStatus(path, false,
|
|
|
+ overwrite
|
|
|
+ ? StatusProbeEnum.DIRECTORIES
|
|
|
+ : StatusProbeEnum.ALL);
|
|
|
+
|
|
|
+ // if the thread reaches here, there is something at the path
|
|
|
+ if (status.isDirectory()) {
|
|
|
+ // path references a directory: automatic error
|
|
|
+ throw new FileAlreadyExistsException(path + " is a directory");
|
|
|
+ }
|
|
|
+ if (!overwrite) {
|
|
|
+ // path references a file and overwrite is disabled
|
|
|
+ throw new FileAlreadyExistsException(path + " already exists");
|
|
|
+ }
|
|
|
+ LOG.debug("Overwriting file {}", path);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ // this means there is nothing at the path; all good.
|
|
|
}
|
|
|
- LOG.debug("Overwriting file {}", path);
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- // this means the file is not found
|
|
|
-
|
|
|
}
|
|
|
instrumentation.fileCreated();
|
|
|
- PutTracker putTracker =
|
|
|
- committerIntegration.createTracker(path, key);
|
|
|
- String destKey = putTracker.getDestKey();
|
|
|
final BlockOutputStreamStatistics outputStreamStatistics
|
|
|
= statisticsContext.newOutputStreamStatistics();
|
|
|
+ PutTracker putTracker =
|
|
|
+ committerIntegration.createTracker(path, key, outputStreamStatistics);
|
|
|
+ String destKey = putTracker.getDestKey();
|
|
|
+
|
|
|
+ // put options are derived from the path and the
|
|
|
+ // option builder.
|
|
|
+ boolean keep = performance || keepDirectoryMarkers(path);
|
|
|
+ final PutObjectOptions putOptions =
|
|
|
+ new PutObjectOptions(keep, null, options.getHeaders());
|
|
|
+
|
|
|
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
|
|
|
S3ABlockOutputStream.builder()
|
|
|
.withKey(destKey)
|
|
@@ -1686,7 +1706,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
.withProgress(progress)
|
|
|
.withPutTracker(putTracker)
|
|
|
.withWriteOperations(
|
|
|
- createWriteOperationHelper(getActiveAuditSpan()))
|
|
|
+ createWriteOperationHelper(auditSpan))
|
|
|
.withExecutorService(
|
|
|
new SemaphoredDelegatingExecutor(
|
|
|
boundedThreadPool,
|
|
@@ -1697,12 +1717,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
getConf().getBoolean(
|
|
|
DOWNGRADE_SYNCABLE_EXCEPTIONS,
|
|
|
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
|
|
|
- .withCSEEnabled(isCSEEnabled);
|
|
|
+ .withCSEEnabled(isCSEEnabled)
|
|
|
+ .withPutOptions(putOptions);
|
|
|
return new FSDataOutputStream(
|
|
|
new S3ABlockOutputStream(builder),
|
|
|
null);
|
|
|
}
|
|
|
-
|
|
|
/**
|
|
|
* Create a Write Operation Helper with the current active span.
|
|
|
* All operations made through this helper will activate the
|
|
@@ -1735,10 +1755,65 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
auditSpan);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create instance of an FSDataOutputStreamBuilder for
|
|
|
+ * creating a file at the given path.
|
|
|
+ * @param path path to create
|
|
|
+ * @return a builder.
|
|
|
+ * @throws UncheckedIOException for problems creating the audit span
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ @AuditEntryPoint
|
|
|
+ public FSDataOutputStreamBuilder createFile(final Path path) {
|
|
|
+ try {
|
|
|
+ final Path qualified = qualify(path);
|
|
|
+ final AuditSpan span = entryPoint(INVOCATION_CREATE_FILE,
|
|
|
+ pathToKey(qualified),
|
|
|
+ null);
|
|
|
+ return new CreateFileBuilder(this,
|
|
|
+ qualified,
|
|
|
+ new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_FILE, span))
|
|
|
+ .create()
|
|
|
+ .overwrite(true);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // catch any IOEs raised in span creation and convert to
|
|
|
+ // an UncheckedIOException
|
|
|
+ throw new UncheckedIOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Callback for create file operations.
|
|
|
+ */
|
|
|
+ private final class CreateFileBuilderCallbacksImpl implements
|
|
|
+ CreateFileBuilder.CreateFileBuilderCallbacks {
|
|
|
+
|
|
|
+ private final Statistic statistic;
|
|
|
+ /** span for operations. */
|
|
|
+ private final AuditSpan span;
|
|
|
+
|
|
|
+ private CreateFileBuilderCallbacksImpl(
|
|
|
+ final Statistic statistic,
|
|
|
+ final AuditSpan span) {
|
|
|
+ this.statistic = statistic;
|
|
|
+ this.span = span;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FSDataOutputStream createFileFromBuilder(
|
|
|
+ final Path path,
|
|
|
+ final Progressable progress,
|
|
|
+ final CreateFileBuilder.CreateFileOptions options) throws IOException {
|
|
|
+ // the span will be picked up inside the output stream
|
|
|
+ return trackDuration(getDurationTrackerFactory(), statistic.getSymbol(), () ->
|
|
|
+ innerCreateFile(path, progress, span, options));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* {@inheritDoc}
|
|
|
- * @throws FileNotFoundException if the parent directory is not present -or
|
|
|
- * is not a directory.
|
|
|
+ * The S3A implementations downgrades to the recursive creation, to avoid
|
|
|
+ * any race conditions with parent entries "disappearing".
|
|
|
*/
|
|
|
@Override
|
|
|
@AuditEntryPoint
|
|
@@ -1750,30 +1825,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
long blockSize,
|
|
|
Progressable progress) throws IOException {
|
|
|
final Path path = makeQualified(p);
|
|
|
- // this span is passed into the stream.
|
|
|
- try (AuditSpan span = entryPoint(INVOCATION_CREATE_NON_RECURSIVE, path)) {
|
|
|
- Path parent = path.getParent();
|
|
|
- // expect this to raise an exception if there is no parent dir
|
|
|
- if (parent != null && !parent.isRoot()) {
|
|
|
- S3AFileStatus status;
|
|
|
- try {
|
|
|
- // optimize for the directory existing: Call list first
|
|
|
- status = innerGetFileStatus(parent, false,
|
|
|
- StatusProbeEnum.DIRECTORIES);
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- // no dir, fall back to looking for a file
|
|
|
- // (failure condition if true)
|
|
|
- status = innerGetFileStatus(parent, false,
|
|
|
- StatusProbeEnum.HEAD_ONLY);
|
|
|
- }
|
|
|
- if (!status.isDirectory()) {
|
|
|
- throw new FileAlreadyExistsException("Not a directory: " + parent);
|
|
|
- }
|
|
|
- }
|
|
|
- return innerCreateFile(path, permission,
|
|
|
- flags.contains(CreateFlag.OVERWRITE), bufferSize,
|
|
|
- replication, blockSize, progress);
|
|
|
+
|
|
|
+ // span is created and passed in to the callbacks.
|
|
|
+ final AuditSpan span = entryPoint(INVOCATION_CREATE_NON_RECURSIVE,
|
|
|
+ pathToKey(path),
|
|
|
+ null);
|
|
|
+ // uses the CreateFileBuilder, filling it in with the relevant arguments.
|
|
|
+ final CreateFileBuilder builder = new CreateFileBuilder(this,
|
|
|
+ path,
|
|
|
+ new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_NON_RECURSIVE, span))
|
|
|
+ .create()
|
|
|
+ .withFlags(flags)
|
|
|
+ .blockSize(blockSize)
|
|
|
+ .bufferSize(bufferSize);
|
|
|
+ if (progress != null) {
|
|
|
+ builder.progress(progress);
|
|
|
}
|
|
|
+ return builder.build();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2671,7 +2739,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
*/
|
|
|
public PutObjectRequest newPutObjectRequest(String key,
|
|
|
ObjectMetadata metadata, File srcfile) {
|
|
|
- return requestFactory.newPutObjectRequest(key, metadata, srcfile);
|
|
|
+ return requestFactory.newPutObjectRequest(key, metadata, null, srcfile);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2721,12 +2789,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* Auditing: must be inside an audit span.
|
|
|
* <i>Important: this call will close any input stream in the request.</i>
|
|
|
* @param putObjectRequest the request
|
|
|
+ * @param putOptions put object options
|
|
|
* @return the upload initiated
|
|
|
* @throws AmazonClientException on problems
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
|
|
|
- PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
|
|
|
+ @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
|
|
|
+ PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest,
|
|
|
+ PutObjectOptions putOptions)
|
|
|
throws AmazonClientException {
|
|
|
long len = getPutRequestLength(putObjectRequest);
|
|
|
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
|
|
@@ -2737,9 +2807,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
OBJECT_PUT_REQUESTS.getSymbol(), () ->
|
|
|
s3.putObject(putObjectRequest));
|
|
|
incrementPutCompletedStatistics(true, len);
|
|
|
- // update metadata
|
|
|
+ // apply any post-write actions.
|
|
|
finishedWrite(putObjectRequest.getKey(), len,
|
|
|
- result.getETag(), result.getVersionId());
|
|
|
+ result.getETag(), result.getVersionId(),
|
|
|
+ putOptions);
|
|
|
return result;
|
|
|
} catch (SdkBaseException e) {
|
|
|
incrementPutCompletedStatistics(false, len);
|
|
@@ -3011,7 +3082,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// is mostly harmless to create a new one.
|
|
|
if (!key.isEmpty() && !s3Exists(f, StatusProbeEnum.DIRECTORIES)) {
|
|
|
LOG.debug("Creating new fake directory at {}", f);
|
|
|
- createFakeDirectory(key);
|
|
|
+ createFakeDirectory(key, putOptionsForPath(f));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3026,7 +3097,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
protected void maybeCreateFakeParentDirectory(Path path)
|
|
|
throws IOException, AmazonClientException {
|
|
|
Path parent = path.getParent();
|
|
|
- if (parent != null && !parent.isRoot()) {
|
|
|
+ if (parent != null && !parent.isRoot() && !isUnderMagicCommitPath(parent)) {
|
|
|
createFakeDirectoryIfNecessary(parent);
|
|
|
}
|
|
|
}
|
|
@@ -3197,6 +3268,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* Make the given path and all non-existent parents into
|
|
|
* directories. Has the semantics of Unix {@code 'mkdir -p'}.
|
|
|
* Existence of the directory hierarchy is not an error.
|
|
|
+ * Parent elements are scanned to see if any are a file,
|
|
|
+ * <i>except under __magic</i> paths.
|
|
|
+ * There the FS assumes that the destination directory creation
|
|
|
+ * did that scan and that paths in job/task attempts are all
|
|
|
+ * "well formed"
|
|
|
* @param p path to create
|
|
|
* @param permission to apply to path
|
|
|
* @return true if a directory was created or already existed
|
|
@@ -3214,7 +3290,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
new MkdirOperation(
|
|
|
createStoreContext(),
|
|
|
path,
|
|
|
- createMkdirOperationCallbacks()));
|
|
|
+ createMkdirOperationCallbacks(),
|
|
|
+ isMagicCommitPath(path)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3240,9 +3317,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void createFakeDirectory(final String key)
|
|
|
+ public void createFakeDirectory(final Path dir, final boolean keepMarkers)
|
|
|
throws IOException {
|
|
|
- S3AFileSystem.this.createEmptyObject(key);
|
|
|
+ S3AFileSystem.this.createFakeDirectory(
|
|
|
+ pathToKey(dir),
|
|
|
+ keepMarkers
|
|
|
+ ? PutObjectOptions.keepingDirs()
|
|
|
+ : putOptionsForPath(dir));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3608,7 +3689,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3AFileSystem.this.invoker.retry(
|
|
|
"putObject(" + "" + ")", to.toString(),
|
|
|
true,
|
|
|
- () -> executePut(putObjectRequest, progress));
|
|
|
+ () -> executePut(putObjectRequest, progress, putOptionsForPath(to)));
|
|
|
|
|
|
return null;
|
|
|
});
|
|
@@ -3627,7 +3708,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
new MkdirOperation(
|
|
|
storeContext,
|
|
|
path,
|
|
|
- createMkdirOperationCallbacks()));
|
|
|
+ createMkdirOperationCallbacks(), false));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3637,14 +3718,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* aborted before an {@code InterruptedIOException} is thrown.
|
|
|
* @param putObjectRequest request
|
|
|
* @param progress optional progress callback
|
|
|
+ * @param putOptions put object options
|
|
|
* @return the upload result
|
|
|
* @throws InterruptedIOException if the blocking was interrupted.
|
|
|
*/
|
|
|
- @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
|
|
|
- UploadResult executePut(PutObjectRequest putObjectRequest,
|
|
|
- Progressable progress)
|
|
|
+ @Retries.OnceRaw("For PUT; post-PUT actions are RetrySwallowed")
|
|
|
+ UploadResult executePut(
|
|
|
+ final PutObjectRequest putObjectRequest,
|
|
|
+ final Progressable progress,
|
|
|
+ final PutObjectOptions putOptions)
|
|
|
throws InterruptedIOException {
|
|
|
String key = putObjectRequest.getKey();
|
|
|
+ long len = getPutRequestLength(putObjectRequest);
|
|
|
UploadInfo info = putObject(putObjectRequest);
|
|
|
Upload upload = info.getUpload();
|
|
|
ProgressableProgressListener listener = new ProgressableProgressListener(
|
|
@@ -3652,9 +3737,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
upload.addProgressListener(listener);
|
|
|
UploadResult result = waitForUploadCompletion(key, info);
|
|
|
listener.uploadCompleted();
|
|
|
+
|
|
|
// post-write actions
|
|
|
- finishedWrite(key, info.getLength(),
|
|
|
- result.getETag(), result.getVersionId());
|
|
|
+ finishedWrite(key, len,
|
|
|
+ result.getETag(), result.getVersionId(), putOptions);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -3663,7 +3749,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* If the waiting for completion is interrupted, the upload will be
|
|
|
* aborted before an {@code InterruptedIOException} is thrown.
|
|
|
* If the upload (or its result collection) failed, this is where
|
|
|
- * the failure is raised as an AWS exception
|
|
|
+ * the failure is raised as an AWS exception.
|
|
|
+ * Calls {@link #incrementPutCompletedStatistics(boolean, long)}
|
|
|
+ * to update the statistics.
|
|
|
* @param key destination key
|
|
|
* @param uploadInfo upload to wait for
|
|
|
* @return the upload result
|
|
@@ -3985,63 +4073,64 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
/**
|
|
|
* Perform post-write actions.
|
|
|
- * <p></p>
|
|
|
+ * <p>
|
|
|
* This operation MUST be called after any PUT/multipart PUT completes
|
|
|
* successfully.
|
|
|
- * <p></p>
|
|
|
- * The actions include:
|
|
|
- * <ol>
|
|
|
- * <li>
|
|
|
- * Calling
|
|
|
- * {@link #deleteUnnecessaryFakeDirectories(Path)}
|
|
|
- * if directory markers are not being retained.
|
|
|
- * </li>
|
|
|
- * <li>
|
|
|
- * Updating any metadata store with details on the newly created
|
|
|
- * object.
|
|
|
- * </li>
|
|
|
- * </ol>
|
|
|
+ * <p>
|
|
|
+ * The actions include calling
|
|
|
+ * {@link #deleteUnnecessaryFakeDirectories(Path)}
|
|
|
+ * if directory markers are not being retained.
|
|
|
* @param key key written to
|
|
|
* @param length total length of file written
|
|
|
* @param eTag eTag of the written object
|
|
|
* @param versionId S3 object versionId of the written object
|
|
|
+ * @param putOptions put object options
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
- @Retries.RetryTranslated("Except if failOnMetadataWriteError=false, in which"
|
|
|
- + " case RetryExceptionsSwallowed")
|
|
|
- void finishedWrite(String key, long length, String eTag, String versionId) {
|
|
|
+ @Retries.RetryExceptionsSwallowed
|
|
|
+ void finishedWrite(
|
|
|
+ String key,
|
|
|
+ long length,
|
|
|
+ String eTag,
|
|
|
+ String versionId,
|
|
|
+ PutObjectOptions putOptions) {
|
|
|
LOG.debug("Finished write to {}, len {}. etag {}, version {}",
|
|
|
key, length, eTag, versionId);
|
|
|
- Path p = keyToQualifiedPath(key);
|
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
|
- // kick off an async delete
|
|
|
- CompletableFuture<?> deletion;
|
|
|
- if (!keepDirectoryMarkers(p)) {
|
|
|
- deletion = submit(
|
|
|
- unboundedThreadPool, getActiveAuditSpan(),
|
|
|
- () -> {
|
|
|
- deleteUnnecessaryFakeDirectories(
|
|
|
- p.getParent()
|
|
|
- );
|
|
|
- return null;
|
|
|
- });
|
|
|
- } else {
|
|
|
- deletion = null;
|
|
|
+ if (!putOptions.isKeepMarkers()) {
|
|
|
+ Path p = keyToQualifiedPath(key);
|
|
|
+ deleteUnnecessaryFakeDirectories(p.getParent());
|
|
|
}
|
|
|
-
|
|
|
- // catch up with any delete operation.
|
|
|
- waitForCompletionIgnoringExceptions(deletion);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Should we keep directory markers under the path being created
|
|
|
* by mkdir/file creation/rename?
|
|
|
+ * This is done if marker retention is enabled for the path,
|
|
|
+ * or it is under a magic path where we are saving IOPs
|
|
|
+ * knowing that all committers are on the same code version and
|
|
|
+ * therefore marker aware.
|
|
|
* @param path path to probe
|
|
|
* @return true if the markers MAY be retained,
|
|
|
* false if they MUST be deleted
|
|
|
*/
|
|
|
private boolean keepDirectoryMarkers(Path path) {
|
|
|
- return directoryPolicy.keepDirectoryMarkers(path);
|
|
|
+ return directoryPolicy.keepDirectoryMarkers(path)
|
|
|
+ || isUnderMagicCommitPath(path);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Should we keep directory markers under the path being created
|
|
|
+ * by mkdir/file creation/rename?
|
|
|
+ * See {@link #keepDirectoryMarkers(Path)} for the policy.
|
|
|
+ *
|
|
|
+ * @param path path to probe
|
|
|
+ * @return the options to use with the put request
|
|
|
+ */
|
|
|
+ private PutObjectOptions putOptionsForPath(Path path) {
|
|
|
+ return keepDirectoryMarkers(path)
|
|
|
+ ? PutObjectOptions.keepingDirs()
|
|
|
+ : PutObjectOptions.deletingDirs();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4078,27 +4167,32 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* Create a fake directory, always ending in "/".
|
|
|
* Retry policy: retrying; translated.
|
|
|
* @param objectName name of directory object.
|
|
|
+ * @param putOptions put object options
|
|
|
* @throws IOException IO failure
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
- private void createFakeDirectory(final String objectName)
|
|
|
+ private void createFakeDirectory(final String objectName,
|
|
|
+ final PutObjectOptions putOptions)
|
|
|
throws IOException {
|
|
|
- createEmptyObject(objectName);
|
|
|
+ createEmptyObject(objectName, putOptions);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Used to create an empty file that represents an empty directory.
|
|
|
+ * The policy for deleting parent dirs depends on the path, dir
|
|
|
+ * status and the putOptions value.
|
|
|
* Retry policy: retrying; translated.
|
|
|
* @param objectName object to create
|
|
|
+ * @param putOptions put object options
|
|
|
* @throws IOException IO failure
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
- private void createEmptyObject(final String objectName)
|
|
|
+ private void createEmptyObject(final String objectName, PutObjectOptions putOptions)
|
|
|
throws IOException {
|
|
|
invoker.retry("PUT 0-byte object ", objectName,
|
|
|
true, () ->
|
|
|
putObjectDirect(getRequestFactory()
|
|
|
- .newDirectoryMarkerRequest(objectName)));
|
|
|
+ .newDirectoryMarkerRequest(objectName), putOptions));
|
|
|
incrementPutProgressStatistics(objectName, 0);
|
|
|
instrumentation.directoryCreated();
|
|
|
}
|
|
@@ -4207,14 +4301,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
/**
|
|
|
* Predicate: is a path a magic commit path?
|
|
|
- * True if magic commit is enabled and the path qualifies as special.
|
|
|
+ * True if magic commit is enabled and the path qualifies as special,
|
|
|
+ * and is not a a .pending or .pendingset file,
|
|
|
* @param path path to examine
|
|
|
- * @return true if the path is or is under a magic directory
|
|
|
+ * @return true if writing a file to the path triggers a "magic" write.
|
|
|
*/
|
|
|
public boolean isMagicCommitPath(Path path) {
|
|
|
return committerIntegration.isMagicCommitPath(path);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Predicate: is a path under a magic commit path?
|
|
|
+ * True if magic commit is enabled and the path is under __magic,
|
|
|
+ * irrespective of file type.
|
|
|
+ * @param path path to examine
|
|
|
+ * @return true if the path is in a magic dir and the FS has magic writes enabled.
|
|
|
+ */
|
|
|
+ private boolean isUnderMagicCommitPath(Path path) {
|
|
|
+ return committerIntegration.isUnderMagicPath(path);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
|
|
|
* Override superclass so as to disable symlink resolution as symlinks
|
|
@@ -4766,9 +4872,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
|
|
|
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
|
|
|
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
|
|
|
+ return getDirectoryMarkerPolicy().hasPathCapability(path, cap);
|
|
|
+
|
|
|
+ // keep for a magic path or if the policy retains it
|
|
|
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
|
|
|
+ return keepDirectoryMarkers(path);
|
|
|
+ // delete is the opposite of keep
|
|
|
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
|
|
|
- return getDirectoryMarkerPolicy().hasPathCapability(path, cap);
|
|
|
+ return !keepDirectoryMarkers(path);
|
|
|
+
|
|
|
+ // create file options
|
|
|
+ case FS_S3A_CREATE_PERFORMANCE:
|
|
|
+ case FS_S3A_CREATE_HEADER:
|
|
|
+ return true;
|
|
|
|
|
|
default:
|
|
|
return super.hasPathCapability(p, cap);
|