|
@@ -18,26 +18,15 @@
|
|
|
|
|
|
package org.apache.hadoop.fs.s3a.impl;
|
|
|
|
|
|
-import javax.annotation.Nullable;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.net.URI;
|
|
|
-import java.net.URISyntaxException;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-import com.amazonaws.AmazonClientException;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
|
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
|
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
|
|
-import com.amazonaws.services.s3.transfer.model.CopyResult;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import org.assertj.core.api.Assertions;
|
|
|
import org.junit.Before;
|
|
@@ -45,32 +34,8 @@ import org.junit.Test;
|
|
|
|
|
|
import org.apache.commons.lang3.tuple.Pair;
|
|
|
import org.apache.commons.lang3.tuple.Triple;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.RemoteIterator;
|
|
|
-import org.apache.hadoop.fs.s3a.Constants;
|
|
|
-import org.apache.hadoop.fs.s3a.Invoker;
|
|
|
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
|
|
-import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
|
|
-import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
|
|
-import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
|
|
|
-import org.apache.hadoop.fs.s3a.S3AReadOpContext;
|
|
|
-import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
|
|
|
-import org.apache.hadoop.fs.s3a.S3ListRequest;
|
|
|
-import org.apache.hadoop.fs.s3a.S3ListResult;
|
|
|
-import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
|
|
|
-import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
|
|
-import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
|
|
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
|
|
@@ -93,8 +58,8 @@ public class TestPartialDeleteFailures {
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
- context = createMockStoreContext(true,
|
|
|
- new OperationTrackingStore());
|
|
|
+ context = S3ATestUtils.createMockStoreContext(true,
|
|
|
+ new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -187,9 +152,10 @@ public class TestPartialDeleteFailures {
|
|
|
final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
|
|
|
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
|
|
deleteForbidden);
|
|
|
- OperationTrackingStore store
|
|
|
- = new OperationTrackingStore();
|
|
|
- StoreContext storeContext = createMockStoreContext(true, store);
|
|
|
+ S3ATestUtils.OperationTrackingStore store
|
|
|
+ = new S3ATestUtils.OperationTrackingStore();
|
|
|
+ StoreContext storeContext = S3ATestUtils
|
|
|
+ .createMockStoreContext(true, store, CONTEXT_ACCESSORS);
|
|
|
MultiObjectDeleteSupport deleteSupport
|
|
|
= new MultiObjectDeleteSupport(storeContext, null);
|
|
|
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
|
|
@@ -210,174 +176,6 @@ public class TestPartialDeleteFailures {
|
|
|
}
|
|
|
|
|
|
|
|
|
- private StoreContext createMockStoreContext(boolean multiDelete,
|
|
|
- OperationTrackingStore store) throws URISyntaxException, IOException {
|
|
|
- URI name = new URI("s3a://bucket");
|
|
|
- Configuration conf = new Configuration();
|
|
|
- return new StoreContextBuilder().setFsURI(name)
|
|
|
- .setBucket("bucket")
|
|
|
- .setConfiguration(conf)
|
|
|
- .setUsername("alice")
|
|
|
- .setOwner(UserGroupInformation.getCurrentUser())
|
|
|
- .setExecutor(BlockingThreadPoolExecutorService.newInstance(
|
|
|
- 4,
|
|
|
- 4,
|
|
|
- 10, TimeUnit.SECONDS,
|
|
|
- "s3a-transfer-shared"))
|
|
|
- .setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY)
|
|
|
- .setInvoker(
|
|
|
- new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
|
|
|
- .setInstrumentation(new S3AInstrumentation(name))
|
|
|
- .setStorageStatistics(new S3AStorageStatistics())
|
|
|
- .setInputPolicy(S3AInputPolicy.Normal)
|
|
|
- .setChangeDetectionPolicy(
|
|
|
- ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
|
|
|
- ChangeDetectionPolicy.Source.ETag, false))
|
|
|
- .setMultiObjectDeleteEnabled(multiDelete)
|
|
|
- .setMetadataStore(store)
|
|
|
- .setUseListV1(false)
|
|
|
- .setContextAccessors(CONTEXT_ACCESSORS)
|
|
|
- .setTimeProvider(new S3Guard.TtlTimeProvider(conf))
|
|
|
- .build();
|
|
|
- }
|
|
|
-
|
|
|
- private static class MinimalListingOperationCallbacks
|
|
|
- implements ListingOperationCallbacks {
|
|
|
- @Override
|
|
|
- public CompletableFuture<S3ListResult> listObjectsAsync(
|
|
|
- S3ListRequest request)
|
|
|
- throws IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public CompletableFuture<S3ListResult> continueListObjectsAsync(
|
|
|
- S3ListRequest request,
|
|
|
- S3ListResult prevResult)
|
|
|
- throws IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public S3ALocatedFileStatus toLocatedFileStatus(
|
|
|
- S3AFileStatus status) throws IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public S3ListRequest createListObjectsRequest(
|
|
|
- String key,
|
|
|
- String delimiter) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long getDefaultBlockSize(Path path) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public int getMaxKeys() {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public ITtlTimeProvider getUpdatedTtlTimeProvider() {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean allowAuthoritative(Path p) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- private static class MinimalOperationCallbacks
|
|
|
- implements OperationCallbacks {
|
|
|
- @Override
|
|
|
- public S3ObjectAttributes createObjectAttributes(
|
|
|
- Path path,
|
|
|
- String eTag,
|
|
|
- String versionId,
|
|
|
- long len) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public S3ObjectAttributes createObjectAttributes(
|
|
|
- S3AFileStatus fileStatus) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public S3AReadOpContext createReadContext(
|
|
|
- FileStatus fileStatus) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void finishRename(
|
|
|
- Path sourceRenamed,
|
|
|
- Path destCreated)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deleteObjectAtPath(
|
|
|
- Path path,
|
|
|
- String key,
|
|
|
- boolean isFile,
|
|
|
- BulkOperationState operationState)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
|
|
- Path path,
|
|
|
- S3AFileStatus status,
|
|
|
- boolean collectTombstones,
|
|
|
- boolean includeSelf)
|
|
|
- throws IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public CopyResult copyFile(
|
|
|
- String srcKey,
|
|
|
- String destKey,
|
|
|
- S3ObjectAttributes srcAttributes,
|
|
|
- S3AReadOpContext readContext)
|
|
|
- throws IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public DeleteObjectsResult removeKeys(
|
|
|
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
- boolean deleteFakeDir,
|
|
|
- List<Path> undeletedObjectsOnFailure,
|
|
|
- BulkOperationState operationState,
|
|
|
- boolean quiet)
|
|
|
- throws MultiObjectDeleteException, AmazonClientException,
|
|
|
- IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean allowAuthoritative(Path p) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public RemoteIterator<S3AFileStatus> listObjects(
|
|
|
- Path path,
|
|
|
- String key)
|
|
|
- throws IOException {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private static class MinimalContextAccessor implements ContextAccessors {
|
|
|
|
|
|
@Override
|
|
@@ -406,155 +204,5 @@ public class TestPartialDeleteFailures {
|
|
|
return path;
|
|
|
}
|
|
|
}
|
|
|
- /**
|
|
|
- * MetadataStore which tracks what is deleted and added.
|
|
|
- */
|
|
|
- private static class OperationTrackingStore implements MetadataStore {
|
|
|
-
|
|
|
- private final List<Path> deleted = new ArrayList<>();
|
|
|
-
|
|
|
- private final List<Path> created = new ArrayList<>();
|
|
|
-
|
|
|
- @Override
|
|
|
- public void initialize(final FileSystem fs,
|
|
|
- ITtlTimeProvider ttlTimeProvider) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void initialize(final Configuration conf,
|
|
|
- ITtlTimeProvider ttlTimeProvider) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void forgetMetadata(final Path path) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public PathMetadata get(final Path path) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public PathMetadata get(final Path path,
|
|
|
- final boolean wantEmptyDirectoryFlag) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public DirListingMetadata listChildren(final Path path) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void put(final PathMetadata meta) {
|
|
|
- put(meta, null);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void put(final PathMetadata meta,
|
|
|
- final BulkOperationState operationState) {
|
|
|
- created.add(meta.getFileStatus().getPath());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void put(final Collection<? extends PathMetadata> metas,
|
|
|
- final BulkOperationState operationState) {
|
|
|
- metas.stream().forEach(meta -> put(meta, null));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void put(final DirListingMetadata meta,
|
|
|
- final List<Path> unchangedEntries,
|
|
|
- final BulkOperationState operationState) {
|
|
|
- created.add(meta.getPath());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void destroy() {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void delete(final Path path,
|
|
|
- final BulkOperationState operationState) {
|
|
|
- deleted.add(path);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deletePaths(final Collection<Path> paths,
|
|
|
- @Nullable final BulkOperationState operationState)
|
|
|
- throws IOException {
|
|
|
- deleted.addAll(paths);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deleteSubtree(final Path path,
|
|
|
- final BulkOperationState operationState) {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void move(@Nullable final Collection<Path> pathsToDelete,
|
|
|
- @Nullable final Collection<PathMetadata> pathsToCreate,
|
|
|
- @Nullable final BulkOperationState operationState) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void prune(final PruneMode pruneMode, final long cutoff) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long prune(final PruneMode pruneMode,
|
|
|
- final long cutoff,
|
|
|
- final String keyPrefix) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public BulkOperationState initiateBulkWrite(
|
|
|
- final BulkOperationState.OperationType operation,
|
|
|
- final Path dest) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, String> getDiagnostics() {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void updateParameters(final Map<String, String> parameters) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void close() {
|
|
|
- }
|
|
|
-
|
|
|
- public List<Path> getDeleted() {
|
|
|
- return deleted;
|
|
|
- }
|
|
|
-
|
|
|
- public List<Path> getCreated() {
|
|
|
- return created;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public RenameTracker initiateRenameOperation(
|
|
|
- final StoreContext storeContext,
|
|
|
- final Path source,
|
|
|
- final S3AFileStatus sourceStatus,
|
|
|
- final Path dest) {
|
|
|
- throw new UnsupportedOperationException("unsupported");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void addAncestors(final Path qualifiedPath,
|
|
|
- @Nullable final BulkOperationState operationState) {
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
}
|