|
@@ -54,6 +54,8 @@ import com.amazonaws.SdkBaseException;
|
|
|
import com.amazonaws.services.s3.AmazonS3;
|
|
|
import com.amazonaws.services.s3.Headers;
|
|
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
|
|
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
|
|
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
|
|
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
@@ -70,6 +72,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
|
|
import com.amazonaws.services.s3.model.S3Object;
|
|
|
+import com.amazonaws.services.s3.model.SelectObjectContentRequest;
|
|
|
+import com.amazonaws.services.s3.model.SelectObjectContentResult;
|
|
|
import com.amazonaws.services.s3.model.StorageClass;
|
|
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
|
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
|
@@ -126,6 +130,7 @@ import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.V2Migration;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
|
|
|
import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
@@ -853,6 +858,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param dtEnabled are delegation tokens enabled?
|
|
|
* @throws IOException failure.
|
|
|
*/
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
|
|
Configuration conf = getConf();
|
|
|
credentials = null;
|
|
@@ -865,6 +871,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// with it if so.
|
|
|
|
|
|
LOG.debug("Using delegation tokens");
|
|
|
+ V2Migration.v1DelegationTokenCredentialProvidersUsed();
|
|
|
S3ADelegationTokens tokens = new S3ADelegationTokens();
|
|
|
this.delegationTokens = Optional.of(tokens);
|
|
|
tokens.bindToFileSystem(getCanonicalUri(),
|
|
@@ -1192,7 +1199,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* This is for internal use within the S3A code itself.
|
|
|
* @return AmazonS3Client
|
|
|
*/
|
|
|
- AmazonS3 getAmazonS3Client() {
|
|
|
+ private AmazonS3 getAmazonS3Client() {
|
|
|
return s3;
|
|
|
}
|
|
|
|
|
@@ -1206,6 +1213,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@VisibleForTesting
|
|
|
public AmazonS3 getAmazonS3ClientForTesting(String reason) {
|
|
|
LOG.warn("Access to S3A client requested, reason {}", reason);
|
|
|
+ V2Migration.v1S3ClientRequested();
|
|
|
return s3;
|
|
|
}
|
|
|
|
|
@@ -1586,6 +1594,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Callbacks for WriteOperationHelper.
|
|
|
+ */
|
|
|
+ private final class WriteOperationHelperCallbacksImpl
|
|
|
+ implements WriteOperationHelper.WriteOperationHelperCallbacks {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SelectObjectContentResult selectObjectContent(SelectObjectContentRequest request) {
|
|
|
+ return s3.selectObjectContent(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CompleteMultipartUploadResult completeMultipartUpload(
|
|
|
+ CompleteMultipartUploadRequest request) {
|
|
|
+ return s3.completeMultipartUpload(request);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Create the read context for reading from the referenced file,
|
|
|
* using FS state as well as the status.
|
|
@@ -1806,7 +1833,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
getConf(),
|
|
|
statisticsContext,
|
|
|
getAuditSpanSource(),
|
|
|
- auditSpan);
|
|
|
+ auditSpan,
|
|
|
+ new WriteOperationHelperCallbacksImpl());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2287,6 +2315,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Retries.RetryTranslated
|
|
|
@InterfaceStability.Evolving
|
|
|
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
|
|
|
+ V2Migration.v1GetObjectMetadataCalled();
|
|
|
return trackDurationAndSpan(INVOCATION_GET_FILE_STATUS, path, () ->
|
|
|
getObjectMetadata(makeQualified(path), null, invoker,
|
|
|
"getObjectMetadata"));
|