|
@@ -43,6 +43,9 @@ import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
|
|
import software.amazon.awssdk.services.s3.model.MultipartUpload;
|
|
import software.amazon.awssdk.services.s3.model.MultipartUpload;
|
|
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
|
|
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
|
|
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
|
|
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.fs.s3a.S3AInternals;
|
|
|
|
+import org.apache.hadoop.fs.s3a.S3AStore;
|
|
import org.apache.hadoop.util.Lists;
|
|
import org.apache.hadoop.util.Lists;
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
@@ -129,9 +132,10 @@ public class StagingTestBase {
|
|
* @throws IOException IO problems.
|
|
* @throws IOException IO problems.
|
|
*/
|
|
*/
|
|
protected static S3AFileSystem createAndBindMockFSInstance(Configuration conf,
|
|
protected static S3AFileSystem createAndBindMockFSInstance(Configuration conf,
|
|
- Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome)
|
|
|
|
|
|
+ Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome,
|
|
|
|
+ S3Client mockS3Client)
|
|
throws IOException {
|
|
throws IOException {
|
|
- S3AFileSystem mockFs = mockS3AFileSystemRobustly();
|
|
|
|
|
|
+ S3AFileSystem mockFs = mockS3AFileSystemRobustly(mockS3Client);
|
|
MockS3AFileSystem wrapperFS = new MockS3AFileSystem(mockFs, outcome);
|
|
MockS3AFileSystem wrapperFS = new MockS3AFileSystem(mockFs, outcome);
|
|
URI uri = RAW_BUCKET_URI;
|
|
URI uri = RAW_BUCKET_URI;
|
|
wrapperFS.initialize(uri, conf);
|
|
wrapperFS.initialize(uri, conf);
|
|
@@ -142,8 +146,13 @@ public class StagingTestBase {
|
|
return mockFs;
|
|
return mockFs;
|
|
}
|
|
}
|
|
|
|
|
|
- private static S3AFileSystem mockS3AFileSystemRobustly() {
|
|
|
|
|
|
+ private static S3AFileSystem mockS3AFileSystemRobustly(S3Client mockS3Client) {
|
|
S3AFileSystem mockFS = mock(S3AFileSystem.class);
|
|
S3AFileSystem mockFS = mock(S3AFileSystem.class);
|
|
|
|
+ S3AInternals s3AInternals = mock(S3AInternals.class);
|
|
|
|
+ when(mockFS.getS3AInternals()).thenReturn(s3AInternals);
|
|
|
|
+ when(s3AInternals.getStore()).thenReturn(mock(S3AStore.class));
|
|
|
|
+ when(s3AInternals.getAmazonS3Client(anyString()))
|
|
|
|
+ .thenReturn(mockS3Client);
|
|
doNothing().when(mockFS).incrementReadOperations();
|
|
doNothing().when(mockFS).incrementReadOperations();
|
|
doNothing().when(mockFS).incrementWriteOperations();
|
|
doNothing().when(mockFS).incrementWriteOperations();
|
|
doNothing().when(mockFS).incrementWriteOperations();
|
|
doNothing().when(mockFS).incrementWriteOperations();
|
|
@@ -350,7 +359,7 @@ public class StagingTestBase {
|
|
this.errors = new StagingTestBase.ClientErrors();
|
|
this.errors = new StagingTestBase.ClientErrors();
|
|
this.mockClient = newMockS3Client(results, errors);
|
|
this.mockClient = newMockS3Client(results, errors);
|
|
this.mockFS = createAndBindMockFSInstance(jobConf,
|
|
this.mockFS = createAndBindMockFSInstance(jobConf,
|
|
- Pair.of(results, errors));
|
|
|
|
|
|
+ Pair.of(results, errors), mockClient);
|
|
this.wrapperFS = lookupWrapperFS(jobConf);
|
|
this.wrapperFS = lookupWrapperFS(jobConf);
|
|
// and bind the FS
|
|
// and bind the FS
|
|
wrapperFS.setAmazonS3Client(mockClient);
|
|
wrapperFS.setAmazonS3Client(mockClient);
|