|
@@ -54,6 +54,8 @@ import com.amazonaws.SdkBaseException;
|
|
|
import com.amazonaws.services.s3.AmazonS3;
|
|
|
import com.amazonaws.services.s3.Headers;
|
|
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
|
|
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
|
|
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
|
|
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
@@ -70,6 +72,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
|
|
import com.amazonaws.services.s3.model.S3Object;
|
|
|
+import com.amazonaws.services.s3.model.SelectObjectContentRequest;
|
|
|
+import com.amazonaws.services.s3.model.SelectObjectContentResult;
|
|
|
import com.amazonaws.services.s3.model.StorageClass;
|
|
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
|
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
|
@@ -127,6 +131,7 @@ import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.V2Migration;
|
|
|
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
|
|
@@ -882,6 +887,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param dtEnabled are delegation tokens enabled?
|
|
|
* @throws IOException failure.
|
|
|
*/
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
|
|
Configuration conf = getConf();
|
|
|
credentials = null;
|
|
@@ -894,6 +900,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// with it if so.
|
|
|
|
|
|
LOG.debug("Using delegation tokens");
|
|
|
+ V2Migration.v1DelegationTokenCredentialProvidersUsed();
|
|
|
S3ADelegationTokens tokens = new S3ADelegationTokens();
|
|
|
this.delegationTokens = Optional.of(tokens);
|
|
|
tokens.bindToFileSystem(getCanonicalUri(),
|
|
@@ -1221,7 +1228,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* This is for internal use within the S3A code itself.
|
|
|
* @return AmazonS3Client
|
|
|
*/
|
|
|
- AmazonS3 getAmazonS3Client() {
|
|
|
+ private AmazonS3 getAmazonS3Client() {
|
|
|
return s3;
|
|
|
}
|
|
|
|
|
@@ -1235,6 +1242,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@VisibleForTesting
|
|
|
public AmazonS3 getAmazonS3ClientForTesting(String reason) {
|
|
|
LOG.warn("Access to S3A client requested, reason {}", reason);
|
|
|
+ V2Migration.v1S3ClientRequested();
|
|
|
return s3;
|
|
|
}
|
|
|
|
|
@@ -1619,6 +1627,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Callbacks for WriteOperationHelper.
|
|
|
+ */
|
|
|
+ private final class WriteOperationHelperCallbacksImpl
|
|
|
+ implements WriteOperationHelper.WriteOperationHelperCallbacks {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SelectObjectContentResult selectObjectContent(SelectObjectContentRequest request) {
|
|
|
+ return s3.selectObjectContent(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CompleteMultipartUploadResult completeMultipartUpload(
|
|
|
+ CompleteMultipartUploadRequest request) {
|
|
|
+ return s3.completeMultipartUpload(request);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Create the read context for reading from the referenced file,
|
|
|
* using FS state as well as the status.
|
|
@@ -1843,7 +1870,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
getConf(),
|
|
|
statisticsContext,
|
|
|
getAuditSpanSource(),
|
|
|
- auditSpan);
|
|
|
+ auditSpan,
|
|
|
+ new WriteOperationHelperCallbacksImpl());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2324,6 +2352,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Retries.RetryTranslated
|
|
|
@InterfaceStability.Evolving
|
|
|
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
|
|
|
+ V2Migration.v1GetObjectMetadataCalled();
|
|
|
return trackDurationAndSpan(INVOCATION_GET_FILE_STATUS, path, () ->
|
|
|
getObjectMetadata(makeQualified(path), null, invoker,
|
|
|
"getObjectMetadata"));
|