|
@@ -23,7 +23,6 @@ import java.io.File;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
-import java.util.ArrayList;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -51,19 +50,20 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.PathIOException;
|
|
import org.apache.hadoop.fs.PathIOException;
|
|
|
|
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
|
|
|
|
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
|
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
|
|
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
|
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
|
import org.apache.hadoop.fs.s3a.select.SelectBinding;
|
|
import org.apache.hadoop.fs.s3a.select.SelectBinding;
|
|
|
|
+import org.apache.hadoop.fs.store.audit.AuditSpan;
|
|
|
|
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
|
|
import org.apache.hadoop.util.DurationInfo;
|
|
import org.apache.hadoop.util.DurationInfo;
|
|
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
|
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
|
|
|
|
|
-import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
|
|
|
|
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
|
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
|
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
|
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
|
-import static org.apache.hadoop.fs.s3a.S3AUtils.longOption;
|
|
|
|
-import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
|
|
|
-import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.store.audit.AuditingFunctions.withinAuditSpan;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Helper for low-level operations against an S3 Bucket for writing data,
|
|
* Helper for low-level operations against an S3 Bucket for writing data,
|
|
@@ -87,6 +87,17 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_
|
|
* </ul>
|
|
* </ul>
|
|
*
|
|
*
|
|
* This API is for internal use only.
|
|
* This API is for internal use only.
|
|
|
|
+ * Span scoping: This helper is instantiated with span; it will be used
|
|
|
|
+ * before operations which query/update S3
|
|
|
|
+ *
|
|
|
|
+ * History
|
|
|
|
+ * <pre>
|
|
|
|
+ * - A nested class in S3AFileSystem
|
|
|
|
+ * - Single shared instance created and reused.
|
|
|
|
+ * - [HADOOP-13786] A separate class, single instance in S3AFS
|
|
|
|
+ * - [HDFS-13934] Split into interface and implementation
|
|
|
|
+ * - [HADOOP-15711] Adds audit tracking; one instance per use.
|
|
|
|
+ * </pre>
|
|
*/
|
|
*/
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
@InterfaceStability.Unstable
|
|
@InterfaceStability.Unstable
|
|
@@ -116,22 +127,50 @@ public class WriteOperationHelper implements WriteOperations {
|
|
*/
|
|
*/
|
|
private final S3AStatisticsContext statisticsContext;
|
|
private final S3AStatisticsContext statisticsContext;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Store Context; extracted from owner.
|
|
|
|
+ */
|
|
|
|
+ private final StoreContext storeContext;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Source of Audit spans.
|
|
|
|
+ */
|
|
|
|
+ private final AuditSpanSource auditSpanSource;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Audit Span.
|
|
|
|
+ */
|
|
|
|
+ private AuditSpan auditSpan;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Factory for AWS requests.
|
|
|
|
+ */
|
|
|
|
+ private final RequestFactory requestFactory;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Constructor.
|
|
* Constructor.
|
|
* @param owner owner FS creating the helper
|
|
* @param owner owner FS creating the helper
|
|
* @param conf Configuration object
|
|
* @param conf Configuration object
|
|
* @param statisticsContext statistics context
|
|
* @param statisticsContext statistics context
|
|
|
|
+ * @param auditSpanSource source of spans
|
|
|
|
+ * @param auditSpan span to activate
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
protected WriteOperationHelper(S3AFileSystem owner,
|
|
protected WriteOperationHelper(S3AFileSystem owner,
|
|
Configuration conf,
|
|
Configuration conf,
|
|
- S3AStatisticsContext statisticsContext) {
|
|
|
|
|
|
+ S3AStatisticsContext statisticsContext,
|
|
|
|
+ final AuditSpanSource auditSpanSource,
|
|
|
|
+ final AuditSpan auditSpan) {
|
|
this.owner = owner;
|
|
this.owner = owner;
|
|
this.invoker = new Invoker(new S3ARetryPolicy(conf),
|
|
this.invoker = new Invoker(new S3ARetryPolicy(conf),
|
|
this::operationRetried);
|
|
this::operationRetried);
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
this.statisticsContext = statisticsContext;
|
|
this.statisticsContext = statisticsContext;
|
|
- bucket = owner.getBucket();
|
|
|
|
|
|
+ this.storeContext = owner.createStoreContext();
|
|
|
|
+ this.bucket = owner.getBucket();
|
|
|
|
+ this.auditSpanSource = auditSpanSource;
|
|
|
|
+ this.auditSpan = checkNotNull(auditSpan);
|
|
|
|
+ this.requestFactory = owner.getRequestFactory();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -150,6 +189,7 @@ public class WriteOperationHelper implements WriteOperations {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Execute a function with retry processing.
|
|
* Execute a function with retry processing.
|
|
|
|
+ * Also activates the current span.
|
|
* @param <T> type of return value
|
|
* @param <T> type of return value
|
|
* @param action action to execute (used in error messages)
|
|
* @param action action to execute (used in error messages)
|
|
* @param path path of work (used in error messages)
|
|
* @param path path of work (used in error messages)
|
|
@@ -164,10 +204,33 @@ public class WriteOperationHelper implements WriteOperations {
|
|
boolean idempotent,
|
|
boolean idempotent,
|
|
CallableRaisingIOE<T> operation)
|
|
CallableRaisingIOE<T> operation)
|
|
throws IOException {
|
|
throws IOException {
|
|
-
|
|
|
|
|
|
+ activateAuditSpan();
|
|
return invoker.retry(action, path, idempotent, operation);
|
|
return invoker.retry(action, path, idempotent, operation);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the audit span this object was created with.
|
|
|
|
+ * @return the audit span
|
|
|
|
+ */
|
|
|
|
+ public AuditSpan getAuditSpan() {
|
|
|
|
+ return auditSpan;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Activate the audit span.
|
|
|
|
+ * @return the span
|
|
|
|
+ */
|
|
|
|
+ private AuditSpan activateAuditSpan() {
|
|
|
|
+ return auditSpan.activate();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Deactivate the audit span.
|
|
|
|
+ */
|
|
|
|
+ private void deactivateAuditSpan() {
|
|
|
|
+ auditSpan.deactivate();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Create a {@link PutObjectRequest} request against the specific key.
|
|
* Create a {@link PutObjectRequest} request against the specific key.
|
|
* @param destKey destination key
|
|
* @param destKey destination key
|
|
@@ -176,15 +239,18 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* @param headers optional map of custom headers.
|
|
* @param headers optional map of custom headers.
|
|
* @return the request
|
|
* @return the request
|
|
*/
|
|
*/
|
|
|
|
+ @Retries.OnceRaw
|
|
public PutObjectRequest createPutObjectRequest(String destKey,
|
|
public PutObjectRequest createPutObjectRequest(String destKey,
|
|
InputStream inputStream,
|
|
InputStream inputStream,
|
|
long length,
|
|
long length,
|
|
final Map<String, String> headers) {
|
|
final Map<String, String> headers) {
|
|
|
|
+ activateAuditSpan();
|
|
ObjectMetadata objectMetadata = newObjectMetadata(length);
|
|
ObjectMetadata objectMetadata = newObjectMetadata(length);
|
|
if (headers != null) {
|
|
if (headers != null) {
|
|
objectMetadata.setUserMetadata(headers);
|
|
objectMetadata.setUserMetadata(headers);
|
|
}
|
|
}
|
|
- return owner.newPutObjectRequest(destKey,
|
|
|
|
|
|
+ return getRequestFactory().newPutObjectRequest(
|
|
|
|
+ destKey,
|
|
objectMetadata,
|
|
objectMetadata,
|
|
inputStream);
|
|
inputStream);
|
|
}
|
|
}
|
|
@@ -195,13 +261,16 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* @param sourceFile source file
|
|
* @param sourceFile source file
|
|
* @return the request
|
|
* @return the request
|
|
*/
|
|
*/
|
|
|
|
+ @Retries.OnceRaw
|
|
public PutObjectRequest createPutObjectRequest(String dest,
|
|
public PutObjectRequest createPutObjectRequest(String dest,
|
|
File sourceFile) {
|
|
File sourceFile) {
|
|
Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
|
|
Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
|
|
"File length is too big for a single PUT upload");
|
|
"File length is too big for a single PUT upload");
|
|
- return owner.newPutObjectRequest(dest,
|
|
|
|
- newObjectMetadata((int) sourceFile.length()),
|
|
|
|
- sourceFile);
|
|
|
|
|
|
+ activateAuditSpan();
|
|
|
|
+ return getRequestFactory().
|
|
|
|
+ newPutObjectRequest(dest,
|
|
|
|
+ newObjectMetadata((int) sourceFile.length()),
|
|
|
|
+ sourceFile);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -227,7 +296,7 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* @return a new metadata instance
|
|
* @return a new metadata instance
|
|
*/
|
|
*/
|
|
public ObjectMetadata newObjectMetadata(long length) {
|
|
public ObjectMetadata newObjectMetadata(long length) {
|
|
- return owner.newObjectMetadata(length);
|
|
|
|
|
|
+ return getRequestFactory().newObjectMetadata(length);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -240,15 +309,16 @@ public class WriteOperationHelper implements WriteOperations {
|
|
@Retries.RetryTranslated
|
|
@Retries.RetryTranslated
|
|
public String initiateMultiPartUpload(String destKey) throws IOException {
|
|
public String initiateMultiPartUpload(String destKey) throws IOException {
|
|
LOG.debug("Initiating Multipart upload to {}", destKey);
|
|
LOG.debug("Initiating Multipart upload to {}", destKey);
|
|
- final InitiateMultipartUploadRequest initiateMPURequest =
|
|
|
|
- new InitiateMultipartUploadRequest(bucket,
|
|
|
|
- destKey,
|
|
|
|
- newObjectMetadata(-1));
|
|
|
|
- initiateMPURequest.setCannedACL(owner.getCannedACL());
|
|
|
|
- owner.setOptionalMultipartUploadRequestParameters(initiateMPURequest);
|
|
|
|
-
|
|
|
|
- return retry("initiate MultiPartUpload", destKey, true,
|
|
|
|
- () -> owner.initiateMultipartUpload(initiateMPURequest).getUploadId());
|
|
|
|
|
|
+ try (AuditSpan span = activateAuditSpan()) {
|
|
|
|
+ return retry("initiate MultiPartUpload", destKey, true,
|
|
|
|
+ () -> {
|
|
|
|
+ final InitiateMultipartUploadRequest initiateMPURequest =
|
|
|
|
+ getRequestFactory().newMultipartUploadRequest(
|
|
|
|
+ destKey);
|
|
|
|
+ return owner.initiateMultipartUpload(initiateMPURequest)
|
|
|
|
+ .getUploadId();
|
|
|
|
+ });
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -278,23 +348,22 @@ public class WriteOperationHelper implements WriteOperations {
|
|
throw new PathIOException(destKey,
|
|
throw new PathIOException(destKey,
|
|
"No upload parts in multipart upload");
|
|
"No upload parts in multipart upload");
|
|
}
|
|
}
|
|
- CompleteMultipartUploadResult uploadResult =
|
|
|
|
- invoker.retry("Completing multipart upload", destKey,
|
|
|
|
- true,
|
|
|
|
- retrying,
|
|
|
|
- () -> {
|
|
|
|
- // a copy of the list is required, so that the AWS SDK doesn't
|
|
|
|
- // attempt to sort an unmodifiable list.
|
|
|
|
- return owner.getAmazonS3Client().completeMultipartUpload(
|
|
|
|
- new CompleteMultipartUploadRequest(bucket,
|
|
|
|
- destKey,
|
|
|
|
- uploadId,
|
|
|
|
- new ArrayList<>(partETags)));
|
|
|
|
- }
|
|
|
|
- );
|
|
|
|
- owner.finishedWrite(destKey, length, uploadResult.getETag(),
|
|
|
|
- uploadResult.getVersionId(), operationState);
|
|
|
|
- return uploadResult;
|
|
|
|
|
|
+ try (AuditSpan span = activateAuditSpan()) {
|
|
|
|
+ CompleteMultipartUploadResult uploadResult;
|
|
|
|
+ uploadResult = invoker.retry("Completing multipart upload", destKey,
|
|
|
|
+ true,
|
|
|
|
+ retrying,
|
|
|
|
+ () -> {
|
|
|
|
+ final CompleteMultipartUploadRequest request =
|
|
|
|
+ getRequestFactory().newCompleteMultipartUploadRequest(
|
|
|
|
+ destKey, uploadId, partETags);
|
|
|
|
+ return owner.getAmazonS3Client().completeMultipartUpload(
|
|
|
|
+ request);
|
|
|
|
+ });
|
|
|
|
+ owner.finishedWrite(destKey, length, uploadResult.getETag(),
|
|
|
|
+ uploadResult.getVersionId(), operationState);
|
|
|
|
+ return uploadResult;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -351,16 +420,17 @@ public class WriteOperationHelper implements WriteOperations {
|
|
destKey,
|
|
destKey,
|
|
true,
|
|
true,
|
|
retrying,
|
|
retrying,
|
|
- () -> owner.abortMultipartUpload(
|
|
|
|
- destKey,
|
|
|
|
- uploadId));
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () ->
|
|
|
|
+ owner.abortMultipartUpload(
|
|
|
|
+ destKey, uploadId)));
|
|
} else {
|
|
} else {
|
|
// single pass attempt.
|
|
// single pass attempt.
|
|
once("Aborting multipart upload ID " + uploadId,
|
|
once("Aborting multipart upload ID " + uploadId,
|
|
destKey,
|
|
destKey,
|
|
- () -> owner.abortMultipartUpload(
|
|
|
|
- destKey,
|
|
|
|
- uploadId));
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () ->
|
|
|
|
+ owner.abortMultipartUpload(
|
|
|
|
+ destKey,
|
|
|
|
+ uploadId)));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -373,7 +443,8 @@ public class WriteOperationHelper implements WriteOperations {
|
|
public void abortMultipartUpload(MultipartUpload upload)
|
|
public void abortMultipartUpload(MultipartUpload upload)
|
|
throws IOException {
|
|
throws IOException {
|
|
invoker.retry("Aborting multipart commit", upload.getKey(), true,
|
|
invoker.retry("Aborting multipart commit", upload.getKey(), true,
|
|
- () -> owner.abortMultipartUpload(upload));
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(),
|
|
|
|
+ () -> owner.abortMultipartUpload(upload)));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -389,7 +460,7 @@ public class WriteOperationHelper implements WriteOperations {
|
|
throws IOException {
|
|
throws IOException {
|
|
LOG.debug("Aborting multipart uploads under {}", prefix);
|
|
LOG.debug("Aborting multipart uploads under {}", prefix);
|
|
int count = 0;
|
|
int count = 0;
|
|
- List<MultipartUpload> multipartUploads = owner.listMultipartUploads(prefix);
|
|
|
|
|
|
+ List<MultipartUpload> multipartUploads = listMultipartUploads(prefix);
|
|
LOG.debug("Number of outstanding uploads: {}", multipartUploads.size());
|
|
LOG.debug("Number of outstanding uploads: {}", multipartUploads.size());
|
|
for (MultipartUpload upload: multipartUploads) {
|
|
for (MultipartUpload upload: multipartUploads) {
|
|
try {
|
|
try {
|
|
@@ -402,6 +473,14 @@ public class WriteOperationHelper implements WriteOperations {
|
|
return count;
|
|
return count;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ @Retries.RetryTranslated
|
|
|
|
+ public List<MultipartUpload> listMultipartUploads(final String prefix)
|
|
|
|
+ throws IOException {
|
|
|
|
+ activateAuditSpan();
|
|
|
|
+ return owner.listMultipartUploads(prefix);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Abort a multipart commit operation.
|
|
* Abort a multipart commit operation.
|
|
* @param destKey destination key of ongoing operation
|
|
* @param destKey destination key of ongoing operation
|
|
@@ -409,6 +488,7 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* @throws IOException on problems.
|
|
* @throws IOException on problems.
|
|
* @throws FileNotFoundException if the abort ID is unknown
|
|
* @throws FileNotFoundException if the abort ID is unknown
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
@Retries.RetryTranslated
|
|
@Retries.RetryTranslated
|
|
public void abortMultipartCommit(String destKey, String uploadId)
|
|
public void abortMultipartCommit(String destKey, String uploadId)
|
|
throws IOException {
|
|
throws IOException {
|
|
@@ -423,6 +503,7 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* in {@code offset} and a length of block in {@code size} equal to
|
|
* in {@code offset} and a length of block in {@code size} equal to
|
|
* or less than the remaining bytes.
|
|
* or less than the remaining bytes.
|
|
* The part number must be less than 10000.
|
|
* The part number must be less than 10000.
|
|
|
|
+ * Retry policy is once-translated; to much effort
|
|
* @param destKey destination key of ongoing operation
|
|
* @param destKey destination key of ongoing operation
|
|
* @param uploadId ID of ongoing upload
|
|
* @param uploadId ID of ongoing upload
|
|
* @param partNumber current part number of the upload
|
|
* @param partNumber current part number of the upload
|
|
@@ -431,9 +512,11 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* @param sourceFile optional source file.
|
|
* @param sourceFile optional source file.
|
|
* @param offset offset in file to start reading.
|
|
* @param offset offset in file to start reading.
|
|
* @return the request.
|
|
* @return the request.
|
|
- * @throws IllegalArgumentException if the parameters are invalid -including
|
|
|
|
|
|
+ * @throws IllegalArgumentException if the parameters are invalid.
|
|
* @throws PathIOException if the part number is out of range.
|
|
* @throws PathIOException if the part number is out of range.
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
|
|
+ @Retries.OnceTranslated
|
|
public UploadPartRequest newUploadPartRequest(
|
|
public UploadPartRequest newUploadPartRequest(
|
|
String destKey,
|
|
String destKey,
|
|
String uploadId,
|
|
String uploadId,
|
|
@@ -441,52 +524,17 @@ public class WriteOperationHelper implements WriteOperations {
|
|
int size,
|
|
int size,
|
|
InputStream uploadStream,
|
|
InputStream uploadStream,
|
|
File sourceFile,
|
|
File sourceFile,
|
|
- Long offset) throws PathIOException {
|
|
|
|
- checkNotNull(uploadId);
|
|
|
|
- // exactly one source must be set; xor verifies this
|
|
|
|
- checkArgument((uploadStream != null) ^ (sourceFile != null),
|
|
|
|
- "Data source");
|
|
|
|
- checkArgument(size >= 0, "Invalid partition size %s", size);
|
|
|
|
- checkArgument(partNumber > 0,
|
|
|
|
- "partNumber must be between 1 and %s inclusive, but is %s",
|
|
|
|
- DEFAULT_UPLOAD_PART_COUNT_LIMIT, partNumber);
|
|
|
|
-
|
|
|
|
- LOG.debug("Creating part upload request for {} #{} size {}",
|
|
|
|
- uploadId, partNumber, size);
|
|
|
|
- long partCountLimit = longOption(conf,
|
|
|
|
- UPLOAD_PART_COUNT_LIMIT,
|
|
|
|
- DEFAULT_UPLOAD_PART_COUNT_LIMIT,
|
|
|
|
- 1);
|
|
|
|
- if (partCountLimit != DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
|
|
|
|
- LOG.warn("Configuration property {} shouldn't be overridden by client",
|
|
|
|
- UPLOAD_PART_COUNT_LIMIT);
|
|
|
|
- }
|
|
|
|
- final String pathErrorMsg = "Number of parts in multipart upload exceeded."
|
|
|
|
- + " Current part count = %s, Part count limit = %s ";
|
|
|
|
- if (partNumber > partCountLimit) {
|
|
|
|
- throw new PathIOException(destKey,
|
|
|
|
- String.format(pathErrorMsg, partNumber, partCountLimit));
|
|
|
|
- }
|
|
|
|
- UploadPartRequest request = new UploadPartRequest()
|
|
|
|
- .withBucketName(bucket)
|
|
|
|
- .withKey(destKey)
|
|
|
|
- .withUploadId(uploadId)
|
|
|
|
- .withPartNumber(partNumber)
|
|
|
|
- .withPartSize(size);
|
|
|
|
- if (uploadStream != null) {
|
|
|
|
- // there's an upload stream. Bind to it.
|
|
|
|
- request.setInputStream(uploadStream);
|
|
|
|
- } else {
|
|
|
|
- checkArgument(sourceFile.exists(),
|
|
|
|
- "Source file does not exist: %s", sourceFile);
|
|
|
|
- checkArgument(offset >= 0, "Invalid offset %s", offset);
|
|
|
|
- long length = sourceFile.length();
|
|
|
|
- checkArgument(offset == 0 || offset < length,
|
|
|
|
- "Offset %s beyond length of file %s", offset, length);
|
|
|
|
- request.setFile(sourceFile);
|
|
|
|
- request.setFileOffset(offset);
|
|
|
|
- }
|
|
|
|
- return request;
|
|
|
|
|
|
+ Long offset) throws IOException {
|
|
|
|
+ return once("upload part request", destKey,
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () ->
|
|
|
|
+ getRequestFactory().newUploadPartRequest(
|
|
|
|
+ destKey,
|
|
|
|
+ uploadId,
|
|
|
|
+ partNumber,
|
|
|
|
+ size,
|
|
|
|
+ uploadStream,
|
|
|
|
+ sourceFile,
|
|
|
|
+ offset)));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -514,7 +562,8 @@ public class WriteOperationHelper implements WriteOperations {
|
|
throws IOException {
|
|
throws IOException {
|
|
return retry("Writing Object",
|
|
return retry("Writing Object",
|
|
putObjectRequest.getKey(), true,
|
|
putObjectRequest.getKey(), true,
|
|
- () -> owner.putObjectDirect(putObjectRequest));
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () ->
|
|
|
|
+ owner.putObjectDirect(putObjectRequest)));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -529,7 +578,8 @@ public class WriteOperationHelper implements WriteOperations {
|
|
// no retry; rely on xfer manager logic
|
|
// no retry; rely on xfer manager logic
|
|
return retry("Writing Object",
|
|
return retry("Writing Object",
|
|
putObjectRequest.getKey(), true,
|
|
putObjectRequest.getKey(), true,
|
|
- () -> owner.executePut(putObjectRequest, null));
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () ->
|
|
|
|
+ owner.executePut(putObjectRequest, null)));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -543,13 +593,12 @@ public class WriteOperationHelper implements WriteOperations {
|
|
public void revertCommit(String destKey,
|
|
public void revertCommit(String destKey,
|
|
@Nullable BulkOperationState operationState) throws IOException {
|
|
@Nullable BulkOperationState operationState) throws IOException {
|
|
once("revert commit", destKey,
|
|
once("revert commit", destKey,
|
|
- () -> {
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () -> {
|
|
Path destPath = owner.keyToQualifiedPath(destKey);
|
|
Path destPath = owner.keyToQualifiedPath(destKey);
|
|
owner.deleteObjectAtPath(destPath,
|
|
owner.deleteObjectAtPath(destPath,
|
|
destKey, true, operationState);
|
|
destKey, true, operationState);
|
|
owner.maybeCreateFakeParentDirectory(destPath);
|
|
owner.maybeCreateFakeParentDirectory(destPath);
|
|
- }
|
|
|
|
- );
|
|
|
|
|
|
+ }));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -620,10 +669,11 @@ public class WriteOperationHelper implements WriteOperations {
|
|
public UploadPartResult uploadPart(UploadPartRequest request)
|
|
public UploadPartResult uploadPart(UploadPartRequest request)
|
|
throws IOException {
|
|
throws IOException {
|
|
return retry("upload part #" + request.getPartNumber()
|
|
return retry("upload part #" + request.getPartNumber()
|
|
- + " upload ID "+ request.getUploadId(),
|
|
|
|
|
|
+ + " upload ID " + request.getUploadId(),
|
|
request.getKey(),
|
|
request.getKey(),
|
|
true,
|
|
true,
|
|
- () -> owner.uploadPart(request));
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(),
|
|
|
|
+ () -> owner.uploadPart(request)));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -642,10 +692,10 @@ public class WriteOperationHelper implements WriteOperations {
|
|
* @return the request
|
|
* @return the request
|
|
*/
|
|
*/
|
|
public SelectObjectContentRequest newSelectRequest(Path path) {
|
|
public SelectObjectContentRequest newSelectRequest(Path path) {
|
|
- SelectObjectContentRequest request = new SelectObjectContentRequest();
|
|
|
|
- request.setBucketName(bucket);
|
|
|
|
- request.setKey(owner.pathToKey(path));
|
|
|
|
- return request;
|
|
|
|
|
|
+ try (AuditSpan span = getAuditSpan()) {
|
|
|
|
+ return getRequestFactory().newSelectRequest(
|
|
|
|
+ storeContext.pathToKey(path));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -664,6 +714,8 @@ public class WriteOperationHelper implements WriteOperations {
|
|
final SelectObjectContentRequest request,
|
|
final SelectObjectContentRequest request,
|
|
final String action)
|
|
final String action)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ // no setting of span here as the select binding is (statically) created
|
|
|
|
+ // without any span.
|
|
String bucketName = request.getBucketName();
|
|
String bucketName = request.getBucketName();
|
|
Preconditions.checkArgument(bucket.equals(bucketName),
|
|
Preconditions.checkArgument(bucket.equals(bucketName),
|
|
"wrong bucket: %s", bucketName);
|
|
"wrong bucket: %s", bucketName);
|
|
@@ -676,7 +728,7 @@ public class WriteOperationHelper implements WriteOperations {
|
|
action,
|
|
action,
|
|
source.toString(),
|
|
source.toString(),
|
|
true,
|
|
true,
|
|
- () -> {
|
|
|
|
|
|
+ withinAuditSpan(getAuditSpan(), () -> {
|
|
try (DurationInfo ignored =
|
|
try (DurationInfo ignored =
|
|
new DurationInfo(LOG, "S3 Select operation")) {
|
|
new DurationInfo(LOG, "S3 Select operation")) {
|
|
try {
|
|
try {
|
|
@@ -691,11 +743,35 @@ public class WriteOperationHelper implements WriteOperations {
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- });
|
|
|
|
|
|
+ }));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public AuditSpan createSpan(final String operation,
|
|
|
|
+ @Nullable final String path1,
|
|
|
|
+ @Nullable final String path2) throws IOException {
|
|
|
|
+ return auditSpanSource.createSpan(operation, path1, path2);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void incrementWriteOperations() {
|
|
public void incrementWriteOperations() {
|
|
owner.incrementWriteOperations();
|
|
owner.incrementWriteOperations();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Deactivate the audit span.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void close() throws IOException {
|
|
|
|
+ deactivateAuditSpan();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the request factory which uses this store's audit span.
|
|
|
|
+ * @return the request factory.
|
|
|
|
+ */
|
|
|
|
+ public RequestFactory getRequestFactory() {
|
|
|
|
+ return requestFactory;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|